Tag: SPRING-CLOUD-STREAM-BINDER-KAFKA
Desarrollamos un marco interno de la empresa sobre Spring Boot y nos gustaría compatibilizar Kafka-Streams con Spring Cloud Stream. Necesitamos inyectar automáticamente algunos encabezados en todos los mensajes salientes. Hemos logrado esto con el Binder Kafka de Spring Cloud Stream al registrar un ChannelInterceptor personalizado, pero esto no funciona para . . . Read more
Tenemos un microservicio que lee registros de varios temas de Kafka. Para cada tema de entrada, tenemos un tema de DLQ separado para almacenar registros que no se pudieron procesar por cualquier motivo (formato de registro no válido, los registros no se pudieron analizar utilizando el esquema Avro, no se . . . Read more
Soy nuevo en Kafka. Estoy usando Spring Cloud Stream Kafka en mi proyecto, también estoy escribiendo código de manera funcional. Quiero consumir un mensaje de un único tema de entrada, construir diferentes modelos a partir del mensaje y publicar cada modelo en un tema diferente (es decir, 1 modelo -> . . . Read more
@StreamListener(“notification-input-channel”) @SendTo(“notification-output-channel”) public KStream<string,notification> process(KStream<string,posinvoice> input) { KStream<string,notification> notificationStream=input. filter((k,v)->v.getCustomerType.equals(“Prime”)). mapValues(v->recordBuilder.getNotification(v)); return notificationStream; } En primer lugar, soy completamente nuevo en Kafka. Ahora, respecto a mi pregunta: en el KStream de entrada, estoy obteniendo el valor como String y Kafka luego lo convierte en un objeto PosInvoice. Pero en el . . . Read more
Estoy actualizando una aplicación Spring Boot a Boot 2.6.1, Cloud 2021.0.0 y Cloud Stream 3.2.1. Esta aplicación tiene una serie de KStreams como: @Bean fun processEvent(): Function<><id, eventreceived=””>, KStream<id, updatedinfo?=””>> { … } Ahora la aplicación se bloquea al iniciar: Caused by: org.springframework.kafka.KafkaException: No se pudo iniciar el stream; la . . . Read more