es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Binder de Kafka-Streams de Spring Cloud Stream, revisar encabezados antes de deserializar el mensaje.

Estoy utilizando Spring Cloud (Hoxton.SR8) Stream con el vinculador de Kafka-Streams en mi proyecto.

¿Podemos verificar el encabezado del mensaje antes de deserializar la carga útil? Quiero hacer esto para lograr un filtro basado en el encabezado de los mensajes entrantes. La deserialización puede consumir algo de CPU, antes de que esto ocurra, ¿podemos verificar los encabezados y descartar el mensaje si cierto valor en el encabezado no coincide con la condición?

Intenté usar ListernContainerCustomizer de esta manera.

@Bean
ListenerContainerCustomizer<><?, ?>> customizer() 
{
    return (container, dest, group) -> {
        container.setRecordInterceptor(record -> {
            System.out.println(">>>>> Registro recibido, verificando encabezados");
            Headers headers = record.headers();
            System.out.println(">>>>> Longitud del encabezado: " + headers.toArray().length);
            for(Header header: headers)
            {
                if(header.key().equalsIgnoreCase("eventtype"))
                {
                    String value = String.valueOf(header.value());
                    if(!value.equalsIgnoreCase("PUBLISHED")) {
                        System.out.println("Tipo de evento del encabezado no es PUBLICADO, omitiendo el registro");
                        return null;
                    }
                }
            }
            System.out.println("Procesando registro");
            return record;
        });
    };
}

Pero no está haciendo nada. Intenté imprimir los beans cargados, y este bean personalizado se carga pero no hace nada.

Por favor, ayuda.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, puedes verificar el encabezado del mensaje antes de deserializar la carga útil en Spring Cloud Stream con el enlace de Kafka Streams. Para lograr un filtrado basado en encabezados de mensajes entrantes, puedes usar un RecordInterceptor. El RecordInterceptor intercepta el mensaje antes de ser deserializado y puedes filtrar el mensaje según el valor del encabezado.

    En el fragmento de código que proporcionaste, has definido un bean personalizado que establece el RecordInterceptor para el contenedor del listener de mensajes. Sin embargo, parece que el RecordInterceptor no está filtrando los mensajes como se esperaba.

    Aquí tienes algunas cosas que puedes intentar para solucionar el problema:

    1. Verifica que el bean personalizado esté siendo tomado por Spring. Puedes agregar algunas declaraciones de registro en el método personalizado para ver si se llama durante el inicio.
    2. Verifica el valor del encabezado que se está comparando en la declaración “if”. Asegúrate de que la clave y el valor del encabezado coincidan con el encabezado del mensaje entrante.

    3. Intenta devolver el registro de entrada en lugar de null en la declaración “if”. Esto omitirá el mensaje si el valor del encabezado no coincide, pero seguirá procesando el resto de los mensajes. Por ejemplo:

    if(!value.equalsIgnoreCase("PUBLISHED")) {
        System.out.println("El tipo de evento del encabezado no es PUBLISHED, omitiendo el registro");
        return record;
    }
    
    1. Habilita el registro DEBUG para Spring Cloud Stream y Kafka Streams para obtener más información sobre el procesamiento del mensaje.

    ¡Espero que esto ayude!

Comments are closed.