es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Crear un flujo de cliente gRPC es asíncrono?

Tengo el siguiente archivo .proto:

service ChatService {
rpc Login(LoginRequest) returns (LoginResponse) {}
rpc Logout(LogoutRequest) returns (LogoutResponse) {}
rpc Stream(stream StreamRequest) returns (stream StreamResponse) {}
}

Inicialmente, cualquier cliente llamará al método “Login” y luego al método “Stream”. Si un cliente inicia sesión en el servicio (servidor), se envía un mensaje de difusión a través del cliente de streaming. Quiero probar esta funcionalidad en una prueba unitaria y esto es lo que tengo hasta ahora:

func Test_Login_ShouldBroadcastLoginNotificationThroughStream(t *testing.T) {
waitc := make(chan struct{})
loginSuccess := false

ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(dialer()), grpc.WithInsecure())
if err != nil {
    t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client1 := NewChatServiceClient(conn)
// paso 1: iniciar sesión en el cliente
r, err := client1.Login(ctx, &LoginRequest{Name: "test"})
if err != nil {
    t.Fatal("Error al iniciar sesión en el cliente")
}

// paso 2: comenzar a transmitir
md := metadata.New(map[string]string{tokenHeader: r.Token})
ctx = metadata.NewOutgoingContext(ctx, md)
sc1, err := client1.Stream(ctx) // ESTO NO PARECE ESTAR BLOQUEANDO
if err != nil {
    t.Fatal("Error al transmitir en el cliente1")
}
t.Log("finalizó la configuración del flujo del cliente 1")

// // paso 3: comenzar a escuchar los mensajes entrantes
go func() {
    res, _ := sc1.Recv()
    t.Logf("se recibió una respuesta del cliente 1 %+v", res)
    m := res.Event.(*StreamResponse_ClientLogin)
    if m.ClientLogin.Name == "test2" {
        loginSuccess = true
    }
    close(waitc)
}()

// es necesario pausar durante unos segundos antes de iniciar el cliente 2 para que comience el streaming
// paso 4: crear otro cliente
client2 := NewChatServiceClient(conn)
_, err = client2.Login(ctx, &LoginRequest{Name: "test2"})
if err != nil {
    t.Fatal("Error al iniciar sesión en el cliente")
}
t.Log("el cliente 2 inició sesión")

<-waitc
sc1.CloseSend()
// no es necesario iniciar la transmisión aquí ya que el cliente 1 está transmitiendo
if !loginSuccess {
    t.Fail()
}

}

Los pasos son los siguientes:

  1. El cliente 1 inicia sesión.
  2. El cliente 1 llama a “stream” y espera nuevos mensajes de inicio de sesión.
  3. El cliente 2 inicia sesión, en este punto el servidor enviará un mensaje de inicio de sesión correcto al cliente 1.

Mi problema es que en el “paso 2”, aunque el cliente llama a “Stream”, antes de que el servidor pueda configurar los flujos (completar el método) para “cliente1”, ya ha comenzado a procesar el método de inicio de sesión de “cliente2”. Tenía la impresión de que “sc1, err := client1.Stream(ctx)” es sincrónico y se bloqueará hasta que el servidor procese ese método, pero esto no parece ser el caso y parece que devuelve inmediatamente. ¿Me estoy perdiendo algo aquí?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, es correcto que el método Stream debería bloquear hasta que el servidor haya configurado el stream para cliente1. Sin embargo, es posible que el servidor sea lento en responder o que esté encontrando errores que están causando que el método devuelva de inmediato.

    Para depurar este problema, puedes intentar agregar registros de depuración o puntos de interrupción en el código del servidor para ver si está manejando adecuadamente la solicitud de transmisión de cliente1. También puedes intentar usar un protocolo de transporte diferente para ver si esto resuelve el problema. Alternativamente, también puedes intentar implementar un mecanismo de reintento en el lado del cliente para seguir intentando la solicitud de Stream hasta que tenga éxito.

Comments are closed.