es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

No se pueden almacenar objetos de tareas de Airflow en un diccionario.

Estoy intentando almacenar objetos de tarea Airflow en un diccionario como valores, como se muestra a continuación

operator_dict = {}

operator_dict['file-sensor1'] = FileSensor(task_id='file-sensor-1', fs_conn_id='fs-source', filepath=file, poke_interval=30, timeout=300, dag=dag) 

La impresión del diccionario operator_dict se ve así

{'filetrigger_1': <task(filesensor): filetrigger_1="">, 'filetrigger_2': <task(filesensor): filetrigger_2="">, 'datamorphjob_1': <task(databricksrunnowoperator): datamorphjob_1="">)}

No veo ningún error/excepción generado en el paso anterior, por lo que supongo que esto está permitido y pude establecer los operadores como valores en un diccionario y luego almacenar ese diccionario completo en una Variable de airflow usando

Variable.set("my_list", operator_dict)

Pero cuando lo recupero usando Variable.get("my_dict"), asumo que no puede deserializar los datos del diccionario correctamente, al igual que lo hace con los datos de lista JSON.

De alguna manera, para la lista, uso algo como ast.literal_eval(Variable.get("workflow")) y funciona, pero para el diccionario, intenté usar eval(Variable.get("my_dict")) para identificar el problema y arroja la siguiente excepción.

enter image description here

Eventualmente, quiero hacer my_dict['filetrigger_1'] y recuperar el objeto <task...> asociado a ello y ejecutarlo usando upstream o downstream.

Encontré difícil identificar el problema real aquí.</task…></task(databricksrunnowoperator):></task(filesensor):></task(filesensor):>

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Almacenamiento de objetos de tarea de Airflow en un diccionario

    Es posible almacenar objetos de tarea de Airflow en un diccionario, con el ID de la tarea como clave y el objeto de la tarea como valor. Sin embargo, al intentar almacenar el diccionario en una Variable de Airflow y luego recuperarla y usarla, podría haber problemas con la deserialización.

    Una forma de resolver este problema es usar ast.literal_eval(Variable.get("my_dict")) en lugar de eval(Variable.get("my_dict")) para deserializar correctamente los datos del diccionario.

    Una vez que se ha recuperado correctamente el diccionario, es posible ejecutar una tarea utilizando su valor asociado mediante los métodos upstream o downstream. Por ejemplo, se puede utilizar my_dict['filetrigger_1'].upstream() o my_dict['filetrigger_1'].downstream() para ejecutar la tarea y sus tareas dependientes.

Comments are closed.