es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo añadir columnas de trazabilidad en la integración del autoloader y ADF?

Estoy utilizando Azure Data Factory para copiar los datos de origen en la zona de aterrizaje (ADLS Gen2) y luego utilizar el autoloader para cargarlos en las tablas delta bronce. Todo funciona perfectamente excepto que no puedo derivar el nombre del pipeline, runid y la hora del disparador como columnas derivadas en el archivo parquet junto con la tabla de origen de entrada.

Aquí, el esquema es un structType construido utilizando el esquema real de la tabla del servidor SQL de origen, no cubre las columnas derivadas adicionales en ADF.

sourceFilePath = 'wasbs://landing-zone@dlslandingzonedev.blob.core.windows.net/' \
   + domain_name + '/' + database_name + '/' \
   + schema_table_name.replace(database_name+'.','') + '/'
df = (spark
     .readStream
     .format("cloudFiles")
     .options(**cloudFile)
     .schema(schema)
     .option("rescueDataColumn", "_rescued_data")
     .load(sourceFilePath)
     )
# Columnas de trazabilidad
# from pyspark.sql.functions import *
df = (
  df.withColumn("audit_fileName", input_file_name()) 
    .withColumn("audit_createdTimestamp", current_timestamp())
)

Aquí está el DF de escritura en streaming:

streamQuery = (df
           .writeStream
           .format("delta")
           .outputMode("append")
           .trigger(once=True)
           .queryName(queryName)
           .option("checkpointLocation",checkpointLocation)
           .option("mergeSchema", "true")
           .start(tablePath)
          )

Con mergeSchema True, esperaba que el flujo detectara 3 columnas adicionales de Data Factory al escribir en formato delta. ¿Es esta una limitación del parquet? ¿Debo leer los datos como CSV / JSON en su lugar? ¿O tengo que agregar la definición del esquema de las columnas derivadas?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema está en la definición del esquema, ya que no incluye las columnas adicionales derivadas en ADF. La opción mergeSchema solo funciona para fusionar cambios en columnas existentes y no para agregar nuevas columnas. Por lo tanto, para incluir las columnas derivadas en el esquema, deberás agregarlas a la definición del esquema.

    Alternativamente, también puedes leer los datos como CSV o JSON en lugar de Parquet si eso se adapta mejor a tus necesidades.

Comments are closed.