actualizar (fusionar) delta con streaming estructurado de spark
Necesito insertar/actualizar datos en tiempo real (con transmisión estructurada de Spark) en Python.
Estos datos se leen en tiempo real (formato csv) y luego se escriben como una tabla delta (aquí queremos actualizar los datos, por eso usamos merge into desde delta).
Estoy usando el motor delta con Databricks.
He codificado lo siguiente:
from delta.tables import *
spark = SparkSession.builder \
.config("spark.sql.streaming.schemaInference", "true")\
.appName("SparkTest") \
.getOrCreate()
sourcedf= spark.readStream.format("csv") \
.option("header", True) \
.load("/mnt/user/raw/test_input") #csv data que leemos en tiempo real
spark.conf.set("spark.sql.shuffle.partitions", "1")
spark.createDataFrame([], sourcedf.schema) \
.write.format("delta") \
.mode("overwrite") \
.saveAsTable("deltaTable")
def upsertToDelta(microBatchOutputDF, batchId):
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO deltaTable t
USING updates s
ON s.Id = t.Id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
sourcedf.writeStream \
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.option("checkpointLocation", "/mnt/user/raw/checkpoints/output")\
.option("path", "/mnt/user/raw/PARQUET/output") \
.start() \
.awaitTermination()
Pero nada se escribe como se esperaba en la ruta de salida, la ruta del punto de control se llena como se esperaba, y una visualización de la tabla delta me muestra resultados también.
display(table("deltaTable"))
En la interfaz de usuario de Spark, veo el paso de escritura de transmisión:
sourcedf.writeStream \ .format("delta") \ ....
first at Snapshot.scala:156+details
RDD: Delta Table State #1 - dbfs:/user/hive/warehouse/deltatable/_delta_log
¿Alguna idea de cómo solucionar esto para poder insertar/actualizar datos csv en tablas delta en S3 en tiempo real con Spark?
Saludos cordiales
davy.ai
Hay algunas cosas que se pueden verificar para resolver el problema de que los datos no se escriban en la ruta de salida:
outputMode
exista y que el usuario que ejecuta el código tenga permisos de escritura en esa ruta.Verifique si hay algún error o excepción arrojados por la función
upsertToDelta
. Una forma de hacer esto es agregar la opciónoutputMode("append")
a la llamada de la funciónwriteStream
y escribir el DataFramemicroBatchOutputDF
en una salida de consola. Por ejemplo:Luego, ejecute el código nuevamente y verifique la salida de la consola en busca de errores o excepciones arrojados.
Verifique si se está utilizando la columna
Id
correcta para la operación de combinación. Asegúrese de que la columnaId
exista tanto en ladeltaTable
como en el DataFrameupdates
, y que tenga el mismo tipo de dato en ambos.Verifique si hay otros procesos o trabajos que estén modificando la
deltaTable
al mismo tiempo, lo que podría estar causando conflictos durante la operación de combinación.