es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Problema al configurar Serdes para Kafka Streams.

Coloqué un objeto JSON en mi tema “commits”. Quiero consumir mensajes usando Kafka Streams, pero hay un error.

@Configuration
@EnableKafka
@EnableKafkaStreams
public class AnalyzerConfiguration {
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<string, object=""> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, commitSerde().getClass());

        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public Serde<commit> commitSerde() {
        return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(Commit.class));
    }

    @Bean
    public KStream<string, commit=""> kStream(StreamsBuilder builder) {
        KStream<string, commit=""> stream = builder.stream("commits", Consumed.with(Serdes.String(), commitSerde()));

        KTable<string, long=""> commitsCount = stream
            .mapValues(Commit::getAuthorName)
            .selectKey((key, word) -> word)
            .groupByKey()
            .count(Materialized.as("Counts"));

        commitsCount.toStream().to("commits-count", Produced.with(Serdes.String(), Serdes.Long()));

        return stream;
    }
}

El registro dice:

Excepción en el hilo “test-streams-469f5ee6-d0de-472e-a602-a7b6d11f2e1c-StreamThread-1” org.apache.kafka.streams.errors.StreamsException: No se pudo configurar la clase serde de valor org.apache.kafka.common.serialization.Serdes$WrapperSerde

Causado por: org.apache.kafka.common.KafkaException: No se pudo encontrar un constructor público sin argumentos para org.apache.kafka.common.serialization.Serdes$WrapperSerde

Causado por: java.lang.NoSuchMethodException</string,></string,></string,></string,>

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    El mensaje de error sugiere que hay un problema con la serialización/deserialización del valor. Específicamente, parece que el código no puede encontrar un constructor sin argumentos para la clase Serdes$WrapperSerde. Esto probablemente se debe a que la clase Serdes no proporciona un constructor sin argumentos para su clase WrapperSerde.

    Para solucionar este problema, es posible que desee modificar el método commitSerde para utilizar una implementación personalizada de Serde que pueda serializar/deserializar correctamente los objetos Commit. Por ejemplo:

    @Bean
    public Serde<commit> commitSerde() {
        JsonSerializer<commit> serializer = new JsonSerializer<>();
        JsonDeserializer<commit> deserializer = new JsonDeserializer<>(Commit.class);
        return SerdeFactory.createJsonSerde(serializer, deserializer);
    }
    

    Aquí, utilizamos una clase SerdeFactory personalizada para crear un nuevo objeto JsonSerde que utiliza las clases JsonSerializer y JsonDeserializer para serializar/deserializar datos. Esto debería eliminar la necesidad de la clase WrapperSerde y resolver el error.

Comments are closed.