es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Tag: AIRFLOW

¿Cómo puedo enviar los registros dbt desde Airflow hasta el propietario correspondiente?

Estoy ejecutando dbt (v0.19.0) en Apache Airflow (v2.0.2) utilizando KuberneterPodOperator y enviando alertas a Slack en caso de errores. Dentro de dbt hay múltiples modelos de múltiples propietarios con dependencias cruzadas entre ellos, y todos se ejecutan juntos con el comando run. Por ejemplo: python3 KubernetesPodOperator( dbt_command="run", task_id="run_models", on_failure_callback=send_slack_alert, ) . . . Read more

S3CopyObjectOperator – Se produjo un error (NoSuchKey) al llamar a la operación CopyObject: La clave especificada no existe.

Si ejecuto esto localmente en la CLI se ejecuta correctamente y copia los archivos desde otro bucket/key al mío en la ubicación correcta. aws s3 sync s3://client_export/ref/commissions/snapshot_date=2022-01-01/ s3://bi-dev/KSM/refinery29/commissions/snapshot_date=2022-01-01/ Cuando intento usar el S3CopyObjectOperator, veo el error NoSuchKey: copy_commissions_data = S3CopyObjectOperator( task_id=’copy_commissions_data’, aws_conn_id=’aws_default’, source_bucket_name=’client_export’, dest_bucket_name=’bi-dev’, source_bucket_key=’ref/commissions/snapshot_date=2022-01-01′, dest_bucket_key=’KSM/refix/commissions/snapshot_date=2022-01-01′, dag=dag ) También he . . . Read more

Orquestación de DAGs de Airflow

Tengo tres DAGs (digamos, DAG1, DAG2 y DAG3). Tengo un programador mensual para DAG1. DAG2 y DAG3 no deben ejecutarse directamente (sin planificador para estos) y solo deben ejecutarse cuando DAG1 se complete correctamente. Es decir, una vez que DAG1 esté completo, DAG2 y DAG3 deberán comenzar en paralelo. ¿Cuál . . . Read more

Después de activar externamente el flujo de trabajo de Airflow utilizando la API experimental, continúa en ejecución pero no ejecuta el flujo de trabajo.

Flujo de trabajo de Airflow: def print_hello(**kwargs): task_params = kwargs['dag_run'].conf['task_payload'] print('Hola mundo con {}'.format(task_params)) def hello_world(): args = { "start_date": datetime(2022, 5, 1), "retries": 1, "sla": timedelta(hours=3) } dag_name = 'hola_mundo_a' dag = dag_utils.create_dag(dag_name, args=args, schedule_interval=None) test = PythonOperator( task_id='impresora_hola_mundo', python_callable=print_hello, provide_context=True, dag=dag) with_done_check((test), date_format="{{ ds }}", run_mode=get_mode()) return dag . . . Read more