Tag: FLINK-BATCH
Tengo un caso de uso en el que ejecuto algunos cálculos en una parte de los datos y estos cálculos dependen del contexto (estado intermedio). Por ejemplo: tengo algunas órdenes y realizo algunos cálculos en ellas. Los cálculos se realizan para las órdenes agrupadas por el campo de símbolo. class . . . Read more
Me he dado cuenta de que cada vez que ejecuto un nuevo trabajo, tarda aproximadamente un 20% más en comparación con el tiempo cuando lo lanzo de nuevo. ¿Flink guarda en caché algunos resultados y los reutiliza si se ejecuta un trabajo varias veces? Si es así, ¿cómo puedo controlarlo? . . . Read more
Fondo: He estado intentando configurar BATCH + STREAMING en la misma aplicación de Flink que se despliega en tiempo de ejecución de Kinesis Analytics. La parte de STREAMING funciona bien, pero tengo problemas para agregar soporte para BATCH. https://stackoverflow.com/questions/69795679/flink-handling-keyed-streams-with-data-older-than-application-watermark https://stackoverflow.com/questions/70137863/apache-flink-batch-mode-failing-for-datastream-apis-with-exception-illegalst La lógica es algo así como esto: streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), . . . Read more
Una continuación de esto: https://stackoverflow.com/questions/69795679/flink-handling-keyed-streams-with-data-older-than-application-watermark Basado en la sugerencia, he estado tratando de agregar soporte para Batch en la misma aplicación Flink que estaba usando las API de Datastream. La lógica es algo así: streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); streamExecutionEnvironment.readTextFile(“nombreDeArchivo”) .process(function de proceso que transforma la entrada) .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness(orderness) .withTimestampAssigner( (SerializableTimestampAssigner) (event, l) -> . . . Read more
Tengo un Datastream que contiene algunos campos como event_id, timestamp, etc. que permanecen constantes para muchos registros en la pipeline. Quiero usarlos en el nombre del archivo mientras lo escribo de nuevo en ParquetFormat usando StreamingFileSink. Podemos usar sufijos y prefijos si estamos utilizando constantes. Sin embargo, necesitaba ayuda para . . . Read more