es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Cómo escribir una std::queue con un hilo que solo ejecuta la “tarea más reciente no ejecutada” de la cola?

Necesito crear una cola de tareas de std::queue que sea ejecutada por un hilo que solamente esté ejecutando la última tarea de la lista.

Las tareas son, por ejemplo, tareas de copia de archivo. Pero aquí está la parte importante. No tengo que correr siempre toda la lista de tareas. Simplemente quiero ejecutar la última tarea no ejecutada de la cola y vaciar la cola después de sacar la tarea para ejecutar. Esta parte puede ser complicada. Tengo que tener cuidado de no limpiar la cola cuando haya tareas pendientes sin ejecutar 🙂

El siguiente es el código completo:

#include <atomic>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

using CopyTask = std::function<void(void)>;

class TaskQueue {
public:
~TaskQueue() {
StopThread();
}

void CopyTaskCompleted() {
    std::unique_lock&lt;std::mutex&gt; que_lock(m_push_mutex);
    m_trigger = true;
    que_lock.unlock();
    m_condition_variable.notify_one();
    std::cout &lt;&lt; &quot; CopyTaskCompleted llamado &quot; &lt;&lt; std::endl;
}

void PushCopyTask(CopyTask task) {
    std::lock_guard&lt;std::mutex&gt; lock(m_push_mutex);
    std::cout &lt;&lt; &quot; PushCopyTask llamado &quot; &lt;&lt; std::endl;

    if (!m_thread.joinable()) {
        StartThreadLooper();
        task();
        return;
    }

    if (!m_queue.empty()) {
        m_queue.push(task);
    } else {
        // Si la cola está vacía, entonces no hay tareas pendientes. Ejecutar simplemente la tarea sin ponerla en la cola
        task();
    }
}

private:
std::queue<CopyTask> m_queue;
std::mutex m_queue_mutex;
std::mutex m_push_mutex;
std::condition_variable m_condition_variable;
std::thread m_thread;
std::atomic<bool> m_running {false};
bool m_trigger {false};

void StartThreadLooper() {
    m_running = true;
    std::cout &lt;&lt; &quot; El hilo Looper está por comenzar &quot; &lt;&lt; std::endl;
    m_thread = std::thread( [this] {
        while (m_running) {
            std::cout &lt;&lt; &quot; Hilo Looper ejecutándose &quot; &lt;&lt; std::endl;
            WaitForNotification();
            if (!m_queue.empty()) {
                // Quiero ejecutar la última tarea de la cola aquí y cancelar el resto de las tareas
                CopyTask latest_task_to_execute = m_queue.back();
                m_queue = {};
                latest_task_to_execute();
            }
        }
    });
}

void StopThread() {
    std::unique_lock&lt;std::mutex&gt; lock(m_queue_mutex);
    m_running = false;
    if (m_thread.joinable()) {
        try {
            m_thread.join();
        }
        catch (const std::exception&amp; e) {
            // Log
        }
    }
    std::cout &lt;&lt; &quot; El hilo Looper está saliendo &quot; &lt;&lt; std::endl;
}

void WaitForNotification() {
    std::unique_lock&lt;std::mutex&gt; que_lock(m_queue_mutex);
    m_condition_variable.wait(que_lock, [this] {
        return m_trigger;
    });
    m_trigger = false;
}

};

int main() {
std::cout << " — Comienzo del programa — " << std::endl;

TaskQueue task_queue;
task_queue.PushCopyTask([&amp;task_queue](){
    std::thread t1([&amp;] {
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout &lt;&lt; &quot; PushCopyTask número 1 terminó de ejecutarse &quot; &lt;&lt; std::endl;
        task_queue.CopyTaskCompleted();
    });
});

task_queue.PushCopyTask([&amp;task_queue](){
    std::thread t2([&amp;] {
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout &lt;&lt; &quot; PushCopyTask número 2 terminó de ejecutarse &quot; &lt;&lt; std::endl;
        task_queue.CopyTaskCompleted();
    });
});

task_queue.PushCopyTask([&amp;task_queue](){
    std::thread t3([&amp;] {
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout &lt;&lt; &quot; PushCopyTask número 3 terminó de ejecutarse &quot; &lt;&lt; std::endl;
        task_queue.CopyTaskCompleted();
    });
});

task_queue.PushCopyTask([&amp;task_queue](){
    std::thread t4([&amp;] {
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout &lt;&lt; &quot; PushCopyTask número 4 terminó de ejecutarse &quot; &lt;&lt; std::endl;
        task_queue.CopyTaskCompleted();
    });
});

std::cout &lt;&lt; &quot; -- Fin del programa -- &quot; &lt;&lt; std::endl;
return 0;

}

