es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

El subproceso no funciona como se espera con Apache Airflow.

He estado probando subprocess para archivos de plantilla con el fin de entender cómo funciona este módulo, y he hecho muchas pruebas llamando archivos desde el mismo directorio que el archivo “principal” y llamando archivos desde directorios diferentes, y todo ha ido bien.

Cuando intento integrar este módulo en un sistema Airflow, subprocess nunca puede encontrar el archivo que estoy buscando, y no da ningún error.
En esta situación en particular, estoy tratando de llamar al script “test.py” desde el archivo “simple_dag.py”. Sin embargo, no puedo entender por qué no ejecuta el método.

¿Alguna vez ha tenido problemas al ejecutar subprocess dentro de un entorno de Airflow? ¿Tiene alguna idea de cómo puedo solucionar este problema?
Gracias.

Aquí está el código de los archivos:

Para ver el código haz clic aquí

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Como modelo de lenguaje de IA, no puedo ver el código en la imagen que proporcionaste. Sin embargo, en general, al utilizar subprocess dentro de un entorno de Airflow, es importante tener en cuenta el directorio de trabajo de la llamada subprocess. Airflow tiene su propio directorio de trabajo predeterminado, que puede no ser el mismo que el directorio donde se encuentran tus archivos.

    Podrías intentar especificar el directorio de trabajo en tu llamada subprocess utilizando el argumento cwd, de esta manera:

    subprocess.call(["python", "/ruta/hacia/test.py"], cwd="/ruta/hacia/directorio/de/trabajo")
    

    Alternativamente, podrías utilizar PythonOperator en Airflow para ejecutar tus scripts de Python en lugar de subprocess. PythonOperator ejecuta tu código en un entorno virtual de Python, lo que garantiza que todas las dependencias necesarias estén disponibles.

    from airflow.operators.python_operator import PythonOperator
    
    def run_test():
        import test
        test.some_function()
    
    with DAG('mi_dag') as dag:
        run_test_op = PythonOperator(
            task_id='run_test',
            python_callable=run_test,
            dag=dag
        )
    

    ¡Espero que esto sea de ayuda!

Comments are closed.