es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Kafka stream con AVRO Serde personalizado (sin esquema)

Tengo una aplicación de procesamiento de flujo que utiliza el formato de mensaje AVRO. Para la serialización y deserialización (Serde) se utiliza io.confluent.kafka.streams.serdes.avro.GenericAvroSerde.

Intenté crear un Serde AVRO personalizado como el siguiente:

public class CustomAvroSerde implements Serde {

Class<SpecificRecord> targetType;

public CustomAvroSerde() {

}

public CustomAvroSerde(Class<SpecificRecord> targetType) {
    this.targetType = targetType;
}

@Override
public Serializer<SpecificRecord> serializer() {
    return new Serializer<SpecificRecord>() {
        @Override
        public byte[] serialize(String s, SpecificRecord data) {
            byte[] result = null;
            if (data != null) {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                BinaryEncoder binaryEncoder =
                        EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

                DatumWriter<SpecificRecord> datumWriter = new SpecificDatumWriter<>(data.getSchema());

                try {
                    datumWriter.write(data, binaryEncoder);
                    binaryEncoder.flush();
                    byteArrayOutputStream.close();
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }

                result = byteArrayOutputStream.toByteArray();
            }
            return result;
        }
    };
}

@Override
public Deserializer<SpecificRecord> deserializer() {
    return new Deserializer<SpecificRecord>() {
        @Override
        public SpecificRecord deserialize(String s, byte[] data) {
            SpecificRecord result = null;
            if (data != null) {
                DatumReader<SpecificRecord> datumReader;
                try {
                    BinaryDecoder binaryDecoder = new DecoderFactory().createBinaryDecoder(data, null);
                    datumReader = new SpecificDatumReader<>()
                    //también se intentó suministrar el esquema como línea comentada a continuación
                    //datumReader = new SpecificDatumReader<>(new Schema.Parser().parse("esquema avro en forma de cadena"));
                    Decoder decoder = DecoderFactory.get().binaryDecoder(data, binaryDecoder);
                    result = datumReader.read(null, decoder);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return result;
        }
    };
}

}

El proceso de serialización funciona bien, pero al realizar la deserialización se produce un error de Nullpointer, ya que el Serde no tiene referencia del esquema AVRO en targetType durante la deserialización.

No tendré detalles del esquema hasta que se ejecute, por lo que busco un esquema generalizado que se proporcione en la deserialización.

El Serde de Confluent inicializa targetType con KafkaAvroDeserializer. ¿Cómo manejar esto? ¿Alguna pista?

Contenido editado
Se cambió la serialización y deserialización para utilizar SpecificRecord.
Después del cambio de código, se produjo una excepción Nullpointer como se muestra a continuación

java.lang.NullPointerException
at org.apache.avro.Schema.applyAliases(Schema.java:1787)
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:130)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)

Como se comentó en el código, se intentó con Scheme pasándolo a SpecificDatumReader mientras se construía. Cuando se intentó esto, se produjo una excepción de casting como se muestra a continuación,

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)

También se habilitó la bandera specific.avro.reader para que sea verdadera.
Fin del contenido editado

Gracias
Venkata

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que está teniendo problemas para crear un AVRO Serde personalizado para su aplicación de procesamiento de flujo. Ya ha intentado utilizar SpecificRecord para la serialización y deserialización, pero está encontrando problemas con NullPointerException y ClassCastException. También ha habilitado el indicador specific.avro.reader como true.

    Sería útil tener más información sobre su aplicación, como la versión de Kafka Streams que está utilizando, la versión de AVRO que está utilizando y los mensajes de error exactos que está recibiendo. Además, sería útil tener más información sobre el esquema que está utilizando, como el tipo de datos que contiene y la estructura del esquema.

    Si puede proporcionar más información, puede ayudarnos a entender mejor el problema y proporcionar una respuesta más detallada.

Comments are closed.