La validación de las Grandes Expectativas falló y el trabajo fue cancelado.
Estoy trabajando en una tarea de Monitoreo de Datos en la cual estoy utilizando el framework Great Expectation para monitorear la calidad de los datos. Estoy utilizando el conjunto airflow+big query+great expectation para lograr esto.
He establecido el parámetro is_blocking:False
para expectation, pero el trabajo se interrumpe con una excepción y las tareas descendentes no se pueden ejecutar debido a esto. ¿Hay alguna forma de enviar notificaciones pero que la ejecución no se detenga?
Excepción detallada a continuación:
[2021-11-29 15:19:45,925] {taskinstance.py:1252} INFO – Exportando las siguientes variables de entorno:
AIRFLOW_CTX_DAG_OWNER=data-science
AIRFLOW_CTX_DAG_ID=abcd-data-ds-1
AIRFLOW_CTX_TASK_ID=ge-notify-data_monitoring-expect-5ff9677f
AIRFLOW_CTX_EXECUTION_DATE=2021-11-29T11:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-11-29T11:00:00+00:00
[2021-11-29 15:19:45,926] {great_expectations_notification_operator.py:42} INFO – Recuperando clave data-ds-v4__promo_roi_input_features_monitoring_expect_column_values_to_be_between47deadf091f092857156a30495953f3c_20211129T110000
[2021-11-29 15:19:45,986] {alerts.py:109} INFO – Enviando notificación a Slack
[2021-11-29 15:19:46,411] {great_expectations_notification_operator.py:73} ERROR – La validación falló en el data warehouse para abcd.xyz.is_outlier
[2021-11-29 15:19:46,430] {taskinstance.py:1463} ERROR – La tarea falló con una excepción
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1165, in run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1308, in _execute_task
result = task_copy.execute(context=context)
File "/opt/airflow/src/datahub/operators/expectations/great_expectations_notification_operator.py", line 79, in execute
raise AirflowException(message)
airflow.exceptions.AirflowException: La validación falló en el data warehouse para abcd.xyz.is_outlier
[2021-11-29 15:19:46,432] {taskinstance.py:1506} INFO – Marcando la tarea como FALLIDA. dag_id=curated-data-ds-v4, task_id=ge-notify-data_monitoring-expect-5ff9677f, execution_date=20211129T110000, start_date=20211129T151945, end_date=20211129T151946
[2021-11-29 15:19:46,505] {local_task_job.py:151} INFO – La tarea salió con el código de retorno 1
[2021-11-29 15:19:46,557] {alerts.py:109} INFO – Enviando notificación a Slack
[2021-11-29 15:19:47,564] {local_task_job.py:261} INFO – Se programaron 0 tareas descendentes a partir de la verificación de programación posterior
davy.ai
Una forma de lograr esto es envolviendo el código de
expectation
dentro de un bloque try-except y atrapando la excepciónAirflowException
. Dentro del bloqueexcept
, podemos agregar el código para enviar la notificación sin levantar el error, permitiendo así que las tareas aguas abajo se ejecuten. Aquí hay un ejemplo:En este ejemplo, definimos una función
validate_data
que envuelve el código de la expectativa dentro de un bloque try-except. Si la expectativa falla y se levanta unaAirflowException
, enviamos la notificación pero no levantamos el error. Si se levanta unExpectationError
, enviamos la notificación y levantamos el error para detener la ejecución de la tarea.Luego usamos el
PythonOperator
para ejecutar la funciónvalidate_data
con el argumentoparams
que incluye el parámetrois_blocking
establecido enFalse
. Esto permite que la expectativa se ejecute sin bloquear las tareas aguas abajo, y que las notificaciones se envíen sin detener la ejecución de la tarea en caso de errores.