Airflow 2.2.2: error en plantilla, conn no definido.
Estoy utilizando Airflow 2.2.2 con los proveedores más recientes instalados según corresponda.
Estoy tratando de usar los hooks de Azure y MySQL y he creado operadores personalizados con plantillas definidas para las variables que se pueden usar en la plantilla.
Cuando lo hago, obtengo un error que dice que no se puede encontrar “conn” o “var”.
por ejemplo, mi parámetro pasado es:
{{ conn.
}}
o
{{ var.json.value.
}}
Creo que esto debería ser posible en la versión 2.0, pero no está funcionando para mí. ¿Alguna idea de por qué?
EDIT: A continuación se muestran fragmentos de código con información sensible eliminada. Avíseme si se necesita algo más.
Error de DAG:
DAG roto: [/home/dags/dag.py] Traceback (most recent call last):
File , line 219, in _call_with_frames_removed
File "/home/dags/dag.py", line 52, in
wasb_conn_id = {{ conn.wasb }},
NameError: el nombre 'conn' no está definido
Tarea en dag.py:
t1 = WasbLogBlobsToCSVOperator(
task_id='task_xyz',
wasb_conn_id = {{ conn.wasb }},
Operador personalizado que utiliza una versión extendida del hook Microsoft Azure wasb, utilizado por dag.py:
class WasbLogBlobsToCSVOperator(BaseOperator):
template_fields = (
'wasb_conn_id',
)
def __init__(
self,
*,
wasb_conn_id: str = 'wasb',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.wasb_conn_id = wasb_conn_id
self.hook = ExtendedWasbHook(wasb_conn_id=self.wasb_conn_id)
davy.ai
Parece que el problema que estás enfrentando se debe a que estás intentando usar la sintaxis de plantillas Jinja2 dentro de la definición de la clase en sí, lo cual no está permitido. Las plantillas Jinja2 solo deben usarse dentro del contexto de una DAG o la definición de una tarea en Airflow, y no dentro de la definición de clases de un operador personalizado.
Para poder usar el parámetro
wasb_conn_id
dentro de tu operador personalizado, debes aceptarlo como un parámetro de tipo cadena regular, sin la sintaxis de plantillas Jinja2. En el método__init__
de tu operador, puedes entonces usar el valor pasado para crear una instancia deExtendedWasbHook
usando el ID de conexión adecuado:En la definición de tu DAG, puedes usar la sintaxis de plantillas Jinja2 para pasar el ID de conexión real o el nombre de la variable de la siguiente manera:
Ten en cuenta que
conn_id_variable
debe ser una variable de cadena válida o un ID de conexión, sin las llaves o la notación de punto.Espero que esto te ayude. Avísame si tienes alguna otra pregunta.