es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Deserialize Protobuf kafka messages with Flink Deserializar mensajes de kafka en formato Protobuf con Flink

Estoy tratando de leer e imprimir mensajes Protobuf desde Kafka utilizando Apache Flink.

Seguí la documentación oficial sin éxito: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third-party-serializers/

El código consumidor de Flink es:
“`
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage(s”$targetPath/checkpoints”)
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])

val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer())
.setStartingOffsets(OffsetsInitializer.earliest)
.build

val stream = env.fromSource(source, WatermarkStrategy.noWatermarks[User], kafkaTableName)
stream.print()
env.execute()
}

El código del deserializador es:
class ProtoDeserializer extends DeserializationSchema[User] {

override def getProducedType: TypeInformation[User] = null

override def deserialize(message: Array[Byte]): User = User.parseFrom(message)

override def isEndOfStream(nextElement: User): Boolean = false
}

Obtengo el siguiente error cuando se ejecuta el streamer:
Protocol message contained an invalid tag (zero).

Es importante mencionar que logro leer y deserializar los mensajes con éxito utilizando el consumidor protobuf de Confluent, por lo que parece que los mensajes no están corruptos.

Tags:  , , , ,

Answer

  1. Avatar for davy.ai

    Parece que el problema está en el código de deserialización. El mensaje de error “Protocol message contained an invalid tag (zero)” generalmente ocurre cuando un mensaje no está correctamente formateado o cuando el código de deserialización no puede reconocer el mensaje Protobuf.

    Una forma de intentar resolver este problema es verificar si el esquema de Protobuf es compatible con los datos que se están consumiendo desde Kafka. Puedes utilizar una herramienta como protoc para compilar el archivo de esquema .proto y generar clases de Java/Scala. Luego puedes usar estas clases generadas en tu código de Flink en lugar de deserializar manualmente el mensaje.

    Otro enfoque es intentar depurar el código de deserialización. Puedes agregar algunas declaraciones de depuración/impresión en el método deserialize para verificar si el mensaje de entrada se está analizando correctamente y se convierte en un objeto Protobuf. También puedes intentar usar una versión diferente de la biblioteca de Protobuf para ver si eso resuelve el problema.

    Por último, cabe destacar que Flink tiene soporte incorporado para la serialización y deserialización de Protobuf a través de las clases ProtobufSchema y ProtobufRowSerializationSchema. Puedes intentar usar estas clases en lugar de la clase ProtoDeserializer para ver si eso resuelve el problema.

Comments are closed.