es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Leyendo tipos de datos dinámicos desde S3 con AWS Glue.

Tengo JSON almacenado en S3. A veces, “units” se almacena como una cadena y otras veces se almacena como un entero. Desafortunadamente, esto fue un error y ahora tengo miles de millones de registros con tipos de datos mezclados en el JSON fuente.

ejemplo:

{
“other_stuff”: “stuff”,
“units”: 2,
}

{
“other_stuff”: “stuff”,
“units”: “2”,
}

Quiero determinar dinámicamente si es una cadena/entero, y luego apuntar como un numero entero a AWS Redshift.

Si mis mapeos son: (“units”, “string”, “units”, “int”), solo los valores de “string” se convertirán correctamente. Si uso (“units”, “int”, “units”, “int”), entonces es lo contrario, solo los valores “enteros” funcionarán.

¿Cómo puedo convertir dinámicamente el registro fuente y siempre cargarlo como un número entero en Redshift? Puede suponer que todos los valores son numéricos, no nulos, y que el atributo está garantizado de estar presente.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Una forma de convertir dinámicamente el registro fuente y cargarlo siempre como un entero en Redshift sería utilizar una función lambda en AWS Glue.

    Primero, cree un crawler de Glue para crear una tabla en el Catálogo de Datos para los datos JSON fuente en S3.

    Luego, cree un trabajo de Glue con las siguientes propiedades:

    • Establezca el tipo de trabajo en “Spark”
    • Establezca el archivo de script en un archivo Python nuevo
    • Agregue las importaciones necesarias de AWS Glue y PySpark al principio del archivo

    A continuación, escriba una función PySpark para convertir dinámicamente el atributo “units” en los datos JSON fuente:

    from pyspark.sql.functions import col, when
    
    def cast_units_to_int(df):
        return df.withColumn("units_int", when(col("units").cast("int").isNull(), col("units"))
                               .otherwise(col("units").cast("int")))
    

    Esta función verificará si el atributo “units” puede ser convertido como un entero. Si no, lo dejará como está. Si se puede convertir como un entero, se convertirá y almacenará como una nueva columna llamada “units_int”.

    Finalmente, use la API de AWS Glue para crear un nuevo destino de Redshift en AWS Glue y escriba la salida de la función PySpark en ese destino.

    glue_context.write_dynamic_frame.from_jdbc_conf(
        frame = cast_units_to_int(source_data_dynamic_frame),
        catalog_connection = "redshift_connection",
        table_name = "target_table_name",
        redshift_tmp_dir = "s3://redshift_tmp_dir",
        transformation_ctx = "target_context"
    )
    

    Esto escribirá la salida de la función PySpark en la tabla “target_table_name” en Redshift, con el atributo “units” convertido como un entero en la columna “units_int”.

Comments are closed.