es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Extrayendo un valor de campo (generalmente constante) de Apache Flink Datastream

Tengo un Datastream que contiene algunos campos como event_id, timestamp, etc. que permanecen constantes para muchos registros en la pipeline. Quiero usarlos en el nombre del archivo mientras lo escribo de nuevo en ParquetFormat usando StreamingFileSink. Podemos usar sufijos y prefijos si estamos utilizando constantes. Sin embargo, necesitaba ayuda para extraer el valor del registro que se puede utilizar para generar el nombre del archivo.

Patrón de nombre de archivo _ .parquet


Configuración de archivo de salida config = OutputFileConfig
.constructor()
.conPartPrefix("prefijo")
.conPartSuffix(".ext")
.build();

Estoy planeando usar esto, pero quiero ayuda para extraer el “prefijo” de los propios registros.

Cualquier pensamiento sobre esto realmente sería útil.
¡Gracias de antemano!

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para extraer los valores de Datastream para generar el nombre del archivo, puede usar la función map para transformar el flujo de entrada en un flujo de salida que contenga los valores deseados. Aquí hay un fragmento de código de ejemplo que demuestra este enfoque:

    DataStream<genericrecord> input = ...; // el Datastream de entrada
    
    DataStream<><string, long,="" string="">> output = input.map(record -> {
      // extraer los campos del registro
      String eventId = record.get("event_id").toString();
      long timestamp = record.get("timestamp");
      String otherField = record.get("someother-field").toString();
    
      // devolver una tupla que contenga los campos extraídos
      return Tuple3.of(eventId, timestamp, otherField);
    });
    
    // configure el StreamingFileSink con los valores extraídos
    StreamingFileSink<><string, long,="" string="">> sink = StreamingFileSink
      .forBulkFormat(new Path("/ruta/hacia/el/resultado"), ParquetAvroWriters.forGenericRecord(schema))
      .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd-HH"))
      .withOutputFileConfig(OutputFileConfig
        .builder()
        .withPartPrefix(value -> value.f0 + "-" + value.f1 + "-" + value.f2 + "-")
        .withPartSuffix(".parquet")
        .build())
      .build();
    
    // escribir el flujo de salida al sink
    output.addSink(sink);
    

    En este fragmento de código, usamos la función map para transformar el DataStream<genericrecord> de entrada en un DataStream<><string, long,="" string="">> de salida que contiene los campos extraídos en una tupla. Luego, pasamos este flujo de salida al StreamingFileSink y configuramos el OutputFileConfig para usar los campos extraídos en el prefijo del nombre del archivo. Finalmente, escribimos el flujo de salida al sink utilizando addSink.</string,></string,></string,>

Comments are closed.