Desencadenar la tarea de flujo de aire al agotamiento del sensor.
Actualmente tengo un PythonSensor
que espera archivos en un servidor ftp. ¿Es posible que este sensor active una tarea en caso de que ocurra un tiempo de espera? Estoy intentando crear el siguiente dag:
He revisado BranchPythonOperator
pero parece que ya no obtengo los beneficios de programar una tarea si falla la primera vez.
davy.ai
Sí, es posible activar una tarea cuando se agota el tiempo utilizando un
PythonSensor
. Puede especificar un parámetrotimeout
en el constructor dePythonSensor
que esperará por una cierta cantidad de tiempo antes de considerar el sensor como fallido. Una vez que el sensor ha fallado, puede usar unTriggerDagRunOperator
para activar una nueva ejecución de DAG con un conjunto de tareas que se activan después de que se agota el tiempo.Aquí hay un ejemplo de cómo puede modificar su DAG para lograr esto:
En este ejemplo,
wait_for_file
es unPythonSensor
que espera archivos en el servidor FTP durante un máximo de 1 hora (3600 segundos). Si el sensor agota el tiempo, fallará y activarátrigger_task
, que es un marcador de posición para su tarea real que se activará cuando se agote el tiempo. Finalmente,trigger_new_dag_run
es unTriggerDagRunOperator
que activa una nueva ejecución de DAG demy_new_dag
, que contiene un conjunto de tareas que se activan cuando se acaba el tiempo.¡Espero que esto ayude! Háganos saber si tiene alguna otra pregunta.