es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Mongo change-Stream with Spring resume En la reanudación con Mongo change-stream usando Spring, en comparación con la opción “at” (en) y “startAfter” (empezar después), se garantiza la tolerancia a fallos en caso de pérdida de conexión.

No puedo encontrar una respuesta en stackOverflow ni en ninguna documentación,
Tengo el siguiente código de cambio de flujo (escuchar una base de datos no una colección específica)

Versión de Mongo es 4.2

@Configuration
public class DatabaseChangeStreamListener{

//Constructor, campos, etc...

@PostConstruct
public void initialize(){
    MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
    ChangeStreamRequest.ChangeStreamRequestOptions options = new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
    container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
    container.start();
}

private ChangeStreamOptions buildChangeStreamOptions(){
    return ChangeStreamOptions.builder()
    .returnFullDocumentOnUpdate()
    .filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
    .resumeAt(Instant.now().minusSeconds(1))
    .build();
}

//más código
}

Quiero que el flujo comience a escuchar solo desde el momento de la iniciación del sistema, sin tomar nada previo en el op-log, ¿funcionará .resumeAt(Instant.now().minusSeconds(1))?
¿Necesito usar el método startAfter y, de ser así, cómo puedo encontrar el último resumeToken en la base de datos?
¿O ya está listo para usar y no necesito agregar ninguna línea de reanudación/inicio?

Segunda pregunta, nunca detengo el contenedor (debe estar en funcionamiento mientras la aplicación se ejecute), en caso de desconexión de MongoDB y reconexión, ¿el listener en la configuración actual seguirá consumiendo mensajes? (Tengo problemas para simular la desconexión de la base de datos)

Si no volverá a reanudar el manejo de eventos, ¿qué debo cambiar en la configuración para que el cambio de flujo continúe y tome todos los eventos desde el último resumeToken recibido antes de la desconexión?
He leído este gran artículo en medium change stream en producción, pero utiliza el cursor directamente, y quiero usar el DefaultMessageListenerContainer de Spring, ya que es mucho más elegante.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Primera pregunta

    Si desea que el flujo comience a escuchar solo desde el momento de inicio del sistema, sin tener en cuenta nada anterior en el op-log, debe eliminar la línea resumeAt() del método buildChangeStreamOptions(). La línea resumeAt() se utiliza para especificar el momento a partir del cual el flujo de cambios debe comenzar a escuchar, y si no se especifica, por defecto comienza a escuchar desde el último evento en el oplog.

    Segunda pregunta

    Si hay una desconexión de MongoDB y una reconexión, entonces el listener en la configuración actual continuará consumiendo mensajes. Sin embargo, se recomienda utilizar un mecanismo de reintento para manejar el escenario de desconexión. Puede utilizar el método retry() del DefaultMessageListenerContainer para establecer el mecanismo de reintento.

    Si desea que el flujo de cambios continúe y tome todos los eventos desde el último resumeToken recibido antes de la desconexión, entonces necesita almacenar el resumeToken en una base de datos o un archivo, y cuando la aplicación inicie, leer el resumeToken desde la base de datos o el archivo, y pasarlo al método resumeAfter() en lugar del método resumeAt() en el método buildChangeStreamOptions() para continuar el flujo de cambios desde el último resumeToken recibido.

Comments are closed.