Tag: AIRFLOW
De la documentación de Airflow: Debe evitar escribir código de nivel superior que no sea necesario para crear operadores y construir relaciones DAG entre ellos. […] Uno de los factores importantes que afecta el tiempo de carga del DAG, que podría pasar desapercibido para los desarrolladores de Python, es que . . . Read more
Estoy ejecutando dbt (v0.19.0) en Apache Airflow (v2.0.2) utilizando KuberneterPodOperator y enviando alertas a Slack en caso de errores. Dentro de dbt hay múltiples modelos de múltiples propietarios con dependencias cruzadas entre ellos, y todos se ejecutan juntos con el comando run. Por ejemplo: python3 KubernetesPodOperator( dbt_command="run", task_id="run_models", on_failure_callback=send_slack_alert, ) . . . Read more
Si ejecuto esto localmente en la CLI se ejecuta correctamente y copia los archivos desde otro bucket/key al mío en la ubicación correcta. aws s3 sync s3://client_export/ref/commissions/snapshot_date=2022-01-01/ s3://bi-dev/KSM/refinery29/commissions/snapshot_date=2022-01-01/ Cuando intento usar el S3CopyObjectOperator, veo el error NoSuchKey: copy_commissions_data = S3CopyObjectOperator( task_id=’copy_commissions_data’, aws_conn_id=’aws_default’, source_bucket_name=’client_export’, dest_bucket_name=’bi-dev’, source_bucket_key=’ref/commissions/snapshot_date=2022-01-01′, dest_bucket_key=’KSM/refix/commissions/snapshot_date=2022-01-01′, dag=dag ) También he . . . Read more
Tengo tres DAGs (digamos, DAG1, DAG2 y DAG3). Tengo un programador mensual para DAG1. DAG2 y DAG3 no deben ejecutarse directamente (sin planificador para estos) y solo deben ejecutarse cuando DAG1 se complete correctamente. Es decir, una vez que DAG1 esté completo, DAG2 y DAG3 deberán comenzar en paralelo. ¿Cuál . . . Read more
Flujo de trabajo de Airflow: def print_hello(**kwargs): task_params = kwargs['dag_run'].conf['task_payload'] print('Hola mundo con {}'.format(task_params)) def hello_world(): args = { "start_date": datetime(2022, 5, 1), "retries": 1, "sla": timedelta(hours=3) } dag_name = 'hola_mundo_a' dag = dag_utils.create_dag(dag_name, args=args, schedule_interval=None) test = PythonOperator( task_id='impresora_hola_mundo', python_callable=print_hello, provide_context=True, dag=dag) with_done_check((test), date_format="{{ ds }}", run_mode=get_mode()) return dag . . . Read more