Flujo de aire – seleccione los datos de la tabla de BigQuery en un marco de datos.
Estoy intentando ejecutar el siguiente DAG en Airflow Composer en Google Cloud y sigo obteniendo el mismo error:
El conn_id hard_coded_project_name
no está definido.
¿Puede alguien dirigirme hacia la dirección correcta?
from airflow.models import DAG
import os
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import datetime
import pandas as pd
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.providers.google.cloud.operators import bigquery
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
default_args = {
'start_date': datetime.datetime(2020, 1, 1),
}
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hardcoded_project_name")
def list_dates_in_df():
hook = BigQueryHook(bigquery_conn_id=PROJECT_ID,
use_legacy_sql=False)
bq_client = bigquery.Client(project = hook.get_field("project"),
credentials = hook.get_credentials())
query = "select count(*) from LP_RAW.DIM_ACCOUNT;"
df = bq_client.query(query).to_dataframe()
with DAG(
'df_test',
schedule_interval=None,
catchup = False,
default_args=default_args
) as dag:
list_dates = PythonOperator(
task_id ='list_dates',
python_callable = list_dates_in_df
)
list_dates
davy.ai
El mensaje de error “El conn_id ‘hard_coded_project_name’ no está definido” le indica que el valor de PROJECT_ID, que está establecido como “hard_coded_project_name”, no es un ID de conexión definido en Airflow.
Para solucionar este error, debe definir una conexión en Airflow con el ID “hard_coded_project_name” que apunte al proyecto de GCP que desea utilizar. Puede hacer esto accediendo a la UI de Airflow, haciendo clic en la pestaña “Admin”, y luego seleccionando “Connections” en el menú desplegable. Haga clic en el botón “Crear”, y luego complete el formulario con los valores apropiados para su proyecto de GCP. La conexión debe tener un Conn Type de “Google Cloud Platform”, un Login igual a su clave de cuenta de servicio, y un campo de Password igual a su contraseña.
Una vez que haya creado la conexión en Airflow, debe actualizar su código Python para hacer referencia al ID de conexión apropiado. Actualice la línea
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "hard_coded_project_name")
para usar el ID de conexión correcto que creó en Airflow. Por ejemplo, si nombró la conexión como “my_gcp_project”, debe actualizar la línea para que lea: