es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Convirtiendo StreamListener con encabezados a Modelo Funcional

Debido a que @EnableBinding y @StreamListener están obsoletos, necesito migrar el código existente al nuevo modelo, sin embargo no pude encontrar ninguna información sobre si el mapeo de argumentos disponible en Spring Cloud Stream todavía es compatible y/o si hay soluciones alternativas limpias.

Mi método original:

@StreamListener("mysource")
public void processMessage(byte[] rawMessage, @Header(required = false, value = KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] rawKey) {
    processMessage(rawMessage, rawKey);
}

Logré convertir esto para que funcione de la siguiente manera:

@Bean(name = "mysource")
public Consumer<><?>> mySource() {
    return message -> {
        byte[] rawMessage = message.getPayload().toString().getBytes();
        byte[] rawKey = (byte[]) message.getHeaders().get("kafka_receivedMessageKey");
        processMessage(rawMessage, rawKey);
    };
}

Sin embargo, lo que preferiría es maximizar el soporte del framework con respecto al mapeo de argumentos y/o conversiones automáticas de tipos.
Intenté lo siguiente:

@Bean(name = "mysource")
public BiConsumer<><byte[]>, MessageHeaders> mySource() {
    return (message, headers) -> {
        byte[] rawMessage = message.getPayload();
        byte[] rawKey = (byte[]) headers.get("kafka_receivedMessageKey");
        processMessage(rawMessage, rawKey);
    };
}

Pero esto produce un error al iniciar: FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet - La definición de función 'mysource' no es válida. El bean de función referenciado o uno de sus componentes no existe.

También soy consciente de que junto con Supplier y Consumer, también está disponible Function, pero no estoy seguro de cómo usar una Function en este caso en lugar de un BiConsumer o si es posible, por lo que busco ejemplos sobre cómo realizar esta migración sin problemas y de forma elegante con respeto a la consumición y producción de mensajes y encabezados de/a Kafka.</byte[]>

Tags:  , ,

Answer

  1. Avatar for davy.ai

    A partir de la versión 3.1 de Spring Cloud Stream, @EnableBinding y @StreamListener están efectivamente obsoletos a favor de un modelo de programación funcional. Sin embargo, la funcionalidad de mapeo de argumentos en Spring Cloud Stream aún es compatible y se puede utilizar con el nuevo modelo de programación funcional.

    En su caso, ya ha migrado su código para utilizar el nuevo modelo de programación funcional con un Consumer<><?>>. Sin embargo, si desea utilizar la funcionalidad de mapeo de argumentos, puede definir su Consumer como una Function<><byte[]>, Void> en su lugar.

    Aquí hay un ejemplo de cómo puede definir su función:

    @Bean
    public Function<><byte[]>, Void> miFuncion() {
        return message -> {
            byte[] mensajeRaw = message.getPayload();
            byte[] claveRaw = (byte[]) message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
            procesarMensaje(mensajeRaw, claveRaw);
            return null;
        };
    }
    

    En este ejemplo, definimos una función que recibe un Message<byte[]> como entrada y devuelve Void. Luego extraemos los valores de carga útil y encabezado del mensaje y llamamos a procesarMensaje con esos valores.

    Una vez que haya definido su función, puede vincularla a un tema de Kafka utilizando la propiedad spring.cloud.stream.function.bindings.miFuncion-in-0 en su archivo application.yml:

    spring:
        cloud:
            stream:
                function:
                    bindings:
                        miFuncion-in-0:
                            destination: mysource
                            contentType: application/octet-stream
    

    Esto vinculará su función miFuncion a un tema de Kafka llamado mysource. La propiedad contentType es opcional, pero si conoce el tipo de contenido de sus mensajes, puede establecerlo aquí.

    Con esta configuración, Spring Cloud Stream convertirá automáticamente sus mensajes de Kafka a Message<byte[]> y los pasará a su función miFuncion. Los valores de carga útil y encabezado se mapearán automáticamente a los argumentos de entrada en función de sus nombres y tipos.</byte[]></byte[]></byte[]></byte[]>

Comments are closed.