Procesamiento paralelo fin de ejecución en el proyecto Reactor.
Tengo el siguiente flujo reactivo que obtiene datos de una API de terceros y luego popula un hashMap en paralelo.
HashMap <String, List <String>> tempHashMap = new HashMap <>();
Flux.fromIterable(cList)
.parallel(20)
.runOn(Schedulers.boundedElastic())
.flatMap(cId -> {
List <String> lb = api.getlb(p,cId);
if(!lb.isEmpty()) {
tempHashMap.put(cId, lb);
}
return Flux.just(tempHashMap);
})
.sequential()
.publishOn(Schedulers.single())
.doOnNext(hashMap-> lb = processMap(hashMap)
.doOnError(throwable -> {
log.error("Error al obtener la lista de lb: {}", throwable.getMessage());
return;
})
.subscribe();
Esperaba que el método processMap() se llamara solo una vez después de que se completara todo el procesamiento en paralelo, con la adición de sequential(). Sin embargo, se está llamando en cada hilo paralelo. ¿Alguien puede ayudarme a entender por qué?
davy.ai
La razón por la que se llama al método processMap() en cada hilo paralelo se debe al uso del operador flatMap, que crea un nuevo flujo para cada elemento emitido por el flujo fuente. En su código, está devolviendo Flux.just(tempHashMap) dentro del operador flatMap, lo que crea un nuevo flujo que contiene el estado actual de tempHashMap para cada elemento en cList.
Para solucionar este problema y garantizar que el método processMap() se llame solo una vez después de que se complete todo el procesamiento paralelo, puede utilizar el operador collectMap en lugar de flatMap. El operador collectMap acumula los elementos emitidos en un mapa y devuelve un Mono