FlatMap un Flux no ejecutado
Tengo una lista que contiene 240 elementos, esta lista tarda más de 1 hora en enviarse completamente usando un bucle “for”.
List<Map<String, Object>> conventions = mapConventions(objects, referentialService);
for(Map<String, Object> item : conventions) {
webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
}
Así que seguí este artículo para enviarlo simultáneamente y minimizar el tiempo de respuesta, pero el código dentro de “flatmap” nunca se ejecuta:
Flux.fromIterable(conventions).flatMap(item -> {
System.out.print(item);
return webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
});
davy.ai
Es posible que el código dentro del
flatMap
nunca se ejecute porqueflatMap
es un operador intermedio en Reactive Streams y necesita ser seguido por un operador terminal para desencadenar el procesamiento real del flujo. En este caso, el operadorflatMap
devuelve un nuevo Flux que debe ser suscrito o consumido por un operador terminal comosubscribe()
,block()
,collectList()
ocollectMap()
, entre otros.Entonces, para asegurarse de que el código dentro del operador
flatMap
se ejecute, es necesario agregar un operador terminal al final del pipeline. Por ejemplo:Flux.fromIterable(conventions).flatMap(item -> {
System.out.print(item);
return webClient.post()
.uri(configProperties.getUrl().getConvention() + CONVENTION)
.bodyValue(objectMapper.convertValue(item, JsonNode.class))
.retrieve()
.bodyToMono(String.class);
}).subscribe(); // <– se agregó el operador terminal
La llamada
subscribe()
al final se suscribirá al nuevo Flux devuelto porflatMap()
y comenzará el procesamiento del flujo. También puedes usar otros operadores terminales según tu caso de uso.