es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

actualizar (fusionar) delta con streaming estructurado de spark

Necesito insertar/actualizar datos en tiempo real (con transmisión estructurada de Spark) en Python.
Estos datos se leen en tiempo real (formato csv) y luego se escriben como una tabla delta (aquí queremos actualizar los datos, por eso usamos merge into desde delta).
Estoy usando el motor delta con Databricks.
He codificado lo siguiente:

from delta.tables import *
spark = SparkSession.builder \
  .config("spark.sql.streaming.schemaInference", "true")\
  .appName("SparkTest") \
  .getOrCreate()
sourcedf= spark.readStream.format("csv") \
  .option("header", True) \
  .load("/mnt/user/raw/test_input") #csv data que leemos en tiempo real
spark.conf.set("spark.sql.shuffle.partitions", "1")

spark.createDataFrame([], sourcedf.schema) \
  .write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("deltaTable")

def upsertToDelta(microBatchOutputDF, batchId): 
  microBatchOutputDF.createOrReplaceTempView("updates")
  microBatchOutputDF._jdf.sparkSession().sql("""
    MERGE INTO deltaTable t
    USING updates s
    ON s.Id = t.Id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

sourcedf.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .option("checkpointLocation", "/mnt/user/raw/checkpoints/output")\
  .option("path", "/mnt/user/raw/PARQUET/output") \
  .start() \ 
  .awaitTermination()

Pero nada se escribe como se esperaba en la ruta de salida, la ruta del punto de control se llena como se esperaba, y una visualización de la tabla delta me muestra resultados también.

display(table("deltaTable"))

En la interfaz de usuario de Spark, veo el paso de escritura de transmisión:

sourcedf.writeStream \ .format("delta") \ ....
first at Snapshot.scala:156+details
RDD: Delta Table State #1 - dbfs:/user/hive/warehouse/deltatable/_delta_log

¿Alguna idea de cómo solucionar esto para poder insertar/actualizar datos csv en tablas delta en S3 en tiempo real con Spark?
Saludos cordiales

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Hay algunas cosas que se pueden verificar para resolver el problema de que los datos no se escriban en la ruta de salida:

    1. Asegúrese de que la ruta proporcionada en la opción outputMode exista y que el usuario que ejecuta el código tenga permisos de escritura en esa ruta.
    2. Verifique si hay algún error o excepción arrojados por la función upsertToDelta. Una forma de hacer esto es agregar la opción outputMode("append") a la llamada de la función writeStream y escribir el DataFrame microBatchOutputDF en una salida de consola. Por ejemplo:

    sourcedf.writeStream
      .format("delta")
      .foreachBatch(upsertToDelta)
      .outputMode("append")
      .option("checkpointLocation", "/mnt/user/raw/checkpoints/output")
      .option("path", "/mnt/user/raw/PARQUET/output")
      .start()
      .awaitTermination()
    

    Luego, ejecute el código nuevamente y verifique la salida de la consola en busca de errores o excepciones arrojados.

    1. Verifique si se está utilizando la columna Id correcta para la operación de combinación. Asegúrese de que la columna Id exista tanto en la deltaTable como en el DataFrame updates, y que tenga el mismo tipo de dato en ambos.

    2. Verifique si hay otros procesos o trabajos que estén modificando la deltaTable al mismo tiempo, lo que podría estar causando conflictos durante la operación de combinación.

Comments are closed.