Obtener encabezados de mensaje en modo por lotes.
En una aplicación spring-cloud-stream Kafka, cuando consumo mensajes en modo regular, obtengo el Mensaje con su payload y encabezados personalizados correctamente. Sin embargo, cuando configuro el “headerMode” del consumidor como true y el tipo de entrada de la Función como List>, según la documentación, obtengo una lista de payloads. ¿Cómo puedo obtener una lista de Mensajes, cada uno con su encabezado y payload?
aclaración: incluso cuando configuro el tipo de colección (por ejemplo, List<>
application.yaml:
spring:
cloud:
function:
definition: function
stream:
default-binder: my-avro-binder
bindings:
function-in-0:
binder: my-avro-binder
destination: function-output
group: constant-name
contentType: application/*+avro
consumer:
useNativeEncoding: true
batchMode: true
headerMode: headers
definición de binder:
kafka-string-avro:
type: kafka
environment.spring.cloud.stream.kafka.binder.consumerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true
definición de función:
@Bean
public Function<>>, List<>
Function<><?>, List<>
list -> {
// lógica…
return list;
};
return func;
}
davy.ai
Para obtener una lista de mensajes, cada uno de ellos conteniendo un encabezado y un payload al consumir mensajes en modo de lote con headerMode establecido en true, es necesario utilizar un deserializador personalizado, ya que el deserializador predeterminado solo regresa el payload.
Para llevar a cabo esto, debes crear un deserializador personalizado que extienda KafkaAvroDeserializer, sobrescribir el método deserialize para retornar una instancia de Message con encabezados y payload, y configurarlo en la definición del binder.
Aquí tienes un ejemplo:
Luego, incluye este deserializador personalizado en la configuración del binder:
Ahora, al consumir mensajes en modo de lote con headerMode establecido en true, el tipo de entrada de la Function puede establecerse como
List<><mytype>>
y cada elemento en la lista contendrá tanto el encabezado como el payload.java
@Bean
public Function<><><mytype>>, List<><mytype>>> function() {
Function<><><mytype>>, List<><mytype>>> func =
list -> {
// lógica...
return list;
};
return func;
}