es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo hacer que un pipeline de Kedro tome dataframes de entrada configurables?

He creado un flujo de trabajo en Kedro compuesto de diferentes pipelines de procesamiento de ciencia de datos. Estas pipeline son probadas de manera independiente.

Cuando ejecuto una pipeline en particular de Kedro de forma autónoma, la pipeline toma su entrada de un archivo CSV.

En el entorno de producción, las pipeline necesitan ser unidas. En producción, cuando estoy ejecutando esta pipeline (PL1) junto con otra pipeline (PL0), me gustaría que la pipeline (PL1) tome su entrada de la pipeline (PL0).

Entonces, ¿cómo hacer que la entrada de una pipeline de Kedro sea configurable dependiendo de la opción en tiempo de ejecución? La opción de tiempo de ejecución puede ser autónoma o de integración.

Agradecería cualquier ejemplo de código para lograr lo mismo.

Tags:  

Answer

  1. Avatar for davy.ai

    Para hacer que la entrada de un pipeline de Kedro sea configurable dependiendo de una opción en tiempo de ejecución, podemos utilizar el sistema de configuración de Kedro. Específicamente, podemos definir dos configuraciones distintas: una para un entorno independiente y otra para un entorno integrado. Luego, podemos utilizar la configuración para determinar las entradas a nuestro pipeline.

    Primero, necesitamos definir nuestras dos configuraciones en conf/base, creando dos archivos YAML distintos, uno para cada configuración. Por ejemplo, podríamos tener un archivo conf/base/standalone.yml con el siguiente contenido:

    pl1:
      input_file: data/input.csv
    

    Y un archivo conf/base/integrated.yml con el siguiente contenido:

    pl1:
      input_file: '{{ pl0.output_file }}'
    

    Aquí, estamos definiendo una clave de configuración pl1 con un valor input_file. En la configuración independiente, el archivo de entrada está codificado en data/input.csv. En la configuración integrada, el archivo de entrada está configurado para el archivo de salida del pipeline pl0.

    A continuación, necesitamos modificar nuestro código del pipeline para usar la configuración. Específicamente, necesitamos usar el objeto config que se pasa a la función del pipeline para determinar el archivo de entrada. Aquí hay un ejemplo:

    from kedro.io import CSVDataSet
    
    
    def process_data(input_file: str, output_dataset: CSVDataSet):
        # procesar datos y guardar en output_dataset
        pass
    
    
    def create_pipeline(**kwargs):
        config = kwargs.get('config', {})
        input_file = config['pl1']['input_file']
    
        pl1 = Pipeline([
            node(
                process_data,
                inputs={'input_file': input_file},
                outputs={'output_dataset': 'output_dataset'},
            ),
        ])
    
        return pl1
    

    Aquí, obtenemos la configuración pl1 del objeto config y la usamos para determinar el archivo de entrada para nuestra función process_data. Observe que estamos pasando la variable input_file al diccionario inputs de la función node.

    Finalmente, para ejecutar nuestro pipeline en modo independiente, podemos usar el siguiente comando:

    kedro run --env standalone
    

    Y para ejecutar nuestro pipeline en modo integrado, podemos usar el siguiente comando:

    kedro run --env integrated
    

    Cuando ejecutamos el pipeline en modo integrado, Kedro utilizará automáticamente el archivo de configuración integrated.yml, que establece el archivo de entrada en el archivo de salida del pipeline pl0.

    ¡Espero que esto ayude! Háganme saber si tienen alguna duda adicional.

Comments are closed.