es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo almacenar en caché tareas asíncronas costosas para esperar aquellas que ya están en progreso?

Tengo una caché de avance que calcula algunos valores costosos. En algunos casos, debo realizar una llamada costosa al mismo recurso. En una situación en la que la caché de avance ya está calculando el valor, me gustaría .await hasta que se haya completado este cálculo en curso.

Mi código actual (simplificado) está estructurado de manera similar a esto:

struct MyStruct {
    cache: Caché, // caché para los resultados
}

impl MyStruct {
    async fn compute(&self) -> ExpensiveThing { ... }

    async fn forward_cache_compute(&self, identifier: &str) {
        // realizar un cálculo costoso y almacenarlo en caché:
        ... 
        let value = self.compute().await // .... tarda 100ms ...
        self.cache.insert(identifier, value) 
        // considerar si es posible guardar un futuro de compute() o una variable condicional para esperar a "identifier"
    }

    async fn get_from_cache_or_compute_if_needed(&self, identifier: &str) -> ExpensiveThing {
        // me gustaría verificar si la caché de avance ya está calculando y devolver ese valor si es posible (¿compartir un futuro?)
        if let Some(cached_value) = self.cache.get(identifier) {
            // utilizar este cached_value y no calcular de nuevo
        } else if ... se está realizando un cálculo en curso... {
            // bloquear en eso
            // ¿puedo guardar el futuro y esperarlo desde varios lugares?
        }
    }
}
Tags:  , ,

Answer

  1. Avatar for davy.ai

    Para lograr esto, puedes usar un HashMap para almacenar un tokio::sync::oneshot::Sender para cada identificador que se esté calculando actualmente. Cuando llega una nueva solicitud para un identificador, primero compruebas la caché para ver si el resultado ya está disponible. Si lo está, se devuelve el valor almacenado en la caché. Si no lo está, se comprueba el mapa de solicitudes en proceso para el identificador.

    Si ya hay un cálculo en proceso para el identificador, se utiliza el tokio::sync::oneshot::Receiver para esperar el resultado. Si no hay un cálculo en proceso, se crea un nuevo emisor y se guarda en el mapa. Luego se calcula el valor como antes y se envía el resultado a través del emisor antes de almacenarlo en la caché.

    Aquí tienes un ejemplo de implementación:

    use std::collections::HashMap;
    use tokio::sync::oneshot;
    
    struct MyStruct {
        cache: Cache, // caché para los resultados
        in_flight_requests: HashMap<string,></string,><expensivething>>,
    }
    
    impl MyStruct {
        async fn compute(&self) -> ExpensiveThing { ... }
    
        async fn forward_cache_compute(&self, identifier: &str) {
            let (tx, rx) = oneshot::channel();
    
            // realizar un cálculo costoso y almacenarlo en caché:
            let value = self.compute().await;
    
            // almacenar el valor en la caché
            self.cache.insert(identifier, value);
    
            // notificar a las solicitudes en espera que el valor está disponible
            let _ = tx.send(value);
    
            // eliminar la entrada de la solicitud en proceso
            self.in_flight_requests.remove(identifier);
        }
    
        async fn get_from_cache_or_compute_if_needed(&self, identifier: &str) -> ExpensiveThing {
            // comprobar si el valor ya está en la caché
            if let Some(cached_value) = self.cache.get(identifier) {
                return cached_value.clone();
            }
    
            // comprobar si ya hay una solicitud en proceso
            if let Some(sender) = self.in_flight_requests.get(identifier) {
                // esperar a que se calcule y se envíe el valor a través del canal
                match sender.await {
                    Ok(value) => return value,
                    Err(_) => () // manejar el error de manera adecuada
                }
            }
    
            // si llegamos a este punto, no hay una solicitud en proceso para el identificador
            let (tx, rx) = oneshot::channel();
    
            // agregar una nueva entrada al mapa de solicitudes en proceso
            self.in_flight_requests.insert(identifier.to_owned(), tx);
    
            // iniciar el cálculo costoso en segundo plano
            tokio::spawn(async move {
                self.forward_cache_compute(identifier).await;
            });
    
            // esperar a que se calcule y se envíe el valor a través del canal
            match rx.await {
                Ok(value) => value,
                Err(_) => {/* manejar el error de manera adecuada */}
            }
        }
    }
    

    En esta implementación, se utiliza un tokio::sync::oneshot::Sender para notificar al llamador cuando el valor está disponible. El HashMap in_flight_requests asocia cada identificador con un emisor para un cálculo en proceso. Se elimina la entrada de este mapa cuando el cálculo está completo, ya sea exitoso o no. También se utiliza clone() para devolver una copia del valor almacenado en la caché en lugar de una referencia.

Comments are closed.