es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Flujo – flatMap paralelo con webclient – límite a una tasa de lotes fija limitada.

El código que tengo es este:

return Flux.fromIterable(new Generator()).log()
        .flatMap(
             s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //solo solicitar 6 segmentos en paralelo a través de webClient
        .take(6) //solo necesitamos 6 respuestas 200 OK
        .sort();

Solo solicita HEAD, hasta que se hayan realizado las primeras 6 solicitudes correctamente.

La paralelización funciona aquí, pero el problema es que después de que una de las solicitudes se completa, inmediatamente activa la siguiente solicitud (para mantener el nivel de paralelización de 6). Lo que necesito aquí es tener un nivel de paralelización de 6, pero por lotes. Es decir, activar 6 solicitudes, esperar hasta que todas se completen, activar nuevamente 6 solicitudes …

Este es el resultado de la función log() anterior:

: | request(6)
: | onNext(7)
: | onNext(17)
: | onNext(27)
: | onNext(37)
: | onNext(47)
: | onNext(57)
: | request(1) <---- desde aquí NO OK; esperar hasta que todas se completen y activar solicitud(6)
: | onNext(8)
: | request(1)
: | onNext(18)
: | request(1)
: | onNext(28)
: | request(1)
: | onNext(38)
: | request(1)
: | onNext(48)
: | request(1)
: | onNext(58)
: | cancel()

ACTUALIZACIÓN:

Esto es lo que intenté con el buffer:

return Flux.fromIterable(new Generator())
        .buffer(6)
        .flatMap(Flux::fromIterable)
        .log()
        .flatMap(
             s ->
                webClient
                    .head()
                    .uri(
                        MessageFormat.format(
                            "/my-{2,number,#00}.xml",
                            channel, timestamp, s))
                    .exchangeToMono(r -> Mono.just(r.statusCode()))
                    .filter(HttpStatus::is2xxSuccessful)
                    .map(r -> s),
            6)  //solo solicitar 6 segmentos en paralelo a través de webClient
        .take(6)
        .sort();
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El código inicial solicita HEAD hasta que se realicen exitosamente las primeras 6 solicitudes, manteniendo un nivel de paralelización de 6. Sin embargo, inmediatamente activa la siguiente solicitud después de la finalización de una solicitud.

    Para resolver este problema y activar 6 solicitudes, esperar hasta que todas estén completas, y luego activar otras 6 solicitudes, podemos hacer uso del operador buffer. El operador buffer puede agrupar elementos de una fuente Flux en lotes de tamaño fijo.

    En el código actualizado, envolvemos el iterable con buffer(6) para formar lotes de tamaño 6. Luego aplanamos los lotes con flatMap(Flux::fromIterable) antes de continuar con el resto del código. El operador flatMap esperará hasta que se completen las 6 solicitudes antes de pasar al siguiente lote.

    El código actualizado es el siguiente:

    return Flux.fromIterable(new Generator())
            .buffer(6)
            .flatMap(Flux::fromIterable)
            .log()
            .flatMap(
                s ->
                    webClient
                        .head()
                        .uri(
                            MessageFormat.format(
                                "/my-{2,number,#00}.xml",
                                channel, timestamp, s))
                        .exchangeToMono(r -> Mono.just(r.statusCode()))
                        .filter(HttpStatus::is2xxSuccessful)
                        .map(r -> s),
                6)
            .take(6)
            .sort();
    

Comments are closed.