es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

RabbitMQ BasicAck dando mensajes de error de cierre de canal en la consola – Spring AMQP

@EnableRabbit
@Service
public class RabbitMqListenerWithReply {

@Autowired
RabbitTemplate rabbitTemplate;

@Value(“${test.rabbitmq.exchange}”)
private String rabbitMQExchange;

@Value(“${test.rabbitmq.routingkey}”)
private String rabbitMQRoutingKey;

@RabbitListener(queues = “${test.rabbitmq.queue}”)
public void receiveMessage(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG)long tag) throws IOException {
System.out.println(message.toString());
channel.basicAck(tag, false);
System.out.println(“Ackd”);

}
}

Estoy intentando implementar un oyente que reconozca el mensaje después de consumirlo, sin embargo, la consola muestra este error, ¿podría alguien indicarme dónde podría haber cometido un error?

Este es el error que estoy obteniendo actualmente cada vez que ejecuto el editor para publicar el mensaje en el intercambio.

2022-01-03 09:35:33.192 ERROR 27280 — [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Cierre de canal: error del canal; método del protocolo: #method<channel.close>(código de respuesta = 406, texto de respuesta = PRECONDITION_FAILED – unknown delivery tag 1, id de clase = 60, id de método = 80)

2022-01-03 09:35:34.208 INFO 27280 — [ntContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Reinicio del consumidor@52d7ab79: etiquetas=[[amq.ctag-0pIM_-TROTIxXRsAJpmOrw]], canal=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@71369e1a Shared Rabbit Connection: SimpleConnection@60b616c8 [delegate=amqp://guest@127.0.0.1:5672/, puerto local= 57660], acknowledgeMode=AUTO tamaño de cola local=0

Así es como estoy publicando mi mensaje actualmente.

@RestController
@RequestMapping(“api/test”)
public class RestAPIController {
@Autowired
RabbitTemplate rabbitTemplate;

@Value(“${test.rabbitmq.exchange}”)
private String rabbitMQExchange;

@Value(“${test.rabbitmq.routingkey}”)
private String rabbitMQRoutingKey;

@Value(“${test.rabbitmq.queue}”)
private String rabbitMQQueue;

@GetMapping(“{message}”)
public String testAPI(@PathVariable(“message”) String message) {
System.out.println(“Mensaje enviado: ” + message);
rabbitTemplate.convertAndSend(rabbitMQExchange, rabbitMQRoutingKey , message);
return “El mensaje fue enviado”;
}

}</channel.close>

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error indica que hay un problema con la etiqueta de entrega utilizada para confirmar el mensaje. Parece que la etiqueta de entrega utilizada en el método basicAck no es válida, o se utiliza la misma etiqueta de entrega para varios mensajes.

    Para solucionar este problema, puedes intentar generar una etiqueta de entrega única para cada mensaje utilizando el método channel.getNextPublishSeqNo(). Puedes almacenar la etiqueta de entrega en un mapa o junto con la carga del mensaje para asegurarte de que se utiliza la etiqueta de entrega correcta para cada mensaje.

    Aquí tienes un ejemplo de cómo puedes modificar el código del oyente para utilizar una etiqueta de entrega única:

    @RabbitListener(queues = "${test.rabbitmq.queue}")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println("Mensaje recibido: " + message);
    
        // procesar el mensaje aquí
    
        // generar etiqueta de entrega única
        long ackTag = channel.getNextPublishSeqNo();
    
        // confirmar el mensaje con la etiqueta de entrega única
        channel.basicAck(deliveryTag, false);
    
        System.out.println("Mensaje confirmado con etiqueta: " + ackTag);  
    }
    

    También puedes intentar establecer el acknowledgeMode del contenedor del oyente de mensajes en MANUAL para confirmar los mensajes manualmente. Para hacer esto, agrega la siguiente propiedad a tu archivo application.properties:

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    Luego, modifica el código del oyente para utilizar el modo AcknowledgeMode.MANUAL:

    @RabbitListener(queues = "${test.rabbitmq.queue}")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        System.out.println("Mensaje recibido: " + message);
    
        // procesar el mensaje aquí
    
        // confirmar el mensaje manualmente
        channel.basicAck(deliveryTag, false);
    
        System.out.println("Mensaje confirmado");  
    }
    

    Espero que esto te ayude a resolver tu problema. Hazme saber si tienes alguna otra pregunta.

Comments are closed.