Cómo emitir un error filtrado si el flujo está vacío.
Tengo código síncrono que quiero convertir en no bloqueante con reactor.
Quiero llamar en paralelo a diferentes URI, las llamadas pueden devolver una respuesta, un error o nada.
Hay 3 casos:
- Una solicitud devuelve una respuesta, la retorno sin esperar a que se completen las otras solicitudes. Si otras solicitudes devolvieron errores anteriormente, descarto los errores.
- Al menos una solicitud devuelve un error y ninguna otra solicitud devuelve una respuesta, retorno un error.
- Todas las solicitudes no devuelven nada (ni respuesta ni error), no retorno nada.
Ya hice esto de manera síncrona:
AtomicReference<webclientresponseexception> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest retorna un Mono que emite una respuesta, un error o nada
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.blockFirst();
return Pair.of(responseBody, responseException.get());
Quiero eliminar la llamada bloqueante y devolver un Mono.
Por lo que entiendo, estoy manteniendo un estado con el error que ocurrió y no puedo tener un estado con reactor.
¿Cómo puedo realizar un seguimiento de los errores que ocurrieron pero no emitirlos, ya que quiero ver si otras solicitudes emiten un resultado más tarde?
¿Funciona esta versión?
AtomicReference<webclientresponseexception> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest retorna un Mono que emite una respuesta, un error o nada
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
¿El AtomicReference se cerrará como un cierre?
(no traduje el código y la salida ya que no es necesario)
davy.ai
Para realizar un seguimiento de los errores sin emitirlos, puedes utilizar el operador
materialize()
para convertir el flujo en un flujo de objetosSignal<t>
, donde cada señal representa un elemento, un error o la finalización de un Mono. Luego puedes utilizar el operadorreduce()
para acumular las señales y realizar un seguimiento de los errores que ocurran, y utilizar el operadorfilter()
para eliminar las señales que representen un Mono vacío. Por último, puedes utilizar el operadorflatMap()
para invocar el métodosendRequest()
para cada URL, y utilizar el operadorthen()
para ignorar los elementos emitidos por el Mono resultante y solo considerar su señal de finalización.Aquí tienes un ejemplo de cómo puedes implementar esto:
En esta implementación, el operador
reduce()
acumula las señales y devuelve la primera señal de error, si la hay. Si no hay señales de error, devuelve el primer elemento o señal de finalización. La señal resultante se procesa luego por el operadorflatMap()
, que devuelve un error si la señal es una señal de error, un Mono con un AtomicReference vacío si la señal es una señal de finalización, o un Mono con el elemento si la señal es una señal de elemento. El operadorfilter()
elimina cualquier Mono vacío, y el último operadorflatMap()
devuelve un flujo de cuerpos de respuesta sin errores.En cuanto al AtomicReference, será capturado por el cierre y compartido entre las diferentes invocaciones del operador
onErrorContinue()
. Sin embargo, debes tener en cuenta que el operadorMono.defer()
debe utilizarse para diferir la creación del Mono de error hasta que realmente se necesite, para evitar invocarresponseException.get()
cuando no haya señal de error.