es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo puedo programar mi DAG para que se ejecute cada 5 minutos?

Estoy utilizando GCP Cloud Composer y tengo una instancia de Airflow y un DAG.

Los argumentos predeterminados del DAG son los siguientes:

# Define the default arguments for the DAG.
default_args = {
    # If the start date is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG
    # uploads.
    "start_date": datetime.datetime(2021, 12, 2) #ayer
    ,"owner": "foobar"
    ,"schedule_interval": "*/5 * * * *"
    ,"tags": ["create_empty_dataset", "BigQuery"]
}

El DAG se está ejecutando correctamente, pero el horario (cada 5 minutos) no lo está. Intenté usar datetime.timedelta(minutes=5) pero no funcionó. Solo se ejecuta una vez y eso es todo. Esto es lo que veo:

DAG

¿Algún idea? Puedo editar y compartir el código del DAG si es necesario. Es simple.

¡Gracias!

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Parece que el schedule_interval establecido en los argumentos por defecto del DAG no está funcionando como se esperaba. Una posible solución sería verificar el parámetro catchup en el DAG para ver si está establecido en False. Si está establecido en True, entonces Airflow rellenará todas las ejecuciones programadas que se hayan perdido y, por lo tanto, es posible que no se ejecute con la frecuencia deseada.

    Para resolver este problema, podemos establecer el parámetro catchup en False. Esto garantizará que Airflow ejecute el DAG en el intervalo programado a partir de la start_date especificada en los argumentos por defecto.

    Así es cómo se establece el parámetro catchup en False en el DAG:

    # Importar el módulo DAG
    from airflow import DAG
    
    # Importar los operadores requeridos
    from airflow.operators import BashOperator
    from datetime import datetime
    
    # Definir los argumentos por defecto para el DAG.
    default_args = {
        "start_date": datetime(2021, 12, 2), #ayer
        "owner": "foobar",
        "schedule_interval": "*/5 * * * *",
        "tags": ["create_empty_dataset", "BigQuery"],
        "catchup": False     # Establecer el parámetro catchup en False
    }
    
    # Definir el DAG
    dag = DAG(
        "my_dag",
        default_args=default_args,
        catchup=False    # Establecer el parámetro catchup en False también en la definición del DAG
    )
    
    # Definir las tareas para el DAG
    task1 = BashOperator(task_id='task1', bash_command='echo "Hola Mundo"', dag=dag)
    
    

    Además de establecer el parámetro catchup en False, también es importante asegurarse de que la start_date esté establecida en una fecha anterior a la hora actual.

    ¡Espero que esto ayude a resolver el problema con el DAG que no se ejecuta en el horario deseado! Avísame si necesitas más ayuda.

Comments are closed.