Reactores de hundimiento. Muchos con repetición variable.
Suponga que deseo una llegada de vuelo Flux
. El Sink
estaría basado en la siguiente clase:
class Flight {
String code;
Date eta;
boolean arrived;
}
Un Map
de code
-> Flight
sería el Sink
real. Nuevos vuelos pueden ser añadidos (emit
) y eta
puede cambiar (causando un emit
). Las llegadas se emiten como arrived=true
y luego se eliminan del Map
en la llegada del vuelo (por simplicidad).
Si yo tuviera un Sinks.many().multicast().replay()
, ¿cómo puedo eliminar cambios previos en eta
y eliminar los vuelos “arrived” de los internos del Sink
, de tal forma que un nuevo Subscriber
no obtenga “todo”, pero sí lo último de cada Flight
entrante, menos las llegadas?
Sospecho que necesitaría crear un nuevo Sinks.many().unicast()
para cada nuevo Subscriber
y emitir
el contenido más reciente del Map
al nuevo Subscriber
y continuar emit
actualizaciones de eventos de cambio del Map
.
Sin embargo, ¿cómo hago un seguimiento de estos Sinks
para saber a cuáles debo emitir
? ¿Y cómo sé cuando un Sink
se desuscribe o se dispone, para que pueda detener el emit
a ese Sink
desechado?
Podría hacer el seguimiento de los suscriptores a través de Flux.create(FluxSink)
, onRequest()
, onDispose()
, etc., lo que efectivamente me dio un unicast()
. ¿Me falta entender la diferencia entre FluxSink
y Sinks
?
davy.ai
Para lograr el comportamiento deseado, puedes crear un
Flux
utilizandoSinks.many().multicast().replay()
y aplicar operaciones de transformación en él.Con esta implementación, cada nuevo suscriptor solo recibirá los últimos objetos
Flight
entrantes, menos los que llegaron y los cambios previos eneta
. No es necesario crear un nuevoSinks.many().unicast()
para cada nuevo suscriptor.Puedes realizar un seguimiento de los suscriptores llamando a los métodos
doOnSubscribe
ydoOnCancel
en elFlux
creado a partir del operadorSinks.many()
. Cuando un suscriptor cancela su suscripción, puedes anular el registro en el Sink de origen.FluxSink
ySinks
son similares en el sentido de que ambos te permiten emitir objetos a múltiples suscriptores. La principal diferencia es queFluxSink
es una API basada en callback que se controla manualmente, mientras queSinks
es una API declarativa que proporciona varios tipos de Sinks para diferentes casos de uso.