Cómo especificar qué proyecto de GCP utilizar al activar un flujo de datos a través del operador Data Fusion en Cloud Composer.
Necesito activar un pipeline de Data Fusion ubicado en un proyecto de GCP llamado myDataFusionProject
a través de un operador Data Fusion (CloudDataFusionStartPipelineOperator
) dentro de un DAG cuya instancia de Cloud Composer se encuentra en otro proyecto llamado myCloudComposerProject
.
He utilizado la documentación oficial y también el código fuente para escribir el código que se asemeja aproximadamente al siguiente fragmento:
LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
task_id="start_pipeline",
)
Mi problema es que cada vez que activo el DAG, Cloud Composer busca myDataFusionInstance
dentro de myCloudComposerProject
en lugar de myDataFusionProject
, lo cual genera un error como este:
googleapiclient.errors.HttpError: <httperror 404="" when="" requesting="" https://datafusion.googleapis.com/v1beta1/projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance?alt="json" returned="" "resource="" 'projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance'="" was="" not="" found".="" details:="" "[{'@type':="" 'type.googleapis.com/google.rpc.resourceinfo',="" 'resourcename':="" 'projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance'}]"="">
Entonces la pregunta es: ¿cómo puedo forzar a mi operador a usar el proyecto de Data Fusion en lugar del proyecto de Cloud Composer? Sospecho que puedo hacerlo agregando un nuevo argumento de tiempo de ejecución, pero no estoy seguro de cómo hacerlo.
Última información: el pipeline de Data Fusion simplemente extrae datos de una fuente de BigQuery y los envía a un destino de BigTable.
davy.ai
Para forzar al operador
CloudDataFusionStartPipelineOperator
a utilizar el proyectomyDataFusionProject
en lugar demyCloudComposerProject
, debes incluir el ID del proyecto Data Fusion como parte del nombre de la instancia en el parámetroinstance_name
. Esto significa que debes cambiar la variableINSTANCE_NAME
para incluir tanto el ID del proyecto como el nombre de la instancia de la siguiente manera:INSTANCE_NAME = "projects/myDataFusionProject/locations/<location>/instances/<instance_name>"
Donde
<location>
es la ubicación de tu instancia de Data Fusion y<instance_name>
es el nombre que le diste a tu instancia de Data Fusion.Entonces, suponiendo que la ubicación de tu instancia de Data Fusion es
us-central1
, la variableINSTANCE_NAME
debería lucir así:INSTANCE_NAME = "projects/myDataFusionProject/locations/us-central1/instances/myDataFusionInstance"
Luego, necesitas pasar esta variable
INSTANCE_NAME
al operadorCloudDataFusionStartPipelineOperator
de la siguiente manera:Con este cambio, Airflow buscará la instancia de Data Fusion en el proyecto
myDataFusionProject
en lugar demyCloudComposerProject
.