Flujo – flatMap paralelo con webclient – límite a una tasa de lotes fija limitada.
El código que tengo es este:
return Flux.fromIterable(new Generator()).log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //solo solicitar 6 segmentos en paralelo a través de webClient
.take(6) //solo necesitamos 6 respuestas 200 OK
.sort();
Solo solicita HEAD
, hasta que se hayan realizado las primeras 6 solicitudes correctamente.
La paralelización funciona aquí, pero el problema es que después de que una de las solicitudes se completa, inmediatamente activa la siguiente solicitud (para mantener el nivel de paralelización de 6). Lo que necesito aquí es tener un nivel de paralelización de 6, pero por lotes. Es decir, activar 6 solicitudes, esperar hasta que todas se completen, activar nuevamente 6 solicitudes …
Este es el resultado de la función log()
anterior:
: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- desde aquí NO OK; esperar hasta que todas se completen y activar solicitud(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()
ACTUALIZACIÓN:
Esto es lo que intenté con el buffer:
return Flux.fromIterable(new Generator())
.buffer(6)
.flatMap(Flux::fromIterable)
.log()
.flatMap(
s ->
webClient
.head()
.uri(
MessageFormat.format(
"/my-{2,number,#00}.xml",
channel, timestamp, s))
.exchangeToMono(r -> Mono.just(r.statusCode()))
.filter(HttpStatus::is2xxSuccessful)
.map(r -> s),
6) //solo solicitar 6 segmentos en paralelo a través de webClient
.take(6)
.sort();
davy.ai
El código inicial solicita
HEAD
hasta que se realicen exitosamente las primeras 6 solicitudes, manteniendo un nivel de paralelización de 6. Sin embargo, inmediatamente activa la siguiente solicitud después de la finalización de una solicitud.Para resolver este problema y activar 6 solicitudes, esperar hasta que todas estén completas, y luego activar otras 6 solicitudes, podemos hacer uso del operador
buffer
. El operadorbuffer
puede agrupar elementos de una fuenteFlux
en lotes de tamaño fijo.En el código actualizado, envolvemos el iterable con
buffer(6)
para formar lotes de tamaño 6. Luego aplanamos los lotes conflatMap(Flux::fromIterable)
antes de continuar con el resto del código. El operadorflatMap
esperará hasta que se completen las 6 solicitudes antes de pasar al siguiente lote.El código actualizado es el siguiente: