Tag: PROJECT-REACTOR
Necesito copiar datos de una fuente (en paralelo) a otra en lotes. Hice esto: Flux.generate((SynchronousSink<String> sink) -> { try { String val = dataSource.getNextItem(); if (val == null) { sink.complete(); return; } sink.next(val); } catch (InterruptedException e) { sink.error(e); } }) .parallel(4) .runOn(Schedulers.parallel()) .doOnNext(dataTarget::write) .sequential() .blockLast(); class dataSource{ public Item . . . Read more
Tengo un método que toma una lista de elementos, hace una petición web y luego devuelve los elementos que no pudieron ser procesados (algunos elementos pueden ser procesados correctamente, algunos pueden haber fallado y solo se devuelven los que fallaron o una lista vacía): // devuelve una lista de elementos . . . Read more
Tengo un método java-lang @Service public class MyService { public Mono<Integer> processData() { … // operación reactiva muy larga } } En el flujo normal del programa, llamo a este método de forma asíncrona a través de un evento de Kafka. Para fines de prueba, necesito exponer el método como . . . Read more
Suponga que deseo una llegada de vuelo Flux. El Sink estaría basado en la siguiente clase: class Flight { String code; Date eta; boolean arrived; } Un Map de code -> Flight sería el Sink real. Nuevos vuelos pueden ser añadidos (emit) y eta puede cambiar (causando un emit). Las . . . Read more
Quiero dividir el Flux en múltiples List (o sub-Flux) de manera que cada una contenga hasta 20 elementos. Mi Flux: “a”, “s”, “x”,… Convertido a: List<string>, List<string> o Flux<string>, Flux<string> cada una con hasta 20 elementos.