Pero lo anterior no funciona. ¿Parece que estoy haciendo algo mal con la condition_variable?

¿Cómo puedo escribir una cola de tareas con una API pública simple como la clase TaskQueue anterior donde puedo pasar un método para ejecutar y siempre debe ejecutar el último método cuando tenga la oportunidad?

Env de desarrollo:
Macos Big Sure con compilador C++ clang.

Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para crear una cola std::queue de tareas que sea ejecutada por un hilo que solo ejecuta la tarea más reciente de la lista, se puede usar una std::condition_variable para notificar al hilo cuando hay una tarea para ejecutar. Aquí mostramos cómo se puede modificar la clase TaskQueue para lograr esto:

    “`c++
    #include
    #include
    #include
    #include
    #include
    #include

    using CopyTask = std::function<void(void)>;

    class TaskQueue {
    public:
    ~TaskQueue() {
    DetenerHilo();
    }

    void CopiaDeTareaCompletada() {
        std::unique_lock<std::mutex> que_lock(m_push_mutex);
        m_trigger = true;
        que_lock.unlock();
        m_condition_variable.notify_one();
        std::cout << "CopiaDeTareaCompletada llamada" << std::endl;
    }
    
    void AgregarCopiaDeTarea(CopyTask task) {
        std::lock_guard<std::mutex> lock(m_push_mutex);
        std::cout << "AgregarCopiaDeTarea llamada" << std::endl;
    
        if (!m_thread.joinable()) {
            IniciarBucleHilo();
        }
    
        m_queue.push(task);
        m_trigger = true;
        m_condition_variable.notify_one();
    }
    

    private:
    std::queue m_queue;
    std::mutex m_queue_mutex;
    std::mutex m_push_mutex;
    std::condition_variable m_condition_variable;
    std::thread m_thread;
    std::atomic m_running{false};
    bool m_trigger{false};

    void IniciarBucleHilo() {
        m_running = true;
        std::cout << "Hilo de bucle se va a iniciar" << std::endl;
        m_thread = std::thread([this] {
            while (m_running) {
                std::cout << "Hilo de bucle corriendo" << std::endl;
                EsperarNotificacion();
                if (!m_queue.empty()) {
                    m_queue.back()();
                    std::lock_guard<std::mutex> lock(m_queue_mutex);
                    while (!m_queue.empty()) m_queue.pop();
                }
            }
        });
    }
    
    void DetenerHilo() {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_running = false;
        if (m_thread.joinable()) {
            try {
                m_thread.join();
            }
            catch (const std::exception &e) {
                // Log
            }
        }
        std::cout << "Hilo de bucle finalizando" << std::endl;
    }
    
    void EsperarNotificacion() {
        std::unique_lock<std::mutex> que_lock(m_push_mutex);
        m_condition_variable.wait(que_lock, [this] {
            return m_trigger;
        });
        m_trigger = false;
    }
    

    };

    int main() {
    std::cout << “– Inicio del programa –” << std::endl;

    TaskQueue task_queue;
    task_queue.AgregarCopiaDeTarea([](){
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout << "AgregarCopiaDeTarea número 1 ha terminado de ejecutarse" << std::endl;
    });
    
    task_queue.AgregarCopiaDeTarea([](){
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout << "AgregarCopiaDeTarea número 2 ha terminado de ejecutarse" << std::endl;
    });
    
    task_queue.AgregarCopiaDeTarea([](){
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout << "AgregarCopiaDeTarea número 3 ha terminado de ejecutarse" << std::endl;
    });
    
    task_queue.AgregarCopiaDeTarea([](){
        std::this_thread::sleep_for(std::chrono::milliseconds(70));
        std::cout << "AgregarCopiaDeTarea número 4 ha terminado de ejecutarse" << std::endl;
    });
    
    std::this_thread::sleep_for(std::chrono::milliseconds(250));
    std::cout << "-- Fin del programa --" << std::endl;
    return 0;
    

    }
    “`

    En el código anterior, modificamos el método AgregarCopiaDeTarea para agregar siempre la tarea a la cola y activar la variable de condición para notificar al hilo de bucle que hay una nueva tarea disponible.

    Modificamos el hilo de bucle para ejecutar la tarea más reciente en la cola (usando m_queue.back()()) y luego limpiar la cola (usando while (!m_queue.empty()) m_queue.pop();).

    También agregamos un breve retraso al final de la función main para permitir que el hilo de bucle finalice su ejecución antes de que el programa salga.

    Con estas modificaciones, la clase TaskQueue debería funcionar como se esperaba, con una API pública simple y siempre ejecutando la tarea más reciente.</void(void)>

Comments are closed.