es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Método foreach() con errores de Spark Streaming.

Estoy intentando escribir datos extraídos de Kafka en una tabla de Bigquery cada 120 segundos. Me gustaría realizar algunas operaciones adicionales que, según la documentación, deberían ser posibles dentro del método .foreach() o foreachBatch().

Como prueba, quería imprimir un mensaje simple cada vez que se extraen datos de Kafka y se escriben en BigQuery.

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime=’120 segundos’) \
.foreachBatch(imprimir(“¿se imprime en cada lote?”))
.format(“bigquery”).outputMode(“append”) \
.option(“temporaryGcsBucket”,ruta1) \
.option(“checkpointLocation”,ruta2) \
.option(“table”, tabla_kafka) \
.start()
batch_job.awaitTermination()

Esperaría que este mensaje se imprima cada 120 segundos en la salida de la celda de Jupyter Lab, en lugar de esto, se imprime solo una vez y sigue escribiendo en BigQuery.

Si intento usar .foreach() en lugar de foreachBatch()

batch_job=df_alarmsFromKafka.writeStream\
.trigger(processingTime=’120 seconds’) \
.foreach(imprimir(“¿se imprime en cada lote?”))
.format(“bigquery”).outputMode(“append”) \
.option(“temporaryGcsBucket”,ruta1) \
.option(“checkpointLocation”,ruta2) \
.option(“table”, tabla_kafka) \
.start()
batch_job.awaitTermination()

imprime el mensaje una vez e inmediatamente después muestra el siguiente error, que no pude depurar/entender:

/usr/lib/spark/python/pyspark/sql/streaming.py in foreach(self, f)
1335
1336 if not hasattr(f, ‘process’):
-> 1337 raise Exception(“El objeto proporcionado no tiene un método ‘proceso'”)
1338
1339 if not callable(getattr(f, ‘process’))):

Excepción: El objeto proporcionado no tiene un método ‘proceso’.

¿Estoy haciendo algo mal? ¿Cómo puedo realizar operaciones simples cada 120 segundos además de las realizadas directamente en el marco de datos evaluado df_alarmsFromKafka?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que estás intentando usar incorrectamente el método .foreach(). El método .foreach() requiere un objeto callable con un método process, como un objeto ForeachWriter. Puedes encontrar más información sobre el método .foreach() aquí.

    Si deseas realizar operaciones adicionales cada 120 segundos, puedes utilizar el método .foreachBatch(). Este método toma una función como argumento, la cual se llamará para cada lote de datos. Puedes encontrar más información sobre el método .foreachBatch() aquí.

Comments are closed.