Airflow 1.x TriggerDagRunOperator pasa {{ds}} como conf.
En Airflow 1.x (no en 2.x), quiero que DAG1 active DAG2 y pasar la fecha con plantilla de DAG1 {{ds}} como un parámetro de diccionario ‘conf’ como { “day” : “{{ds}}” } a DAG2, para que DAG2 pueda acceder a ella a través de {{dag_run.conf[‘day’]}}.
Pero DAG1 termina pasando la cadena literal ‘{{ds}}’ en lugar de ‘2021-12-03’.
DAG2 utiliza un SSHOperator, no un PythonOperator (para el cual parece existir una solución).
DAG 1:
from airflow.operators import TriggerDagRunOperator
def fn_day(context, dagrun_order):
dagrun_order.payload = {"day": "{{ds}}"}
return dagrun_order
trig = TriggerDagRunOperator(
trigger_dag_id="ssh",
task_id='trig',
python_callable=fn_day,
dag=dag)
DAG 2:
from airflow.contrib.operators.ssh_operator import SSHOperator
ssh = SSHOperator(
ssh_conn_id='ssh_vm',
task_id='echo',
command="echo {{dag_run.conf['day']}}"
)
davy.ai
Para pasar la fecha de plantilla
{{ds}}
como un parámetro de diccionario ‘conf’ desde DAG1 a DAG2 en Airflow 1.x, debes modificar la funciónfn_day
de la siguiente manera:Esto extraerá la fecha de plantilla
{{ds}}
del contexto y la pasará a DAG2 como payload en el parámetro de diccionario ‘conf’.Luego, en DAG2, puedes acceder a la variable
day
utilizando el diccionariodag_run.conf
de la siguiente manera:Esto imprimirá el valor de
day
en el SSHOperator.