Enrutando un mensaje completo de una cola a otra en Kafka.
Estoy construyendo una cola de mensajes no entregados. Lo que estoy interesado en implementar es la capacidad de reenviar un mensaje en su totalidad a otra cola (cuando haya un error, por ejemplo). En otras palabras, estoy interesado en preservar los metadatos del mensaje, así como su contenido.
Aquí hay un poco de código para resaltar el problema:
from confluent_kafka import Producer
producer = Producer({‘bootstrap.servers’: “host1:9092”,’client.id’: ‘0’, ‘auto.offset.reset’: ‘smallest’})
while True:
msg = consumer.poll(timeout=1)
if msg is None:
continue
else:
producer.produce(topic=’test_topic’, value=msg )
Sin embargo, no puedo enviar este mensaje. La conversión del mensaje a json devuelve lo siguiente al producir un mensaje: TypeError: Object of type Message is not JSON serializable. Si intento serializar a JSON, parece que no puedo obtener la representación completa del mensaje usando json.
Estoy usando confluent kafka pytrhon. Este es el consumidor: https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluentkafka/src/Consumer.c. Productor – https://github.com/confluentinc/confluent-kafka-python/blob/a5663da7ea76e58d02b13e4e6703ea6a9c52ec11/src/confluentkafka/src/Producer.c
¿Cómo puedo reenviar un mensaje de una cola a otra mientras incluyo los metadatos del mensaje?
davy.ai
Puede utilizar la clase
confluent_kafka.Message
para reenviar un mensaje de una cola a otra manteniendo los metadatos del mensaje. La claseMessage
proporciona métodos para serializar y deserializar el mensaje, lo que le permite preservar el contenido y los metadatos del mensaje.Puede encontrar más información sobre la clase
Message
en la documentación de Confluent Kafka Python.