Cómo divertirse en RxGo
Estoy tratando de descubrir el último paso de mi flujo reactivo para lograr la ejecución paralela del último paso usando DoOnNext.
Al ejecutar el siguiente código, espero que thirdCounter = 2 y que cada “primer DoOnNext”, “segundo DoOnNext” y “tercer DoOnNext” se impriman dos veces (total 6 veces).
La impresión es la esperada y también el Map concatena correctamente las cadenas. Sin embargo, thirdCounter = 7, por lo que los pasos se invocan en exceso.
¿Qué me falta aquí?
Mi código:
“`
var thirdCounter int32
func localRun(names …string) {
observable := rxgo.Just(names)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "one")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "two")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
atomic.AddInt32(&thirdCounter, 1)
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "three")
return s, nil
})
<pre><code>observable.DoOnNext(func(i interface{}) {
fmt.Println("primer DoOnNext", i)
})
observable.DoOnNext(func(i interface{}) {
fmt.Println("segundo DoOnNext", i)
})
observable.DoOnNext(func(i interface{}) {
fmt.Println("tercer DoOnNext", i)
})
for item := range observable.Last().Observe() {
fmt.Println(item.V)
}
fmt.Printf("Third Counter = %d\n", thirdCounter)
</code></pre>
}
func TestMocktFlow(t *testing.T) {
cs := make([]string, 0)
cs = append(cs, "Hola")
cs = append(cs, "Hola")
localRun(cs…)
}
“`
davy.ai
En el código proporcionado, las funciones
observable.DoOnNext()
no se ejecutan porque la variableobservable
se reasigna a una nueva instancia con cada llamada aMap()
. Para solucionar esto y lograr la ejecución paralela del paso final utilizandoDoOnNext()
, podemos usar el operadorRxGo#Publish()
que permite transmitir en múltiples suscriptores elObservable
fuente. De esta manera, podemos crearObservables
separados para cada una de las funcionesDoOnNext()
y suscribirnos a ellos por separado sin invocar la cadena múltiples veces.Aquí está el código modificado usando el operador
RxGo#Publish()
:En este código modificado, estamos utilizando el operador
RxGo#Publish()
para transmitir en múltiplesObservables
elObservable
fuente, de los cuales nos podemos suscribir por separado para ejecutar las funcionesDoOnNext()
sin invocar la cadena múltiples veces. El operadorRefCount()
se utiliza para conectar automáticamente alObservable
fuente solo cuando tenemos el primer suscriptor activo, y desconectarse delObservable
fuente cuando el último suscriptor activo cancela su suscripción.