es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Puedes aplicar un esquema definido manualmente al intentar leer archivos delta en databricks?

He creado un esquema manual utilizando structtype, etc.

Tengo un nivel de plata 1 en ALDS que contiene archivos delta sin procesar. Necesito aplicar el esquema manual a este nivel de plata 1, que luego se escribirá como nivel de plata 2. Así que simplemente necesito leer los datos de SL1, aplicar el esquema manual y luego escribirlo en SL2.

Recibo el siguiente error cuando intento leer los datos de SL1:

java.lang.UnsupportedOperationException: DeltaDataSource source no admite esquemas especificados por el usuario.

¿Alguien sabe cómo proceder? A continuación, el código que utilicé para leer los archivos delta.

vbak = spark.read.format(read_format).schema(vbak_schema).load(load_path)

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Desafortunadamente, el mensaje de error sugiere que la fuente DeltaDataSource no admite un esquema especificado por el usuario, por lo que el método utilizado para leer los datos (spark.read.format(read_format).schema(vbak_schema).load(load_path)) no funcionará.

    Para proceder, es posible que deba considerar métodos alternativos para aplicar el esquema manual a los datos. Una opción podría ser aplicar el esquema durante el proceso ETL (Extract, Transform, Load) antes de que los datos se escriban en la capa de nivel 1 (silver level 1). Alternativamente, podría intentar crear una tabla Delta con el esquema manual y luego usar el comando MERGE INTO para escribir los datos de la capa de nivel 1 en la tabla Delta con el esquema especificado.

    Por ejemplo, podría intentar el siguiente código para crear una tabla Delta con el esquema manual:

    spark.sql("CREATE TABLE sl2 USING DELTA LOCATION '<ruta-a-sl2>' AS SELECT * FROM sl1 WHERE 1=0")
    spark.sql("ALTER TABLE sl2 ADD COLUMNS (<esquema-manual>)")
    

    Luego, podría utilizar el comando MERGE INTO para escribir los datos de la capa de nivel 1 en la tabla Delta con el esquema especificado:

    spark.sql("MERGE INTO sl2 t USING sl1 s ON <condicion-de-union> WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *")
    

    Tenga en cuenta que <condicion-de-union> debe especificar las columnas que se utilizan para unir las tablas, y * se utiliza para indicar que todas las columnas deben actualizarse o insertarse.

Comments are closed.