es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo consumir flujo infinito varias veces

Esto es lo que intento lograr:

Cuando alguien solicita `http://localhost/runIt`, me gustaría devolver datos de la caché que se actualizarán cada 6 segundos. A continuación, tengo un flujo (siempre el mismo que se almacena en el mapa) que se instancia por primera vez y comienza a emitir números 0,1,2,3,4… hasta el infinito.

¿Es posible hacer que este método del controlador de Spring MVC devuelva “1,2” en la primera solicitud y luego en la solicitud después de 7 segundos devuelva “3,4”, etc.?

Además, si lastRunIt no se actualiza durante 60 segundos, necesitaría terminar el flujo.

El código siguiente es algo que tenía en mente, pero actualmente no está funcionando en absoluto.

Map<integer,></integer,>> itos = new ConcurrentHashMap<>();
Instant lastRunIt;

@GetMapping(“runIt”)
public Flux runIt(){
lastRunIt = Instant.now();
return itos.computeIfAbsent(1, k ->
Flux.interval(Duration.ZERO, Duration.ofSeconds(3))
.doOnNext(x -> {
//dispose if no request for 60 seconds
if(lastRunIt.plusSeconds(60).isBefore(Instant.now())){
//someDispisable.dispose(); //<— HOW TO GET Disposable here?
}
System.out.println(x);
})
.cache(Duration.ofSeconds(6))
);

}

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Solución:

    Sí, es posible obtener el resultado deseado. El código que se muestra a continuación debería funcionar:

    Map<integer,></integer,><string>> itos = new ConcurrentHashMap<>();
    Instant lastRunIt = Instant.now();
    
    @GetMapping("runIt")
    public Flux<string> runIt() {
        int interval = 2;
        lastRunIt = Instant.now(); //actualiza la hora de la última solicitud
    
        return itos.computeIfAbsent(interval, k -> {
            Flux<string> flux = Flux
                .interval(Duration.ZERO, Duration.ofSeconds(1))
                .map(i -> i.toString()) //transforma los valores long en strings
                .window(Duration.ofSeconds(interval)) //agrupa los valores en ventanas de duración intervalo segundos
                .flatMap(w ->
                    w.take(2) //toma 2 valores de cada ventana
                     .reduce("", (a,b) -> a + "," + b) //concatena los 2 valores en un solo string
                )
                .doOnCancel(() -> System.out.println("Flujo desechado")) //esto se llamará cuando el flujo termine
                .take(Duration.ofSeconds(60)); //termina el flujo después de 60 segundos de inactividad
            return flux;
        })
        .cache(Duration.ofSeconds(6)); //almacena en caché el resultado durante 6 segundos
    }
    

    Explicación:

    El método runIt() devuelve un flujo que emite una secuencia de valores creada por el método Flux.interval(). El operador map() transforma estos valores de long a string. El operador window() agrupa los valores en ventanas de duración intervalo segundos. Luego, el operador flatMap() toma dos valores de cada ventana y los concatena en un solo string. Estos strings se emiten a través del flujo devuelto por runIt(). El operador doOnCancel() se llama cuando el flujo se desecha y, en este caso, imprime un mensaje en la consola. El operador take() garantiza que el flujo se termine si ha estado inactivo durante 60 segundos. Por último, el operador cache() asegura que el resultado se almacene en caché durante 6 segundos.

    El método computeIfAbsent() asegura que se devuelva el mismo flujo para cada valor de intervalo. Esto significa que si el método se llama dos veces con el mismo valor de intervalo, se devolverá el mismo flujo, lo que asegura que la caché se comparta entre las solicitudes con el mismo valor de intervalo. El ConcurrentHashMap asegura que esto sea seguro para subprocesos.

    Ten en cuenta que el operador take(2) en flatMap() toma dos valores de cada ventana. Si deseas devolver un número diferente de valores, puedes modificar este argumento. También ten en cuenta que el operador reduce() concatena los dos valores en un solo string separado por una coma. Si deseas utilizar un separador diferente, puedes modificar este argumento.

Comments are closed.