diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 9ad7c4f471..8816ddda16 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -3,7 +3,6 @@ #pragma once -#include #include #include #include @@ -22,8 +21,10 @@ template class WorkQueueThreadBase final { public: + using FunctionType = std::function; + WorkQueueThreadBase() = default; - WorkQueueThreadBase(std::string name, std::function function) + WorkQueueThreadBase(std::string name, FunctionType function) { Reset(std::move(name), std::move(function)); } @@ -31,11 +32,10 @@ public: // Shuts the current work thread down (if any) and starts a new thread with the given function // Note: Some consumers of this API push items to the queue before starting the thread. - void Reset(std::string name, std::function function) + void Reset(std::string name, FunctionType function) { auto lg = GetLockGuard(); Shutdown(); - m_run_thread.store(true, std::memory_order_relaxed); m_thread = std::thread(std::bind_front(&WorkQueueThreadBase::ThreadLoop, this), std::move(name), std::move(function)); } @@ -56,25 +56,30 @@ public: void Cancel() { auto lg = GetLockGuard(); - if (IsRunning()) - { - m_skip_work.store(true, std::memory_order_relaxed); - WaitForCompletion(); - m_skip_work.store(false, std::memory_order_relaxed); - } - else - { - m_items.Clear(); - } + + // Fast path avoids round trip thread communication and saves ~20us. + if (m_items.Empty()) + return; + + RunCommand([&] { m_items.Clear(); }); } // Tells the worker thread to stop when its queue is empty. // Blocks until the worker thread exits. Does nothing if thread isn't running. - void Shutdown() { StopThread(true); } + void Shutdown() + { + auto lg = GetLockGuard(); + WaitForCompletion(); + StopThread(); + } // Tells the worker thread to stop immediately, potentially leaving work in the queue. // Blocks until the worker thread exits. Does nothing if thread isn't running. - void Stop() { StopThread(false); } + void Stop() + { + auto lg = GetLockGuard(); + StopThread(); + } // Stops the worker thread ASAP and empties the queue. void StopAndCancel() @@ -94,18 +99,33 @@ public: } private: - void StopThread(bool wait_for_completion) + using CommandFunction = std::function; + + // Blocking. + void RunCommand(CommandFunction cmd) { - auto lg = GetLockGuard(); - - if (wait_for_completion) - WaitForCompletion(); - - if (m_run_thread.exchange(false, std::memory_order_relaxed)) + if (!IsRunning()) { - m_event.Set(); - m_thread.join(); + std::invoke(cmd); + return; } + + m_commands.Emplace(std::move(cmd)); + m_event.Set(); + m_commands.WaitForEmpty(); + } + + // Stop immediately. + void StopThread() + { + if (!m_thread.joinable()) + return; + + // empty-function shutdown signal. + m_commands.Emplace(CommandFunction{}); + m_event.Set(); + m_thread.join(); + m_commands.Clear(); } auto GetLockGuard() @@ -124,24 +144,29 @@ private: bool IsRunning() { return m_thread.joinable(); } - void ThreadLoop(const std::string& thread_name, const std::function& function) + void ThreadLoop(const std::string& thread_name, const FunctionType& function) { Common::SetCurrentThreadName(thread_name.c_str()); - while (m_run_thread.load(std::memory_order_relaxed)) + while (true) { + while (!m_commands.Empty()) + { + CommandFunction& command = m_commands.Front(); + // empty-function shutdown signal. + if (!command) + return; + + std::invoke(command); + m_commands.Pop(); + } + if (m_items.Empty()) { m_event.Wait(); continue; } - if (m_skip_work.load(std::memory_order_relaxed)) - { - m_items.Clear(); - continue; - } - function(std::move(m_items.Front())); m_items.Pop(); } @@ -149,9 +174,8 @@ private: std::thread m_thread; Common::WaitableSPSCQueue m_items; + Common::WaitableSPSCQueue m_commands; Common::Event m_event; - std::atomic_bool m_skip_work = false; - std::atomic_bool m_run_thread = false; using DummyMutex = std::type_identity; using ProducerMutex = std::conditional_t;