es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo configurar la propiedad de reintentos del consumidor de Kafka desde application.properties en Spring Boot?

En spring-boot

application.yml:

kafka:
   bootstrap-servers: localhost:9092
   listener:
    concurrency: 10
    ack-mode: MANUAL
   producer:
    topic: test-record
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    retries: 3
    orn-record:
      timeout: 3
    #acks: 1
   consumer:
    groupId: test-record
    topic: test
    enable-auto-commit: false
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

Al utilizar la configuración anterior, podemos evitar la configuración basada en Java web (bean) en spring boot, lo cual es una gran ventaja.

P: ¿Podemos agregar un controlador de errores de Kafka y propiedades de número de reintento de consumidor de Kafka desde application.properties / application.yml?

No pude encontrar ninguna referencia ni documentación al respecto, por lo que espero llegar a alguna conclusión, solo debido a este problema ahora tengo que ir a la configuración basada en Java web en spring boot y eliminar la configuración de propiedades, lo cual es volver a la forma antigua en spring. Creo que debería haber alguna solución alternativa y podríamos lograr esto a través de la configuración del archivo de propiedades.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Es posible configurar los manejadores de errores de kafka y el número de reintentos en application.yml o application.properties. En el ejemplo dado, retries ya está configurado para el productor. Para configurar manejadores de errores y reintentos para los consumidores, se pueden añadir las siguientes propiedades:

    kafka:
      consumer:
        properties:
          max.poll.records: 500
          max.poll.interval.ms: 60000
          session.timeout.ms: 30000
          max.poll.interval.ms: 60000
          max.partition.fetch.bytes: 1048576
          fetch.min.bytes: 1
          fetch.max.wait.ms: 1000
          retry.backoff.ms: 1000
    

    En este ejemplo, se han configurado algunas propiedades comunes para el consumidor. También se pueden añadir propiedades personalizadas según sea necesario.

    Además, para el manejo de errores, se puede usar la anotación @KafkaListener y especificar la clase ErrorHandler. Por ejemplo:

    @KafkaListener(topics = "test-topic", errorHandler = MyCustomErrorHandler.class)
    public void listen(ConsumerRecord<string, string=""> record) {
        // do something with the record
    }
    

    En este ejemplo, se ha utilizado la clase MyCustomErrorHandler como manejador de errores. Se puede implementar su propia clase de manejo de errores mediante la implementación de la interfaz org.springframework.kafka.listener.ErrorHandler.</string,>

Comments are closed.