es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo ejecutar Flux en paralelo, pero recopilar los resultados en secuencia.

Quiero ejecutar Flux en paralelo, pero recopilar el resultado en secuencia. Supongamos que tengo un flux de [3,2,1], después de realizar alguna tarea en paralelo, espero que el resultado siga siendo [3,2,1].

val mono = Flux.fromIterable(3 downTo 1)
      .map { it.toString() }
      // esto devolverá el mismo número
      .flatMap { number -> task(number) }
      .doOnNext { println("Número de $it") }
      // Obtuve 1, 3, 2 lo cual es bueno porque quiero que se ejecute en paralelo
      .collectList()
      .doOnNext { println(it) }
      // Todavía obtuve [1,3,2], pero quiero que sea [3,2,1] según el orden iterable.
      .block()
Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para ejecutar un Flujo en paralelo pero recoger el resultado en secuencia, puede utilizar el operador groupJoin() para unir los resultados en el orden definido por el iterable original. Aquí está un ejemplo de código:

    val flux = Flux.fromIterable(3 downTo 1)
            .map { it.toString() }
            .flatMap { number -> task(number).subscribeOn(Schedulers.parallel()) }
            // use groupJoin to collect the results in sequence 
            .groupBy(Function.identity()) // group by the original value 
            .groupJoin(Flux.range(1, 3),
                    { key -> Mono.just(key) }, // get the grouped Flux for each key
                    { i -> Mono.just(i) }, // get the index Flux
                    { key, group -> group.map { value -> key } } // map to the key for each value in the group
            )
            .flatMap { it } // flatten the Flux of grouped values
            .doOnNext { println("Número de $it") }
            .collectList()
            .doOnNext { println(it) }
            .block()
    

    En este ejemplo, usamos flatMap con subscribeOn(Schedulers.parallel()) para ejecutar las tareas en paralelo. Luego usamos groupBy para agrupar los resultados por el valor original (por ejemplo, “1”, “2”, “3”). También usamos groupJoin para unir los grupos en el orden original, utilizando un Flux separado con enteros del 1 al 3. Finalmente, aplanamos el Flujo de valores agrupados y los recogemos en una lista.

    El resultado debería ser la lista original [3,2,1] en el orden definido por el iterable. Note que los operadores doOnNext se utilizan para imprimir la salida, pero no son necesarios para la operación real.

Comments are closed.