es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Reactores de hundimiento. Muchos con repetición variable.

Suponga que deseo una llegada de vuelo Flux. El Sink estaría basado en la siguiente clase:


class Flight {
String code;
Date eta;
boolean arrived;
}

Un Map de code -> Flight sería el Sink real. Nuevos vuelos pueden ser añadidos (emit) y eta puede cambiar (causando un emit). Las llegadas se emiten como arrived=true y luego se eliminan del Map en la llegada del vuelo (por simplicidad).

Si yo tuviera un Sinks.many().multicast().replay(), ¿cómo puedo eliminar cambios previos en eta y eliminar los vuelos “arrived” de los internos del Sink, de tal forma que un nuevo Subscriber no obtenga “todo”, pero sí lo último de cada Flight entrante, menos las llegadas?

Sospecho que necesitaría crear un nuevo Sinks.many().unicast() para cada nuevo Subscriber y emitir el contenido más reciente del Map al nuevo Subscriber y continuar emit actualizaciones de eventos de cambio del Map.

Sin embargo, ¿cómo hago un seguimiento de estos Sinks para saber a cuáles debo emitir? ¿Y cómo sé cuando un Sink se desuscribe o se dispone, para que pueda detener el emit a ese Sink desechado?

Podría hacer el seguimiento de los suscriptores a través de Flux.create(FluxSink), onRequest(), onDispose(), etc., lo que efectivamente me dio un unicast(). ¿Me falta entender la diferencia entre FluxSink y Sinks?

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para lograr el comportamiento deseado, puedes crear un Flux utilizando Sinks.many().multicast().replay() y aplicar operaciones de transformación en él.

    Sinks.Many<flight> source = Sinks.many().multicast().replay();
    
    Flux<flight> arrivals = source.asFlux()
        .distinctUntilChanged(Flight::getCode) // solo emite el último vuelo entrante con el mismo código
        .filter(flight -> !flight.arrived) // filtra los vuelos llegados
        .map(flight -> new Flight(flight.code, flight.eta, false)); // crea una copia del objeto Flight para eliminar los cambios previos en eta
    
    // agrega un nuevo vuelo al Sink de origen
    void addFlight(Flight flight) {
        source.tryEmitNext(flight);
    }
    
    // actualiza eta de un vuelo ya existente en el Sink de origen
    void updateFlight(Flight flight) {
        // encuentra el vuelo en el Sink de origen y actualiza su eta
        // mientras emite el vuelo actualizado para activar la tubería de transformación
        source.tryEmitNext(source.tryPollNext()
            .map(f -> {
                if (f.code.equals(flight.code)) {
                    f.eta = flight.eta;
                }
                return f;
            })
            .orElse(null));
    }
    
    // marca un vuelo como llegado y lo elimina del Sink de origen
    void removeFlight(String code) {
        source.tryEmitNext(source.tryPollNext()
            .filter(f -> !f.code.equals(code))
            .orElse(null));
    }
    

    Con esta implementación, cada nuevo suscriptor solo recibirá los últimos objetos Flight entrantes, menos los que llegaron y los cambios previos en eta. No es necesario crear un nuevo Sinks.many().unicast() para cada nuevo suscriptor.

    Puedes realizar un seguimiento de los suscriptores llamando a los métodos doOnSubscribe y doOnCancel en el Flux creado a partir del operador Sinks.many(). Cuando un suscriptor cancela su suscripción, puedes anular el registro en el Sink de origen.

    Set<long> subscriberIds = new HashSet<>(); // realiza un seguimiento de los id de los suscriptores
    
    Flux<flight> arrivals = source.asFlux()
        .doOnSubscribe(subscription -> {
            long id = subscription.hashCode();
            subscriberIds.add(id);
            subscription.onCancel(() -> {
                subscriberIds.remove(id);
                // verifica si no hay más suscriptores y elimina el Sink de origen
                if (subscriberIds.isEmpty()) {
                    source.tryEmitComplete();
                }
            });
        })
        .distinctUntilChanged(Flight::getCode)
        .filter(flight -> !flight.arrived)
        .map(flight -> new Flight(flight.code, flight.eta, false));
    
    // agrega un nuevo vuelo al Sink de origen y lo emite a todos los suscriptores
    void addFlight(Flight flight) {
        source.tryEmitNext(flight);
        for (Long id : subscriberIds) {
            arrivals.subscribeOn(Schedulers.parallel()).subscribe();
        }
    }
    
    // actualiza eta de un vuelo ya existente en el Sink de origen y lo emite a todos los suscriptores
    void updateFlight(Flight flight) {
        source.tryEmitNext(source.tryPollNext()
            .map(f -> {
                if (f.code.equals(flight.code)) {
                    f.eta = flight.eta;
                }
                return f;
            })
            .orElse(null));
        for (Long id : subscriberIds) {
            arrivals.subscribeOn(Schedulers.parallel()).subscribe();
        }
    }
    
    // marca un vuelo como llegado y lo elimina del Sink de origen y emite los vuelos actualizados a todos los suscriptores
    void removeFlight(String code) {
        source.tryEmitNext(source.tryPollNext()
            .filter(f -> !f.code.equals(code))
            .orElse(null));
        for (Long id : subscriberIds) {
            arrivals.subscribeOn(Schedulers.parallel()).subscribe();
        }
    }
    

    FluxSink y Sinks son similares en el sentido de que ambos te permiten emitir objetos a múltiples suscriptores. La principal diferencia es que FluxSink es una API basada en callback que se controla manualmente, mientras que Sinks es una API declarativa que proporciona varios tipos de Sinks para diferentes casos de uso.

Comments are closed.