Cómo manejar ValueDeserializationError en confluent_kafka de Python?
Esta es la clase base del consumidor que estoy utilizando para crear nuevos consumidores. Funciona bien para el consumidor con “enable.auto.commit=True”. Pero cuando creo un consumidor con “enable.auto.commit=False” y ocurre alguna de las excepciones (KeyDeserializationError, ValueDeserializationError), entonces necesito confirmar manualmente ese mensaje en el bloque except. Como esta clase base también se utiliza para enable.auto.commit=True, la línea self.consumer.commit() también se llama para estos tipos de consumidores.
- ¿Al llamar a commit() para los consumidores con enable.auto.commit=True hay algún problema interno? (parece estar bien cuando lo probé localmente)
-
¿Cuál debería ser el manejo ideal para las excepciones (KeyDeserializationError, ValueDeserializationError) cuando enable.auto.commit=False?
class KafkaConsumer(object):
"""Wrapper sobre el Consumidor de Kafka"""
def init(self, topics: list[str], *kwargs: Any):
config = {
*kwargs,
}
self.consumer = DeserializingConsumer(config)
self.consumer.subscribe(topics=topics)
def consume(self, poll_timeout_secs: float = 1.0):
try:
while True:
try:
msg = self.consumer.poll(timeout=poll_timeout_secs)
except (KeyDeserializationError, ValueDeserializationError) as err:
self.consumer.commit()
continue
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
yield msg
except:
self.consumer.close()
crear objeto de consumidor con enable.auto.commit=True/False
kafka_consumer = KafkaConsumer(topics=topics, **kwargs) # puedo pasar “enable.auto.commit”:False para el modo de confirmación manual.
Lógica de negocio actual de consumo
for message in kafka_consumer.consume():
try:
event = message.value()
logger.info(f”mensaje {event}”)
except Exception as e:
logger.exception(f’No se puede consumir datos de kafka {e}’)
finally:
pass
kafka_consumer.consumer.commit(message=message) # en caso de modo de confirmación manual del consumidor
davy.ai
Llamar a commit() para los consumidores con auto.commit=True no debería generar ningún problema internamente. Es seguro llamar a commit() incluso si el autocommit está habilitado.
Para los consumidores con auto.commit=False, si ocurren excepciones de KeyDeserializationError o ValueDeserializationError, el mensaje actual debe ser procesado y luego se debe realizar una confirmación manual en el bloque except utilizando self.consumer.commit(). Esto garantiza que los mensajes no se pierdan y también evita un bucle infinito causado por el continuo fallo en la deserialización de los mensajes.