Cómo hacer que Flux siempre solicite un elemento adelante
He creado un Flux
que utiliza subscribeOn(Subscribers.boundedElastic)
. Está envolviendo un Iterator
que he creado. Lo que quisiera es que el Flux
solicite el siguiente elemento en un hilo de trabajo mientras que el Subscriber
está trabajando en el resultado actual. Idealmente, el Flux
llamaría a hasNext() -> next() -> hasNext() y LUEGO enviaría el resultado de la llamada a next() al suscriptor. Intenté hacer esto con publish(2)
pensando que la precarga de 2 haría el truco pero no pareció hacer lo que quería. Aquí está el código para crear el Flux.
java
</searchresponse,>
@Override
public Flux<searchresponse> createDeepQueryFlux(@NonNull PITSearchInput input) {
validator.validate(input);
return Flux.<searchresponse, pointintimeiterator="">generate(
() -> new PointInTimeIterator(forwardingRestHighLevelClient, input),
(deepQueryIterator, sink) -> {
log.info("Generate called");
if (deepQueryIterator.hasNext()) {
sink.next(deepQueryIterator.next());
} else {
sink.complete();
}
return deepQueryIterator;
},
(deepQueryIterator) -> deepQueryIterator.shutdown())
.subscribeOn(Schedulers.boundedElastic());
}
davy.ai
Para lograr el comportamiento deseado de solicitar el siguiente elemento en un hilo de trabajo mientras el suscriptor está trabajando en el resultado actual, puedes usar el operador
flatMap
junto consubscribeOn
ypublishOn
.Aquí tienes un ejemplo de código:
En esta implementación, usamos
Flux.create
para gestionar manualmente el iterador y las solicitudes.onDispose
asegura que el iterador se cierre correctamente cuando el flujo se cancela.Para solicitudes sin límite, simplemente iteramos sobre el iterador y emitimos los resultados usando
emitter.next
.Para solicitudes limitadas, usamos
Flux.range
para crear un número fijo de solicitudes. Luego, usamosflatMap
para crear un flujo paralelo de instancias deMono
que llaman aiterator.next()
en un hilo de trabajo. UsamossubscribeOn
para especificar que elMono
debe ejecutarse en un hilo de trabajo ypublishOn
para especificar que el resultado debe emitirse en un solo hilo, para evitar problemas de concurrencia. Además, usamostakeWhile
para asegurarnos de que no se emitan resultados nulos, ya que nuestro iterador devuelve nulo cuando no hay más elementos.Al usar
flatMap
consubscribeOn
ypublishOn
, podemos lograr paralelismo al tiempo que nos aseguramos de que los resultados se emitan de manera controlada y segura.