es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Airflow 2.2.2: error en plantilla, conn no definido.

Estoy utilizando Airflow 2.2.2 con los proveedores más recientes instalados según corresponda.

Estoy tratando de usar los hooks de Azure y MySQL y he creado operadores personalizados con plantillas definidas para las variables que se pueden usar en la plantilla.

Cuando lo hago, obtengo un error que dice que no se puede encontrar “conn” o “var”.

por ejemplo, mi parámetro pasado es:

{{ conn. }}

o

{{ var.json.value. }}

Creo que esto debería ser posible en la versión 2.0, pero no está funcionando para mí. ¿Alguna idea de por qué?

EDIT: A continuación se muestran fragmentos de código con información sensible eliminada. Avíseme si se necesita algo más.

Error de DAG:

DAG roto: [/home/dags/dag.py] Traceback (most recent call last):
  File , line 219, in _call_with_frames_removed
  File "/home/dags/dag.py", line 52, in 
    wasb_conn_id = {{ conn.wasb }},
NameError: el nombre 'conn' no está definido

Tarea en dag.py:

    t1 = WasbLogBlobsToCSVOperator(
        task_id='task_xyz',
        wasb_conn_id = {{ conn.wasb }},

Operador personalizado que utiliza una versión extendida del hook Microsoft Azure wasb, utilizado por dag.py:

class WasbLogBlobsToCSVOperator(BaseOperator):        

    template_fields = (
        'wasb_conn_id',
        )

    def __init__(
        self,
        *,
        wasb_conn_id: str = 'wasb',
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.wasb_conn_id = wasb_conn_id
        self.hook = ExtendedWasbHook(wasb_conn_id=self.wasb_conn_id)

Tags:  ,

Answer

  1. Avatar for davy.ai

    Parece que el problema que estás enfrentando se debe a que estás intentando usar la sintaxis de plantillas Jinja2 dentro de la definición de la clase en sí, lo cual no está permitido. Las plantillas Jinja2 solo deben usarse dentro del contexto de una DAG o la definición de una tarea en Airflow, y no dentro de la definición de clases de un operador personalizado.

    Para poder usar el parámetro wasb_conn_id dentro de tu operador personalizado, debes aceptarlo como un parámetro de tipo cadena regular, sin la sintaxis de plantillas Jinja2. En el método __init__ de tu operador, puedes entonces usar el valor pasado para crear una instancia de ExtendedWasbHook usando el ID de conexión adecuado:

    class WasbLogBlobsToCSVOperator(BaseOperator):
    
        template_fields = (
            'wasb_conn_id',
        )
    
        def __init__(
            self,
            *,
            wasb_conn_id: str = 'wasb',
            **kwargs,
        ) -> None:
            super().__init__(**kwargs)
            self.wasb_conn_id = wasb_conn_id
            self.hook = ExtendedWasbHook(wasb_conn_id=self.wasb_conn_id)
    

    En la definición de tu DAG, puedes usar la sintaxis de plantillas Jinja2 para pasar el ID de conexión real o el nombre de la variable de la siguiente manera:

    t1 = WasbLogBlobsToCSVOperator(
        task_id='task_xyz',
        wasb_conn_id="{{ conn_id_variable }}",
        ...
    )
    

    Ten en cuenta que conn_id_variable debe ser una variable de cadena válida o un ID de conexión, sin las llaves o la notación de punto.

    Espero que esto te ayude. Avísame si tienes alguna otra pregunta.

Comments are closed.