es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo puedo enviar los registros dbt desde Airflow hasta el propietario correspondiente?

Estoy ejecutando dbt (v0.19.0) en Apache Airflow (v2.0.2) utilizando KuberneterPodOperator y enviando alertas a Slack en caso de errores. Dentro de dbt hay múltiples modelos de múltiples propietarios con dependencias cruzadas entre ellos, y todos se ejecutan juntos con el comando run.

Por ejemplo:
python3
KubernetesPodOperator(
dbt_command="run",
task_id="run_models",
on_failure_callback=send_slack_alert,
)

Estoy tratando de asegurarme de que cada propietario de modelo reciba la alerta correspondiente en su canal relevante.

Para explicar mejor mi problema, supongamos que hay dos modelos dentro de dbt: Modelo-A y Modelo-B. Modelo-A es propiedad del Equipo-A y Modelo-B es propiedad del Equipo-B. Con este enfoque (porque hay un solo comando dbt run), si hay un fallo en Modelo-A, el fallo aparecerá en los registros compartidos de Modelo-A y Modelo-B. Supongamos también que tanto el Equipo-A como el Equipo-B tienen sus propios canales de alerta. Sin embargo, debido a que dbt se ejecuta con un solo comando, todas las alertas se envían a un canal común.

Ahora imagina tener muchos modelos (Modelo-A, Modelo-B…..Modelo-Z). ¿Cómo puedo mejorar el proceso existente para asegurarme de que las fallas en Modelo-A se envíen al canal de alerta del Equipo-A, las fallas en Modelo-B se envíen al canal de alerta del Equipo-B… y así sucesivamente?

¿Cómo puedo enviar errores de dbt (ejecutándose dentro de Airflow) al propietario relevante para hacer que las alertas sean accionables?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Una posible solución a este problema es utilizar etiquetas dbt para identificar el propietario de cada modelo y luego utilizar esas etiquetas para dirigir las alertas al canal apropiado. Específicamente, podrías agregar una etiqueta a cada modelo que especifique el equipo que lo posee. Por ejemplo, podrías agregar las siguientes etiquetas a tus modelos dbt:

    tags:
      team: A-Team 
    
    tags:
      team: B-Team 
    

    Luego, modifica la función send_slack_alert para incluir lógica que extraiga la etiqueta del equipo del modelo fallido y dirija la alerta al canal apropiado. Aquí tienes un ejemplo de cómo podrías modificar la función send_slack_alert para hacer esto:

    def send_slack_alert(context):
        dag_id = context['dag'].dag_id
        task_id = context['task_instance'].task_id
        log_url = context['task_instance'].log_url
        execution_date = context['execution_date'].strftime('%Y-%m-%d %H:%M:%S')
        owner = ''
    
        # Extraer etiquetas del modelo fallido
        dbt_output = context['ti'].xcom_pull(task_ids=task_id)
        for model in dbt_output['results']:
            if model['status'] == 'error':
                owner = model['metadata']['tags'].get('team', '')
    
        # Dirigir alerta al canal apropiado
        if owner == 'A-Team':
            slack_hook = 'https://hooks.slack.com/services/xxxxx/yyyyy/zzzzz'
        elif owner == 'B-Team':
            slack_hook = 'https://hooks.slack.com/services/aaaaa/bbbbb/cccccc'
        # agregar más cláusulas elif para otros equipos según sea necesario
    
        # Enviar alerta a Slack
        slack_msg = f"ALERTA: el trabajo de dbt falló para {dag_id}:{task_id} en {execution_date}. Registros: {log_url}"
        response = requests.post(slack_hook, json={"text": slack_msg})
        response.raise_for_status()
    

    Esta función modificada extrae la etiqueta de equipo del modelo fallido (suponiendo que el modelo fue etiquetado con una etiqueta ‘team’) y luego dirige la alerta al canal apropiado según el valor de la etiqueta. Puedes definir los ganchos de Slack para cada equipo en las cláusulas if-elif de la función.

    Para ejecutar esta función modificada con dbt, puedes pasarla como parámetro on_failure_callback al KubernetesPodOperator. Por ejemplo:

    KubernetesPodOperator(
      dbt_command="run",
      task_id="run_models",
      on_failure_callback=send_slack_alert,
    )
    

    Con este enfoque, puedes asegurarte de que las alertas para los modelos fallidos se dirijan a los canales adecuados del equipo, lo que facilita que los equipos identifiquen y solucionen problemas.

Comments are closed.