es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo configuro a un consumidor para que compruebe más de un esquema al escuchar múltiples temas?

Estoy trabajando en un proyecto para una gran empresa con millones de usuarios. Estamos intentando convertir su arquitectura basada en REST a una arquitectura basada en eventos. La arquitectura actual involucra un servicio, al que llamaremos Servicio-A, que realiza 7 llamadas REST cuando un usuario inicia sesión.

En lugar de llamar a los 7 servicios para obtener esos datos cuando el usuario inicia sesión, queremos modificar esos 7 servicios para que generen eventos cuando haya actualizaciones en los datos. Luego, haremos que el Servicio-A escuche en 7 temas diferentes de Kafka y guarde esas actualizaciones en la base de datos.

Es una aplicación de Java Spring Boot. Estamos utilizando AWS MSK para alojar nuestro clúster de Kafka y estamos utilizando AWS Glue para el registro de esquemas. Puedo configurar mi consumidor en el Servicio-A para escuchar 7 temas, pero no sé cómo hacer que el Servicio-A verifique 7 esquemas diferentes al consumir un mensaje de uno de esos 7 temas.

Hasta ahora, la única configuración que he encontrado para el consumidor de Kafka es una propiedad que toma un nombre de esquema.

Aquí está mi archivo de configuración yaml:

spring:
  kafka:
    listener:
      ack-mode: manual_immediate
    consumer:
      enable-auto-commit: false
      group-id: my-group
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.json.trusted.packages: com.app.somepackage.domain
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer
      auto-offset-reset: earliest
    bootstrap-servers: <my-msk-url>
    properties:
      region: us-west-2
      schemaName: my-first-schema
      registry.name: my-registry-name
      avroRecordType: SPECIFIC_RECORD
Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Para consumir mensajes de varios temas con esquemas diferentes, puedes usar el MultiSchemaRegistryClient proporcionado por la biblioteca AWS Glue. Este cliente te permite obtener las definiciones de esquema por tema y versión. Aquí tienes un ejemplo de cómo puedes utilizar esta biblioteca para consumir mensajes de múltiples temas con esquemas diferentes:

    @Autowired
    private KafkaProperties kafkaProperties;
    
    @Value("${spring.kafka.topic1}")
    private String topic1;
    
    @Value("${spring.kafka.topic2}")
    private String topic2;
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<string, specificrecord=""> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<string, specificrecord=""> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // Establece el número de hilos para consumir mensajes
        return factory;
    }
    
    @Bean
    public ConsumerFactory<string, specificrecord=""> consumerFactory() {
        Map<string, object=""> props = kafkaProperties.buildConsumerProperties();
        props.put("schema.registry.client", new MultiSchemaRegistryClient(new URL(kafkaProperties.getProperties().get("schema.registry.url"))));
        // Establece el deserializador para utilizar SpecificRecordDeserializer proporcionado por la biblioteca AWS Glue
        props.put("value.deserializer", SpecificRecordDeserializer.class);
        // Establece las definiciones de esquema para cada tema
        props.put("topic.schema." + topic1, "com.example.Topic1Record");
        props.put("topic.schema." + topic2, "com.example.Topic2Record");
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @KafkaListener(topics = "${spring.kafka.topic1},${spring.kafka.topic2}")
    public void listen(Object message) {
        log.info("Mensaje recibido: {}", message);
        // Procesa el mensaje
    }
    

    En este ejemplo, estamos utilizando MultiSchemaRegistryClient para obtener las definiciones de esquema para cada tema. También estamos configurando el deserializador para utilizar SpecificRecordDeserializer proporcionado por la biblioteca AWS Glue, que puede deserializar mensajes utilizando las definiciones de esquema obtenidas del registro de esquemas.

    Ten en cuenta que necesitas establecer las definiciones de esquema para cada tema utilizando el prefijo “topic.schema.” seguido del nombre del tema, como se muestra en el ejemplo. También necesitas establecer las propiedades “spring.kafka.topic1” y “spring.kafka.topic2” en tu archivo application.properties con los nombres de tus temas.</string,></string,></string,></string,>

Comments are closed.