Equivalente a ChannelInterceptor para Spring Cloud Stream Kafka Streams Binder
Desarrollamos un marco interno de la empresa sobre Spring Boot y nos gustaría compatibilizar Kafka-Streams con Spring Cloud Stream. Necesitamos inyectar automáticamente algunos encabezados en todos los mensajes salientes. Hemos logrado esto con el Binder Kafka de Spring Cloud Stream al registrar un ChannelInterceptor
personalizado, pero esto no funciona para Kafka Streams, ya que parecen seguir un camino diferente.
¿Existe algún equivalente a ChannelInterceptor
para el binder de Kafka Streams de Spring Cloud Stream?
Encontré este personalizador/configurador personalizado:
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(final StreamsBuilder builder) {
}
@Override
public void configureTopology(final Topology topology) {
}
});
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(final KafkaStreams kafkaStreams) {
}
});
};
Mi última idea fue utilizar el método configureTopology
para modificar automáticamente la Topology
e insertar un Transformer
justo antes del último nodo de destino. Sin embargo, para agregar este nuevo nodo, debo especificar el nodo padre, por lo que necesitaría conocer el nombre del último nodo de destino y tal vez todos los nombres de los nodos se generaron automáticamente por Kafka Streams… La única manera sería usar el método topology.describe() y tal vez analizar la salida de la cadena… Esto suena demasiado complicado en comparación con un simple ChannelInterceptor
.
¿Alguna idea?
davy.ai
Desafortunadamente, no parece haber un equivalente a
ChannelInterceptor
para Spring Cloud Stream Kafka Streams binder. Sin embargo, el personalizador/configurador proporcionado en la pregunta se puede utilizar para modificar la infraestructura de Kafka Streams y personalizar la instancia de Kafka Streams.Una posible solución para inyectar automáticamente encabezados en todos los mensajes salientes es utilizar el método
configureTopology
para modificar laTopology
. LaTopology
se puede obtener delStreamsBuilder
utilizando el métodobuild
. LaTopology
se puede recorrer para encontrar el último nodo de destino iterando sobre todos los nodos y comprobando los nodos de destino. Una vez encontrado el último nodo de destino, se puede insertar un nuevoTransformer
justo antes para inyectar los encabezados.Desafortunadamente, como se menciona en la pregunta, los nombres de los nodos se generan automáticamente por Kafka Streams, por lo que puede que analizar la salida de
topology.describe()
sea la única forma de encontrar el último nodo de destino. Esta solución puede ser más compleja que utilizar un simpleChannelInterceptor
.Otro enfoque podría ser modificar la instancia de
KafkaStreams
utilizandoKafkaStreamsCustomizer
. La instancia deKafkaStreams
se puede obtener delStreamsBuilderFactoryBean
. En el métodocustomize
, se puede utilizar el mismo enfoque descrito anteriormente para encontrar el último nodo de destino e inyectar los encabezados utilizando elTransformer
.