Cómo procesar datos históricos utilizando Reactor?
Supongamos que tengo dos fuentes de eventos históricos, y que los eventos de cada fuente están ordenados cronológicamente.
¿Cómo puedo fusionar estas fuentes utilizando Reactor para que los eventos en el Flujo fusionado se emitan en orden cronológico?
En RxNET, se puede utilizar una combinación de Observable.Generate()
y HistoricalScheduler
para crear un Observable
a partir de una fuente de eventos históricos de manera que las emisiones se programen según los tiempos de los eventos (como se detalla aquí), pero no puedo encontrar una forma equivalente en Reactor.
¿Quizás podría utilizar de alguna manera Flux.generate()
con VirtualTimeScheduler
?
A continuación se muestra un ejemplo sencillo:
public class Program {
public record Event(Instant time, String id) {}
public static void main(String[] args) {
var source1 = Arrays.asList(
new Event(Instant.ofEpochMilli(10), "a"),
new Event(Instant.ofEpochMilli(30), "c"),
new Event(Instant.ofEpochMilli(50), "e")
);
var source2 = Arrays.asList(
new Event(Instant.ofEpochMilli(20), "b"),
new Event(Instant.ofEpochMilli(40), "d"),
new Event(Instant.ofEpochMilli(60), "f")
);
Flux.fromIterable(source1)
.mergeWith(Flux.fromIterable(source2))
.subscribe(e -> System.out.println(e.id));
// output actual:
// a
// c
// e
// b
// d
// f
// output deseado:
// a
// b
// c
// d
// e
// f
}
}
davy.ai
Desafortunadamente, actualmente no existe un equivalente directo de
Observable.Generate()
yHistoricalScheduler
en Reactor. Sin embargo, una forma de combinar las fuentes mientras se conserva el orden cronológico es utilizar el operadorsorted()
junto con un comparador personalizado que compare los marcos de tiempoInstant
de cada evento.Aquí tienes un ejemplo de implementación:
En este fragmento de código, se utiliza
Flux.fromIterable()
para crear dos instancias separadas de Flux a partir de cada fuente de eventos históricos. Luego, se utiliza el operadormergeWith()
para combinar las dos instancias de Flux en una sola. El operadorsorted()
aplica un comparador personalizado que compara los marcos de tiempoInstant
de cada evento y ordena el Flux combinado en consecuencia. Finalmente, se utiliza el operadormap()
para extraer el campoid
de cada objetoEvent
y se imprime en la consola.Esto debería producir el orden cronológico deseado de los eventos:
Ten en cuenta que debido a que el operador
sorted()
almacena en búfer todos los valores emitidos hasta que el Flux esté completo, este enfoque puede no ser adecuado para fuentes de Flux muy grandes o infinitas. En esos casos, puede ser necesario implementar una solución más especializada utilizando operadores comogroupBy()
yconcat()
omergeSequential()
.