es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo utilizar bulk_load en Apache Airflow.

Tengo Apache Airflow 2.1.4 y una base de datos de PostgreSQL.
Necesito insertar múltiples filas a la vez. Por lo tanto, voy a usar el método bulk_load de PostgresHook, pero obtengo un error cada vez.

data = pd.read_csv(open(filepath, 'rb'))
buffer = StringIO()
buffer.write(data.to_csv(index=None, header=None, sep='\t'))
buffer.seek(0)
schema_table = 'esquema.tabla'
with PostgresHook(postgres_conn_id='mi_base_de_datos_pg'):
    PostgresHook.bulk_load(table=schema_table, tmp_file=buffer)

El error que obtengo:

Traceback (most recent call last):
File "/home/airflow/dags/my_python_file.py", line 76, in <module>
my_func(filepath=my_file, target_schema=schema, target_table=table)
File "/home/airflow/dags/my_python_file.py", line 39, in my_func
with PostgresHook(postgres_conn_id='mi_base_de_datos_pg'):
AttributeError: __enter__

Incluso no pude encontrar ejemplos de uso de bulk_load. Agradecería cualquier pista. Gracias.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El problema con el código es que la declaración with no se está utilizando correctamente con PostgresHook.
    Para usar el método bulk_load, puedes probar el siguiente código:

    from airflow.providers.postgres.hooks.postgres import PostgresHook
    import pandas as pd
    from io import StringIO
    
    data = pd.read_csv(open(filepath, 'rb'))
    buffer = StringIO()
    buffer.write(data.to_csv(index=None, header=None, sep='\t'))
    buffer.seek(0)
    schema_table = 'schema.table'
    
    hook = PostgresHook(postgres_conn_id='my_pg_database')
    hook.bulk_load(table=schema_table, tmp_file=buffer)
    

    Aquí, estamos creando un objeto PostgresHook fuera de la declaración with. Luego, estamos usando el método bulk_load con el objeto de gancho para cargar los datos en la tabla.

    Además, asegúrate de que el postgres_conn_id pasado a PostgresHook corresponda a una ID de conexión válida de Postgres definida en las conexiones de Airflow.

Comments are closed.