Utilice Smart Sensors y aún así obtenga variables de contexto.
Estoy en Airflow 2.1.4 y estoy tratando de modificar un sensor personalizado para que actúe como un “Sensor Inteligente”.
Entre otras cosas, para permitir que un sensor personalizado funcione como un Sensor Inteligente, necesitas proporcionarle una variable de clase “poke_context_fields”. Esto no está muy bien documentado, pero creo que es simplemente una lista de los argumentos de “init” que también deseas pasar a “self.poke()” cuando es llamado por el DAG/Fragmento del Sensor Inteligente (aunque podría estar equivocado).
Entonces lo tengo así:
poke_context_fields = [‘myarg1’, ‘myarg2’]
He probado esto, pero parece que hay un problema: cuando el DAG del Sensor Inteligente llama a “self.poke()”, reenvía esos argumentos como se espera, pero NO me proporciona la variable “context” regular que mi método “poke” espera. Desafortunadamente, mi código no funcionará sin acceso a esa variable, porque necesita ciertas propiedades como “context[‘ds’]”, “context[‘task_instance’]”, etc., que solo estarían disponibles en tiempo de ejecución y no cuando Python analiza la variable de clase.
He leído lo siguiente (https://github.com/apache/airflow/issues/11893), pero no lo comprendo al 100%. ¿Hay alguna solución alternativa para esto o debería concluir que no puedo usar Sensores Inteligentes y debería esperar para usar Operadores Diferibles que se lanzó en 2.2.0?
davy.ai
Según la documentación y tu comprensión,
poke_context_fields
es una variable de clase que permite que el sensor personalizado funcione como un Smart Sensor al enviar argumentos adicionales aself.poke()
cuando se llama. Sin embargo, parece que cuando el DAG/Shard del Smart Sensor llama aself.poke()
, la variablecontext
regular no se pasa junto con ella. Sin esta variable, ciertas propiedades comocontext['ds']
ycontext['task_instance']
requeridas por tu código pueden no estar disponibles, lo que hace que no sea utilizable como un Smart Sensor.El enlace que proporcionaste (https://github.com/apache/airflow/issues/11893) indica que
poke_context_fields
ha sido eliminado en Airflow 2.2.0 a favor depoke_context_kwargs
. Esta nueva variable no sufre el mismo problema quepoke_context_fields
y te permite pasar cualquier argumento de palabra clave que desees para que se retransmita aself.poke()
. Por lo tanto, actualizar a Airflow 2.2.0 y usarpoke_context_kwargs
en lugar depoke_context_fields
puede resolver tu problema.Alternativamente, podrías esperar a que se lancen los Operadores Diferibles y usarlos en lugar de los Smart Sensors. Los Operadores Diferibles te permiten especificar un intervalo de retraso antes de que se ejecute un operador, lo cual puede ser útil en ciertos casos donde la programación no es exacta o cuando deseas retrasar el inicio de un operador hasta que se cumpla una determinada condición.
En general, si no es factible actualizar a Airflow 2.2.0 y usar
poke_context_kwargs
, entonces esperar a que se lancen los Operadores Diferibles puede ser la mejor solución.