Tag: AIRFLOW-API
Flujo de trabajo de Airflow: def print_hello(**kwargs): task_params = kwargs['dag_run'].conf['task_payload'] print('Hola mundo con {}'.format(task_params)) def hello_world(): args = { "start_date": datetime(2022, 5, 1), "retries": 1, "sla": timedelta(hours=3) } dag_name = 'hola_mundo_a' dag = dag_utils.create_dag(dag_name, args=args, schedule_interval=None) test = PythonOperator( task_id='impresora_hola_mundo', python_callable=print_hello, provide_context=True, dag=dag) with_done_check((test), date_format="{{ ds }}", run_mode=get_mode()) return dag . . . Read more
Estoy escribiendo código de Airflow en el que estoy ejecutando un operador de Python para leer la entrada del usuario proporcionada desde la CLI y quiero usar el resultado devuelto por ese operador en el código de Python de nivel superior para generar tareas dinámicas. Usando Xcom, podemos realizar una . . . Read more
Cuando llamamos a un DAG mediante REST, AirFlow intenta ejecutarlo, pero si hay una excepción, devuelve el rastro de la pila como cuerpo de la respuesta envuelto en HTTP 200 OK. ¿Cómo podemos cambiar esa característica para que, en caso de cualquier error de ejecución, envíe un error 400 en . . . Read more
Usando Airflow 2.0.2, estoy tratando de usar la API de Airflow para activar ejecuciones de DAG. Cuando ejecuto un simple GET como curl -X GET –user “fooUser:passw0rd” “${ENDPOINT_URL}/api/v1/pools” Obtengo los resultados esperados: { “pools”: [ { “name”: “default_pool”, “occupied_slots”: 0, “open_slots”: 128, “queued_slots”: 0, “running_slots”: 0, “slots”: 128 } ], . . . Read more