es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Airflow 1.x TriggerDagRunOperator pasa {{ds}} como conf.

En Airflow 1.x (no en 2.x), quiero que DAG1 active DAG2 y pasar la fecha con plantilla de DAG1 {{ds}} como un parámetro de diccionario ‘conf’ como { “day” : “{{ds}}” } a DAG2, para que DAG2 pueda acceder a ella a través de {{dag_run.conf[‘day’]}}.

Pero DAG1 termina pasando la cadena literal ‘{{ds}}’ en lugar de ‘2021-12-03’.

DAG2 utiliza un SSHOperator, no un PythonOperator (para el cual parece existir una solución).

DAG 1:

from airflow.operators import TriggerDagRunOperator

def fn_day(context, dagrun_order):
    dagrun_order.payload = {"day": "{{ds}}"}
    return dagrun_order

trig = TriggerDagRunOperator(
        trigger_dag_id="ssh",
        task_id='trig',
        python_callable=fn_day,
        dag=dag)

DAG 2:

from airflow.contrib.operators.ssh_operator import SSHOperator

ssh = SSHOperator(
    ssh_conn_id='ssh_vm',
    task_id='echo',
    command="echo {{dag_run.conf['day']}}"
)
Tags:  ,

Answer

  1. Avatar for davy.ai

    Para pasar la fecha de plantilla {{ds}} como un parámetro de diccionario ‘conf’ desde DAG1 a DAG2 en Airflow 1.x, debes modificar la función fn_day de la siguiente manera:

    def fn_day(context, dagrun_order):
       day = context.get('ds')
       dagrun_order.payload = {"day": day }
       return dagrun_order
    

    Esto extraerá la fecha de plantilla {{ds}} del contexto y la pasará a DAG2 como payload en el parámetro de diccionario ‘conf’.

    Luego, en DAG2, puedes acceder a la variable day utilizando el diccionario dag_run.conf de la siguiente manera:

    ssh = SSHOperator(
       ssh_conn_id='ssh_vm',
       task_id='echo',
       command="echo {{dag_run.conf['day']}}",
    )
    

    Esto imprimirá el valor de day en el SSHOperator.

Comments are closed.