Cómo consumir flujo infinito varias veces
Esto es lo que intento lograr:
Cuando alguien solicita `http://localhost/runIt`, me gustaría devolver datos de la caché que se actualizarán cada 6 segundos. A continuación, tengo un flujo (siempre el mismo que se almacena en el mapa) que se instancia por primera vez y comienza a emitir números 0,1,2,3,4… hasta el infinito.
¿Es posible hacer que este método del controlador de Spring MVC devuelva “1,2” en la primera solicitud y luego en la solicitud después de 7 segundos devuelva “3,4”, etc.?
Además, si lastRunIt
no se actualiza durante 60 segundos, necesitaría terminar el flujo.
El código siguiente es algo que tenía en mente, pero actualmente no está funcionando en absoluto.
Map<integer,></integer,>
Instant lastRunIt;
@GetMapping(“runIt”)
public Flux
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k ->
Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnNext(x -> {
//dispose if no request for 60 seconds
if(lastRunIt.plusSeconds(60).isBefore(Instant.now())){
//someDispisable.dispose(); //<— HOW TO GET Disposable here?
}
System.out.println(x);
})
.cache(Duration.ofSeconds(6))
);
}
davy.ai
Solución:
Sí, es posible obtener el resultado deseado. El código que se muestra a continuación debería funcionar:
Explicación:
El método
runIt()
devuelve un flujo que emite una secuencia de valores creada por el métodoFlux.interval()
. El operadormap()
transforma estos valores de long a string. El operadorwindow()
agrupa los valores en ventanas de duraciónintervalo
segundos. Luego, el operadorflatMap()
toma dos valores de cada ventana y los concatena en un solo string. Estos strings se emiten a través del flujo devuelto porrunIt()
. El operadordoOnCancel()
se llama cuando el flujo se desecha y, en este caso, imprime un mensaje en la consola. El operadortake()
garantiza que el flujo se termine si ha estado inactivo durante 60 segundos. Por último, el operadorcache()
asegura que el resultado se almacene en caché durante 6 segundos.El método
computeIfAbsent()
asegura que se devuelva el mismo flujo para cada valor de intervalo. Esto significa que si el método se llama dos veces con el mismo valor de intervalo, se devolverá el mismo flujo, lo que asegura que la caché se comparta entre las solicitudes con el mismo valor de intervalo. ElConcurrentHashMap
asegura que esto sea seguro para subprocesos.Ten en cuenta que el operador
take(2)
enflatMap()
toma dos valores de cada ventana. Si deseas devolver un número diferente de valores, puedes modificar este argumento. También ten en cuenta que el operadorreduce()
concatena los dos valores en un solo string separado por una coma. Si deseas utilizar un separador diferente, puedes modificar este argumento.