es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo especificar qué proyecto de GCP utilizar al activar un flujo de datos a través del operador Data Fusion en Cloud Composer.

Necesito activar un pipeline de Data Fusion ubicado en un proyecto de GCP llamado myDataFusionProject a través de un operador Data Fusion (CloudDataFusionStartPipelineOperator) dentro de un DAG cuya instancia de Cloud Composer se encuentra en otro proyecto llamado myCloudComposerProject.

He utilizado la documentación oficial y también el código fuente para escribir el código que se asemeja aproximadamente al siguiente fragmento:

LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    task_id="start_pipeline",
)

Mi problema es que cada vez que activo el DAG, Cloud Composer busca myDataFusionInstance dentro de myCloudComposerProject en lugar de myDataFusionProject, lo cual genera un error como este:

googleapiclient.errors.HttpError: <httperror 404="" when="" requesting="" https://datafusion.googleapis.com/v1beta1/projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance?alt="json" returned="" "resource="" 'projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance'="" was="" not="" found".="" details:="" "[{'@type':="" 'type.googleapis.com/google.rpc.resourceinfo',="" 'resourcename':="" 'projects/mycloudcomposerproject/locations/somelocation/instances/mydatafusioninstance'}]"="">

Entonces la pregunta es: ¿cómo puedo forzar a mi operador a usar el proyecto de Data Fusion en lugar del proyecto de Cloud Composer? Sospecho que puedo hacerlo agregando un nuevo argumento de tiempo de ejecución, pero no estoy seguro de cómo hacerlo.

Última información: el pipeline de Data Fusion simplemente extrae datos de una fuente de BigQuery y los envía a un destino de BigTable.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para forzar al operador CloudDataFusionStartPipelineOperator a utilizar el proyecto myDataFusionProject en lugar de myCloudComposerProject, debes incluir el ID del proyecto Data Fusion como parte del nombre de la instancia en el parámetro instance_name. Esto significa que debes cambiar la variable INSTANCE_NAME para incluir tanto el ID del proyecto como el nombre de la instancia de la siguiente manera:

    INSTANCE_NAME = "projects/myDataFusionProject/locations/<location>/instances/<instance_name>"

    Donde <location> es la ubicación de tu instancia de Data Fusion y <instance_name> es el nombre que le diste a tu instancia de Data Fusion.

    Entonces, suponiendo que la ubicación de tu instancia de Data Fusion es us-central1, la variable INSTANCE_NAME debería lucir así:

    INSTANCE_NAME = "projects/myDataFusionProject/locations/us-central1/instances/myDataFusionInstance"

    Luego, necesitas pasar esta variable INSTANCE_NAME al operador CloudDataFusionStartPipelineOperator de la siguiente manera:

    start_pipeline = CloudDataFusionStartPipelineOperator(
        location=LOCATION,
        pipeline_name=PIPELINE_NAME,
        instance_name=INSTANCE_NAME,
        runtime_args=RUNTIME_ARGS,
        task_id="start_pipeline",
    )
    

    Con este cambio, Airflow buscará la instancia de Data Fusion en el proyecto myDataFusionProject en lugar de myCloudComposerProject.

Comments are closed.