es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Apache Flink: El modo por lotes falla para las API de Datastream con la excepción `IllegalStateException: No se permite la realización de puntos de control con entradas ordenadas`.

Una continuación de esto: https://stackoverflow.com/questions/69795679/flink-handling-keyed-streams-with-data-older-than-application-watermark

Basado en la sugerencia, he estado tratando de agregar soporte para Batch en la misma aplicación Flink que estaba usando las API de Datastream.

La lógica es algo así:

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.readTextFile(“nombreDeArchivo”)
.process(function de proceso que transforma la entrada)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(orderness)
.withTimestampAssigner(
(SerializableTimestampAssigner) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x días))
.process(processWindowFunction);

Según los documentos públicos, mi comprensión era que simplemente necesitaba cambiar la fuente a una acotada. Sin embargo, el procesamiento anterior sigue fallando en el desencadenador del evento después del paso de la ventana con la siguiente excepción:

java.lang.IllegalStateException: Checkpointing is not allowed with sorted inputs.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:552)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)

El archivo de entrada contiene eventos históricos para múltiples claves. Los datos para una clave dada están ordenados, pero los datos generales no lo están. También he agregado un evento al final de cada clave con la marca de tiempo = MAX_WATERMARK para indicar el final del flujo clave. También lo intenté para una sola clave pero el procesamiento falló con la misma excepción.

Nota: No he activado la verificación de puntos de control. También he intentado desactivar explícitamente la verificación de puntos de control sin éxito.

EDITAR – 1

Agregando más detalles:
Intenté cambiar y usar FileSource para leer archivos, pero todavía obtengo la misma excepción.

environment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
“Archivo de texto”)

El primer paso de procesamiento y división de claves funciona. Sin embargo, falla después de eso. Intenté eliminar la ventana y agregar un paso de proceso simple pero sigue fallando. No hay Sink explícito. La última función de proceso simplemente actualiza una base de datos.

¿Hay algo que me esté perdiendo?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error sugiere que la entrada del operador está ordenada, lo que impide el checkpointing. Una razón para ordenar la entrada podría ser para permitir una recuperación eficiente del estado en caso de un fallo. Sin embargo, en el caso de secuencias acotadas, como el procesamiento por lotes, la ordenación no es necesaria ya que todos los datos entran en memoria y pueden procesarse en una sola pasada sin recuperación.

    La solución es deshabilitar explícitamente la ordenación, utilizando el método setParallelism(1). Esto asegura que la entrada no está ordenada y se puede habilitar el checkpointing.

    Aquí está el código actualizado:

    streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
    DataStream<string> inputData = streamExecutionEnvironment.readTextFile("nombreArchivo")
        .setParallelism(1); // deshabilitar la ordenación para el procesamiento por lotes
    inputData
        .process(función de procesamiento que transforma la entrada)
        .assignTimestampsAndWatermarks(WatermarkStrategy
            .<detectionevent>forBoundedOutOfOrderness(orderness)
            .withTimestampAssigner(
                (SerializableTimestampAssigner<event>) (event, l) -> event.getEventTime()))
        .keyBy(keyFunction)
        .window(TumblingEventWindow.of(Time.days(x)))
        .process(processWindowFunction);
    

    Es importante establecer explícitamente la paralelización en 1, ya que el comportamiento predeterminado es ordenar las secuencias con clave. Esto se aplica no solo a readTextFile, sino también a otras fuentes acotadas, como fromElements y fromCollection.

    Finalmente, dado que la entrada está acotada, se puede habilitar el checkpointing de manera segura para proporcionar tolerancia a fallas en caso de fallos.

Comments are closed.