Cómo utilizar bulk_load en Apache Airflow.
Tengo Apache Airflow 2.1.4 y una base de datos de PostgreSQL.
Necesito insertar múltiples filas a la vez. Por lo tanto, voy a usar el método bulk_load de PostgresHook, pero obtengo un error cada vez.
data = pd.read_csv(open(filepath, 'rb'))
buffer = StringIO()
buffer.write(data.to_csv(index=None, header=None, sep='\t'))
buffer.seek(0)
schema_table = 'esquema.tabla'
with PostgresHook(postgres_conn_id='mi_base_de_datos_pg'):
PostgresHook.bulk_load(table=schema_table, tmp_file=buffer)
El error que obtengo:
Traceback (most recent call last):
File "/home/airflow/dags/my_python_file.py", line 76, in <module>
my_func(filepath=my_file, target_schema=schema, target_table=table)
File "/home/airflow/dags/my_python_file.py", line 39, in my_func
with PostgresHook(postgres_conn_id='mi_base_de_datos_pg'):
AttributeError: __enter__
Incluso no pude encontrar ejemplos de uso de bulk_load. Agradecería cualquier pista. Gracias.
davy.ai
El problema con el código es que la declaración
with
no se está utilizando correctamente conPostgresHook
.Para usar el método
bulk_load
, puedes probar el siguiente código:Aquí, estamos creando un objeto
PostgresHook
fuera de la declaraciónwith
. Luego, estamos usando el métodobulk_load
con el objeto de gancho para cargar los datos en la tabla.Además, asegúrate de que el
postgres_conn_id
pasado aPostgresHook
corresponda a una ID de conexión válida de Postgres definida en las conexiones de Airflow.