Convirtiendo StreamListener con encabezados a Modelo Funcional
Debido a que @EnableBinding
y @StreamListener
están obsoletos, necesito migrar el código existente al nuevo modelo, sin embargo no pude encontrar ninguna información sobre si el mapeo de argumentos disponible en Spring Cloud Stream todavía es compatible y/o si hay soluciones alternativas limpias.
Mi método original:
@StreamListener("mysource")
public void processMessage(byte[] rawMessage, @Header(required = false, value = KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] rawKey) {
processMessage(rawMessage, rawKey);
}
Logré convertir esto para que funcione de la siguiente manera:
@Bean(name = "mysource")
public Consumer<><?>> mySource() {
return message -> {
byte[] rawMessage = message.getPayload().toString().getBytes();
byte[] rawKey = (byte[]) message.getHeaders().get("kafka_receivedMessageKey");
processMessage(rawMessage, rawKey);
};
}
Sin embargo, lo que preferiría es maximizar el soporte del framework con respecto al mapeo de argumentos y/o conversiones automáticas de tipos.
Intenté lo siguiente:
@Bean(name = "mysource")
public BiConsumer<><byte[]>, MessageHeaders> mySource() {
return (message, headers) -> {
byte[] rawMessage = message.getPayload();
byte[] rawKey = (byte[]) headers.get("kafka_receivedMessageKey");
processMessage(rawMessage, rawKey);
};
}
Pero esto produce un error al iniciar: FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet - La definición de función 'mysource' no es válida. El bean de función referenciado o uno de sus componentes no existe
.
También soy consciente de que junto con Supplier
y Consumer
, también está disponible Function
, pero no estoy seguro de cómo usar una Function en este caso en lugar de un BiConsumer
o si es posible, por lo que busco ejemplos sobre cómo realizar esta migración sin problemas y de forma elegante con respeto a la consumición y producción de mensajes y encabezados de/a Kafka.</byte[]>
davy.ai
A partir de la versión 3.1 de Spring Cloud Stream,
@EnableBinding
y@StreamListener
están efectivamente obsoletos a favor de un modelo de programación funcional. Sin embargo, la funcionalidad de mapeo de argumentos en Spring Cloud Stream aún es compatible y se puede utilizar con el nuevo modelo de programación funcional.En su caso, ya ha migrado su código para utilizar el nuevo modelo de programación funcional con un
Consumer<><?>>
. Sin embargo, si desea utilizar la funcionalidad de mapeo de argumentos, puede definir suConsumer
como unaFunction<><byte[]>, Void>
en su lugar.Aquí hay un ejemplo de cómo puede definir su función:
En este ejemplo, definimos una función que recibe un
Message<byte[]>
como entrada y devuelveVoid
. Luego extraemos los valores de carga útil y encabezado del mensaje y llamamos aprocesarMensaje
con esos valores.Una vez que haya definido su función, puede vincularla a un tema de Kafka utilizando la propiedad
spring.cloud.stream.function.bindings.miFuncion-in-0
en su archivo application.yml:Esto vinculará su función
miFuncion
a un tema de Kafka llamadomysource
. La propiedadcontentType
es opcional, pero si conoce el tipo de contenido de sus mensajes, puede establecerlo aquí.Con esta configuración, Spring Cloud Stream convertirá automáticamente sus mensajes de Kafka a
Message<byte[]>
y los pasará a su funciónmiFuncion
. Los valores de carga útil y encabezado se mapearán automáticamente a los argumentos de entrada en función de sus nombres y tipos.</byte[]></byte[]></byte[]></byte[]>