From cdd397fe8f43211cf26c1a268a12df9a11069e96 Mon Sep 17 00:00:00 2001 From: Mr-Wiseguy Date: Mon, 23 Oct 2023 18:01:29 -0400 Subject: [PATCH] Restored old thread scheduler code --- portultra/mesgqueue.cpp | 316 ++++++++++--------------- portultra/multilibultra.hpp | 20 +- portultra/scheduler.cpp | 421 +++++++++++++--------------------- portultra/threads.cpp | 60 ++--- src/portultra_translation.cpp | 3 +- 5 files changed, 338 insertions(+), 482 deletions(-) diff --git a/portultra/mesgqueue.cpp b/portultra/mesgqueue.cpp index b979c65..fd7a0bb 100644 --- a/portultra/mesgqueue.cpp +++ b/portultra/mesgqueue.cpp @@ -5,58 +5,6 @@ #include "multilibultra.hpp" #include "recomp.h" -#if defined(_M_X64) -static inline void spinlock_pause() { - _mm_pause(); -} -#elif defined(__x86_64__) -static inline void spinlock_pause() { - __builtin_ia32_pause(); -} -#else -#error "No spinlock_pause implementation for current architecture" -#endif - -template -class atomic_spinlock { - static_assert(sizeof(std::atomic) == sizeof(T), "atomic_spinlock must be used with a type that is the same size as its atomic counterpart"); - static_assert(std::atomic::is_always_lock_free, "atomic_spinlock must be used with an always lock-free atomic type"); - std::atomic_ref locked_; -public: - atomic_spinlock(T& flag) : locked_{ flag } {} - - void lock() { - // Loop until the lock is acquired. - while (true) { - // Try to acquire the lock. - if (!locked_.exchange(true, std::memory_order_acquire)) { - // If it was acquired then exit the loop. - break; - } - // Otherwise, wait until the lock is no longer acquired. - // Doing this instead of constantly trying to acquire the lock reduces cache coherency traffic. - while (locked_.load(std::memory_order_relaxed)) { - // Add a platform-specific pause instruction to reduce load unit traffic. - spinlock_pause(); - } - } - } - - void unlock() { - // Release the lock by setting it to false. - locked_.store(false, std::memory_order_release); - } -}; - -class mesg_queue_lock { - OSMesgQueue* queue_; - atomic_spinlock spinlock_; -public: - mesg_queue_lock(OSMesgQueue* mq) : queue_{ mq }, spinlock_{ mq->lock } {} - void lock() { spinlock_.lock(); } - void unlock() { spinlock_.unlock(); } -}; - extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) { OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); mq->blocked_on_recv = NULLPTR; @@ -65,7 +13,6 @@ extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) ms mq->msg = msg; mq->validCount = 0; mq->first = 0; - mq->lock = false; } s32 MQ_GET_COUNT(OSMesgQueue *mq) { @@ -100,159 +47,148 @@ bool thread_queue_empty(RDRAM_ARG PTR(OSThread)* queue) { return *queue == NULLPTR; } -std::mutex test_mutex{}; +extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + + // Prevent accidentally blocking anything that isn't a game thread + if (!Multilibultra::is_game_thread()) { + flags = OS_MESG_NOBLOCK; + } -// Attempts to put a message into a queue. -// If the queue is not full, returns true and pops a thread from the blocked on receive list. -// If the queue is full and this is a blocking send, places the current thread into the blocked on send list -// for the message queue, marks the current thread as being blocked on a queue and returns false. -bool mesg_queue_try_insert(RDRAM_ARG OSMesgQueue* mq, OSMesg msg, OSThread*& to_run, bool jam, bool blocking) { - //mesg_queue_lock lock{ mq }; - std::lock_guard guard{ test_mutex }; + Multilibultra::disable_preemption(); - // If the queue is full, insert this thread into the blocked on send queue and return false. - if (MQ_IS_FULL(mq)) { - if (blocking) { - thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread()); - // TODO is it safe to use the schedule queue here while in the message queue lock? - Multilibultra::block_self(PASS_RDRAM1); + if (flags == OS_MESG_NOBLOCK) { + // If non-blocking, fail if the queue is full + if (MQ_IS_FULL(mq)) { + Multilibultra::enable_preemption(); + return -1; + } + } else { + // Otherwise, yield this thread until the queue has room + while (MQ_IS_FULL(mq)) { + debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, Multilibultra::this_thread())->id); + thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread()); + Multilibultra::enable_preemption(); + Multilibultra::pause_self(PASS_RDRAM1); + Multilibultra::disable_preemption(); } - to_run = nullptr; - return false; } + + s32 last = (mq->first + mq->validCount) % mq->msgCount; + TO_PTR(OSMesg, mq->msg)[last] = msg; + mq->validCount++; + + OSThread* to_run = nullptr; - // The queue wasn't full, so place the message into it. - if (jam) { - // Insert this message at the start of the queue. - mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount; - TO_PTR(OSMesg, mq->msg)[mq->first] = msg; - mq->validCount++; - } - else { - // Insert this message at the end of the queue. - s32 last = (mq->first + mq->validCount) % mq->msgCount; - TO_PTR(OSMesg, mq->msg)[last] = msg; - mq->validCount++; - } - - // Pop a thread from the blocked on recv queue to wake afterwards. if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) { to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv); } - - return true; -} - -// Attempts to remove a message from a queue. -// If the queue is not empty, returns true and pops a thread from the blocked on send list. -// If the queue is empty and this is a blocking receive, places the current thread into the blocked on receive list -// for the message queue, marks the current thread as being blocked on a queue and returns false. -bool mesg_queue_try_remove(RDRAM_ARG OSMesgQueue* mq, PTR(OSMesg) msg_out, OSThread*& to_run, bool blocking) { - //mesg_queue_lock lock{ mq }; - std::lock_guard guard{ test_mutex }; - - // If the queue is full, insert this thread into the blocked on receive queue and return false. - if (MQ_IS_EMPTY(mq)) { - if (blocking) { - thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, Multilibultra::this_thread()); - // TODO is it safe to use the schedule queue here while in the message queue lock? - Multilibultra::block_self(PASS_RDRAM1); - } - to_run = nullptr; - return false; - } - - // The queue wasn't empty, so remove the first message from it. - if (msg_out != NULLPTR) { - *TO_PTR(OSMesg, msg_out) = TO_PTR(OSMesg, mq->msg)[mq->first]; - } - mq->first = (mq->first + 1) % mq->msgCount; - mq->validCount--; - - // Pop a thread from the blocked on send queue to wake afterwards. - if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) { - to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send); - } - - return true; -} - -enum class MesgQueueActionType { - Send, - Jam, - Receive -}; - -s32 mesg_queue_action(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, PTR(OSMesg) msg_out, s32 flags, MesgQueueActionType action) { - OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_); - OSThread* this_thread = TO_PTR(OSThread, Multilibultra::this_thread()); - bool is_blocking = flags != OS_MESG_NOBLOCK; - - // Prevent accidentally blocking anything that isn't a game thread - if (!Multilibultra::is_game_thread()) { - is_blocking = false; - } - - OSThread* to_run = nullptr; - - // Repeatedly attempt to send the message until it's successful. - while (true) { - // Try to insert/remove the message into the queue depending on the action. - bool success = false; - switch (action) { - case MesgQueueActionType::Send: - success = mesg_queue_try_insert(PASS_RDRAM mq, msg, to_run, false, is_blocking); - break; - case MesgQueueActionType::Jam: - success = mesg_queue_try_insert(PASS_RDRAM mq, msg, to_run, true, is_blocking); - break; - case MesgQueueActionType::Receive: - success = mesg_queue_try_remove(PASS_RDRAM mq, msg_out, to_run, is_blocking); - break; - } - - // If successful, don't block. - if (success) { - //goto after; - break; - } - - // Otherwise if the action was unsuccessful but wasn't blocking, return -1 to indicate a failure. - if (!is_blocking) { - return -1; - } - - // The action failed, so pause this thread until unblocked by the queue. - debug_printf("[Message Queue] Thread %d is blocked on %s\n", this_thread->id, action == MesgQueueActionType::Receive ? "receive" : "send"); - - // Wait for it this thread be resumed. - Multilibultra::wait_for_resumed(PASS_RDRAM1); - } - //after: - - // If any thread was blocked on receiving from this queue, wake it. + + Multilibultra::enable_preemption(); if (to_run) { debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); - Multilibultra::unblock_thread(to_run); - - // If the unblocked thread is higher priority than this one, pause this thread so it can take over. - if (Multilibultra::is_game_thread() && to_run->priority > this_thread->priority) { - Multilibultra::yield_self(PASS_RDRAM1); - Multilibultra::wait_for_resumed(PASS_RDRAM1); + if (Multilibultra::is_game_thread()) { + OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread()); + if (to_run->priority > self->priority) { + Multilibultra::swap_to_thread(PASS_RDRAM to_run); + } else { + Multilibultra::schedule_running_thread(to_run); + } + } else { + Multilibultra::schedule_running_thread(to_run); } } - return 0; } -extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { - return mesg_queue_action(PASS_RDRAM mq_, msg, NULLPTR, flags, MesgQueueActionType::Send); -} - extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { - return mesg_queue_action(PASS_RDRAM mq_, msg, NULLPTR, flags, MesgQueueActionType::Jam); + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + Multilibultra::disable_preemption(); + + if (flags == OS_MESG_NOBLOCK) { + // If non-blocking, fail if the queue is full + if (MQ_IS_FULL(mq)) { + Multilibultra::enable_preemption(); + return -1; + } + } else { + // Otherwise, yield this thread in a loop until the queue is no longer full + while (MQ_IS_FULL(mq)) { + debug_printf("[Message Queue] Thread %d is blocked on jam\n", TO_PTR(OSThread, Multilibultra::this_thread())->id); + thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread()); + Multilibultra::enable_preemption(); + Multilibultra::pause_self(PASS_RDRAM1); + Multilibultra::disable_preemption(); + } + } + + mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount; + TO_PTR(OSMesg, mq->msg)[mq->first] = msg; + mq->validCount++; + + OSThread *to_run = nullptr; + + if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) { + to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv); + } + + Multilibultra::enable_preemption(); + if (to_run) { + debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); + OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); + if (to_run->priority > self->priority) { + Multilibultra::swap_to_thread(PASS_RDRAM to_run); + } else { + Multilibultra::schedule_running_thread(to_run); + } + } + return 0; } -extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_out_, s32 flags) { - return mesg_queue_action(PASS_RDRAM mq_, NULLPTR, msg_out_, flags, MesgQueueActionType::Receive); +extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) { + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + OSMesg *msg = TO_PTR(OSMesg, msg_); + Multilibultra::disable_preemption(); + + if (flags == OS_MESG_NOBLOCK) { + // If non-blocking, fail if the queue is empty + if (MQ_IS_EMPTY(mq)) { + Multilibultra::enable_preemption(); + return -1; + } + } else { + // Otherwise, yield this thread in a loop until the queue is no longer full + while (MQ_IS_EMPTY(mq)) { + debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, Multilibultra::this_thread())->id); + thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, Multilibultra::this_thread()); + Multilibultra::enable_preemption(); + Multilibultra::pause_self(PASS_RDRAM1); + Multilibultra::disable_preemption(); + } + } + + if (msg_ != NULLPTR) { + *msg = TO_PTR(OSMesg, mq->msg)[mq->first]; + } + + mq->first = (mq->first + 1) % mq->msgCount; + mq->validCount--; + + OSThread *to_run = nullptr; + + if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) { + to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send); + } + + Multilibultra::enable_preemption(); + if (to_run) { + debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); + OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); + if (to_run->priority > self->priority) { + Multilibultra::swap_to_thread(PASS_RDRAM to_run); + } else { + Multilibultra::schedule_running_thread(to_run); + } + } + return 0; } diff --git a/portultra/multilibultra.hpp b/portultra/multilibultra.hpp index 4ab705b..b34a9fe 100644 --- a/portultra/multilibultra.hpp +++ b/portultra/multilibultra.hpp @@ -23,8 +23,7 @@ struct UltraThreadContext { std::thread host_thread; - std::atomic_bool scheduled; - std::atomic_bool descheduled; + std::atomic_bool running; std::atomic_bool initialized; }; @@ -52,13 +51,16 @@ void save_init(); void init_scheduler(); void init_events(uint8_t* rdram, uint8_t* rom, WindowHandle window_handle); void init_timers(RDRAM_ARG1); +void set_self_paused(RDRAM_ARG1); void yield_self(RDRAM_ARG1); void block_self(RDRAM_ARG1); void unblock_thread(OSThread* t); void wait_for_resumed(RDRAM_ARG1); void swap_to_thread(RDRAM_ARG OSThread *to); +void pause_thread_impl(OSThread *t); void resume_thread_impl(OSThread* t); void schedule_running_thread(OSThread *t); +void pause_self(RDRAM_ARG1); void halt_self(RDRAM_ARG1); void stop_thread(OSThread *t); void cleanup_thread(OSThread *t); @@ -74,6 +76,8 @@ enum class ThreadPriority { void set_native_thread_name(const std::string& name); void set_native_thread_priority(ThreadPriority pri); PTR(OSThread) this_thread(); +void disable_preemption(); +void enable_preemption(); void notify_scheduler(); void reprioritize_thread(OSThread *t, OSPri pri); void set_main_thread(); @@ -118,11 +122,19 @@ struct gfx_callbacks_t { }; void set_gfx_callbacks(const gfx_callbacks_t* callbacks); +class preemption_guard { +public: + preemption_guard(); + ~preemption_guard(); +private: + std::lock_guard lock; +}; + } // namespace Multilibultra #define MIN(a, b) ((a) < (b) ? (a) : (b)) -//#define debug_printf(...) -#define debug_printf(...) printf(__VA_ARGS__); +#define debug_printf(...) +//#define debug_printf(...) printf(__VA_ARGS__); #endif diff --git a/portultra/scheduler.cpp b/portultra/scheduler.cpp index 2000678..f4d7a87 100644 --- a/portultra/scheduler.cpp +++ b/portultra/scheduler.cpp @@ -2,10 +2,7 @@ #include #include #include -#include -#include -#include "blockingconcurrentqueue.h" #include "multilibultra.hpp" class OSThreadComparator { @@ -25,201 +22,119 @@ public: return false; } - // remove element and re-heap - this->c.erase(it); - std::make_heap(this->c.begin(), this->c.end(), this->comp); + if (it == this->c.begin()) { + // deque the top element + this->pop(); + } else { + // remove element and re-heap + this->c.erase(it); + std::make_heap(this->c.begin(), this->c.end(), this->comp); + } return true; } - - void print() { - std::vector backup = this->c; - debug_printf("[Scheduler] Scheduled Threads:\n"); - while (!empty()) { - OSThread* t = top(); - pop(); - debug_printf(" %d: pri %d state %d\n", t->id, t->priority, t->state); - } - this->c = backup; - } - - bool contains(OSThread* t) { - return std::find(this->c.begin(), this->c.end(), t) != this->c.end(); - } }; -struct NotifySchedulerAction { - -}; - -struct ScheduleThreadAction { - OSThread* t; -}; - -struct StopThreadAction { - OSThread* t; -}; - -struct CleanupThreadAction { - OSThread* t; -}; - -struct ReprioritizeThreadAction { - OSThread* t; - OSPri pri; -}; - -struct YieldedThreadAction { - OSThread* t; -}; - -struct BlockedThreadAction { - OSThread* t; -}; - -struct UnblockThreadAction { - OSThread* t; -}; - -using ThreadAction = std::variant; - static struct { - moodycamel::BlockingConcurrentQueue action_queue{}; - OSThread* running_thread; + std::vector to_schedule; + std::vector to_stop; + std::vector to_cleanup; + std::vector> to_reprioritize; + std::mutex mutex; + // OSThread* running_thread; + std::atomic_int notify_count; + std::atomic_int action_count; + + bool can_preempt; + std::mutex premption_mutex; } scheduler_context{}; -void handle_thread_queueing(thread_queue_t& running_thread_queue, const ScheduleThreadAction& action) { - OSThread* to_schedule = action.t; - debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id); +void handle_thread_queueing(thread_queue_t& running_thread_queue) { + std::lock_guard lock{scheduler_context.mutex}; - // Do not schedule the thread if it's waiting on a message queue - if (to_schedule->state == OSThreadState::BLOCKED_STOPPED) { - to_schedule->state = OSThreadState::BLOCKED_PAUSED; - } - else { - to_schedule->state = OSThreadState::PAUSED; + if (!scheduler_context.to_schedule.empty()) { + OSThread* to_schedule = scheduler_context.to_schedule.back(); + scheduler_context.to_schedule.pop_back(); + scheduler_context.action_count.fetch_sub(1); + debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id); running_thread_queue.push(to_schedule); } } -void handle_thread_stopping(thread_queue_t& running_thread_queue, const StopThreadAction& action) { - OSThread* to_stop = action.t; - debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); +void handle_thread_stopping(thread_queue_t& running_thread_queue) { + std::lock_guard lock{scheduler_context.mutex}; - running_thread_queue.remove(to_stop); - if (running_thread_queue.contains(to_stop)) { - assert(false); - } - - if (to_stop->state == OSThreadState::BLOCKED_PAUSED) { - to_stop->state = OSThreadState::BLOCKED_STOPPED; - } - else { - to_stop->state = OSThreadState::STOPPED; + while (!scheduler_context.to_stop.empty()) { + OSThread* to_stop = scheduler_context.to_stop.back(); + scheduler_context.to_stop.pop_back(); + scheduler_context.action_count.fetch_sub(1); + debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); + running_thread_queue.remove(to_stop); } } -void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread, const CleanupThreadAction& action) { - OSThread* to_cleanup = action.t; - - debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); - running_thread_queue.remove(to_cleanup); - // If the cleaned up thread was the running thread, schedule a new one to run. - if (to_cleanup == cur_running_thread) { - // If there's a thread queued to run, set it as the new running thread. - if (!running_thread_queue.empty()) { - cur_running_thread = running_thread_queue.top(); - } - // Otherwise, set the running thread to null so the next thread that can be run gets started. - else { - cur_running_thread = nullptr; +void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { + std::lock_guard lock{scheduler_context.mutex}; + + while (!scheduler_context.to_cleanup.empty()) { + OSThread* to_cleanup = scheduler_context.to_cleanup.back(); + scheduler_context.to_cleanup.pop_back(); + scheduler_context.action_count.fetch_sub(1); + + debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); + running_thread_queue.remove(to_cleanup); + // If the cleaned up thread was the running thread, schedule a new one to run. + if (to_cleanup == cur_running_thread) { + // If there's a thread queued to run, set it as the new running thread. + if (!running_thread_queue.empty()) { + cur_running_thread = running_thread_queue.top(); + } + // Otherwise, set the running thread to null so the next thread that can be run gets started. + else { + cur_running_thread = nullptr; + } } + to_cleanup->context->host_thread.join(); + delete to_cleanup->context; + to_cleanup->context = nullptr; } - to_cleanup->context->host_thread.join(); - delete to_cleanup->context; - to_cleanup->context = nullptr; } -void handle_thread_reprioritization(thread_queue_t& running_thread_queue, const ReprioritizeThreadAction& action) { - OSThread* to_reprioritize = action.t; - OSPri pri = action.pri; - - debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize->id, pri); - running_thread_queue.remove(to_reprioritize); - to_reprioritize->priority = pri; - running_thread_queue.push(to_reprioritize); +void handle_thread_reprioritization(thread_queue_t& running_thread_queue) { + std::lock_guard lock{scheduler_context.mutex}; + + while (!scheduler_context.to_reprioritize.empty()) { + const std::pair to_reprioritize = scheduler_context.to_reprioritize.back(); + scheduler_context.to_reprioritize.pop_back(); + scheduler_context.action_count.fetch_sub(1); + + debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize.first->id, to_reprioritize.second); + running_thread_queue.remove(to_reprioritize.first); + to_reprioritize.first->priority = to_reprioritize.second; + running_thread_queue.push(to_reprioritize.first); + } } -void handle_thread_yielded(thread_queue_t& running_thread_queue, const YieldedThreadAction& action) { - OSThread* yielded = action.t; - - debug_printf("[Scheduler] Thread %d has yielded\n", yielded->id); - // Remove the yielded thread from the thread queue. If it was in the queue then re-add it so that it's placed after any other threads with the same priority. - if (running_thread_queue.remove(yielded)) { - running_thread_queue.push(yielded); - } - yielded->state = OSThreadState::PAUSED; - debug_printf("[Scheduler] Set thread %d to PAUSED\n", yielded->id); -} - -void handle_thread_blocked(thread_queue_t& running_thread_queue, const BlockedThreadAction& action) { - OSThread* blocked = action.t; - - debug_printf("[Scheduler] Thread %d has been blocked\n", blocked->id); - // Remove the thread from the running queue. - running_thread_queue.remove(blocked); - - // Update the thread's state accordingly. - if (blocked->state == OSThreadState::STOPPED) { - blocked->state = OSThreadState::BLOCKED_STOPPED; - } - else if (blocked->state == OSThreadState::RUNNING) { - blocked->state = OSThreadState::BLOCKED_PAUSED; - } - else { - assert(false); - } - running_thread_queue.remove(blocked); -} - -void handle_thread_unblocking(thread_queue_t& running_thread_queue, const UnblockThreadAction& action) { - OSThread* unblocked = action.t; - - // Do nothing if this thread has already been unblocked. - if (unblocked->state != OSThreadState::BLOCKED_STOPPED && unblocked->state != OSThreadState::BLOCKED_PAUSED) { - return; - } - - debug_printf("[Scheduler] Thread %d has been unblocked\n", unblocked->id); - // Update the thread's state accordingly. - if (unblocked->state == OSThreadState::BLOCKED_STOPPED) { - unblocked->state = OSThreadState::STOPPED; - } - else if (unblocked->state == OSThreadState::BLOCKED_PAUSED) { - // The thread wasn't stopped, so put it back in the running queue now that it's been unblocked. - unblocked->state = OSThreadState::PAUSED; - running_thread_queue.push(unblocked); - } - else { - assert(false); +void handle_scheduler_notifications() { + std::lock_guard lock{scheduler_context.mutex}; + int32_t notify_count = scheduler_context.notify_count.exchange(0); + if (notify_count) { + debug_printf("Received %d notifications\n", notify_count); + scheduler_context.action_count.fetch_sub(notify_count); } } void swap_running_thread(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { if (running_thread_queue.size() > 0) { OSThread* new_running_thread = running_thread_queue.top(); - // If the running thread has changed or the running thread is paused, run the running thread - if (cur_running_thread != new_running_thread || (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING)) { + if (cur_running_thread != new_running_thread) { if (cur_running_thread && cur_running_thread->state == OSThreadState::RUNNING) { debug_printf("[Scheduler] Need to wait for thread %d to pause itself\n", cur_running_thread->id); return; + } else { + debug_printf("[Scheduler] Switching execution to thread %d (%d)\n", new_running_thread->id, new_running_thread->priority); } - debug_printf("[Scheduler] Switching execution to thread %d (%d)\n", new_running_thread->id, new_running_thread->priority); Multilibultra::resume_thread_impl(new_running_thread); - if (cur_running_thread) { - cur_running_thread->context->descheduled.store(true); - cur_running_thread->context->descheduled.notify_all(); - } cur_running_thread = new_running_thread; } else if (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING) { Multilibultra::resume_thread_impl(cur_running_thread); @@ -233,45 +148,26 @@ void scheduler_func() { thread_queue_t running_thread_queue{}; OSThread* cur_running_thread = nullptr; - Multilibultra::set_native_thread_name("Scheduler Thread"); - Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::VeryHigh); - while (true) { - using namespace std::chrono_literals; - ThreadAction action{}; OSThread* old_running_thread = cur_running_thread; - //scheduler_context.action_queue.wait_dequeue_timed(action, 1ms); - scheduler_context.action_queue.wait_dequeue(action); + scheduler_context.action_count.wait(0); - if (std::get_if(&action) == nullptr) { - // Determine the action type and act on it - if (const auto* notify_action = std::get_if(&action)) { - // Nothing to do - } - else if (const auto* stop_action = std::get_if(&action)) { - handle_thread_stopping(running_thread_queue, *stop_action); - } - else if (const auto* cleanup_action = std::get_if(&action)) { - handle_thread_cleanup(running_thread_queue, cur_running_thread, *cleanup_action); - } - else if (const auto* schedule_action = std::get_if(&action)) { - handle_thread_queueing(running_thread_queue, *schedule_action); - } - else if (const auto* reprioritize_action = std::get_if(&action)) { - handle_thread_reprioritization(running_thread_queue, *reprioritize_action); - } - else if (const auto* yielded_action = std::get_if(&action)) { - handle_thread_yielded(running_thread_queue, *yielded_action); - } - else if (const auto* blocked_action = std::get_if(&action)) { - handle_thread_blocked(running_thread_queue, *blocked_action); - } - else if (const auto* unblock_action = std::get_if(&action)) { - handle_thread_unblocking(running_thread_queue, *unblock_action); - } - } + std::lock_guard lock{scheduler_context.premption_mutex}; + + // Handle notifications + handle_scheduler_notifications(); - running_thread_queue.print(); + // Handle stopping threads + handle_thread_stopping(running_thread_queue); + + // Handle cleaning up threads + handle_thread_cleanup(running_thread_queue, cur_running_thread); + + // Handle queueing threads to run + handle_thread_queueing(running_thread_queue); + + // Handle threads that have changed priority + handle_thread_reprioritization(running_thread_queue); // Determine which thread to run, stopping the current running thread if necessary swap_running_thread(running_thread_queue, cur_running_thread); @@ -291,90 +187,97 @@ extern "C" void do_yield() { namespace Multilibultra { void init_scheduler() { + scheduler_context.can_preempt = true; std::thread scheduler_thread{scheduler_func}; scheduler_thread.detach(); } void schedule_running_thread(OSThread *t) { - debug_printf("[Thread] Queuing Thread %d to be scheduled\n", t->id); - scheduler_context.action_queue.enqueue(ScheduleThreadAction{t}); + debug_printf("[Scheduler] Queuing Thread %d to be scheduled\n", t->id); + std::lock_guard lock{scheduler_context.mutex}; + scheduler_context.to_schedule.push_back(t); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); } void swap_to_thread(RDRAM_ARG OSThread *to) { OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); - debug_printf("[Thread] Scheduling swap from thread %d to %d\n", self->id, to->id); - - // Tell the scheduler that the swapped-to thread is ready to run and that this thread is yielding. - schedule_running_thread(to); - yield_self(PASS_RDRAM1); - - // Wait for the scheduler to resume this thread. - wait_for_resumed(PASS_RDRAM1); + debug_printf("[Scheduler] Scheduling swap from thread %d to %d\n", self->id, to->id); + { + std::lock_guard lock{scheduler_context.mutex}; + scheduler_context.to_schedule.push_back(to); + Multilibultra::set_self_paused(PASS_RDRAM1); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); + } + Multilibultra::wait_for_resumed(PASS_RDRAM1); } void reprioritize_thread(OSThread *t, OSPri pri) { - debug_printf("[Thread] Adjusting Thread %d priority to %d\n", t->id, pri); - - scheduler_context.action_queue.enqueue(ReprioritizeThreadAction{t, pri}); + debug_printf("[Scheduler] Adjusting Thread %d priority to %d\n", t->id, pri); + std::lock_guard lock{scheduler_context.mutex}; + scheduler_context.to_reprioritize.emplace_back(t, pri); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); } -void stop_thread(OSThread *t) { - debug_printf("[Thread] Queueing stopping of thread %d\n", t->id); - - scheduler_context.action_queue.enqueue(StopThreadAction{t}); -} - -void Multilibultra::yield_self(RDRAM_ARG1) { - OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread()); - debug_printf("[Thread] Thread %d yielding itself\n", self->id); - - scheduler_context.action_queue.enqueue(YieldedThreadAction{ self }); -} - -void Multilibultra::block_self(RDRAM_ARG1) { - OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread()); - debug_printf("[Thread] Thread %d has been blocked\n", self->id); - - scheduler_context.action_queue.enqueue(BlockedThreadAction{ self }); - -} - -void Multilibultra::unblock_thread(OSThread *t) { - debug_printf("[Thread] Unblocking thread %d\n", t->id); - - scheduler_context.action_queue.enqueue(UnblockThreadAction{ t }); -} - -void halt_self(RDRAM_ARG1) { - OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread()); - debug_printf("[Thread] Thread %d pausing itself\n", self->id); - - stop_thread(self); - yield_self(PASS_RDRAM1); - wait_for_resumed(PASS_RDRAM1); +void pause_self(RDRAM_ARG1) { + OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); + debug_printf("[Scheduler] Thread %d pausing itself\n", self->id); + { + std::lock_guard lock{scheduler_context.mutex}; + Multilibultra::set_self_paused(PASS_RDRAM1); + scheduler_context.to_stop.push_back(self); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); + } + Multilibultra::wait_for_resumed(PASS_RDRAM1); } void cleanup_thread(OSThread *t) { - scheduler_context.action_queue.enqueue(CleanupThreadAction{t}); + std::lock_guard lock{scheduler_context.mutex}; + scheduler_context.to_cleanup.push_back(t); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); +} + +void disable_preemption() { + scheduler_context.premption_mutex.lock(); + if (Multilibultra::is_game_thread()) { + scheduler_context.can_preempt = false; + } +} + +void enable_preemption() { + if (Multilibultra::is_game_thread()) { + scheduler_context.can_preempt = true; + } +#pragma warning(push) +#pragma warning( disable : 26110) + scheduler_context.premption_mutex.unlock(); +#pragma warning( pop ) +} + +// lock's constructor is called first, so can_preempt is set after locking +preemption_guard::preemption_guard() : lock{scheduler_context.premption_mutex} { + scheduler_context.can_preempt = false; +} + +// lock's destructor is called last, so can_preempt is set before unlocking +preemption_guard::~preemption_guard() { + scheduler_context.can_preempt = true; } void notify_scheduler() { - scheduler_context.action_queue.enqueue(NotifySchedulerAction{}); -} - -void resume_thread_impl(OSThread* t) { - if (t->state == OSThreadState::PREEMPTED) { - // Nothing to do here - } - t->state = OSThreadState::RUNNING; - debug_printf("[Scheduler] Set thread %d to RUNNING\n", t->id); - t->context->scheduled.store(true); - t->context->scheduled.notify_all(); + std::lock_guard lock{scheduler_context.mutex}; + scheduler_context.notify_count.fetch_add(1); + scheduler_context.action_count.fetch_add(1); + scheduler_context.action_count.notify_all(); } } extern "C" void pause_self(uint8_t* rdram) { - Multilibultra::halt_self(rdram); + Multilibultra::pause_self(rdram); } diff --git a/portultra/threads.cpp b/portultra/threads.cpp index 36cee41..9d6e69c 100644 --- a/portultra/threads.cpp +++ b/portultra/threads.cpp @@ -131,6 +131,7 @@ static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entry debug_printf("[Thread] Thread waiting to be started: %d\n", self->id); // Wait until the thread is marked as running. + Multilibultra::set_self_paused(PASS_RDRAM1); Multilibultra::wait_for_resumed(PASS_RDRAM1); debug_printf("[Thread] Thread started: %d\n", self->id); @@ -150,7 +151,7 @@ extern "C" void osStartThread(RDRAM_ARG PTR(OSThread) t_) { OSThread* t = TO_PTR(OSThread, t_); debug_printf("[os] Start Thread %d\n", t->id); - // Wait until the thread is initialized to indicate that it's queued to be started. + // Wait until the thread is initialized to indicate that it's action_queued to be started. t->context->initialized.wait(false); debug_printf("[os] Thread %d is ready to be started\n", t->id); @@ -175,33 +176,20 @@ extern "C" void osCreateThread(RDRAM_ARG PTR(OSThread) t_, OSId id, PTR(thread_f t->next = NULLPTR; t->priority = pri; t->id = id; - t->state = OSThreadState::STOPPED; + t->state = OSThreadState::PAUSED; t->sp = sp - 0x10; // Set up the first stack frame t->destroyed = false; // Spawn a new thread, which will immediately pause itself and wait until it's been started. t->context = new UltraThreadContext{}; t->context->initialized.store(false); - t->context->scheduled.store(false); - t->context->descheduled.store(true); + t->context->running.store(false); t->context->host_thread = std::thread{_thread_func, PASS_RDRAM t_, entrypoint, arg}; } extern "C" void osStopThread(RDRAM_ARG PTR(OSThread) t_) { - // If null is passed in as the thread then the calling thread is stopping itself. - if (t_ == NULLPTR) { - t_ = Multilibultra::this_thread(); - } - - // Remove the thread in question from the scheduler so it doesn't get scheduled again. - OSThread* t = TO_PTR(OSThread, t_); - Multilibultra::stop_thread(t); - - // If a thread is stopping itself, tell the scheduler that it has yielded. - if (t_ == Multilibultra::this_thread()) { - Multilibultra::yield_self(PASS_RDRAM1); - } + assert(false); } extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) { @@ -217,12 +205,6 @@ extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) { } } -// TODO make the thread queue stable to ensure correct yielding behavior -extern "C" void osYieldThread(RDRAM_ARG1) { - Multilibultra::yield_self(PASS_RDRAM1); - Multilibultra::wait_for_resumed(PASS_RDRAM1); -} - extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) { if (t == NULLPTR) { t = thread_self; @@ -230,12 +212,13 @@ extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) { bool pause_self = false; if (pri > TO_PTR(OSThread, thread_self)->priority) { pause_self = true; + Multilibultra::set_self_paused(PASS_RDRAM1); } else if (t == thread_self && pri < TO_PTR(OSThread, thread_self)->priority) { pause_self = true; + Multilibultra::set_self_paused(PASS_RDRAM1); } Multilibultra::reprioritize_thread(TO_PTR(OSThread, t), pri); if (pause_self) { - Multilibultra::yield_self(PASS_RDRAM1); Multilibultra::wait_for_resumed(PASS_RDRAM1); } } @@ -254,6 +237,15 @@ extern "C" OSId osGetThreadId(RDRAM_ARG PTR(OSThread) t) { return TO_PTR(OSThread, t)->id; } +// TODO yield thread, need a stable priority queue in the scheduler + +void Multilibultra::set_self_paused(RDRAM_ARG1) { + debug_printf("[Thread] Thread pausing itself: %d\n", TO_PTR(OSThread, thread_self)->id); + TO_PTR(OSThread, thread_self)->state = OSThreadState::PAUSED; + TO_PTR(OSThread, thread_self)->context->running.store(false); + TO_PTR(OSThread, thread_self)->context->running.notify_all(); +} + void check_destroyed(OSThread* t) { if (t->destroyed) { throw thread_terminated{}; @@ -262,13 +254,25 @@ void check_destroyed(OSThread* t) { void Multilibultra::wait_for_resumed(RDRAM_ARG1) { check_destroyed(TO_PTR(OSThread, thread_self)); - //TO_PTR(OSThread, thread_self)->context->descheduled.wait(false); - //TO_PTR(OSThread, thread_self)->context->descheduled.store(false); - TO_PTR(OSThread, thread_self)->context->scheduled.wait(false); - TO_PTR(OSThread, thread_self)->context->scheduled.store(false); + TO_PTR(OSThread, thread_self)->context->running.wait(false); check_destroyed(TO_PTR(OSThread, thread_self)); } +void Multilibultra::pause_thread_impl(OSThread* t) { + t->state = OSThreadState::PREEMPTED; + t->context->running.store(false); + t->context->running.notify_all(); +} + +void Multilibultra::resume_thread_impl(OSThread *t) { + if (t->state == OSThreadState::PREEMPTED) { + // Nothing to do here + } + t->state = OSThreadState::RUNNING; + t->context->running.store(true); + t->context->running.notify_all(); +} + PTR(OSThread) Multilibultra::this_thread() { return thread_self; } diff --git a/src/portultra_translation.cpp b/src/portultra_translation.cpp index e32ca68..bed5aa7 100644 --- a/src/portultra_translation.cpp +++ b/src/portultra_translation.cpp @@ -29,7 +29,8 @@ extern "C" void osDestroyThread_recomp(uint8_t * rdram, recomp_context * ctx) { } extern "C" void osYieldThread_recomp(uint8_t * rdram, recomp_context * ctx) { - osYieldThread(rdram); + assert(false); + // osYieldThread(rdram); } extern "C" void osSetThreadPri_recomp(uint8_t* rdram, recomp_context* ctx) {