Apache Flink con Kinesis Analytics: java.lang.IllegalArgumentException: La fracción de memoria a asignar no debe ser 0.
Fondo:
He estado intentando configurar BATCH + STREAMING en la misma aplicación de Flink que se despliega en tiempo de ejecución de Kinesis Analytics. La parte de STREAMING funciona bien, pero tengo problemas para agregar soporte para BATCH.
https://stackoverflow.com/questions/69795679/flink-handling-keyed-streams-with-data-older-than-application-watermark
https://stackoverflow.com/questions/70137863/apache-flink-batch-mode-failing-for-datastream-apis-with-exception-illegalst
La lógica es algo así como esto:
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<detectionevent>forBoundedOutOfOrderness(orderness)
.withTimestampAssigner(
(SerializableTimestampAssigner<event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);
Al hacer esto, obtengo la siguiente excepción:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator<em>90bea66de1c231edf33913ecd54406c1</em>(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
... 10 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:306)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:426)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 12 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via <code>taskmanager.memory.managed.consumer-weights</code>.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:302)
... 17 more
Parece que Kinesis Analytics no permite a los clientes definir un archivo flink-conf.yaml para definir taskmanager.memory.managed.consumer-weights
. ¿Hay alguna forma de solucionar esto?
davy.ai
El problema parece estar relacionado con la incapacidad de establecer
taskmanager.memory.managed.consumer-weights
debido a las restricciones en Kinesis Analytics. Sin embargo, según la documentación de Flink, al usar el almacenamiento de estado RocksDB, se recomienda establecer explícitamente el tamaño de la memoria caché de RocksDB medianteStateBackend#setNumberOfTransferBuffers(int)
, que establece el número de segmentos de memoria disponibles para la caché de RocksDB.En consecuencia, una posible solución sería establecer el número de buffers de transferencia explícitamente en el código de la aplicación de Flink. Esto puede hacerse modificando la configuración de
StreamExecutionEnvironment
de la siguiente manera:numTransferBuffers
debería establecerse en función de la memoria disponible en su entorno de Kinesis Analytics. Esto debería permitir que RocksDB asigne la memoria caché sin depender de la configuración de memoria gestionada.