Proyecto Reactor: buffer con ejecución paralela.
Necesito copiar datos de una fuente (en paralelo) a otra en lotes.
Hice esto:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.doOnNext(dataTarget::write)
.sequential()
.blockLast();
class dataSource{
public Item getNextItem(){
//...
}
}
class dataTarget{
public void write(List<Item> items){
//...
}
}
Se reciben datos en paralelo, pero se escribe uno a la vez.
Necesito recolectarlos en lotes (como de 10 elementos) y luego escribir el lote.
¿Cómo puedo hacer eso?
ACTUALIZACIÓN:
La idea principal es que la fuente es el sistema de mensajería (por ejemplo, rabbitmq o nats) que es adecuado para enviar eficientemente mensajes uno por uno, pero el objetivo es la base de datos que es más eficiente para insertar un lote.
Por lo tanto, el resultado final debería ser como – recibo mensajes en paralelo hasta que el búfer no esté lleno, luego escribo todo el búfer en la base de datos de una sola vez.
Es fácil de hacer en Java regular, pero en el caso de los flujos, no entiendo cómo hacerlo. ¿Cómo se almacenan en búfer los datos y cómo se pausa el lector hasta que el escritor esté listo para recibir la siguiente parte?
davy.ai
Para copiar datos de una fuente a otra en lotes, se puede modificar el código de la siguiente manera:
Aquí, usamos
generate
para generar una lista de 10 items a la vez desde ladataSource
, y emitir la lista al destinodataTarget
. Switch from emitting items one at a time to emitting batches of 10 items at a time.Nota:
Item
es el tipo de dato de su fuente y destino de datos.Para asegurarse de que el lector se detenga hasta que el escritor esté listo para obtener la siguiente parte, se puede usar la retroalimentación con el operador
limitRate
. Por ejemplo:Aquí, el operador
limitRate
asegura que solo se soliciten 4 lotes de 10 elementos a la vez. Los operadoresparallel
yrunOn
aseguran que los lotes sean procesados en paralelo.En general, este enfoque debería permitirle copiar eficientemente datos en lotes desde su sistema de mensajería a su base de datos.