es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Equivalente a ChannelInterceptor para Spring Cloud Stream Kafka Streams Binder

Desarrollamos un marco interno de la empresa sobre Spring Boot y nos gustaría compatibilizar Kafka-Streams con Spring Cloud Stream. Necesitamos inyectar automáticamente algunos encabezados en todos los mensajes salientes. Hemos logrado esto con el Binder Kafka de Spring Cloud Stream al registrar un ChannelInterceptor personalizado, pero esto no funciona para Kafka Streams, ya que parecen seguir un camino diferente.

¿Existe algún equivalente a ChannelInterceptor para el binder de Kafka Streams de Spring Cloud Stream?

Encontré este personalizador/configurador personalizado:

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
  return factoryBean -> {

  factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
    @Override
    public void configureBuilder(final StreamsBuilder builder) {

    }

    @Override
    public void configureTopology(final Topology topology) {

    }
  });

  factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
    @Override
    public void customize(final KafkaStreams kafkaStreams) {

    }
  });

};

Mi última idea fue utilizar el método configureTopology para modificar automáticamente la Topology e insertar un Transformer justo antes del último nodo de destino. Sin embargo, para agregar este nuevo nodo, debo especificar el nodo padre, por lo que necesitaría conocer el nombre del último nodo de destino y tal vez todos los nombres de los nodos se generaron automáticamente por Kafka Streams… La única manera sería usar el método topology.describe() y tal vez analizar la salida de la cadena… Esto suena demasiado complicado en comparación con un simple ChannelInterceptor.

¿Alguna idea?

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Desafortunadamente, no parece haber un equivalente a ChannelInterceptor para Spring Cloud Stream Kafka Streams binder. Sin embargo, el personalizador/configurador proporcionado en la pregunta se puede utilizar para modificar la infraestructura de Kafka Streams y personalizar la instancia de Kafka Streams.

    Una posible solución para inyectar automáticamente encabezados en todos los mensajes salientes es utilizar el método configureTopology para modificar la Topology. La Topology se puede obtener del StreamsBuilder utilizando el método build. La Topology se puede recorrer para encontrar el último nodo de destino iterando sobre todos los nodos y comprobando los nodos de destino. Una vez encontrado el último nodo de destino, se puede insertar un nuevo Transformer justo antes para inyectar los encabezados.

    Desafortunadamente, como se menciona en la pregunta, los nombres de los nodos se generan automáticamente por Kafka Streams, por lo que puede que analizar la salida de topology.describe() sea la única forma de encontrar el último nodo de destino. Esta solución puede ser más compleja que utilizar un simple ChannelInterceptor.

    Otro enfoque podría ser modificar la instancia de KafkaStreams utilizando KafkaStreamsCustomizer. La instancia de KafkaStreams se puede obtener del StreamsBuilderFactoryBean. En el método customize, se puede utilizar el mismo enfoque descrito anteriormente para encontrar el último nodo de destino e inyectar los encabezados utilizando el Transformer.

Comments are closed.