From 5b594048db7413baf9df5e9c2f78a99c262ab93f Mon Sep 17 00:00:00 2001 From: Mr-Wiseguy Date: Fri, 7 Jul 2023 14:21:06 -0400 Subject: [PATCH] Rewrote scheduler to use concurrent queues instead of atomic waits, added thread priorities --- Makefile | 4 +- portultra/events.cpp | 11 ++ portultra/multilibultra.hpp | 11 ++ portultra/scheduler.cpp | 210 +++++++++++++++-------------------- portultra/threads.cpp | 51 +++++++-- portultra/timer.cpp | 3 + src/recomp.cpp | 2 + thirdparty/concurrentqueue.h | 2 +- 8 files changed, 164 insertions(+), 130 deletions(-) diff --git a/Makefile b/Makefile index 19e4c14..57a9d9e 100644 --- a/Makefile +++ b/Makefile @@ -56,8 +56,8 @@ UCRT_DIR ?= C:\Program Files (x86)\Windows Kits\10\lib\10.0.22000.0\ucrt\x6 SDK_DIR ?= C:\Program Files (x86)\Windows Kits\10\lib\10.0.22000.0\um\x64 WARNFLAGS := -Wall -Wextra -Wpedantic -Wno-gnu-anonymous-struct -CFLAGS := -ffunction-sections -fdata-sections $(OPTFLAGS) $(WARNFLAGS) -c -CXXFLAGS := -ffunction-sections -fdata-sections $(OPTFLAGS) $(WARNFLAGS) -std=c++20 -c +CFLAGS := -ffunction-sections -fdata-sections -march=nehalem $(OPTFLAGS) $(WARNFLAGS) -c +CXXFLAGS := -ffunction-sections -fdata-sections -march=nehalem $(OPTFLAGS) $(WARNFLAGS) -std=c++20 -c CPPFLAGS := -Iinclude -Ithirdparty LDFLAGS := -v -Wl,/OPT:REF $(OPTFLAGS) $(LIBS) -L"$(LIB_DIR:;=)" -L"$(UCRT_DIR:;=)" -L"$(SDK_DIR:;=)" lib/RT64/$(CONFIG)/RT64.lib diff --git a/portultra/events.cpp b/portultra/events.cpp index f825f05..51de43a 100644 --- a/portultra/events.cpp +++ b/portultra/events.cpp @@ -94,6 +94,10 @@ extern "C" void osViSetEvent(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, u32 ret } void vi_thread_func() { + Multilibultra::set_native_thread_name("VI Thread"); + // This thread should be prioritized over every other thread in the application, as it's what allows + // the game to generate new audio and gfx lists. + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::Critical); using namespace std::chrono_literals; uint64_t total_vis = 0; @@ -198,6 +202,9 @@ void run_rsp_microcode(uint8_t* rdram, const OSTask* task, RspUcodeFunc* ucode_f void task_thread_func(uint8_t* rdram, uint8_t* rom, std::atomic_flag* thread_ready) { + Multilibultra::set_native_thread_name("SP Task Thread"); + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::Normal); + // Notify the caller thread that this thread is ready. thread_ready->test_and_set(); thread_ready->notify_all(); @@ -230,6 +237,10 @@ void task_thread_func(uint8_t* rdram, uint8_t* rom, std::atomic_flag* thread_rea void gfx_thread_func(uint8_t* rdram, uint8_t* rom, std::atomic_flag* thread_ready, void* window_handle) { using namespace std::chrono_literals; + + Multilibultra::set_native_thread_name("Gfx Thread"); + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::Normal); + RT64Init(rom, rdram, window_handle); rsp_constants_init(); diff --git a/portultra/multilibultra.hpp b/portultra/multilibultra.hpp index 2f11f3d..61607f1 100644 --- a/portultra/multilibultra.hpp +++ b/portultra/multilibultra.hpp @@ -34,6 +34,17 @@ void resume_thread_impl(OSThread *t); void schedule_running_thread(OSThread *t); void pause_self(RDRAM_ARG1); void cleanup_thread(OSThread *t); + +enum class ThreadPriority { + Low, + Normal, + High, + VeryHigh, + Critical +}; + +void set_native_thread_name(const std::string& name); +void set_native_thread_priority(ThreadPriority pri); PTR(OSThread) this_thread(); void disable_preemption(); void enable_preemption(); diff --git a/portultra/scheduler.cpp b/portultra/scheduler.cpp index f4d7a87..69e50ef 100644 --- a/portultra/scheduler.cpp +++ b/portultra/scheduler.cpp @@ -2,7 +2,9 @@ #include #include #include +#include +#include "blockingconcurrentqueue.h" #include "multilibultra.hpp" class OSThreadComparator { @@ -35,93 +37,78 @@ public: } }; +struct NotifySchedulerAction { + +}; + +struct ScheduleThreadAction { + OSThread* t; +}; + +struct StopThreadAction { + OSThread* t; +}; + +struct CleanupThreadAction { + OSThread* t; +}; + +struct ReprioritizeThreadAction { + OSThread* t; + OSPri pri; +}; + +using ThreadAction = std::variant; + static struct { - std::vector to_schedule; - std::vector to_stop; - std::vector to_cleanup; - std::vector> to_reprioritize; - std::mutex mutex; - // OSThread* running_thread; - std::atomic_int notify_count; - std::atomic_int action_count; + moodycamel::BlockingConcurrentQueue action_queue{}; + OSThread* running_thread; bool can_preempt; std::mutex premption_mutex; } scheduler_context{}; -void handle_thread_queueing(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - if (!scheduler_context.to_schedule.empty()) { - OSThread* to_schedule = scheduler_context.to_schedule.back(); - scheduler_context.to_schedule.pop_back(); - scheduler_context.action_count.fetch_sub(1); - debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id); - running_thread_queue.push(to_schedule); - } +void handle_thread_queueing(thread_queue_t& running_thread_queue, const ScheduleThreadAction& action) { + OSThread* to_schedule = action.t; + debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id); + running_thread_queue.push(to_schedule); } -void handle_thread_stopping(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_stop.empty()) { - OSThread* to_stop = scheduler_context.to_stop.back(); - scheduler_context.to_stop.pop_back(); - scheduler_context.action_count.fetch_sub(1); - debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); - running_thread_queue.remove(to_stop); - } +void handle_thread_stopping(thread_queue_t& running_thread_queue, const StopThreadAction& action) { + OSThread* to_stop = action.t; + debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); + running_thread_queue.remove(to_stop); } -void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_cleanup.empty()) { - OSThread* to_cleanup = scheduler_context.to_cleanup.back(); - scheduler_context.to_cleanup.pop_back(); - scheduler_context.action_count.fetch_sub(1); - - debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); - running_thread_queue.remove(to_cleanup); - // If the cleaned up thread was the running thread, schedule a new one to run. - if (to_cleanup == cur_running_thread) { - // If there's a thread queued to run, set it as the new running thread. - if (!running_thread_queue.empty()) { - cur_running_thread = running_thread_queue.top(); - } - // Otherwise, set the running thread to null so the next thread that can be run gets started. - else { - cur_running_thread = nullptr; - } +void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread, const CleanupThreadAction& action) { + OSThread* to_cleanup = action.t; + + debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); + running_thread_queue.remove(to_cleanup); + // If the cleaned up thread was the running thread, schedule a new one to run. + if (to_cleanup == cur_running_thread) { + // If there's a thread queued to run, set it as the new running thread. + if (!running_thread_queue.empty()) { + cur_running_thread = running_thread_queue.top(); + } + // Otherwise, set the running thread to null so the next thread that can be run gets started. + else { + cur_running_thread = nullptr; } - to_cleanup->context->host_thread.join(); - delete to_cleanup->context; - to_cleanup->context = nullptr; } + to_cleanup->context->host_thread.join(); + delete to_cleanup->context; + to_cleanup->context = nullptr; } -void handle_thread_reprioritization(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_reprioritize.empty()) { - const std::pair to_reprioritize = scheduler_context.to_reprioritize.back(); - scheduler_context.to_reprioritize.pop_back(); - scheduler_context.action_count.fetch_sub(1); - - debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize.first->id, to_reprioritize.second); - running_thread_queue.remove(to_reprioritize.first); - to_reprioritize.first->priority = to_reprioritize.second; - running_thread_queue.push(to_reprioritize.first); - } -} - -void handle_scheduler_notifications() { - std::lock_guard lock{scheduler_context.mutex}; - int32_t notify_count = scheduler_context.notify_count.exchange(0); - if (notify_count) { - debug_printf("Received %d notifications\n", notify_count); - scheduler_context.action_count.fetch_sub(notify_count); - } +void handle_thread_reprioritization(thread_queue_t& running_thread_queue, const ReprioritizeThreadAction& action) { + OSThread* to_reprioritize = action.t; + OSPri pri = action.pri; + + debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize->id, pri); + running_thread_queue.remove(to_reprioritize); + to_reprioritize->priority = pri; + running_thread_queue.push(to_reprioritize); } void swap_running_thread(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { @@ -148,26 +135,32 @@ void scheduler_func() { thread_queue_t running_thread_queue{}; OSThread* cur_running_thread = nullptr; + Multilibultra::set_native_thread_name("Scheduler Thread"); + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::VeryHigh); + while (true) { + ThreadAction action; OSThread* old_running_thread = cur_running_thread; - scheduler_context.action_count.wait(0); + scheduler_context.action_queue.wait_dequeue(action); std::lock_guard lock{scheduler_context.premption_mutex}; - - // Handle notifications - handle_scheduler_notifications(); - // Handle stopping threads - handle_thread_stopping(running_thread_queue); - - // Handle cleaning up threads - handle_thread_cleanup(running_thread_queue, cur_running_thread); - - // Handle queueing threads to run - handle_thread_queueing(running_thread_queue); - - // Handle threads that have changed priority - handle_thread_reprioritization(running_thread_queue); + // Determine the action type and act on it + if (const auto* cleanup_action = std::get_if(&action)) { + // Nothing to do + } + else if (const auto* stop_action = std::get_if(&action)) { + handle_thread_stopping(running_thread_queue, *stop_action); + } + else if (const auto* cleanup_action = std::get_if(&action)) { + handle_thread_cleanup(running_thread_queue, cur_running_thread, *cleanup_action); + } + else if (const auto* schedule_action = std::get_if(&action)) { + handle_thread_queueing(running_thread_queue, *schedule_action); + } + else if (const auto* reprioritize_action = std::get_if(&action)) { + handle_thread_reprioritization(running_thread_queue, *reprioritize_action); + } // Determine which thread to run, stopping the current running thread if necessary swap_running_thread(running_thread_queue, cur_running_thread); @@ -194,51 +187,35 @@ void init_scheduler() { void schedule_running_thread(OSThread *t) { debug_printf("[Scheduler] Queuing Thread %d to be scheduled\n", t->id); - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_schedule.push_back(t); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); + scheduler_context.action_queue.enqueue(ScheduleThreadAction{t}); } void swap_to_thread(RDRAM_ARG OSThread *to) { OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); debug_printf("[Scheduler] Scheduling swap from thread %d to %d\n", self->id, to->id); - { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_schedule.push_back(to); - Multilibultra::set_self_paused(PASS_RDRAM1); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); - } + + Multilibultra::set_self_paused(PASS_RDRAM1); + scheduler_context.action_queue.enqueue(ScheduleThreadAction{to}); Multilibultra::wait_for_resumed(PASS_RDRAM1); } void reprioritize_thread(OSThread *t, OSPri pri) { debug_printf("[Scheduler] Adjusting Thread %d priority to %d\n", t->id, pri); - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_reprioritize.emplace_back(t, pri); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); + + scheduler_context.action_queue.enqueue(ReprioritizeThreadAction{t, pri}); } void pause_self(RDRAM_ARG1) { OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); debug_printf("[Scheduler] Thread %d pausing itself\n", self->id); - { - std::lock_guard lock{scheduler_context.mutex}; - Multilibultra::set_self_paused(PASS_RDRAM1); - scheduler_context.to_stop.push_back(self); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); - } + + Multilibultra::set_self_paused(PASS_RDRAM1); + scheduler_context.action_queue.enqueue(StopThreadAction{self}); Multilibultra::wait_for_resumed(PASS_RDRAM1); } void cleanup_thread(OSThread *t) { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_cleanup.push_back(t); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); + scheduler_context.action_queue.enqueue(CleanupThreadAction{t}); } void disable_preemption() { @@ -269,10 +246,7 @@ preemption_guard::~preemption_guard() { } void notify_scheduler() { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.notify_count.fetch_add(1); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); + scheduler_context.action_queue.enqueue(NotifySchedulerAction{}); } } diff --git a/portultra/threads.cpp b/portultra/threads.cpp index 663ff44..6d8f273 100644 --- a/portultra/threads.cpp +++ b/portultra/threads.cpp @@ -43,6 +43,45 @@ void run_thread_function(uint8_t* rdram, uint64_t addr, uint64_t sp, uint64_t ar struct thread_terminated : std::exception {}; +#ifdef _WIN32 +void Multilibultra::set_native_thread_name(const std::string& name) { + std::wstring wname{name.begin(), name.end()}; + + HRESULT r; + r = SetThreadDescription( + GetCurrentThread(), + wname.c_str() + ); +} + +void Multilibultra::set_native_thread_priority(ThreadPriority pri) { + int nPriority = THREAD_PRIORITY_NORMAL; + + // Convert ThreadPriority to Win32 priority + switch (pri) { + case ThreadPriority::Low: + nPriority = THREAD_PRIORITY_BELOW_NORMAL; + break; + case ThreadPriority::Normal: + nPriority = THREAD_PRIORITY_NORMAL; + break; + case ThreadPriority::High: + nPriority = THREAD_PRIORITY_ABOVE_NORMAL; + break; + case ThreadPriority::VeryHigh: + nPriority = THREAD_PRIORITY_HIGHEST; + break; + case ThreadPriority::Critical: + nPriority = THREAD_PRIORITY_TIME_CRITICAL; + break; + default: + throw std::runtime_error("Invalid thread priority!"); + break; + } + // SetThreadPriority(GetCurrentThread(), nPriority); +} +#endif + static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entrypoint, PTR(void) arg) { OSThread *self = TO_PTR(OSThread, self_); debug_printf("[Thread] Thread created: %d\n", self->id); @@ -50,23 +89,17 @@ static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entry is_game_thread = true; // Set the thread name -#ifdef _WIN32 - std::wstring thread_name = L"Game Thread " + std::to_wstring(self->id); - HRESULT r; - r = SetThreadDescription( - GetCurrentThread(), - thread_name.c_str() - ); -#endif + Multilibultra::set_native_thread_name("Game Thread " + std::to_string(self->id)); + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::High); // Set initialized to false to indicate that this thread can be started. + Multilibultra::set_self_paused(PASS_RDRAM1); self->context->initialized.store(true); self->context->initialized.notify_all(); debug_printf("[Thread] Thread waiting to be started: %d\n", self->id); // Wait until the thread is marked as running. - Multilibultra::set_self_paused(PASS_RDRAM1); Multilibultra::wait_for_resumed(PASS_RDRAM1); debug_printf("[Thread] Thread started: %d\n", self->id); diff --git a/portultra/timer.cpp b/portultra/timer.cpp index 5379139..2be4f1a 100644 --- a/portultra/timer.cpp +++ b/portultra/timer.cpp @@ -61,6 +61,9 @@ uint64_t time_now() { } void timer_thread(RDRAM_ARG1) { + Multilibultra::set_native_thread_name("Timer Thread"); + Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::VeryHigh); + // Lambda comparator function to keep the set ordered auto timer_sort = [PASS_RDRAM1](PTR(OSTimer) a_, PTR(OSTimer) b_) { OSTimer* a = TO_PTR(OSTimer, a_); diff --git a/src/recomp.cpp b/src/recomp.cpp index 9fc34ae..220211a 100644 --- a/src/recomp.cpp +++ b/src/recomp.cpp @@ -138,6 +138,8 @@ __declspec(dllexport) extern "C" void start(void* window_handle, const Multilibu Multilibultra::set_input_callbacks(input_callbacks); std::thread game_thread{[](void* window_handle) { debug_printf("[Recomp] Starting\n"); + + Multilibultra::set_native_thread_name("Game Start Thread"); Multilibultra::preinit(rdram_buffer.get(), rom.get(), window_handle); diff --git a/thirdparty/concurrentqueue.h b/thirdparty/concurrentqueue.h index 4b2ad79..bcb06a1 100644 --- a/thirdparty/concurrentqueue.h +++ b/thirdparty/concurrentqueue.h @@ -378,7 +378,7 @@ struct ConcurrentQueueDefaultTraits // Recommended values are on the order of 1000-10000 unless the number of // consumer threads exceeds the number of idle cores (in which case try 0-100). // Only affects instances of the BlockingConcurrentQueue. - static const int MAX_SEMA_SPINS = 10000; + static const int MAX_SEMA_SPINS = 100; // Whether to recycle dynamically-allocated blocks into an internal free list or // not. If false, only pre-allocated blocks (controlled by the constructor