es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Procesamiento paralelo fin de ejecución en el proyecto Reactor.

Tengo el siguiente flujo reactivo que obtiene datos de una API de terceros y luego popula un hashMap en paralelo.

HashMap <String, List <String>> tempHashMap = new HashMap <>();
        Flux.fromIterable(cList)
                .parallel(20)
                .runOn(Schedulers.boundedElastic())
                .flatMap(cId -> {
                    List <String> lb = api.getlb(p,cId);

                    if(!lb.isEmpty()) {
                        tempHashMap.put(cId, lb);
                    }
                    return Flux.just(tempHashMap);
                })

                .sequential()
                .publishOn(Schedulers.single())
                .doOnNext(hashMap-> lb = processMap(hashMap)
                .doOnError(throwable -> {
                    log.error("Error al obtener la lista de lb: {}", throwable.getMessage());
                    return;
                })
                .subscribe();

Esperaba que el método processMap() se llamara solo una vez después de que se completara todo el procesamiento en paralelo, con la adición de sequential(). Sin embargo, se está llamando en cada hilo paralelo. ¿Alguien puede ayudarme a entender por qué?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    La razón por la que se llama al método processMap() en cada hilo paralelo se debe al uso del operador flatMap, que crea un nuevo flujo para cada elemento emitido por el flujo fuente. En su código, está devolviendo Flux.just(tempHashMap) dentro del operador flatMap, lo que crea un nuevo flujo que contiene el estado actual de tempHashMap para cada elemento en cList.

    Para solucionar este problema y garantizar que el método processMap() se llame solo una vez después de que se complete todo el procesamiento paralelo, puede utilizar el operador collectMap en lugar de flatMap. El operador collectMap acumula los elementos emitidos en un mapa y devuelve un Mono

    . Luego, puede suscribirse a este Mono y aplicar su lógica de procesamiento en el operador doOnSuccess, que se llamará solo una vez después de que se hayan coleccionado todos los elementos.

    Aquí hay un fragmento de código actualizado para lograr esto:

    HashMap<string,></string,><string>> tempHashMap = new HashMap<>();
    Flux.fromIterable(cList)
        .parallel(20)
        .runOn(Schedulers.boundedElastic())
        .flatMap(cId -> {
            List<string> lb = api.getlb(p,cId);
            if(!lb.isEmpty()) {
                tempHashMap.put(cId, lb);
            }
            return Mono.just(tempHashMap);
        })
        .sequential()
        .publishOn(Schedulers.single())
        .collectMap(hashMap -> hashMap)
        .doOnSuccess(hashMap -> processMap(hashMap))
        .doOnError(throwable -> {
            log.error("Error al obtener la lista de lb: {}", throwable.getMessage());
        })
        .subscribe();
    

    ¡Espero que esto ayude! Avíseme si tiene más preguntas.

Comments are closed.