es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo manejar ValueDeserializationError en confluent_kafka de Python?

Esta es la clase base del consumidor que estoy utilizando para crear nuevos consumidores. Funciona bien para el consumidor con “enable.auto.commit=True”. Pero cuando creo un consumidor con “enable.auto.commit=False” y ocurre alguna de las excepciones (KeyDeserializationError, ValueDeserializationError), entonces necesito confirmar manualmente ese mensaje en el bloque except. Como esta clase base también se utiliza para enable.auto.commit=True, la línea self.consumer.commit() también se llama para estos tipos de consumidores.

  1. ¿Al llamar a commit() para los consumidores con enable.auto.commit=True hay algún problema interno? (parece estar bien cuando lo probé localmente)

  2. ¿Cuál debería ser el manejo ideal para las excepciones (KeyDeserializationError, ValueDeserializationError) cuando enable.auto.commit=False?

class KafkaConsumer(object):
    """Wrapper sobre el Consumidor de Kafka"""
    def init(self, topics: list[str], *kwargs: Any):
        config = {
            *kwargs,
        }
        self.consumer = DeserializingConsumer(config)
        self.consumer.subscribe(topics=topics)

def consume(self, poll_timeout_secs: float = 1.0):
    try:
        while True:
            try:
                msg = self.consumer.poll(timeout=poll_timeout_secs)
            except (KeyDeserializationError, ValueDeserializationError) as err:
                self.consumer.commit()
                continue
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                yield msg
    except:
        self.consumer.close()

crear objeto de consumidor con enable.auto.commit=True/False

kafka_consumer = KafkaConsumer(topics=topics, **kwargs) # puedo pasar “enable.auto.commit”:False para el modo de confirmación manual.

Lógica de negocio actual de consumo

for message in kafka_consumer.consume():
try:
event = message.value()
logger.info(f”mensaje {event}”)
except Exception as e:
logger.exception(f’No se puede consumir datos de kafka {e}’)
finally:
pass

kafka_consumer.consumer.commit(message=message) # en caso de modo de confirmación manual del consumidor

Tags:  , , , ,

Answer

  1. Avatar for davy.ai
    1. ¿Al llamar a commit() para los consumidores con auto.commit=True genera algún problema internamente?

    Llamar a commit() para los consumidores con auto.commit=True no debería generar ningún problema internamente. Es seguro llamar a commit() incluso si el autocommit está habilitado.

    1. ¿Cuál debería ser el manejo ideal para las excepciones (KeyDeserializationError, ValueDeserializationError) cuando auto.commit=False?

    Para los consumidores con auto.commit=False, si ocurren excepciones de KeyDeserializationError o ValueDeserializationError, el mensaje actual debe ser procesado y luego se debe realizar una confirmación manual en el bloque except utilizando self.consumer.commit(). Esto garantiza que los mensajes no se pierdan y también evita un bucle infinito causado por el continuo fallo en la deserialización de los mensajes.

Comments are closed.