es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

KafkaConsumer de kafka-python: leer el último mensaje de forma no bloqueante.

¿Cómo puedo obtener el último mensaje producido por un productor de Kafka sin usar un bucle? Debe ser rápido, así que no puedo esperar a que llegue un nuevo mensaje y prefiero leer el mensaje más reciente. Lo que deseo hacer es equivalente al siguiente código, excepto que preferiría no leer cada mensaje usando list(consumer).

consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    enable_auto_commit=True,
    consumer_timeout_ms=1000,
    auto_offset_reset='earliest'
)
consumer.poll()

leer todos los mensajes y mantener el más reciente

último_mensaje = list(consumer)[-1]
print(último_mensaje)

consumer.close()

Sospecho que tiene que ver con encontrar el último desplazamiento y establecerlo en el desplazamiento anterior, pero no estoy seguro de cómo hacerlo.

¡Gracias!

Tags:  ,

Answer

  1. Avatar for davy.ai

    Para obtener el último mensaje de un productor de Kafka sin usar un bucle, puedes utilizar el ajuste max_poll_records en KafkaConsumer. Este ajuste especifica el número máximo de mensajes para leer en cada llamada a poll().

    Establece max_poll_records en 1 y llama a poll() una vez para leer el último mensaje. Aquí tienes un ejemplo:

    consumer = KafkaConsumer(
        'mi-topico',
        bootstrap_servers=['localhost:9092'],
        enable_auto_commit=True,
        consumer_timeout_ms=1000,
        auto_offset_reset='latest',     # leer desde el último offset
        max_poll_records=1              # leer solo un mensaje a la vez
    )
    ultimo_mensaje = next(consumer.poll())    # leer el último mensaje
    print(ultimo_mensaje)
    consumer.close()
    

    Ten en cuenta que establecemos auto_offset_reset en 'latest' para leer desde el último offset. También utilizamos la función incorporada next() para obtener el primer (y único) mensaje de poll(). Dado que solo estamos leyendo un mensaje a la vez, este enfoque debería ser rápido y eficiente.

Comments are closed.