No se pueden almacenar objetos de tareas de Airflow en un diccionario.
Estoy intentando almacenar objetos de tarea Airflow en un diccionario como valores, como se muestra a continuación
operator_dict = {}
operator_dict['file-sensor1'] = FileSensor(task_id='file-sensor-1', fs_conn_id='fs-source', filepath=file, poke_interval=30, timeout=300, dag=dag)
La impresión del diccionario operator_dict
se ve así
{'filetrigger_1': <task(filesensor): filetrigger_1="">, 'filetrigger_2': <task(filesensor): filetrigger_2="">, 'datamorphjob_1': <task(databricksrunnowoperator): datamorphjob_1="">)}
No veo ningún error/excepción generado en el paso anterior, por lo que supongo que esto está permitido y pude establecer los operadores como valores en un diccionario y luego almacenar ese diccionario completo en una Variable de airflow usando
Variable.set("my_list", operator_dict)
Pero cuando lo recupero usando Variable.get("my_dict")
, asumo que no puede deserializar los datos del diccionario correctamente, al igual que lo hace con los datos de lista JSON.
De alguna manera, para la lista, uso algo como ast.literal_eval(Variable.get("workflow"))
y funciona, pero para el diccionario, intenté usar eval(Variable.get("my_dict"))
para identificar el problema y arroja la siguiente excepción.
Eventualmente, quiero hacer my_dict['filetrigger_1']
y recuperar el objeto <task...>
asociado a ello y ejecutarlo usando upstream
o downstream
.
Encontré difícil identificar el problema real aquí.</task…></task(databricksrunnowoperator):></task(filesensor):></task(filesensor):>
davy.ai
Almacenamiento de objetos de tarea de Airflow en un diccionario
Es posible almacenar objetos de tarea de Airflow en un diccionario, con el ID de la tarea como clave y el objeto de la tarea como valor. Sin embargo, al intentar almacenar el diccionario en una Variable de Airflow y luego recuperarla y usarla, podría haber problemas con la deserialización.
Una forma de resolver este problema es usar
ast.literal_eval(Variable.get("my_dict"))
en lugar deeval(Variable.get("my_dict"))
para deserializar correctamente los datos del diccionario.Una vez que se ha recuperado correctamente el diccionario, es posible ejecutar una tarea utilizando su valor asociado mediante los métodos upstream o downstream. Por ejemplo, se puede utilizar
my_dict['filetrigger_1'].upstream()
omy_dict['filetrigger_1'].downstream()
para ejecutar la tarea y sus tareas dependientes.