es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo prevenir que Spark mantenga datos antiguos que provocan falta de memoria en Spark Structured Streaming

Estoy utilizando el streaming estructurado en Spark pero estoy teniendo dificultades para entender los datos mantenidos en memoria. Actualmente estoy ejecutando Spark 2.4.7, que dice (Guía de programación de streaming estructurado)

La idea clave en el streaming estructurado es tratar un flujo de datos en vivo como una tabla a la que se le está agregando continuamente.

Entiendo esto como que Spark agrega todos los datos entrantes a una tabla ilimitada que nunca se trunca, es decir, seguirá creciendo indefinidamente.

Entiendo el concepto y por qué es bueno, por ejemplo, cuando quiero realizar una agregación basada en la hora del evento, puedo usar withWatermark para decirle a Spark qué columna es la hora del evento y luego especificar cuán atrasado quiero recibir los datos, y hacer que Spark descarte todo lo anterior a eso.

Sin embargo, supongamos que quiero realizar una agregación en algo que no es la hora del evento. Tengo un caso de uso en el que cada mensaje en Kafka contiene una serie de puntos de datos. Entonces, uso explode_outer para crear varias filas para cada mensaje, y para estas filas (dentro del mismo mensaje) me gustaría realizar una agregación basada en el id del mensaje (obteniendo max, min, avg, etc.). Mi pregunta es, ¿mantendrá Spark todos los datos “antiguos” ya que es así como funciona el streaming estructurado, lo que provocará problemas de OOM? ¿Y la única forma de evitar esto es agregar una marca de agua “ficticia” utilizando, por ejemplo, el momento en que recibí el mensaje e incluirla en mi groupBy?

Y en el otro caso de uso, donde ni siquiera quiero hacer un groupBy, simplemente quiero hacer alguna transformación en cada mensaje y luego pasarlo, solo me importa el “lote” actual. ¿Spark mantendrá también todos los mensajes antiguos obligándome a utilizar una marca de agua “ficticia” junto con un groupBy (incluyendo el id del mensaje en el groupBy y tomando, por ejemplo, el max de todas las columnas)?

Sé que puedo usar los buenos y antiguos DStreams para eliminar mi problema y simplemente manejar cada mensaje por separado, pero luego pierdo todas las ventajas del streaming estructurado.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Cuando se utiliza Structured Streaming en Spark, el flujo de datos entrante se trata como una tabla no acotada que crece indefinidamente. Esto significa que todos los datos entrantes se mantienen en memoria y no se truncan, lo que puede provocar problemas con el uso de memoria (problemas de OOM) si no se manejan correctamente.

    Si desea hacer agregaciones en algo que no sea el tiempo de evento, puede utilizar las funciones groupBy y agg para agrupar los datos por una determinada columna y luego realizar agregaciones en esos datos agrupados. Sin embargo, si le preocupa los problemas de memoria, puede agregar una “marca de agua” ficticia (withWatermark) en una columna que tenga un valor monótono creciente, como el tiempo en el que recibió el mensaje. Esto permitirá a Spark desechar datos antiguos después de un cierto período de tiempo y evitar problemas de memoria.

    Si solo le importa el lote de datos actual y no desea realizar ninguna agrupación o agregación, simplemente puede aplicar transformaciones en cada mensaje y enviarlo. En este caso, Spark solo mantendrá en memoria el lote de datos actual y no almacenará datos antiguos.

    En general, Structured Streaming puede ser una herramienta poderosa para trabajar con flujos de datos en vivo, pero es importante manejar correctamente el uso de memoria para evitar cualquier problema. Agregar marcas de agua ficticias y grupos puede ayudar a evitar problemas de memoria y permitirle trabajar con los datos de una manera más manejable.

Comments are closed.