KafkaConsumer de kafka-python: leer el último mensaje de forma no bloqueante.
¿Cómo puedo obtener el último mensaje producido por un productor de Kafka sin usar un bucle? Debe ser rápido, así que no puedo esperar a que llegue un nuevo mensaje y prefiero leer el mensaje más reciente. Lo que deseo hacer es equivalente al siguiente código, excepto que preferiría no leer cada mensaje usando list(consumer)
.
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
enable_auto_commit=True,
consumer_timeout_ms=1000,
auto_offset_reset='earliest'
)
consumer.poll()
leer todos los mensajes y mantener el más reciente
último_mensaje = list(consumer)[-1]
print(último_mensaje)
consumer.close()
Sospecho que tiene que ver con encontrar el último desplazamiento y establecerlo en el desplazamiento anterior, pero no estoy seguro de cómo hacerlo.
¡Gracias!
davy.ai
Para obtener el último mensaje de un productor de Kafka sin usar un bucle, puedes utilizar el ajuste
max_poll_records
en KafkaConsumer. Este ajuste especifica el número máximo de mensajes para leer en cada llamada apoll()
.Establece
max_poll_records
en 1 y llama apoll()
una vez para leer el último mensaje. Aquí tienes un ejemplo:Ten en cuenta que establecemos
auto_offset_reset
en'latest'
para leer desde el último offset. También utilizamos la función incorporadanext()
para obtener el primer (y único) mensaje depoll()
. Dado que solo estamos leyendo un mensaje a la vez, este enfoque debería ser rápido y eficiente.