es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo emitir un error filtrado si el flujo está vacío.

Tengo código síncrono que quiero convertir en no bloqueante con reactor.

Quiero llamar en paralelo a diferentes URI, las llamadas pueden devolver una respuesta, un error o nada.

Hay 3 casos:

  • Una solicitud devuelve una respuesta, la retorno sin esperar a que se completen las otras solicitudes. Si otras solicitudes devolvieron errores anteriormente, descarto los errores.
  • Al menos una solicitud devuelve un error y ninguna otra solicitud devuelve una respuesta, retorno un error.
  • Todas las solicitudes no devuelven nada (ni respuesta ni error), no retorno nada.

Ya hice esto de manera síncrona:

AtomicReference<webclientresponseexception> responseException = new AtomicReference<>();

String responseBody = Flux.fromIterable(uriList)
        .flatMap(url -> repo.sendRequest(uri)) 
        // sendRequest retorna un Mono que emite una respuesta, un error o nada
        .onErrorContinue(WebClientResponseException.class, (error, element) -> {
            var webclientError = (WebClientResponseException) error;
            responseException.set(webclientError);
        })
        .blockFirst();

return Pair.of(responseBody, responseException.get());

Quiero eliminar la llamada bloqueante y devolver un Mono.

Por lo que entiendo, estoy manteniendo un estado con el error que ocurrió y no puedo tener un estado con reactor.

¿Cómo puedo realizar un seguimiento de los errores que ocurrieron pero no emitirlos, ya que quiero ver si otras solicitudes emiten un resultado más tarde?

¿Funciona esta versión?

AtomicReference<webclientresponseexception> responseException = new AtomicReference<>();

return Flux.fromIterable(uriList)
        .flatMap(url -> repo.sendRequest(uri)) 
        // sendRequest retorna un Mono que emite una respuesta, un error o nada
        .onErrorContinue(WebClientResponseException.class, (error, element) -> {
            var webclientError = (WebClientResponseException) error;
            responseException.set(webclientError);
        })
        .next()
        .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));

¿El AtomicReference se cerrará como un cierre?

(no traduje el código y la salida ya que no es necesario)

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para realizar un seguimiento de los errores sin emitirlos, puedes utilizar el operador materialize() para convertir el flujo en un flujo de objetos Signal<t>, donde cada señal representa un elemento, un error o la finalización de un Mono. Luego puedes utilizar el operador reduce() para acumular las señales y realizar un seguimiento de los errores que ocurran, y utilizar el operador filter() para eliminar las señales que representen un Mono vacío. Por último, puedes utilizar el operador flatMap() para invocar el método sendRequest() para cada URL, y utilizar el operador then() para ignorar los elementos emitidos por el Mono resultante y solo considerar su señal de finalización.

    Aquí tienes un ejemplo de cómo puedes implementar esto:

    return Flux.fromIterable(uriList)
        .flatMap(url -> repo.sendRequest(uri).materialize())
        .reduce((signal1, signal2) -> {
            if (signal1.isOnError()) {
                return signal1;
            } else if (signal2.isOnError()) {
                return signal2;
            } else {
                return signal1;
            }
        })
        .flatMap(signal -> {
            if (signal.isOnError()) {
                return Mono.error(signal.getThrowable());
            } else if (signal.isOnComplete()) {
                AtomicReference<webclientresponseexception> responseException = new AtomicReference<>();
                return Mono.just(responseException).then();
            } else {
                return Mono.just(signal.get());
            }
        })
        .filter(signal -> !signal.getType().equals(SignalType.ON_COMPLETE))
        .flatMap(signal -> Mono.justOrEmpty(signal.get()))
    

    En esta implementación, el operador reduce() acumula las señales y devuelve la primera señal de error, si la hay. Si no hay señales de error, devuelve el primer elemento o señal de finalización. La señal resultante se procesa luego por el operador flatMap(), que devuelve un error si la señal es una señal de error, un Mono con un AtomicReference vacío si la señal es una señal de finalización, o un Mono con el elemento si la señal es una señal de elemento. El operador filter() elimina cualquier Mono vacío, y el último operador flatMap() devuelve un flujo de cuerpos de respuesta sin errores.

    En cuanto al AtomicReference, será capturado por el cierre y compartido entre las diferentes invocaciones del operador onErrorContinue(). Sin embargo, debes tener en cuenta que el operador Mono.defer() debe utilizarse para diferir la creación del Mono de error hasta que realmente se necesite, para evitar invocar responseException.get() cuando no haya señal de error.

Comments are closed.