es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

AWS: escritura desde un Pandas dataframe a DynamoDB

Tengo un trabajo de AWS Glue escrito en Python. Dentro de él hay un gran dataframe de Pandas, y los datos contenidos en él deben ser escritos en DynamoDB.

Actualmente estoy utilizando la funcionalidad “write_dynamic_frame” de Glue para lograr esto, ya que maneja problemas como la generación de errores “500 SlowDown” que pueden ocurrir a veces al escribir grandes cantidades de datos en un corto período de tiempo.

Funciona, pero la escritura real de datos en la base de datos es bastante lenta (más de 2 minutos para escribir 1,000 registros).

Mi proceso actual se ve así:

my_df = {popule el dataframe de Pandas...}
table_name = "nombre_de_mi_tabla_dynamodb"

# Pandas -> Spark -> DynamicFrame
spark_df: DataFrame = spark.createDataFrame(my_df)
result_df: DynamicFrame = DynamicFrame.fromDF(spark_df, glue_context, "result_df")

num_partitions: int = result_df.toDF().rdd.getNumPartitions()

glue_context.write_dynamic_frame.from_options(
    frame=result_df,
    connection_type="dynamodb",
    connection_options={
        "dynamodb.output.tableName": table_name,
        "dynamodb.throughput.write.percent": "1.5",
        "dynamodb.output.retry": "30"
    }
)

¿Existe algún mecanismo para la escritura en lotes de datos en DynamoDB? Tengo más de un millón de registros que necesito escribir.

Gracias por cualquier ayuda.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, existe un mecanismo para escribir datos en lotes en DynamoDB que puede ayudar a mejorar la velocidad de escritura de datos. Una opción es dividir los datos en lotes más pequeños y escribir cada lote en la base de datos por separado. El SDK de AWS para Python (Boto3) proporciona una función llamada batch_write_item() que se puede utilizar para escribir varios elementos en DynamoDB en una sola solicitud. Aquí tienes un ejemplo de cómo modificar tu código para utilizar esta función:

    import boto3
    from boto3.dynamodb.batch_write import BatchWriter
    
    mi_df = {llenar el dataframe de Pandas...}
    nombre_tabla = "nombre_mi_tabla_dynamodb"
    tamaño_lote = 500  # ajustar según necesidad
    
    # Pandas -> Spark -> DynamicFrame
    spark_df = spark.createDataFrame(mi_df)
    result_df = DynamicFrame.fromDF(spark_df, glue_context, "result_df")
    
    # Dividir los datos en lotes más pequeños
    datos = result_df.toDF().toPandas().to_dict('records')
    lotes = [datos[i:i+tamaño_lote] for i in range(0, len(datos), tamaño_lote)]
    
    # Conectar a DynamoDB
    dynamodb = boto3.resource('dynamodb')
    tabla = dynamodb.Table(nombre_tabla)
    
    # Escribir cada lote en la base de datos
    with BatchWriter(tabla) as lote:
        for datos_lote in lotes:
            for elemento in datos_lote:
                lote.put_item(Item=elemento)
    

    Este código dividirá los datos en lotes más pequeños de 500 registros cada uno (puedes ajustar el tamaño del lote según necesites) y escribirá cada lote en DynamoDB utilizando la función batch_write_item(). Esto debería ayudar a mejorar la velocidad de escritura de datos en la base de datos.

Comments are closed.