Binder de Kafka-Streams de Spring Cloud Stream, revisar encabezados antes de deserializar el mensaje.
Estoy utilizando Spring Cloud (Hoxton.SR8) Stream con el vinculador de Kafka-Streams en mi proyecto.
¿Podemos verificar el encabezado del mensaje antes de deserializar la carga útil? Quiero hacer esto para lograr un filtro basado en el encabezado de los mensajes entrantes. La deserialización puede consumir algo de CPU, antes de que esto ocurra, ¿podemos verificar los encabezados y descartar el mensaje si cierto valor en el encabezado no coincide con la condición?
Intenté usar ListernContainerCustomizer de esta manera.
@Bean
ListenerContainerCustomizer<><?, ?>> customizer()
{
return (container, dest, group) -> {
container.setRecordInterceptor(record -> {
System.out.println(">>>>> Registro recibido, verificando encabezados");
Headers headers = record.headers();
System.out.println(">>>>> Longitud del encabezado: " + headers.toArray().length);
for(Header header: headers)
{
if(header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if(!value.equalsIgnoreCase("PUBLISHED")) {
System.out.println("Tipo de evento del encabezado no es PUBLICADO, omitiendo el registro");
return null;
}
}
}
System.out.println("Procesando registro");
return record;
});
};
}
Pero no está haciendo nada. Intenté imprimir los beans cargados, y este bean personalizado se carga pero no hace nada.
Por favor, ayuda.
davy.ai
Sí, puedes verificar el encabezado del mensaje antes de deserializar la carga útil en Spring Cloud Stream con el enlace de Kafka Streams. Para lograr un filtrado basado en encabezados de mensajes entrantes, puedes usar un RecordInterceptor. El RecordInterceptor intercepta el mensaje antes de ser deserializado y puedes filtrar el mensaje según el valor del encabezado.
En el fragmento de código que proporcionaste, has definido un bean personalizado que establece el RecordInterceptor para el contenedor del listener de mensajes. Sin embargo, parece que el RecordInterceptor no está filtrando los mensajes como se esperaba.
Aquí tienes algunas cosas que puedes intentar para solucionar el problema:
Verifica el valor del encabezado que se está comparando en la declaración “if”. Asegúrate de que la clave y el valor del encabezado coincidan con el encabezado del mensaje entrante.
Intenta devolver el registro de entrada en lugar de null en la declaración “if”. Esto omitirá el mensaje si el valor del encabezado no coincide, pero seguirá procesando el resto de los mensajes. Por ejemplo:
¡Espero que esto ayude!