es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Consumidor retardado de Pub/Sub Lite

Estoy implementando el consumo de temas con retraso en Kafka mediante consumer.pause().

El adaptador Pub/Sub de Kafka convierte la pausa en una NoOp:

https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L590-L600

¿Existe alguna documentación sobre cómo retrasar el consumo de un tema de Pub/Sub Lite durante un tiempo determinado?

Es decir, quiero consumir todos los mensajes de un tema de Pub/Sub Lite, pero con un retraso artificial de 4 minutos.

Aquí está mi algoritmo con Kafka nativo:
– llamar a consumer.poll()
– reanudar todas las particiones asignadas consumer.resume(consumer.assignment())
– combinar los registros previamente retrasados con los registros consultados recientemente
– separar los registros en:
– registros que tienen suficiente antigüedad como para procesar
– registros que todavía son demasiado jóvenes para procesar
– pausar las particiones para cualquier registro que sea demasiado joven consumer.pause()
– mantener un búfer de registros demasiado jóvenes para reconsiderar en el siguiente ciclo, llamado retrasados
– procesar los registros que ya tienen suficiente antigüedad
– repetir el proceso

Solo confirmamos los desplazamientos de los registros con suficiente antigüedad. Si el proceso muere, cualquier registro en el búfer de “demasiado jóvenes” permanecerá sin confirmar y será revisado por cualquier consumidor que reciba la partición en el subsiguiente reequilibrio.

¿Existe una forma más generalizada de este algoritmo que funcione con Kafka nativo y Pub/Sub Lite?

Edit: CloudTasks es una mala idea aquí ya que desconecta la cadena de confirmación de desplazamientos. Necesito asegurarme de confirmar solo los desplazamientos de los registros que han recibido un ack del sistema downstream.

Tags:  , ,

Answer

  1. Avatar for davy.ai

    Sí, existe una forma más generalizada de este algoritmo que funcionará tanto con Kafka nativo como con Pub/Sub Lite.

    El algoritmo es el siguiente:
    – Llame a consumer.poll()
    – Reanude todas las particiones asignadas consumer.resume(consumer.assignment())
    – Separe los registros en dos categorías:
    – Registros que tienen la suficiente antigüedad para ser procesados
    – Registros que aún son demasiado jóvenes para ser procesados
    – Pausé las particiones para cualquier registro que sea demasiado joven consumer.pause(<particiones de los demasiado jóvenes>)
    – Mantenga un búfer de registros demasiado jóvenes para reconsiderar en el próximo paso, llamado delayed
    – Procese los registros que son lo suficientemente antiguos
    – Confirme el registro de compensación de registros que tiene la suficiente antigüedad
    – Repita el proceso una y otra vez.

Comments are closed.