es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo ejecutar contenedores de Cloud Run en un DAG de Airflow?

Estoy tratando de ejecutar un contenedor con Cloud Run como una tarea de una DAG de Airflow.

Parece que no hay cosas como un CloudRunOperator o similar y no puedo encontrar nada en la documentación (tanto de Cloud Run como de Airflow).

¿Alguien ha lidiado alguna vez con este problema?
Si es así, ¿cómo puedo ejecutar un contenedor con Cloud Run y manejar xcom?

¡Gracias de antemano!

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Ejecución de un contenedor con Cloud Run como tarea en una DAG de Airflow

    Si estás intentando ejecutar un contenedor con Cloud Run como tarea de una DAG de Airflow, es posible que te enfrentes a algunos desafíos, ya que Airflow no cuenta con un operador integrado llamado CloudRunOperator. Sin embargo, puedes utilizar el operador BashOperator para ejecutar un comando de interfaz de línea de comandos (CLI) de Cloud Run y desplegar tu contenedor.

    Para hacer esto, primero necesitas instalar el Cloud SDK y autenticarte con tu proyecto de Google Cloud Platform (GCP). Luego, puedes utilizar el comando gcloud run deploy para desplegar tu contenedor con las configuraciones requeridas.

    Para manejar xcom, puedes utilizar el BashOperator para exportar la salida de tu tarea de Cloud Run a xcom. Por ejemplo, puedes agregar la bandera –format=json a tu comando gcloud run deploy y utilizar jq para extraer la salida deseada y exportarla a xcom.

    Aquí tienes un ejemplo de DAG que ejecuta un contenedor con Cloud Run y exporta la salida a xcom:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2021, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'cloud_run_dag',
        default_args=default_args,
        schedule_interval=timedelta(days=1),
        catchup=False,
    )
    
    deploy_task = BashOperator(
        task_id='deploy_container',
        bash_command='gcloud run deploy <service_name> --image=<image_name> --platform=managed --region=<region> --format=json | jq \'{output: .status.url}\' | xargs echo',
        dag=dag,
    )
    
    export_task = BashOperator(
        task_id='export_output',
        bash_command='echo {{ ti.xcom_pull(task_ids="deploy_container", key="output") }}',
        dag=dag,
    )
    

    En este ejemplo, el comando gcloud run deploy despliega un contenedor con el nombre de servicio, nombre de imagen y región especificados. La salida del comando se redirige a jq para extraer la URL de estado, que luego se exporta a xcom utilizando xargs.

    La tarea export_task utiliza el BashOperator para mostrar la salida de la tarea deploy_task desde xcom.

    Ten en cuenta que este es solo un ejemplo y es posible que debas modificarlo para adaptarlo a tu caso de uso específico.

    Espero que esto te ayude. Déjame saber si tienes alguna pregunta adicional.

Comments are closed.