es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

La validación de las Grandes Expectativas falló y el trabajo fue cancelado.

Estoy trabajando en una tarea de Monitoreo de Datos en la cual estoy utilizando el framework Great Expectation para monitorear la calidad de los datos. Estoy utilizando el conjunto airflow+big query+great expectation para lograr esto.

He establecido el parámetro is_blocking:False para expectation, pero el trabajo se interrumpe con una excepción y las tareas descendentes no se pueden ejecutar debido a esto. ¿Hay alguna forma de enviar notificaciones pero que la ejecución no se detenga?

Excepción detallada a continuación:

[2021-11-29 15:19:45,925] {taskinstance.py:1252} INFO – Exportando las siguientes variables de entorno:
AIRFLOW_CTX_DAG_OWNER=data-science
AIRFLOW_CTX_DAG_ID=abcd-data-ds-1
AIRFLOW_CTX_TASK_ID=ge-notify-data_monitoring-expect-5ff9677f
AIRFLOW_CTX_EXECUTION_DATE=2021-11-29T11:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-11-29T11:00:00+00:00
[2021-11-29 15:19:45,926] {great_expectations_notification_operator.py:42} INFO – Recuperando clave data-ds-v4__promo_roi_input_features_monitoring_expect_column_values_to_be_between47deadf091f092857156a30495953f3c_20211129T110000
[2021-11-29 15:19:45,986] {alerts.py:109} INFO – Enviando notificación a Slack
[2021-11-29 15:19:46,411] {great_expectations_notification_operator.py:73} ERROR – La validación falló en el data warehouse para abcd.xyz.is_outlier
[2021-11-29 15:19:46,430] {taskinstance.py:1463} ERROR – La tarea falló con una excepción
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1165, in run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1308, in _execute_task
result = task_copy.execute(context=context)
File "/opt/airflow/src/datahub/operators/expectations/great_expectations_notification_operator.py", line 79, in execute
raise AirflowException(message)
airflow.exceptions.AirflowException: La validación falló en el data warehouse para abcd.xyz.is_outlier
[2021-11-29 15:19:46,432] {taskinstance.py:1506} INFO – Marcando la tarea como FALLIDA. dag_id=curated-data-ds-v4, task_id=ge-notify-data_monitoring-expect
-5ff9677f, execution_date=20211129T110000, start_date=20211129T151945, end_date=20211129T151946
[2021-11-29 15:19:46,505] {local_task_job.py:151} INFO – La tarea salió con el código de retorno 1
[2021-11-29 15:19:46,557] {alerts.py:109} INFO – Enviando notificación a Slack
[2021-11-29 15:19:47,564] {local_task_job.py:261} INFO – Se programaron 0 tareas descendentes a partir de la verificación de programación posterior

Tags:  ,

Answer

  1. Avatar for davy.ai

    Una forma de lograr esto es envolviendo el código de expectation dentro de un bloque try-except y atrapando la excepción AirflowException. Dentro del bloque except, podemos agregar el código para enviar la notificación sin levantar el error, permitiendo así que las tareas aguas abajo se ejecuten. Aquí hay un ejemplo:

    from great_expectations.exceptions import ExpectationError
    from airflow.exceptions import AirflowException
    from airflow.operators.python_operator import PythonOperator
    
    def validate_data(**context):
        try:
            # código de la expectativa aquí
            is_blocking = context["params"]["is_blocking"]
            result = my_expectation.run(result_format='BOOLEAN_ONLY', is_blocking=is_blocking)
            if not result["success"]:
                raise ExpectationError("La expectativa falló")
        except AirflowException as e:
            # enviar notificación sin levantar el error
            send_notification(e)
        except ExpectationError as e:
            # enviar notificación y lanzar el error
            send_notification(e)
            raise e
    
    ge_task = PythonOperator(
        task_id="ge_validate_data",
        provide_context=True,
        python_callable=validate_data,
        op_kwargs={"params": {"is_blocking": False}},
        dag=my_dag
    )
    

    En este ejemplo, definimos una función validate_data que envuelve el código de la expectativa dentro de un bloque try-except. Si la expectativa falla y se levanta una AirflowException, enviamos la notificación pero no levantamos el error. Si se levanta un ExpectationError, enviamos la notificación y levantamos el error para detener la ejecución de la tarea.

    Luego usamos el PythonOperator para ejecutar la función validate_data con el argumento params que incluye el parámetro is_blocking establecido en False. Esto permite que la expectativa se ejecute sin bloquear las tareas aguas abajo, y que las notificaciones se envíen sin detener la ejecución de la tarea en caso de errores.

Comments are closed.