es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Proyecto Reactor: buffer con ejecución paralela.

Necesito copiar datos de una fuente (en paralelo) a otra en lotes.

Hice esto:

Flux.generate((SynchronousSink<String> sink) -> {
                    try {
                        String val = dataSource.getNextItem();
                        if (val == null) {
                            sink.complete();
                            return;
                        }
                        sink.next(val);

                } catch (InterruptedException e) {
                    sink.error(e);
                }
            })
            .parallel(4)
            .runOn(Schedulers.parallel())
            .doOnNext(dataTarget::write)
            .sequential()
            .blockLast();
class dataSource{
  public Item getNextItem(){ 
    //...
  }
}
class dataTarget{
  public void write(List<Item> items){ 
    //...
  }
}

Se reciben datos en paralelo, pero se escribe uno a la vez.

Necesito recolectarlos en lotes (como de 10 elementos) y luego escribir el lote.

¿Cómo puedo hacer eso?

ACTUALIZACIÓN:

La idea principal es que la fuente es el sistema de mensajería (por ejemplo, rabbitmq o nats) que es adecuado para enviar eficientemente mensajes uno por uno, pero el objetivo es la base de datos que es más eficiente para insertar un lote.

Por lo tanto, el resultado final debería ser como – recibo mensajes en paralelo hasta que el búfer no esté lleno, luego escribo todo el búfer en la base de datos de una sola vez.

Es fácil de hacer en Java regular, pero en el caso de los flujos, no entiendo cómo hacerlo. ¿Cómo se almacenan en búfer los datos y cómo se pausa el lector hasta que el escritor esté listo para recibir la siguiente parte?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para copiar datos de una fuente a otra en lotes, se puede modificar el código de la siguiente manera:

    Flux.generate((SynchronousSink<><item>> sink) -> {
            List<item> items = new ArrayList<>();
            for (int i = 0; i < 10; i++) { // recolectar 10 items
                try {
                    Item val = dataSource.getNextItem();
                    if (val == null) {
                        sink.complete();
                        return;
                    }
                    items.add(val);
                } catch (InterruptedException e) {
                    sink.error(e);
                }
            }
            sink.next(items); // escribir el lote de 10 items
        })
        .parallel(4)
        .runOn(Schedulers.parallel())
        .doOnNext(dataTarget::write)
        .sequential()
        .blockLast();
    

    Aquí, usamos generate para generar una lista de 10 items a la vez desde la dataSource, y emitir la lista al destino dataTarget. Switch from emitting items one at a time to emitting batches of 10 items at a time.

    Nota: Item es el tipo de dato de su fuente y destino de datos.

    Para asegurarse de que el lector se detenga hasta que el escritor esté listo para obtener la siguiente parte, se puede usar la retroalimentación con el operador limitRate. Por ejemplo:

    Flux.generate((SynchronousSink<><item>> sink) -> {
            List<item> items = new ArrayList<>();
            for (int i = 0; i < 10; i++) { // recolectar 10 items
                try {
                    Item val = dataSource.getNextItem();
                    if (val == null) {
                        sink.complete();
                        return;
                    }
                    items.add(val);
                } catch (InterruptedException e) {
                    sink.error(e);
                }
            }
            sink.next(items); // escribir el lote de 10 items
        })
        .limitRate(4) // especificar la tasa de items que pueden ser solicitados a la vez
        .parallel(4)
        .runOn(Schedulers.parallel())
        .doOnNext(dataTarget::write)
        .sequential()
        .blockLast();
    

    Aquí, el operador limitRate asegura que solo se soliciten 4 lotes de 10 elementos a la vez. Los operadores parallel y runOn aseguran que los lotes sean procesados en paralelo.

    En general, este enfoque debería permitirle copiar eficientemente datos en lotes desde su sistema de mensajería a su base de datos.

Comments are closed.