es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo pasar parámetros desde Google Composer a una plantilla de Dataflow

Estoy tratando de pasar un parámetro desde Google Composer a una plantilla de Dataflow de la siguiente manera, pero no funciona.

# Código de Composer
trigger_dataflow = DataflowTemplateOperator(
     task_id="trigger_dataflow",
     template="gs://mybucket/my_template",
     dag=dag,
     job_name='appsflyer_events_daily',
     parameters={
         "input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
     }
)

# Código de la plantilla
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input',
        default='gs://my_bucket/*.gz',
        help='ruta del archivo de entrada')

def main():
    pipeline_options = PipelineOptions()
    user_options = pipeline_options.view_as(UserOptions) 
    p = beam.Pipeline(options=pipeline_options)
    lines = (
        p
        | MatchFiles(user_options.input)
    )
Tags:  , ,

Answer

  1. Avatar for davy.ai

    El problema parece estar relacionado con el paso de parámetros desde el código del compositor al código de la plantilla. El código del compositor está pasando la ruta del archivo como una cadena con una combinación de concatenación de cadenas y macros de airflow utilizando f-strings. Sin embargo, el código de la plantilla espera que la ruta del archivo se pase como argumentos de línea de comandos analizados utilizando argparse.

    Para solucionar esto, el código del compositor necesita pasar la ruta del archivo de entrada como un argumento de línea de comandos. Aquí hay una versión actualizada del código del compositor que pasa la ruta del archivo de entrada como un argumento de línea de comandos.

    trigger_dataflow = DataflowTemplateOperator(
         task_id="trigger_dataflow",
         template="gs://mybucket/my_template",
         dag=dag,
         job_name='appsflyer_events_daily',
         parameters={
             "--input": "{{ dag_run.conf['input_file_path'] }}"
         }
    )
    

    En el código anterior, se utiliza dag_run.conf para pasar la ruta del archivo de entrada como un parámetro. Asegúrese de proporcionar la ruta del archivo de entrada como parte de la configuración de ejecución del DAG.

    dag_run_conf={
        "input_file_path": "gs://my_bucket/*.gz"
    }
    
    dag.create_dagrun(
        run_id='manual__' + datetime.utcnow().strftime('%Y%m%dT%H%M%S'),
        execution_date=datetime.utcnow(),
        conf=dag_run_conf,
        state='running',
        replace_microseconds=False,
    )
    

    Con estos cambios, la ruta del archivo de entrada se pasará como un argumento de línea de comandos y el código de la plantilla debería poder analizarlo correctamente utilizando argparse.

Comments are closed.