es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Flink SlidingEventTimeWindows no funciona como se espera.

Tengo una ejecución de transmisión configurada como

object FlinkSlidingEventTimeExample extends App {
  case class Trx(timestamp:Long, id:String, trx:String, count:Int)
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  val watermarkS1 = WatermarkStrategy
    .forBoundedOutOfOrderness[Trx](Duration.ofSeconds(15))
    .withTimestampAssigner(new SerializableTimestampAssigner[Trx] {
      override def extractTimestamp(element: Trx, recordTimestamp: Long): Long = element.timestamp
    })
  val s1 = env.socketTextStream("localhost", 9999)
    .flatMap(l => l.split(" "))
    .map(l => Trx(timestamp = l.split(",")(0).toLong, id = l.split(",")(1), trx = l.split(",")(2), count = 1))
    .assignTimestampsAndWatermarks(watermarkS1)
    .keyBy(l => l.id)
    .window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5))) // No funciona
    //.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))) // Funciona
    .sum("count")
    .print
  env.execute("FlinkSlidingEventTimeExample")
}

Ya he definido un watermark, pero no pude entender por qué no está produciendo nada. ¿Alguien tiene alguna idea? Mi versión de Flink es 1.14.0

Mi archivo build.sbt es así:

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-runtime-web" % "1.14.0"
libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.14.0"
libraryDependencies += "org.apache.flink" % "flink-queryable-state-runtime" % "1.14.0"

Estoy ingresando datos de entrada desde un socket (puerto: 9999) de la siguiente manera:

1640375790000,1,trx1
1640375815000,1,trx2
1640375841000,1,trx3
1640375741000,1,trx4

He intentado dar un valor de tiempo más grande que el tamaño de la ventana, pero aún no funciona.

Captura de pantalla de la interfaz de usuario web de Flink:
web-ui
watermarks

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Basado en el código y las capturas de pantalla proporcionados, parece que el problema se encuentra en la definición de la ventana deslizante. Específicamente, la línea .window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5))) no está funcionando como se esperaba.

    Una posible razón para esto es que el tamaño de la ventana (20 segundos) no está alineado con la frecuencia de generación de marcas de agua (15 segundos). Esto puede hacer que ciertos eventos caigan fuera de la ventana, ya que se consideran demasiado tarde cuando llega la marca de agua. Para solucionar esto, podemos intentar aumentar el tamaño de la ventana a un múltiplo del intervalo de la marca de agua, por ejemplo, Time.seconds(30).

    Otra posible razón para el problema es que el generador de datos no está produciendo suficientes eventos para que se active la ventana deslizante. Esto se puede ver en la captura de pantalla de los datos en la pestaña “Watermarks” de Flink, donde vemos una única marca de agua sin eventos asociados. Para solucionar esto, podemos aumentar la frecuencia o el volumen de los datos entrantes, o disminuir el tamaño de la ventana deslizante para que se active con más frecuencia.

    En resumen, algunas posibles soluciones al problema son:
    – Alinear el tamaño de la ventana con la frecuencia de generación de marcas de agua.
    – Aumentar la frecuencia o el volumen de los datos entrantes.
    – Disminuir el tamaño de la ventana deslizante para que se active con más frecuencia.

Comments are closed.