es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Java – escribe en Kafka desde Spark

Soy nuevo en Kafka y Spark, así que desde ya pido disculpas si la pregunta se considera estúpida :).

Estoy tratando de entender cómo se procesan exactamente los datos y se almacenan en caché (desde Spark) antes de ser escritos en Kafka.

Digamos que tenemos el siguiente código que maneja el código que se ejecutará por partición (ya que estamos usando mapPartitions()):

Dataset resultsDF = scenarios.mapPartitions((MapPartitionsFunction<Obj1, String>) permutations -> {
Iterable iterable = () -> permutations;
return StreamSupport.stream(iterable.spliterator(), false).map(permutation -> {
log.info(“Hacer un trabajo….”);
return “abc”;
}).iterator();
}, Encoders.STRING());

Y más tarde escribo este dataset en Kafka utilizando lo siguiente:

        resultsDF
                .write()
                .format("kafka")
                .options(options)
                .save();

donde las opciones son opciones de Kafka como group.id, kafka.bootstrap.servers y topic.

Lo que estoy tratando de entender es cómo se procesa el conjunto de datos: ¿Spark recorre las particiones, recopila todos los resultados del cálculo y los agrega al conjunto de datos, generando así un DS muy grande en memoria? (Estoy buscando las posibles causas de errores de falta de memoria).
Y si es así, ¿hay una manera limpia de escribir datos en Kafka por procesamiento de particiones en lugar de recopilar todo en el conjunto de datos primero? Algo como invocar write() directamente dentro de la función mapPartitions o algo así.

EDITAR: Sé que los datos no se procesan realmente hasta que se llama a ‘write()’, sin embargo, todavía no estoy seguro de lo que esta escritura realmente escribe; por lo tanto, cómo puedo ver el plan de instrucción para el conjunto de datos con más detalle: necesito confirmar que los datos se están escribiendo en kafka directamente con cada cálculo en lugar de en un conjunto de datos después de que se hayan completado todos los cálculos.
Intenté rastrear los registros de la aplicación versus los registros de kafka, pero para ser honesto, no fue realmente útil: los registros detallados de kafka solo están disponibles para el tema, pero este se crea solo después de que realmente llamo a write() (en mi caso específico, el topic se asigna dinámicamente justo antes de procesar los resultados).

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Cuando llama a write() en un Dataset, Spark procesará los datos en el Dataset y los escribirá en Kafka. Los datos no se recopilan en un gran conjunto de datos en memoria antes de ser escritos en Kafka.

    Puede verificar el plan de instrucciones para el conjunto de datos con más detalle llamando a explain() en el Dataset. Esto le mostrará los pasos que Spark tomará para procesar los datos y escribirlos en Kafka.

    También puede verificar los registros de Kafka para ver cuándo se están escribiendo datos en el tema. Los registros le mostrarán cuándo se están escribiendo datos en el tema y qué datos se están escribiendo.

Comments are closed.