es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo detener el consumo de mensajes en un AMQP durante un período de tiempo determinado (donde el tiempo no está fijo)?

Estoy tratando de lograr el siguiente escenario en mi aplicación:
1. Cuando mi aplicación está encendida, el mensaje del intercambio entrante debe ser consumido por la cola entrante.
2. Si ocurre alguna excepción/error, los mensajes son dirigidos a la Cola de Mensajes No Entregados.
3. Cuando hay un tiempo de inactividad en mi aplicación (no quiero consumir mensajes durante ese tiempo), estoy redirigiendo los mensajes a la Cola de Estacionamiento.
4. Cuando el tiempo de inactividad ha terminado, quiero consumir primero el mensaje de la Cola de Estacionamiento y luego empezar a consumir mensajes normalmente utilizando la Cola de Entrada.

Mi pregunta es: ¿Se pueden implementar estos escenarios? Aquí, principalmente me refiero al paso 4. Si la respuesta es afirmativa, ¿podría alguien orientarme en la dirección correcta?

Mi segunda pregunta es: ¿Es esta la forma correcta de lograr este escenario? ¿O hay una mejor forma de lograrlo?

Código añadido:

@RabbitListener(queues = "${com.rabbitmq.queueName}", id="msgId")
@RabbitListener(queues = "${com.rabbitmq.parkingQueueName}", id="parkingId")
public void consumeMessage(Message message) {
    try {
        log.info("Received message: {}",new String(message.getBody()));

        // verifique si la aplicación está inactiva
        if(val) {
            registry.getListenerContainer("msgId").stop();
            rabbitTemplate.send(rabbitMQConfig.getExchange(), rabbitMQConfig.getParkingRoutingKey(), message);
        }
    } catch(Exception e) {
        rabbitTemplate.send(rabbitMQConfig.getDeadLetterExchange(), rabbitMQConfig.getDeadLetterRoutingKey(), message);
    }
}

“`

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Solución para Lograr el Escenario en la Aplicación

    Paso 1

    Para consumir el mensaje del intercambio entrante cuando la aplicación está activa, podemos usar la anotación @RabbitListener en nuestro código. Esta anotación vincula un método a una cola de mensajes de RabbitMQ y maneja automáticamente la consumición de mensajes.

    Ejemplo:

    @RabbitListener(queues = "${com.rabbitmq.queueName}")
    public void consumeMessage(Message message) {
      // Manejar el mensaje consumido
    }
    

    Aquí, "${com.rabbitmq.queueName}" se refiere al nombre de la cola entrante. Este método se invocará automáticamente cuando haya un mensaje en la cola.

    Paso 2

    Si ocurre alguna excepción/error durante la consumición del mensaje, podemos utilizar la funcionalidad Dead Letter Exchange de RabbitMQ para redirigir el mensaje a una Cola Dead Letter. Las Colas Dead Letter se utilizan para almacenar mensajes que no se pueden procesar por algún motivo.

    Ejemplo:

    @RabbitListener(queues = "${com.rabbitmq.queueName}")
    public void consumeMessage(Message message) {
      try {
        // Manejar el mensaje consumido
      } catch (Exception e) {
        rabbitTemplate.send(rabbitMQConfig.getDeadLetterExchange(), rabbitMQConfig.getDeadLetterRoutingKey(), message);
      }
    }
    

    Aquí, rabbitTemplate.send se utiliza para enviar el mensaje al Dead Letter Exchange con la clave de enrutamiento proporcionada.

    Paso 3

    Para redirigir mensajes a la Cola ParkingLot durante un tiempo de inactividad, podemos dejar de consumir mensajes de la cola entrante y utilizar rabbitTemplate de RabbitMQ para enviar el mensaje a la Cola ParkingLot.

    Ejemplo:

    @RabbitListener(queues = "${com.rabbitmq.queueName}")
    public void consumeMessage(Message message) {
      try {
        // Manejar el mensaje consumido
    
        // Verificar si hay tiempo de inactividad en curso
        if(downtime) {
          registry.getListenerContainer("msgId").stop();
          rabbitTemplate.send(rabbitMQConfig.getExchange(), rabbitMQConfig.getParkingLotRoutingKey(), message);
        }
      } catch (Exception e) {
        rabbitTemplate.send(rabbitMQConfig.getDeadLetterExchange(), rabbitMQConfig.getDeadLetterRoutingKey(), message);
      }
    }
    

    Aquí, registry.getListenerContainer("msgId").stop() deja de consumir mensajes de la cola entrante al obtener el contenedor de escucha con el id proporcionado. Luego, se utiliza rabbitTemplate.send para enviar el mensaje a la Cola ParkingLot con la clave de enrutamiento proporcionada.

    Paso 4

    Para consumir el mensaje de la Cola ParkingLot cuando finalice el tiempo de inactividad y luego consumir mensajes normalmente utilizando la Cola Entrante, podemos primero consumir mensajes de la Cola ParkingLot y luego comenzar a consumir mensajes de la Cola Entrante.

    Ejemplo:

    @RabbitListener(queues = "${com.rabbitmq.parkingLotQueueName}", id="parkingLotId")
    public void consumeParkingLot(Message message) {
      // Manejar el mensaje consumido
    
      // Una vez finalizado el tiempo de inactividad, comenzar a consumir mensajes normalmente
      Message parkedMessage = rabbitTemplate.receive(rabbitMQConfig.getParkingLotQueueName());
      if (parkedMessage == null) {
        registry.getListenerContainer("msgId").start();
      } else {
        rabbitTemplate.send(rabbitMQConfig.getExchange(), rabbitMQConfig.getQueueName(), parkedMessage);
      }
    }
    
    @RabbitListener(queues = "${com.rabbitmq.queueName}", id="msgId")
    public void consumeMessage(Message message) {
      // Manejar el mensaje consumido
    }
    

    Aquí, @RabbitListener con "${com.rabbitmq.parkingLotQueueName}" y "parkingLotId" se utiliza para consumir mensajes de la Cola ParkingLot. Una vez finalizado el tiempo de inactividad, primero verificamos si hay mensajes estacionados en la Cola ParkingLot. Si los hay, los enviamos de vuelta a la Cola Entrante usando rabbitTemplate.send. Si no los hay, comenzamos a consumir mensajes de la Cola Entrante obteniendo el contenedor de escucha con el id "msgId" y activándolo.

    Conclusión

    Sí, el escenario proporcionado se puede implementar utilizando los pasos proporcionados anteriormente. La solución proporcionada es una forma correcta de lograr el escenario dado.

Comments are closed.