es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo hacer que Flux siempre solicite un elemento adelante

He creado un Flux que utiliza subscribeOn(Subscribers.boundedElastic). Está envolviendo un Iterator que he creado. Lo que quisiera es que el Flux solicite el siguiente elemento en un hilo de trabajo mientras que el Subscriber está trabajando en el resultado actual. Idealmente, el Flux llamaría a hasNext() -> next() -> hasNext() y LUEGO enviaría el resultado de la llamada a next() al suscriptor. Intenté hacer esto con publish(2) pensando que la precarga de 2 haría el truco pero no pareció hacer lo que quería. Aquí está el código para crear el Flux.

java
@Override
public Flux<searchresponse> createDeepQueryFlux(@NonNull PITSearchInput input) {
validator.validate(input);
return Flux.<searchresponse, pointintimeiterator="">generate(
() -> new PointInTimeIterator(forwardingRestHighLevelClient, input),
(deepQueryIterator, sink) -> {
log.info("Generate called");
if (deepQueryIterator.hasNext()) {
sink.next(deepQueryIterator.next());
} else {
sink.complete();
}
return deepQueryIterator;
},
(deepQueryIterator) -> deepQueryIterator.shutdown())
.subscribeOn(Schedulers.boundedElastic());
}
</searchresponse,>

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para lograr el comportamiento deseado de solicitar el siguiente elemento en un hilo de trabajo mientras el suscriptor está trabajando en el resultado actual, puedes usar el operador flatMap junto con subscribeOn y publishOn.

    Aquí tienes un ejemplo de código:

    @Override
    public Flux<searchresponse> createDeepQueryFlux(@NonNull PITSearchInput input) {
      validator.validate(input);
      return Flux.create(emitter -> {
        PointInTimeIterator iterator = new PointInTimeIterator(forwardingRestHighLevelClient, input);
        emitter.onDispose(iterator::shutdown);
        emitter.onRequest(n -> {
          if (n == Long.MAX_VALUE) {
            // solicitudes sin límite, podemos usar nuestro iterador directamente
            while (iterator.hasNext()) {
              emitter.next(iterator.next());
            }
            emitter.complete();
          } else {
            // solicitud limitada, usemos flatMap para lograr paralelismo
            Flux.range(1, n)
              .flatMap(i -> Mono.fromCallable(iterator::next)
                .subscribeOn(Schedulers.parallel())
                .publishOn(Schedulers.single())
              )
              .takeWhile(Objects::nonNull)
              .subscribe(emitter::next, emitter::error, emitter::complete);
          }
        });
      });
    }
    

    En esta implementación, usamos Flux.create para gestionar manualmente el iterador y las solicitudes. onDispose asegura que el iterador se cierre correctamente cuando el flujo se cancela.

    Para solicitudes sin límite, simplemente iteramos sobre el iterador y emitimos los resultados usando emitter.next.

    Para solicitudes limitadas, usamos Flux.range para crear un número fijo de solicitudes. Luego, usamos flatMap para crear un flujo paralelo de instancias de Mono que llaman a iterator.next() en un hilo de trabajo. Usamos subscribeOn para especificar que el Mono debe ejecutarse en un hilo de trabajo y publishOn para especificar que el resultado debe emitirse en un solo hilo, para evitar problemas de concurrencia. Además, usamos takeWhile para asegurarnos de que no se emitan resultados nulos, ya que nuestro iterador devuelve nulo cuando no hay más elementos.

    Al usar flatMap con subscribeOn y publishOn, podemos lograr paralelismo al tiempo que nos aseguramos de que los resultados se emitan de manera controlada y segura.

Comments are closed.