es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

AWS SQS, DLQ en Spring Boot

¿Cómo agrego una configuración DLQ a mi configuración SQS? No estoy seguro de cómo integrar una DLQ con mi cola existente. Estoy usando mensajes de aws y no JMS, por lo que mi anotación sería @SQSListener para mi método oyente. Tengo una clase de configuración que tiene lo siguiente:

@Bean
    public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(10);
        SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
        simpleMessageListenerContainer.setQueueStopTimeout(queueStopTimeout*1000);
        simpleMessageListenerContainer.setMessageHandler(messageHandler(amazonSQSAsync));
        return simpleMessageListenerContainer;
    }
@Bean
public QueueMessageHandler messageHandler(AmazonSQSAsync amazonSQSAsync) {
    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
    QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    return messageHandler;
}

@Bean
public AmazonSQSAsync awsSqsAsync() {
    AmazonSQSAsyncClient amazonSQSAsyncClient = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
    amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(region)));
    return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
}

No pude encontrar ninguna documentación correcta para configurar adecuadamente los reintentos para que, si los reintentos superan el umbral, el mensaje debería ir a una cola de mensajes no entregados.

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para agregar una configuración de cola de mensajes no entregables (Dead-Letter Queue o DLQ) a su configuración de SQS, debe modificar su código existente siguiendo los siguientes pasos:

    1. Primero, debe crear una cola de SQS separada para su cola de mensajes no entregables (DLQ). Puede hacer esto llamando al método createQueue en el cliente AmazonSQSAsync y pasando un objeto CreateQueueRequest con el atributo queueName establecido en el nombre deseado de su DLQ. Anote el ARN (Nombre de Recurso de Amazon) de su DLQ que obtiene en la respuesta.
    2. A continuación, debe modificar la creación del bean QueueMessageHandler para establecer la propiedad messageHandler con un nuevo objeto DefaultMessageHandlerMethodFactory que tiene un método setDeadLetterQueueName llamado con el nombre de la DLQ creada previamente. También debe establecer la propiedad setMessageConverters con cualquier convertidor de mensaje que esté utilizando.

    @Bean
    public QueueMessageHandler messageHandler(AmazonSQSAsync amazonSQSAsync) {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
        queueMessageHandlerFactory.setDeadLetterQueueName(dlqName);
        queueMessageHandlerFactory.setMessageConverters(Arrays.asList(...)); // establezca cualquier convertidor de mensaje que esté utilizando
    
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverters(queueMessageHandlerFactory.getMessageConverters());
    
        QueueMessageHandler messageHandler = new QueueMessageHandler(queueMessageHandlerFactory);
        messageHandler.setMethodArgumentResolvers(messageHandlerMethodFactory.getArgumentResolvers());
        messageHandler.setCustomHeadersMapper(messageHandlerMethodFactory.getHeadersMapper());
        return messageHandler;
    }
    
    1. Finalmente, debe modificar la creación del bean SimpleMessageListenerContainerFactory para establecer el método setDeadLetterQueueName llamado con el nombre de la DLQ creada previamente. También debe establecer la propiedad setBatchErrorHandler con un nuevo objeto BatchErrorHandler que tiene una sobreescritura del método handle que toma una lista de argumentos Throwable, registra los errores y arroja el último.
    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setTaskExecutor(taskExecutor());
        factory.setMaxNumberOfMessages(10);
        factory.setAutoStartup(true);
        factory.setWaitTimeOut(20);
        factory.setDeadLetterQueueName(dlqName); // establecer el nombre de la DLQ
        factory.setBatchErrorHandler(new BatchErrorHandler() {
            @Override
            public void handle(List<throwable> throwables) throws Exception {
                LOGGER.error("Se produjo un error al procesar el mensaje: {}", throwables.get(0).getMessage(), throwables.get(0));
                throw throwables.get(throwables.size() - 1);
            }
        });
        return factory;
    }
    

    Después de modificar su código con estos pasos, su método oyente consumirá mensajes de la cola de SQS y, si los reintentos exceden el umbral, el mensaje se moverá a la cola de mensajes no entregables (Dead Letter Queue o DLQ) para su análisis adicional.

Comments are closed.