es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Problema de validación de datos en Apache Camel.

Soy nuevo en Camel, leyendo un archivo de datos (archivo de texto plano) utilizando múltiples hilos.
Capaz de leer datos y almacenarlos en un pojo. Tengo un problema con la validación.
El problema está en la validación. Mientras leo línea por línea, almaceno registros en el contexto en un mapa. Ahora la validación debe ser a través de la línea. Algunos registros tienen identificadores que deben estar definidos en un registro anterior. Aquí está mi ruta: –

get("readerService","fileReader")
  .log(...)
  .process(e->readerService.init(...))
  .split(...)
      .streaming()
      .parallelProcessing(...)
      .threads(...)
      .process(e -> readerService.read(e, ctx)
      // transforma el registro leído en un pojo y lo almacena en el mapa en ctx
      .process(e -> recTransformer(e, ctx)
      .process(e -> myValidator(e, ctx)
      .end()

El problema es que el servicio de lectura está leyendo 10 datos aleatoriamente y el transformador los lee, crea un pojo y lo guarda en el mapa en el contexto. Pero el validador también se llama cada vez que el registro completo no se ha leído.
En algunos casos está intentando validar un registro cuando el registro anterior no se ha almacenado en el mapa debido al procesamiento paralelo.

Me gustaría saber cómo llamar a .process(e->myValidator(e, ctx) después de que se haya leído el archivo completo y el mapa esté completo. Eso resolverá mi problema. Cualquier sugerencia será genial.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para resolver el problema de validar el archivo de datos después de que todo el archivo se haya leído y almacenado en el mapa, se puede utilizar el patrón de agregador en Apache Camel. El patrón de agregador se puede usar para acumular todos los datos en el mapa y luego invocar el proceso de validación después de que se haya procesado todo el archivo.

    Aquí hay un ejemplo de ruta utilizando el patrón de agregador:

    from("readerService/fileReader")
      .log(...)
      .process(e -> readerService.init(...))
      .split(...)
          .streaming()
          .parallelProcessing(...)
          .threads(...)
          .process(e -> readerService.read(e, ctx))
          .process(e -> recTransformer(e, ctx))
          .aggregate(constant(true), new MyAggregationStrategy())
                 // use constant(true) to aggregate all messages
                 // use MyAggregationStrategy to aggregate data into map
          .completionSize(1) // complete aggregation after one message
          .process(e -> myValidator(e, ctx))
          .end()
    

    En la ruta anterior, el agregador se crea utilizando el método aggregate. La declaración constant(true) se utiliza para agregar todos los mensajes. La estrategia de agregación MyAggregationStrategy se utiliza para agregar los datos en el mapa.

    La declaración .completionSize(1) se utiliza para completar la agregación después de que se haya agregado un mensaje. Esto asegurará que el archivo de datos se procese por completo antes de que se invoque el proceso de validación.

    Finalmente, la declaración .process(e -> myValidator(e, ctx)) se utiliza para invocar el proceso de validación después de que se haya leído y almacenado todo el archivo en el mapa.

    Con este enfoque, puedes asegurar que el validador se llame después de que el archivo se haya leído y almacenado completamente en el mapa, y evitar la validación de datos incompletos debido al procesamiento en paralelo.

Comments are closed.