Kafka stream con AVRO Serde personalizado (sin esquema)
Tengo una aplicación de procesamiento de flujo que utiliza el formato de mensaje AVRO. Para la serialización y deserialización (Serde) se utiliza io.confluent.kafka.streams.serdes.avro.GenericAvroSerde.
Intenté crear un Serde AVRO personalizado como el siguiente:
public class CustomAvroSerde implements Serde
Class<SpecificRecord> targetType;
public CustomAvroSerde() {
}
public CustomAvroSerde(Class<SpecificRecord> targetType) {
this.targetType = targetType;
}
@Override
public Serializer<SpecificRecord> serializer() {
return new Serializer<SpecificRecord>() {
@Override
public byte[] serialize(String s, SpecificRecord data) {
byte[] result = null;
if (data != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<SpecificRecord> datumWriter = new SpecificDatumWriter<>(data.getSchema());
try {
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
result = byteArrayOutputStream.toByteArray();
}
return result;
}
};
}
@Override
public Deserializer<SpecificRecord> deserializer() {
return new Deserializer<SpecificRecord>() {
@Override
public SpecificRecord deserialize(String s, byte[] data) {
SpecificRecord result = null;
if (data != null) {
DatumReader<SpecificRecord> datumReader;
try {
BinaryDecoder binaryDecoder = new DecoderFactory().createBinaryDecoder(data, null);
datumReader = new SpecificDatumReader<>()
//también se intentó suministrar el esquema como línea comentada a continuación
//datumReader = new SpecificDatumReader<>(new Schema.Parser().parse("esquema avro en forma de cadena"));
Decoder decoder = DecoderFactory.get().binaryDecoder(data, binaryDecoder);
result = datumReader.read(null, decoder);
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
};
}
}
El proceso de serialización funciona bien, pero al realizar la deserialización se produce un error de Nullpointer, ya que el Serde no tiene referencia del esquema AVRO en targetType durante la deserialización.
No tendré detalles del esquema hasta que se ejecute, por lo que busco un esquema generalizado que se proporcione en la deserialización.
El Serde de Confluent inicializa targetType con KafkaAvroDeserializer. ¿Cómo manejar esto? ¿Alguna pista?
Contenido editado
Se cambió la serialización y deserialización para utilizar SpecificRecord.
Después del cambio de código, se produjo una excepción Nullpointer como se muestra a continuación
java.lang.NullPointerException
at org.apache.avro.Schema.applyAliases(Schema.java:1787)
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:130)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
Como se comentó en el código, se intentó con Scheme pasándolo a SpecificDatumReader mientras se construía. Cuando se intentó esto, se produjo una excepción de casting como se muestra a continuación,
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:74)
at com.hcl.test.CustomAvroSerde$2.deserialize(CustomAvroSerde.java:56)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
También se habilitó la bandera specific.avro.reader para que sea verdadera.
Fin del contenido editado
Gracias
Venkata
davy.ai
Parece que está teniendo problemas para crear un AVRO Serde personalizado para su aplicación de procesamiento de flujo. Ya ha intentado utilizar
SpecificRecord
para la serialización y deserialización, pero está encontrando problemas conNullPointerException
yClassCastException
. También ha habilitado el indicadorspecific.avro.reader
comotrue
.Sería útil tener más información sobre su aplicación, como la versión de Kafka Streams que está utilizando, la versión de AVRO que está utilizando y los mensajes de error exactos que está recibiendo. Además, sería útil tener más información sobre el esquema que está utilizando, como el tipo de datos que contiene y la estructura del esquema.
Si puede proporcionar más información, puede ayudarnos a entender mejor el problema y proporcionar una respuesta más detallada.