es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo comparar dos esquemas en Databricks notebook en Python.

Voy a ingresar datos utilizando un cuaderno de Databricks. Quiero validar el esquema de los datos ingresados contra lo que espero que sea el esquema de estos datos.

Entonces básicamente tengo:

    validation_schema = StructType([
      StructField("a", StringType(), True),
      StructField("b", IntegerType(), False),
      StructField("c", StringType(), False),
      StructField("d", StringType(), False)
    ])

    data_ingested_good = [("foo",1,"blabla","36636"),
     ("foo",2,"booboo","40288"),
     ("bar",3,"fafa","42114"),
     ("bar",4,"jojo","39192"),
     ("baz",5,"jiji","32432")
    ]

    data_ingested_bad = [("foo","1","blabla","36636"),
     ("foo","2","booboo","40288"),
     ("bar","3","fafa","42114"),
     ("bar","4","jojo","39192"),
     ("baz","5","jiji","32432")
    ]

    data_ingested_good.printSchema()
    data_ingested_bad.printSchema()
    validation_schema.printSchema()

He visto preguntas similares pero las respuestas siempre están en Scala.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para validar el esquema de los datos ingestados con respecto al esquema esperado utilizando el cuaderno de Databricks y Python, se pueden seguir los siguientes pasos:

    1. Importar las librerías de Spark necesarias:
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    from pyspark.sql.functions import col
    
    1. Definir el esquema esperado utilizando las clases StructType y StructField:
    validation_schema = StructType([
      StructField("a", StringType(), True),
      StructField("b", IntegerType(), False),
      StructField("c", StringType(), False),
      StructField("d", StringType(), False)
    ])
    
    1. Convertir los datos ingestados (en este caso, las listas data_ingested_good y data_ingested_bad) en un DataFrame de Spark:
    df_good = spark.createDataFrame(data_ingested_good, validation_schema)
    df_bad = spark.createDataFrame(data_ingested_bad, validation_schema)
    
    1. Imprimir el esquema de los dos DataFrames para compararlos con el esquema esperado:
    df_good.printSchema()
    df_bad.printSchema()
    
    1. Verificar la validez de los DataFrames con respecto al esquema esperado utilizando la función validate:
    def validate(df, schema):
        try:
            # Verificar si todas las columnas están presentes y son del tipo correcto
            for field in schema.fields:
                if field.name not in df.columns or df.schema[field.name].dataType != field.dataType:
                    return False
            # Verificar si hay columnas adicionales en el DataFrame que no están en el esquema
            for col_name in df.columns:
                if col_name not in [field.name for field in schema.fields]:
                    return False
        except:
            return False
        return True
    
    print(validate(df_good, validation_schema)) # devuelve True
    print(validate(df_bad, validation_schema)) # devuelve False
    

    La función validate verifica si todas las columnas en el DataFrame están presentes y son del tipo de dato correcto según se especifica en el argumento schema. También verifica si hay columnas adicionales en el DataFrame que no están en el esquema. La función devuelve True si el DataFrame es válido con respecto al esquema, y False en caso contrario.

    Nota: Es importante importar las librerías de Spark necesarias y utilizar la función col para asegurar que el código se ejecute sin errores.

    Este código se puede ejecutar directamente en un cuaderno de Databricks utilizando el tiempo de ejecución de Databricks para Apache Spark.

Comments are closed.