Mongo change-Stream with Spring resume En la reanudación con Mongo change-stream usando Spring, en comparación con la opción “at” (en) y “startAfter” (empezar después), se garantiza la tolerancia a fallos en caso de pérdida de conexión.
No puedo encontrar una respuesta en stackOverflow ni en ninguna documentación,
Tengo el siguiente código de cambio de flujo (escuchar una base de datos no una colección específica)
Versión de Mongo es 4.2
@Configuration
public class DatabaseChangeStreamListener{
//Constructor, campos, etc...
@PostConstruct
public void initialize(){
MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
ChangeStreamRequest.ChangeStreamRequestOptions options = new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
container.start();
}
private ChangeStreamOptions buildChangeStreamOptions(){
return ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
.resumeAt(Instant.now().minusSeconds(1))
.build();
}
//más código
}
Quiero que el flujo comience a escuchar solo desde el momento de la iniciación del sistema, sin tomar nada previo en el op-log, ¿funcionará .resumeAt(Instant.now().minusSeconds(1))
?
¿Necesito usar el método startAfter
y, de ser así, cómo puedo encontrar el último resumeToken
en la base de datos?
¿O ya está listo para usar y no necesito agregar ninguna línea de reanudación/inicio?
Segunda pregunta, nunca detengo el contenedor (debe estar en funcionamiento mientras la aplicación se ejecute), en caso de desconexión de MongoDB y reconexión, ¿el listener en la configuración actual seguirá consumiendo mensajes? (Tengo problemas para simular la desconexión de la base de datos)
Si no volverá a reanudar el manejo de eventos, ¿qué debo cambiar en la configuración para que el cambio de flujo continúe y tome todos los eventos desde el último resumeToken
recibido antes de la desconexión?
He leído este gran artículo en medium change stream en producción, pero utiliza el cursor directamente, y quiero usar el DefaultMessageListenerContainer
de Spring, ya que es mucho más elegante.
davy.ai
Primera pregunta
Si desea que el flujo comience a escuchar solo desde el momento de inicio del sistema, sin tener en cuenta nada anterior en el op-log, debe eliminar la línea
resumeAt()
del métodobuildChangeStreamOptions()
. La línearesumeAt()
se utiliza para especificar el momento a partir del cual el flujo de cambios debe comenzar a escuchar, y si no se especifica, por defecto comienza a escuchar desde el último evento en el oplog.Segunda pregunta
Si hay una desconexión de MongoDB y una reconexión, entonces el listener en la configuración actual continuará consumiendo mensajes. Sin embargo, se recomienda utilizar un mecanismo de reintento para manejar el escenario de desconexión. Puede utilizar el método
retry()
delDefaultMessageListenerContainer
para establecer el mecanismo de reintento.Si desea que el flujo de cambios continúe y tome todos los eventos desde el último
resumeToken
recibido antes de la desconexión, entonces necesita almacenar elresumeToken
en una base de datos o un archivo, y cuando la aplicación inicie, leer elresumeToken
desde la base de datos o el archivo, y pasarlo al métodoresumeAfter()
en lugar del métodoresumeAt()
en el métodobuildChangeStreamOptions()
para continuar el flujo de cambios desde el últimoresumeToken
recibido.