es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo puedo mantener a mi consumidor escuchando los mensajes en Azure Service Bus utilizando el Azure SDK para Golang v0.3.1?

He estado utilizando el módulo azure-sdk-for-go/sdk/messaging/azservicebus v0.3.1 para conectar mi consumidor con Azure Service Bus, pero el código implementado solo recibe un número fijo de mensajes y luego detiene la aplicación y me gustaría mantener el consumidor escuchando la cola. A continuación, se muestra mi código:

client, err := azservicebus.NewClientFromConnectionString("Cadena de conexión", nil)

if err != nil {
    log.Fatalf("Error al crear el cliente de Service Bus: %s", err.Error())
}

receiver, err := client.NewReceiverForQueue("cola", nil)

if err != nil {
    log.Fatalf("Error al crear el consumidor: %s", err.Error())
}

messages, err := receiver.ReceiveMessages(context.TODO(), 10, nil)

if err != nil {
    log.Fatalf("Error al recibir los mensajes: %s", err.Error())
}

for _, message := range messages {

    body, err := message.Body()

    if err != nil {
        log.Fatalf("Error al analizar el cuerpo del mensaje: %s", err.Error())
    }

    fmt.Println("Mensaje --->", string(body))

    err = receiver.CompleteMessage(context.TODO(), message)

    if err != nil {
        log.Fatalf("Error al completar el mensaje: %s", err.Error())
    }

    fmt.Printf("Mensaje recibido y completado\n")

}
Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para mantener al consumidor escuchando la cola, debes reemplazar la función ReceiveMessages por Receive.

    El código modificado se vería así:

        client, err := azservicebus.NewClientFromConnectionString("Cadena de conexión", nil)
    
        if err != nil {
            log.Fatalf("Error al crear el cliente de Service Bus: %s", err.Error())
        }
    
        receiver, err := client.NewReceiverForQueue("cola", nil)
    
        if err != nil {
            log.Fatalf("Error al crear el consumidor: %s", err.Error())
        }
    
        for {
            select {
            case <-ctx.Done():
                fmt.Println("El contexto fue cancelado.")
                return
            default:
                message, err := receiver.Receive(context.Background())
    
                if err != nil {
                    if azservicebus.ErrNoMessages == err {
                        time.Sleep(1 * time.Second)
                        continue
                    }
    
                    log.Fatalf("Error al recibir los mensajes: %s", err.Error())
                }
    
                body, err := message.Body()
    
                if err != nil {
                    log.Fatalf("Error al parsear el cuerpo del mensaje: %s", err.Error())
                }
    
                fmt.Println("Mensaje --->", string(body))
    
                err = receiver.CompleteMessage(context.Background(), message)
    
                if err != nil {
                    log.Fatalf("Error al completar el mensaje: %s", err.Error())
                }
    
                fmt.Printf("Mensaje recibido y completado\n")
            }
    
            time.Sleep(1 * time.Second)
        }
    

    El bucle for se ejecutará continuamente hasta que se cancele el contexto. La declaración select se bloquea hasta que se reciba un mensaje o se cancele el contexto. Si Receive devuelve azservicebus.ErrNoMessages, significa que no hay mensajes en la cola, por lo que el bucle continuará esperando 1 segundo antes de intentarlo nuevamente. Si se recibe un mensaje, se procesa y luego se repite el bucle.

Comments are closed.