diff --git a/Source/Core/Common/WorkQueueThread.h b/Source/Core/Common/WorkQueueThread.h index 2a756f3d7e..a6cb71b7f9 100644 --- a/Source/Core/Common/WorkQueueThread.h +++ b/Source/Core/Common/WorkQueueThread.h @@ -25,8 +25,8 @@ public: void Reset(std::function function) { Shutdown(); - m_shutdown.Clear(); - m_cancelled.Clear(); + std::lock_guard lg(m_lock); + m_cancelled = false; m_function = std::move(function); m_thread = std::thread(&WorkQueueThread::ThreadLoop, this); } @@ -34,85 +34,107 @@ public: template void EmplaceItem(Args&&... args) { - if (!m_cancelled.IsSet()) + std::lock_guard lg(m_lock); + if (m_shutdown) { - std::lock_guard lg(m_lock); - m_items.emplace(std::forward(args)...); + return; } - m_wakeup.Set(); + m_items.emplace(std::forward(args)...); + m_idle = false; + m_worker_cond_var.notify_one(); } void Push(T&& item) { - if (!m_cancelled.IsSet()) + std::lock_guard lg(m_lock); + if (m_cancelled) { - std::lock_guard lg(m_lock); - m_items.push(item); + return; } - m_wakeup.Set(); + m_items.push(item); + m_idle = false; + m_worker_cond_var.notify_one(); } void Push(const T& item) { - if (!m_cancelled.IsSet()) - { std::lock_guard lg(m_lock); + if (m_cancelled) + { + return; + } m_items.push(item); - } - m_wakeup.Set(); + m_idle = false; + m_worker_cond_var.notify_one(); } void Clear() { - { - std::lock_guard lg(m_lock); - m_items = std::queue(); - } - m_wakeup.Set(); + std::lock_guard lg(m_lock); + m_items = std::queue(); + m_worker_cond_var.notify_one(); } void Cancel() { - m_cancelled.Set(); - Clear(); - Shutdown(); - } + if (!m_thread.joinable()) + { + return; + } - bool IsCancelled() const { return m_cancelled.IsSet(); } + { + std::unique_lock lg(m_lock); + m_items = std::queue(); + m_cancelled = true; + m_shutdown = true; + m_worker_cond_var.notify_one(); + } + m_thread.join(); + } void Shutdown() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_shutdown.Set(); - m_wakeup.Set(); - m_thread.join(); + return; } + + { + std::unique_lock lg(m_lock); + m_shutdown = true; + m_worker_cond_var.notify_one(); + } + m_thread.join(); } // Doesn't return until the most recent function invocation has finished. - void FlushOne() + void ClearAndFlush() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_flush.Set(); - Clear(); - m_flushed.Wait(); + return; } + + std::unique_lock lg(m_lock); + m_items = std::queue(); + m_wait_cond_var.wait(lg, [&] { + return m_idle; + }); } - // Doesn't return until the queue is empty. + // Doesn't return until the most recent function invocation has finished. void Flush() { - if (m_thread.joinable()) + if (!m_thread.joinable()) { - m_flush.Set(); - m_wakeup.Set(); - m_flushed.Wait(); + return; } - } - bool IsFlushing() const { return m_flush.IsSet() || m_shutdown.IsSet(); } + std::unique_lock lg(m_lock); + m_wait_cond_var.wait(lg, [&] { + return m_idle; + }); + } private: void ThreadLoop() @@ -121,41 +143,37 @@ private: while (true) { - m_wakeup.Wait(); - - while (true) + std::unique_lock lg(m_lock); + if (m_items.empty()) { - std::unique_lock lg(m_lock); - if (m_items.empty()) + m_idle = true; + m_wait_cond_var.notify_all(); + m_worker_cond_var.wait(lg, [&] { + return m_shutdown || !m_items.empty(); + }); + if (m_shutdown) { - if (m_flush.IsSet()) - { - m_flush.Clear(); - m_flushed.Set(); - } break; } - T item{std::move(m_items.front())}; - m_items.pop(); - lg.unlock(); - - m_function(std::move(item)); + continue; } + T item{std::move(m_items.front())}; + m_items.pop(); + lg.unlock(); - if (m_shutdown.IsSet()) - break; + m_function(std::move(item)); } } std::function m_function; std::thread m_thread; - Common::Event m_wakeup; - Common::Flag m_shutdown; - Common::Flag m_cancelled; - Common::Flag m_flush; - Common::Event m_flushed; std::mutex m_lock; std::queue m_items; + std::condition_variable m_wait_cond_var; + std::condition_variable m_worker_cond_var; + bool m_idle = true; + bool m_shutdown = false; + bool m_cancelled = false; }; } // namespace Common