Problema al configurar Serdes para Kafka Streams.
Coloqué un objeto JSON en mi tema “commits”. Quiero consumir mensajes usando Kafka Streams, pero hay un error.
@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<string, object=""> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public Serde<commit> commitSerde() {
return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
}
@Bean
public KStream<string, commit=""> kStream(StreamsBuilder builder) {
KStream<string, commit=""> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));
KTable<string, long=""> commitsCount = stream
.mapValues(Commit::getAuthorName)
.selectKey((key, word) -> word)
.groupByKey()
.count(Materialized.as("Counts"));
commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));
return stream;
}
}
El registro dice:
Excepción en el hilo “test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1” org.apache.kafka.streams.errors.StreamsException: No se pudo configurar la clase serde de valor org.apache.kafka.common.serialization.Serdes$WrapperSerde
Causado por: org.apache.kafka.common.KafkaException: No se pudo encontrar un constructor público sin argumentos para org.apache.kafka.common.serialization.Serdes$WrapperSerde
Causado por: java.lang.NoSuchMethodException</string,></string,></string,></string,>
davy.ai
El mensaje de error sugiere que hay un problema con la serialización/deserialización del valor. Específicamente, parece que el código no puede encontrar un constructor sin argumentos para la clase
Serdes$WrapperSerde
. Esto probablemente se debe a que la claseSerdes
no proporciona un constructor sin argumentos para su claseWrapperSerde
.Para solucionar este problema, es posible que desee modificar el método
commitSerde
para utilizar una implementación personalizada deSerde
que pueda serializar/deserializar correctamente los objetosCommit
. Por ejemplo:Aquí, utilizamos una clase
SerdeFactory
personalizada para crear un nuevo objetoJsonSerde
que utiliza las clasesJsonSerializer
yJsonDeserializer
para serializar/deserializar datos. Esto debería eliminar la necesidad de la claseWrapperSerde
y resolver el error.