es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo divertirse en RxGo

Estoy tratando de descubrir el último paso de mi flujo reactivo para lograr la ejecución paralela del último paso usando DoOnNext.

Al ejecutar el siguiente código, espero que thirdCounter = 2 y que cada “primer DoOnNext”, “segundo DoOnNext” y “tercer DoOnNext” se impriman dos veces (total 6 veces).

La impresión es la esperada y también el Map concatena correctamente las cadenas. Sin embargo, thirdCounter = 7, por lo que los pasos se invocan en exceso.

¿Qué me falta aquí?

Mi código:
“`
var thirdCounter int32
func localRun(names …string) {
observable := rxgo.Just(names)().
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "one")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "two")
return s, nil
}).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
atomic.AddInt32(&thirdCounter, 1)
s := i.(string)
s = fmt.Sprintf("%s,%s", s, "three")
return s, nil
})

<pre><code>observable.DoOnNext(func(i interface{}) {
fmt.Println("primer DoOnNext", i)
})

observable.DoOnNext(func(i interface{}) {
fmt.Println("segundo DoOnNext", i)
})

observable.DoOnNext(func(i interface{}) {
fmt.Println("tercer DoOnNext", i)
})

for item := range observable.Last().Observe() {
fmt.Println(item.V)
}
fmt.Printf("Third Counter = %d\n", thirdCounter)
</code></pre>

}
func TestMocktFlow(t *testing.T) {
cs := make([]string, 0)
cs = append(cs, "Hola")
cs = append(cs, "Hola")
localRun(cs…)
}
“`

Tags:  ,

Answer

  1. Avatar for davy.ai

    En el código proporcionado, las funciones observable.DoOnNext() no se ejecutan porque la variable observable se reasigna a una nueva instancia con cada llamada a Map(). Para solucionar esto y lograr la ejecución paralela del paso final utilizando DoOnNext(), podemos usar el operador RxGo#Publish() que permite transmitir en múltiples suscriptores el Observable fuente. De esta manera, podemos crear Observables separados para cada una de las funciones DoOnNext() y suscribirnos a ellos por separado sin invocar la cadena múltiples veces.

    Aquí está el código modificado usando el operador RxGo#Publish():

    var thirdCounter int32
    
    func localRun(names ...string) {
        observable := rxgo.Just(names)().
            Map(func(_ context.Context, i interface{}) (interface{}, error) {
                s := i.(string)
                s = fmt.Sprintf("%s,%s", s, "one")
                return s, nil
            }).
            Map(func(_ context.Context, i interface{}) (interface{}, error) {
                s := i.(string)
                s = fmt.Sprintf("%s,%s", s, "two")
                return s, nil
            }).
            Map(func(_ context.Context, i interface{}) (interface{}, error) {
                atomic.AddInt32(&thirdCounter, 1)
                s := i.(string)
                s = fmt.Sprintf("%s,%s", s, "three")
                return s, nil
            })
    
        // Crear observables separados para cada una de las funciones DoOnNext
        obs1, obs2, obs3 := observable.Publish().RefCount(), observable.Publish().RefCount(), observable.Publish().RefCount()
    
        obs1.DoOnNext(func(i interface{}) {
            fmt.Println("primer DoOnNext", i)
        })
    
        obs2.DoOnNext(func(i interface{}) {
            fmt.Println("segundo DoOnNext", i)
        })
    
        obs3.DoOnNext(func(i interface{}) {
            fmt.Println("tercer DoOnNext", i)
        })
    
        // Suscribirse al último observable para activar la ejecución de la cadena
        for item := range observable.Last().Observe() {
            fmt.Println(item.V)
        }
        fmt.Printf("Tercer Contador = %d\n", thirdCounter)
    }
    
    func TestMocktFlow(t *testing.T) {
        cs := make([]string, 0)
        cs = append(cs, "Hola")
        cs = append(cs, "Hola")
        localRun(cs...)
    }
    

    En este código modificado, estamos utilizando el operador RxGo#Publish() para transmitir en múltiples Observables el Observable fuente, de los cuales nos podemos suscribir por separado para ejecutar las funciones DoOnNext() sin invocar la cadena múltiples veces. El operador RefCount() se utiliza para conectar automáticamente al Observable fuente solo cuando tenemos el primer suscriptor activo, y desconectarse del Observable fuente cuando el último suscriptor activo cancela su suscripción.

Comments are closed.