es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

¿Cómo recuerda NATS / JetStream a los suscriptores?

Estoy dando mis primeros pasos con NATS y viendo un comportamiento que no puedo entender, incluso después de leer cuidadosamente los documentos. Tengo un servidor NATS local (2.6.5) en ejecución. Fue iniciado con:

./nats-server -js

Genero algunos mensajes con el siguiente código:

async def main():
nc = await nats.connect()
js = nc.jetstream()

await js.delete_stream(name="hello")
await js.add_stream(
    name="hello",
    subjects=["hello"],
)
for i in range(0, 10):
    await js.publish("hello", f"hello world: {i}".encode())
await nc.close()

if name == “main“:
asyncio.run(main())

Si ejecuto el código y ejecuto ./nats stream ls, veo 10 mensajes. Hasta aquí todo bien. Luego ejecuto mi consumidor:

async def main():
nc = await nats.connect()
js = nc.jetstream()
sub = await js.pull_subscribe(“hello”, “hello”)

msg_count = 0
while msg_count < 10:
    for msg in await sub.fetch(1):
        print("Received:", msg.data)
        msg_count = msg_count + 1

        # Intenta nackear cada tercer mensaje
        if msg_count % 3 == 0:
            await msg.nak()
        else:
            await msg.ack()

await nc.close()

if name == “main“:
asyncio.run(main())

La salida muestra:

Received: b’hello world: 0′
Received: b’hello world: 1′
Received: b’hello world: 2′
Received: b’hello world: 2′
Received: b’hello world: 3′
Received: b’hello world: 4′
Received: b’hello world: 4′
Received: b’hello world: 5′
Received: b’hello world: 6′
Received: b’hello world: 6′

Eso tiene sentido: saco 10 mensajes. Cada tercer mensaje es “nakeado”, por lo que es recuperado de nuevo por la próxima llamada. Si vuelvo a ejecutar el script, la salida es:

Received: b’hello world: 7′
Received: b’hello world: 8′
Received: b’hello world: 9′
Received: b’hello world: 9′

Y después de unos segundos obtengo un tiempo de espera. Obviamente, NATS de alguna manera recuerda mi script y continúa entregando mensajes. Pero no entiendo cómo sucede esto. ¿Hay un cursor “global” en la secuencia? Pero en ese caso, varios clientes interferirían, lo cual no tiene sentido para mí. Así que asumo que NATS de alguna manera recuerda a mi cliente. ¿Cómo? ¿Y cómo le diría a NATS que quiero reiniciar? También apreciaría un enlace a la documentación que obviamente me perdí.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Parece que NATS utiliza una secuencia de transmisión para la entrega de mensajes y su consumidor realiza un seguimiento del último número de secuencia que ha procesado. Cuando reinicia su script, NATS recuerda el último número de secuencia que su cliente ha procesado y continúa desde allí.

    Para indicar a NATS que desea reiniciar desde el principio de la secuencia, puede especificar una política de entrega al crear la suscripción. Por ejemplo, puede usar deliver_last para recibir solo el último mensaje en la secuencia o deliver_all_available para recibir todos los mensajes disponibles en la secuencia.

    La documentación sobre políticas de entrega se puede encontrar aquí: https://docs.nats.io/jetstream/concepts/streaming-delivery-policies

    Además, puede restablecer el estado de su cliente eliminando la suscripción y creando una nueva con la política de entrega deseada.

    La documentación sobre cómo trabajar con suscripciones se puede encontrar aquí: https://docs.nats.io/jetstream/nats_streaming_subscriptions

    En general, parece que NATS está diseñado para manejar múltiples clientes que acceden a la misma secuencia, y utiliza una combinación de números de secuencia y políticas de entrega para garantizar la entrega confiable de mensajes.

Comments are closed.