es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Variante Flux.switchOnNext que cambia cuando el próximo publicador emite en lugar de cuando es emitido.

Reactor tiene el operador switchOnNext que refleja una secuencia de editores, cancelando la suscripción anterior cada vez que un nuevo editor está disponible:

Para mi caso de uso, necesito una variación de esta temática, donde en lugar de cancelar el primer editor antes de suscribirme al siguiente, continúo reflejando la secuencia del editor 1 hasta el punto en que el editor 2 emite su primer elemento y solo en ese momento hago el cambio, como se muestra en este diagrama de mármol (para cualquier persona que encuentre esta pregunta más tarde, esto no es un diagrama de un operador existente en la documentación de Reactor, es uno que acabo de esbozar yo mismo):

Aprecio que en el caso general esto podría implicar que el operador mantenga un número ilimitado de suscripciones esperando a que uno de ellos emita antes de cancelar los demás, pero para mi caso de uso, sé que el flujo inicial de flujos es finito (por lo que no necesariamente necesito una solución de publicador de publicadores completamente general, algo que funcione para una lista finita de N editores sería suficiente).

¿Alguien puede ver una combinación ingeniosa de los operadores existentes que implemente este comportamiento o necesito escribirlo desde cero?

Tags:  

Answer

  1. Avatar for davy.ai

    Hasta donde sé, no existe un operador existente en Reactor que implemente el comportamiento que estás describiendo. Sin embargo, debería ser posible escribir dicho operador utilizando una combinación de los operadores concat y takeUntilOther.

    La idea básica sería concatenar las emisiones del primer publicador con las emisiones del segundo publicador una vez que emita su primer elemento. El operador takeUntilOther se utilizaría para asegurarse de que las emisiones del primer publicador se detengan una vez que el segundo publicador emita su primer elemento.

    Aquí tienes un ejemplo de código que implementa este comportamiento:

    Flux<><t>> source = ...; // El flujo de flujo
    
    Flux<t> result = source
        .concatMap(publisher -> {
            Flux<t> flux = Flux.from(publisher);
    
            // Utilizar una señal especial "never" para representar la ausencia de una "emisión"
            Flux<t> never = Mono.never();
    
            // Tomar hasta la señal "never" si este es el primer publicador en la secuencia,
            // de lo contrario, tomar hasta el primer elemento del siguiente publicador
            return flux.takeUntilOther(Flux.concat(Flux.just(never), source.next().flatMapMany(next -> Flux.from(next))));
        });
    

    La expresión source.next().flatMapMany(next -> Flux.from(next)) se utiliza para obtener el siguiente publicador en la secuencia y convertirlo en un Flux. La expresión Flux.concat(Flux.just(never), ...) se utiliza para agregar una señal “never” a las emisiones del siguiente publicador, asegurando que el operador takeUntilOther no detenga inmediatamente las emisiones del publicador actual.

Comments are closed.