es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo importar un paquete personalizado de Python en Cloud Composer?

Estoy aprendiendo a usar Google Cloud Composer y parte de ello es entender cómo importar el paquete personalizado de Python de nuestra organización en la dag. Llegué al punto en el que puedo importar scripts simples colocándolos en la carpeta de dependencias dentro de la carpeta dags, pero eso es prácticamente todo.

No entiendo cómo importar el paquete completo que contiene múltiples scripts y dónde entra en juego setup.py. ¿Cómo estructurarlo en el almacenamiento en la nube y usarlo en la dag?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para importar el paquete personalizado de Python de su organización en un DAG de Cloud Composer, debe seguir los siguientes pasos:

    1. Cree un archivo de distribución del paquete (como un archivo .tar.gz) que contenga todos los archivos de módulo y el archivo setup.py de su paquete personalizado.
    2. Cargue el archivo de distribución del paquete en Google Cloud Storage.

    3. En Cloud Composer, cree un nuevo DAG o abra un DAG existente en el que desee utilizar el paquete personalizado.

    4. Cree un nuevo archivo Python dentro del directorio de DAGs e importe los módulos de su paquete personalizado usando el nombre completo del paquete. Por ejemplo, si su paquete personalizado se llama “mi_paquete” y contiene un módulo llamado “mi_modulo”, la declaración de importación sería:

    from mi_paquete import mi_modulo
    
    1. En el mismo archivo Python, use el gcs_download_operator del paquete airflow.contrib.operators para descargar el archivo de distribución del paquete de Cloud Storage al sistema de archivos local del entorno de Cloud Composer. Por ejemplo:
    from airflow.contrib.operators.gcs_download_operator import GoogleCloudStorageDownloadOperator
    from datetime import datetime, timedelta
    
    ayer = datetime.combine(
        datetime.today() - timedelta(1), datetime.min.time())
    
    tarea_descarga_paquete = GoogleCloudStorageDownloadOperator(
        task_id="tarea_descarga_paquete",
        bucket="mi-bucket",
        object="ruta/al/paquete.tar.gz",
        filename="/home/airflow/gcs/data/paquete.tar.gz",
        start_date=ayer,
        )
    

    Tenga en cuenta que debe especificar los parámetros bucket y object para indicar la ubicación del archivo de distribución del paquete en Cloud Storage, y el parámetro filename para indicar el nombre del archivo local donde se debe guardar el archivo descargado.

    1. Después de que se descargue el paquete, use el PythonOperator para instalar el paquete e importar los módulos en su código DAG. Por ejemplo:
    from airflow.operators.python_operator import PythonOperator
    
    def instalar_paquete():
        !pip install /home/airflow/gcs/data/paquete.tar.gz
    
    with DAG(dag_id='mi_dag', start_date=ayer) as dag:
    
        tarea_instalar_paquete = PythonOperator(
            task_id="tarea_instalar_paquete",
            python_callable=instalar_paquete,
            )
    
        tarea_descarga_paquete >> tarea_instalar_paquete
    
        tarea_mi_modulo = PythonOperator(
            task_id="tarea_mi_modulo",
            python_callable=mi_modulo.mi_funcion,
            )
    
        tarea_instalar_paquete >> tarea_mi_modulo
    
    

    En este ejemplo, la tarea_instalar_paquete utiliza la utilidad pip dentro de una función de Python para instalar el archivo de distribución del paquete. Luego, la tarea_mi_modulo utiliza el módulo importado de su paquete personalizado para activar una función llamada mi_funcion.

    Por último, asegúrese de que su paquete personalizado y sus dependencias sean compatibles con la versión de Python utilizada por Cloud Composer, que actualmente es Python 3.6.

Comments are closed.