es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Obtener encabezados de mensaje en modo por lotes.

En una aplicación spring-cloud-stream Kafka, cuando consumo mensajes en modo regular, obtengo el Mensaje con su payload y encabezados personalizados correctamente. Sin embargo, cuando configuro el “headerMode” del consumidor como true y el tipo de entrada de la Función como List, según la documentación, obtengo una lista de payloads. ¿Cómo puedo obtener una lista de Mensajes, cada uno con su encabezado y payload?

aclaración: incluso cuando configuro el tipo de colección (por ejemplo, List<>>), siempre devuelve solo los payloads (List) en tiempo de ejecución.

application.yaml:

spring:
cloud:
function:
definition: function
stream:
default-binder: my-avro-binder
bindings:
function-in-0:
binder: my-avro-binder
destination: function-output
group: constant-name
contentType: application/*+avro
consumer:
useNativeEncoding: true
batchMode: true
headerMode: headers

definición de binder:

kafka-string-avro:
type: kafka
environment.spring.cloud.stream.kafka.binder.consumerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true

definición de función:

@Bean
public Function<>, List<>>> function() {
Function<><?>, List<>>> func =
list -> {
// lógica…
return list;
};
return func;
}

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para obtener una lista de mensajes, cada uno de ellos conteniendo un encabezado y un payload al consumir mensajes en modo de lote con headerMode establecido en true, es necesario utilizar un deserializador personalizado, ya que el deserializador predeterminado solo regresa el payload.

    Para llevar a cabo esto, debes crear un deserializador personalizado que extienda KafkaAvroDeserializer, sobrescribir el método deserialize para retornar una instancia de Message con encabezados y payload, y configurarlo en la definición del binder.

    Aquí tienes un ejemplo:

    public class MyTypeDeserializer extends KafkaAvroDeserializer {
        @Override
        public Object deserialize(String topic, byte[] data) {
            Message<mytype> message = new GenericMessage<>(super.deserialize(topic, data));
            return MessageBuilder.fromMessage(message).copyHeaders(message.getHeaders()).build();
        }
    }
    

    Luego, incluye este deserializador personalizado en la configuración del binder:

    kafka-string-avro:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.consumerProperties:
            ...
            value.deserializer: com.example.MyTypeDeserializer
    

    Ahora, al consumir mensajes en modo de lote con headerMode establecido en true, el tipo de entrada de la Function puede establecerse como List<><mytype>> y cada elemento en la lista contendrá tanto el encabezado como el payload.

    java
    @Bean
    public Function<><><mytype>>, List<><mytype>>> function() {
    Function<><><mytype>>, List<><mytype>>> func =
    list -> {
    // lógica...
    return list;
    };
    return func;
    }

Comments are closed.