es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo asegurarse de que los datos en el consumo de Kafka no se pierdan.

Ahora mi servidor está consumiendo el mensaje de kafka.
Luego envío el mensaje a un canal y hago “MarkMessage” (autocommit).
golang
func (h MsgConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
h.msgQueue <- msg
// se hará autocommit
sess.MarkMessage(msg, “”)
}
return nil
}

El goroutine del trabajador usa un for-range para leer el canal y enviar los datos a una función de procesamiento (como Insertar en la base de datos).

La pregunta es ¿cómo puedo asegurarme de que los datos en el canal no se pierdan cuando el servidor se cae?

Tengo un plan ahora:
Cada vez que inserte un mensaje en la base de datos, grabaré el offset del mensaje al mismo tiempo.
Si el servicio se cae, cuando lo reinicie, leeré la base de datos para obtener el último offset y lo estableceré en Kafka para que se vuelva a consumir.

¿Es correcto?

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Sí, ese es un enfoque correcto. Puede almacenar el desplazamiento en la base de datos y luego usarlo para establecer el desplazamiento en Kafka cuando el servicio se reinicie. Esto garantizará que no se pierdan mensajes cuando el servicio se interrumpa.

Comments are closed.