es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo procesar datos históricos utilizando Reactor?

Supongamos que tengo dos fuentes de eventos históricos, y que los eventos de cada fuente están ordenados cronológicamente.

¿Cómo puedo fusionar estas fuentes utilizando Reactor para que los eventos en el Flujo fusionado se emitan en orden cronológico?

En RxNET, se puede utilizar una combinación de Observable.Generate() y HistoricalScheduler para crear un Observable a partir de una fuente de eventos históricos de manera que las emisiones se programen según los tiempos de los eventos (como se detalla aquí), pero no puedo encontrar una forma equivalente en Reactor.

¿Quizás podría utilizar de alguna manera Flux.generate() con VirtualTimeScheduler?

A continuación se muestra un ejemplo sencillo:

public class Program {

public record Event(Instant time, String id) {}

public static void main(String[] args) {

var source1 = Arrays.asList(
    new Event(Instant.ofEpochMilli(10), "a"),
    new Event(Instant.ofEpochMilli(30), "c"),
    new Event(Instant.ofEpochMilli(50), "e")
);

var source2 = Arrays.asList(
    new Event(Instant.ofEpochMilli(20), "b"),
    new Event(Instant.ofEpochMilli(40), "d"),
    new Event(Instant.ofEpochMilli(60), "f")
);

Flux.fromIterable(source1)
    .mergeWith(Flux.fromIterable(source2))
    .subscribe(e -> System.out.println(e.id));

// output actual:
// a
// c
// e
// b
// d
// f

// output deseado:
// a
// b
// c
// d
// e
// f

}
}
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Desafortunadamente, actualmente no existe un equivalente directo de Observable.Generate() y HistoricalScheduler en Reactor. Sin embargo, una forma de combinar las fuentes mientras se conserva el orden cronológico es utilizar el operador sorted() junto con un comparador personalizado que compare los marcos de tiempo Instant de cada evento.

    Aquí tienes un ejemplo de implementación:

    Flux.fromIterable(source1)
        .mergeWith(Flux.fromIterable(source2))
        .sorted(Comparator.comparing(Event::getTime))
        .map(Event::getId)
        .subscribe(System.out::println);
    

    En este fragmento de código, se utiliza Flux.fromIterable() para crear dos instancias separadas de Flux a partir de cada fuente de eventos históricos. Luego, se utiliza el operador mergeWith() para combinar las dos instancias de Flux en una sola. El operador sorted() aplica un comparador personalizado que compara los marcos de tiempo Instant de cada evento y ordena el Flux combinado en consecuencia. Finalmente, se utiliza el operador map() para extraer el campo id de cada objeto Event y se imprime en la consola.

    Esto debería producir el orden cronológico deseado de los eventos:

    a
    b
    c
    d
    e
    f
    

    Ten en cuenta que debido a que el operador sorted() almacena en búfer todos los valores emitidos hasta que el Flux esté completo, este enfoque puede no ser adecuado para fuentes de Flux muy grandes o infinitas. En esos casos, puede ser necesario implementar una solución más especializada utilizando operadores como groupBy() y concat() o mergeSequential().

Comments are closed.