Cómo almacenar en caché tareas asíncronas costosas para esperar aquellas que ya están en progreso?
Tengo una caché de avance que calcula algunos valores costosos. En algunos casos, debo realizar una llamada costosa al mismo recurso. En una situación en la que la caché de avance ya está calculando el valor, me gustaría .await
hasta que se haya completado este cálculo en curso.
Mi código actual (simplificado) está estructurado de manera similar a esto:
struct MyStruct {
cache: Caché, // caché para los resultados
}
impl MyStruct {
async fn compute(&self) -> ExpensiveThing { ... }
async fn forward_cache_compute(&self, identifier: &str) {
// realizar un cálculo costoso y almacenarlo en caché:
...
let value = self.compute().await // .... tarda 100ms ...
self.cache.insert(identifier, value)
// considerar si es posible guardar un futuro de compute() o una variable condicional para esperar a "identifier"
}
async fn get_from_cache_or_compute_if_needed(&self, identifier: &str) -> ExpensiveThing {
// me gustaría verificar si la caché de avance ya está calculando y devolver ese valor si es posible (¿compartir un futuro?)
if let Some(cached_value) = self.cache.get(identifier) {
// utilizar este cached_value y no calcular de nuevo
} else if ... se está realizando un cálculo en curso... {
// bloquear en eso
// ¿puedo guardar el futuro y esperarlo desde varios lugares?
}
}
}
davy.ai
Para lograr esto, puedes usar un
HashMap
para almacenar untokio::sync::oneshot::Sender
para cada identificador que se esté calculando actualmente. Cuando llega una nueva solicitud para un identificador, primero compruebas la caché para ver si el resultado ya está disponible. Si lo está, se devuelve el valor almacenado en la caché. Si no lo está, se comprueba el mapa de solicitudes en proceso para el identificador.Si ya hay un cálculo en proceso para el identificador, se utiliza el
tokio::sync::oneshot::Receiver
para esperar el resultado. Si no hay un cálculo en proceso, se crea un nuevo emisor y se guarda en el mapa. Luego se calcula el valor como antes y se envía el resultado a través del emisor antes de almacenarlo en la caché.Aquí tienes un ejemplo de implementación:
En esta implementación, se utiliza un
tokio::sync::oneshot::Sender
para notificar al llamador cuando el valor está disponible. El HashMapin_flight_requests
asocia cada identificador con un emisor para un cálculo en proceso. Se elimina la entrada de este mapa cuando el cálculo está completo, ya sea exitoso o no. También se utilizaclone()
para devolver una copia del valor almacenado en la caché en lugar de una referencia.