es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Apache Flink con Kinesis Analytics: java.lang.IllegalArgumentException: La fracción de memoria a asignar no debe ser 0.

Fondo:
He estado intentando configurar BATCH + STREAMING en la misma aplicación de Flink que se despliega en tiempo de ejecución de Kinesis Analytics. La parte de STREAMING funciona bien, pero tengo problemas para agregar soporte para BATCH.

https://stackoverflow.com/questions/69795679/flink-handling-keyed-streams-with-data-older-than-application-watermark

https://stackoverflow.com/questions/70137863/apache-flink-batch-mode-failing-for-datastream-apis-with-exception-illegalst

La lógica es algo así como esto:

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<detectionevent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

Al hacer esto, obtengo la siguiente excepción:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator<em>90bea66de1c231edf33913ecd54406c1</em>(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 10 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:306)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:426)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via <code>taskmanager.memory.managed.consumer-weights</code>.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
    at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:302)
    ... 17 more

Parece que Kinesis Analytics no permite a los clientes definir un archivo flink-conf.yaml para definir taskmanager.memory.managed.consumer-weights. ¿Hay alguna forma de solucionar esto?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El problema parece estar relacionado con la incapacidad de establecer taskmanager.memory.managed.consumer-weights debido a las restricciones en Kinesis Analytics. Sin embargo, según la documentación de Flink, al usar el almacenamiento de estado RocksDB, se recomienda establecer explícitamente el tamaño de la memoria caché de RocksDB mediante StateBackend#setNumberOfTransferBuffers(int), que establece el número de segmentos de memoria disponibles para la caché de RocksDB.

    En consecuencia, una posible solución sería establecer el número de buffers de transferencia explícitamente en el código de la aplicación de Flink. Esto puede hacerse modificando la configuración de StreamExecutionEnvironment de la siguiente manera:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new RocksDBStateBackend(checkpointPath));
    
    // Establecer el número de buffers de transferencia para la caché de RocksDB
    env.getConfig().setInteger(RocksDBStateBackendOptions.NUMBER_OF_TRANSFER_THREADS, numTransferBuffers);
    
    ...
    
    env.execute(jobName);
    

    numTransferBuffers debería establecerse en función de la memoria disponible en su entorno de Kinesis Analytics. Esto debería permitir que RocksDB asigne la memoria caché sin depender de la configuración de memoria gestionada.

Comments are closed.