es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Configurar para iniciar un nuevo span en KafkaMessageListenerContainer no en el listener (Kafka + Sleuth)

Soy un traductor de español que traduce documentos de TI. Traduzca lo siguiente al español. No traduzca el código y la salida en markdown.

Soy nuevo en kafka + sleuth. Actualmente me he enfrentado a un problema, el span se crea en MessageListenerMethodInterceptor que se activa en KafkaListener, pero en caso de excepción estamos perdiendo spanId y traceId en ErrorHandler. ¿Es posible configurar Sleuth para iniciar un span en KafkaMessageListenerContainer?

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, es posible configurar Spring Cloud Sleuth para que inicie trazas en KafkaMessageListenerContainer en lugar de MessageListenerMethodInterceptor. Esto se puede lograr configurando el bean “TracingMessageListenerDecorator” en su configuración de Spring.

    Aquí hay un ejemplo de cómo podría hacerlo:

    @Bean
    public TracingMessageListenerDecorator tracingMessageListenerDecorator(Tracer tracer) {
        SpanExtractor<kafkamessageheaders> extractor = new KafkaMessageHeaderExtractor();
        return new TracingMessageListenerDecorator(tracer, extractor, new KafkaMessagingSpanManager(tracer));
    }
    
    @Bean
    public KafkaListenerContainerFactory<><string, string="">> kafkaListenerContainerFactory(
        TracingMessageListenerDecorator tracingMessageListenerDecorator) {
    
        ConcurrentKafkaListenerContainerFactory<string, string=""> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        factory.setConcurrency(3);
        factory.setAfterReceivePostProcessors(tracingMessageListenerDecorator);
        return factory;
    }
    

    En la configuración anterior, hemos definido un bean “TracingMessageListenerDecorator” que usa un “KafkaMessagingSpanManager” para iniciar trazas en el “KafkaMessageListenerContainer”. Luego, pasamos este “TracingMessageListenerDecorator” como un “afterReceivePostProcessor” para “ConcurrentKafkaListenerContainerFactory”.

    Con esta configuración, cuando “KafkaMessageListenerContainer” recibe un mensaje nuevo, Sleuth iniciará una nueva traza para él. Si ocurre una excepción durante el procesamiento del mensaje, se invocará la lógica de manejo de errores en “SeekToCurrentErrorHandler”, pero la traza actual seguirá estando disponible con las IDs de traza y span intactas.</string,></string,>

Comments are closed.