diff --git a/CMakeLists.txt b/CMakeLists.txt index defa53c..e959cde 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,7 +104,8 @@ set (SOURCES ${CMAKE_SOURCE_DIR}/ultramodern/mesgqueue.cpp ${CMAKE_SOURCE_DIR}/ultramodern/misc_ultra.cpp ${CMAKE_SOURCE_DIR}/ultramodern/port_main.c - ${CMAKE_SOURCE_DIR}/ultramodern/scheduler.cpp + ${CMAKE_SOURCE_DIR}/ultramodern/scheduling.cpp + ${CMAKE_SOURCE_DIR}/ultramodern/threadqueue.cpp ${CMAKE_SOURCE_DIR}/ultramodern/task_win32.cpp ${CMAKE_SOURCE_DIR}/ultramodern/threads.cpp ${CMAKE_SOURCE_DIR}/ultramodern/timer.cpp diff --git a/src/game/input.cpp b/src/game/input.cpp index 96d5843..bde18fa 100644 --- a/src/game/input.cpp +++ b/src/game/input.cpp @@ -1,4 +1,5 @@ #include +#include #include "../ultramodern/ultramodern.hpp" #include "recomp.h" diff --git a/src/recomp/print.cpp b/src/recomp/print.cpp index 7c7bd23..b0ea771 100644 --- a/src/recomp/print.cpp +++ b/src/recomp/print.cpp @@ -1,3 +1,5 @@ +#include + #include "../ultramodern/ultra64.h" #include "../ultramodern/ultramodern.hpp" #include "recomp.h" diff --git a/src/recomp/recomp.cpp b/src/recomp/recomp.cpp index 10ac659..25eb2f6 100644 --- a/src/recomp/recomp.cpp +++ b/src/recomp/recomp.cpp @@ -419,7 +419,11 @@ void recomp::start(ultramodern::WindowHandle window_handle, const ultramodern::a recomp::message_box("Error opening stored ROM! Please restart this program."); } init(rdram, &context); - recomp_entrypoint(rdram, &context); + try { + recomp_entrypoint(rdram, &context); + } catch (ultramodern::thread_terminated& terminated) { + + } break; case recomp::Game::Quit: break; @@ -436,4 +440,5 @@ void recomp::start(ultramodern::WindowHandle window_handle, const ultramodern::a } game_thread.join(); ultramodern::join_event_threads(); + ultramodern::join_thread_cleaner_thread(); } diff --git a/ultramodern/events.cpp b/ultramodern/events.cpp index e80ede5..6ba64dd 100644 --- a/ultramodern/events.cpp +++ b/ultramodern/events.cpp @@ -64,7 +64,8 @@ static struct { std::mutex message_mutex; uint8_t* rdram; moodycamel::BlockingConcurrentQueue action_queue{}; - std::atomic sp_task = nullptr; + moodycamel::BlockingConcurrentQueue sp_task_queue{}; + moodycamel::ConcurrentQueue deleted_threads{}; } events_context{}; extern "C" void osSetEventMesg(RDRAM_ARG OSEvent event_id, PTR(OSMesgQueue) mq_, OSMesg msg) { @@ -227,26 +228,22 @@ void run_rsp_microcode(uint8_t* rdram, const OSTask* task, RspUcodeFunc* ucode_f } -void task_thread_func(uint8_t* rdram, std::atomic_flag* thread_ready) { +void task_thread_func(uint8_t* rdram, moodycamel::LightweightSemaphore* thread_ready) { ultramodern::set_native_thread_name("SP Task Thread"); ultramodern::set_native_thread_priority(ultramodern::ThreadPriority::Normal); // Notify the caller thread that this thread is ready. - thread_ready->test_and_set(); - thread_ready->notify_all(); + thread_ready->signal(); while (true) { // Wait until an RSP task has been sent - events_context.sp_task.wait(nullptr); + OSTask* task; + events_context.sp_task_queue.wait_dequeue(task); - if (exited) { + if (task == nullptr) { return; } - // Retrieve the task pointer and clear the pending RSP task - OSTask* task = events_context.sp_task; - events_context.sp_task.store(nullptr); - // Run the correct function based on the task type if (task->t.type == M_AUDTASK) { run_rsp_microcode(rdram, task, aspMain); @@ -296,7 +293,7 @@ uint32_t ultramodern::get_display_refresh_rate() { return display_refresh_rate.load(); } -void gfx_thread_func(uint8_t* rdram, std::atomic_flag* thread_ready, ultramodern::WindowHandle window_handle) { +void gfx_thread_func(uint8_t* rdram, moodycamel::LightweightSemaphore* thread_ready, ultramodern::WindowHandle window_handle) { bool enabled_instant_present = false; using namespace std::chrono_literals; @@ -317,8 +314,7 @@ void gfx_thread_func(uint8_t* rdram, std::atomic_flag* thread_ready, ultramodern rsp_constants_init(); // Notify the caller thread that this thread is ready. - thread_ready->test_and_set(); - thread_ready->notify_all(); + thread_ready->signal(); while (!exited) { // Try to pull an action from the queue @@ -522,8 +518,7 @@ void ultramodern::submit_rsp_task(RDRAM_ARG PTR(OSTask) task_) { } // Set all other tasks as the RSP task else { - events_context.sp_task.store(task); - events_context.sp_task.notify_all(); + events_context.sp_task_queue.enqueue(task); } } @@ -533,16 +528,16 @@ void ultramodern::send_si_message() { } void ultramodern::init_events(uint8_t* rdram, ultramodern::WindowHandle window_handle) { - std::atomic_flag gfx_thread_ready; - std::atomic_flag task_thread_ready; + moodycamel::LightweightSemaphore gfx_thread_ready; + moodycamel::LightweightSemaphore task_thread_ready; events_context.rdram = rdram; events_context.sp.gfx_thread = std::thread{ gfx_thread_func, rdram, &gfx_thread_ready, window_handle }; events_context.sp.task_thread = std::thread{ task_thread_func, rdram, &task_thread_ready }; // Wait for the two sp threads to be ready before continuing to prevent the game from // running before we're able to handle RSP tasks. - gfx_thread_ready.wait(false); - task_thread_ready.wait(false); + gfx_thread_ready.wait(); + task_thread_ready.wait(); events_context.vi.thread = std::thread{ vi_thread_func }; } @@ -551,16 +546,7 @@ void ultramodern::join_event_threads() { events_context.sp.gfx_thread.join(); events_context.vi.thread.join(); - // Send a dummy RSP task so that the task thread is able to exit it's atomic wait and terminate. - OSTask dummy_task{}; - OSTask* expected = nullptr; - - // Attempt to exchange the task with the dummy task one until it was nullptr, as that indicates the - // task thread was ready for a new task. - do { - expected = nullptr; - } while (!events_context.sp_task.compare_exchange_weak(expected, &dummy_task)); - events_context.sp_task.notify_all(); - + // Send a null RSP task to indicate that the RSP task thread should exit. + events_context.sp_task_queue.enqueue(nullptr); events_context.sp.task_thread.join(); } diff --git a/ultramodern/mesgqueue.cpp b/ultramodern/mesgqueue.cpp index 141b751..f32b174 100644 --- a/ultramodern/mesgqueue.cpp +++ b/ultramodern/mesgqueue.cpp @@ -1,10 +1,38 @@ #include -#include + +#include "blockingconcurrentqueue.h" #include "ultra64.h" #include "ultramodern.hpp" #include "recomp.h" +struct QueuedMessage { + PTR(OSMesgQueue) mq; + OSMesg mesg; + bool jam; +}; + +static moodycamel::BlockingConcurrentQueue external_messages {}; + +void enqueue_external_message(PTR(OSMesgQueue) mq, OSMesg msg, bool jam) { + external_messages.enqueue({mq, msg, jam}); +} + +bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block); + +void dequeue_external_messages(RDRAM_ARG1) { + QueuedMessage to_send; + while (external_messages.try_dequeue(to_send)) { + do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false); + } +} + +void ultramodern::wait_for_external_message(RDRAM_ARG1) { + QueuedMessage to_send; + external_messages.wait_dequeue(to_send); + do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false); +} + extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) { OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); mq->blocked_on_recv = NULLPTR; @@ -27,168 +55,136 @@ s32 MQ_IS_FULL(OSMesgQueue* mq) { return MQ_GET_COUNT(mq) >= mq->msgCount; } -void thread_queue_insert(RDRAM_ARG PTR(OSThread)* queue, PTR(OSThread) toadd_) { - PTR(OSThread)* cur = queue; - OSThread* toadd = TO_PTR(OSThread, toadd_); - while (*cur && TO_PTR(OSThread, *cur)->priority > toadd->priority) { - cur = &TO_PTR(OSThread, *cur)->next; - } - toadd->next = (*cur); - *cur = toadd_; -} - -OSThread* thread_queue_pop(RDRAM_ARG PTR(OSThread)* queue) { - PTR(OSThread) ret = *queue; - *queue = TO_PTR(OSThread, ret)->next; - return TO_PTR(OSThread, ret); -} - -bool thread_queue_empty(RDRAM_ARG PTR(OSThread)* queue) { - return *queue == NULLPTR; -} - -extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { - OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); - - // Prevent accidentally blocking anything that isn't a game thread - if (!ultramodern::is_game_thread()) { - flags = OS_MESG_NOBLOCK; - } - - ultramodern::disable_preemption(); - - if (flags == OS_MESG_NOBLOCK) { - // If non-blocking, fail if the queue is full +bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block) { + OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_); + if (!block) { + // If non-blocking, fail if the queue is full. if (MQ_IS_FULL(mq)) { - ultramodern::enable_preemption(); - return -1; + return false; } - } else { - // Otherwise, yield this thread until the queue has room + } + else { + // Otherwise, yield this thread until the queue has room. while (MQ_IS_FULL(mq)) { debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, ultramodern::this_thread())->id); - thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, ultramodern::this_thread()); - ultramodern::enable_preemption(); - ultramodern::pause_self(PASS_RDRAM1); - ultramodern::disable_preemption(); + ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_send), ultramodern::this_thread()); + ultramodern::run_next_thread(PASS_RDRAM1); + ultramodern::wait_for_resumed(PASS_RDRAM1); } } - s32 last = (mq->first + mq->validCount) % mq->msgCount; - TO_PTR(OSMesg, mq->msg)[last] = msg; - mq->validCount++; - - OSThread* to_run = nullptr; + if (jam) { + // Jams insert at the head of the message queue's buffer. + mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount; + TO_PTR(OSMesg, mq->msg)[mq->first] = msg; + mq->validCount++; + } + else { + // Sends insert at the tail of the message queue's buffer. + s32 last = (mq->first + mq->validCount) % mq->msgCount; + TO_PTR(OSMesg, mq->msg)[last] = msg; + mq->validCount++; + } - if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) { - to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv); + // If any threads were blocked on receiving from this message queue, pop the first one and schedule it. + PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv); + if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) { + ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue)); } - ultramodern::enable_preemption(); - if (to_run) { - debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); - if (ultramodern::is_game_thread()) { - OSThread* self = TO_PTR(OSThread, ultramodern::this_thread()); - if (to_run->priority > self->priority) { - ultramodern::swap_to_thread(PASS_RDRAM to_run); - } else { - ultramodern::schedule_running_thread(to_run); - } - } else { - ultramodern::schedule_running_thread(to_run); - } - } - return 0; + return true; } -extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { - OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); - ultramodern::disable_preemption(); - - if (flags == OS_MESG_NOBLOCK) { - // If non-blocking, fail if the queue is full - if (MQ_IS_FULL(mq)) { - ultramodern::enable_preemption(); - return -1; - } - } else { - // Otherwise, yield this thread in a loop until the queue is no longer full - while (MQ_IS_FULL(mq)) { - debug_printf("[Message Queue] Thread %d is blocked on jam\n", TO_PTR(OSThread, ultramodern::this_thread())->id); - thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, ultramodern::this_thread()); - ultramodern::enable_preemption(); - ultramodern::pause_self(PASS_RDRAM1); - ultramodern::disable_preemption(); - } - } - - mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount; - TO_PTR(OSMesg, mq->msg)[mq->first] = msg; - mq->validCount++; - - OSThread *to_run = nullptr; - - if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) { - to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv); - } - - ultramodern::enable_preemption(); - if (to_run) { - debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); - OSThread *self = TO_PTR(OSThread, ultramodern::this_thread()); - if (to_run->priority > self->priority) { - ultramodern::swap_to_thread(PASS_RDRAM to_run); - } else { - ultramodern::schedule_running_thread(to_run); - } - } - return 0; -} - -extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) { - OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); - OSMesg *msg = TO_PTR(OSMesg, msg_); - ultramodern::disable_preemption(); - - if (flags == OS_MESG_NOBLOCK) { +bool do_recv(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, bool block) { + OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_); + if (!block) { // If non-blocking, fail if the queue is empty if (MQ_IS_EMPTY(mq)) { - ultramodern::enable_preemption(); - return -1; + return false; } } else { // Otherwise, yield this thread in a loop until the queue is no longer full while (MQ_IS_EMPTY(mq)) { debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, ultramodern::this_thread())->id); - thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, ultramodern::this_thread()); - ultramodern::enable_preemption(); - ultramodern::pause_self(PASS_RDRAM1); - ultramodern::disable_preemption(); + ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv), ultramodern::this_thread()); + ultramodern::run_next_thread(PASS_RDRAM1); + ultramodern::wait_for_resumed(PASS_RDRAM1); } } if (msg_ != NULLPTR) { - *msg = TO_PTR(OSMesg, mq->msg)[mq->first]; + *TO_PTR(OSMesg, msg_) = TO_PTR(OSMesg, mq->msg)[mq->first]; } mq->first = (mq->first + 1) % mq->msgCount; mq->validCount--; - OSThread *to_run = nullptr; + // If any threads were blocked on sending to this message queue, pop the first one and schedule it. + PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_send); + if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) { + ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue)); + } - if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) { - to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send); + return true; +} + +extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + bool jam = false; + + // Don't directly send to the message queue if this isn't a game thread to avoid contention. + if (!ultramodern::is_game_thread()) { + enqueue_external_message(mq_, msg, jam); + return 0; } - ultramodern::enable_preemption(); - if (to_run) { - debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); - OSThread *self = TO_PTR(OSThread, ultramodern::this_thread()); - if (to_run->priority > self->priority) { - ultramodern::swap_to_thread(PASS_RDRAM to_run); - } else { - ultramodern::schedule_running_thread(to_run); - } - } - return 0; + // Handle any messages that have been received from an external thread. + dequeue_external_messages(PASS_RDRAM1); + + // Try to send the message. + bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK); + + // Check the queue to see if this thread should swap execution to another. + ultramodern::check_running_queue(PASS_RDRAM1); + + return sent ? 0 : -1; +} + +extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) { + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + bool jam = true; + + // Don't directly send to the message queue if this isn't a game thread to avoid contention. + if (!ultramodern::is_game_thread()) { + enqueue_external_message(mq_, msg, jam); + return 0; + } + + // Handle any messages that have been received from an external thread. + dequeue_external_messages(PASS_RDRAM1); + + // Try to send the message. + bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK); + + // Check the queue to see if this thread should swap execution to another. + ultramodern::check_running_queue(PASS_RDRAM1); + + return sent ? 0 : -1; +} + +extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) { + OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); + + assert(ultramodern::is_game_thread() && "RecvMesg not allowed outside of game threads."); + + // Handle any messages that have been received from an external thread. + dequeue_external_messages(PASS_RDRAM1); + + // Try to receive a message. + bool received = do_recv(PASS_RDRAM mq_, msg_, flags == OS_MESG_BLOCK); + + // Check the queue to see if this thread should swap execution to another. + ultramodern::check_running_queue(PASS_RDRAM1); + + return received ? 0 : -1; } diff --git a/ultramodern/scheduler.cpp b/ultramodern/scheduler.cpp deleted file mode 100644 index 3153da9..0000000 --- a/ultramodern/scheduler.cpp +++ /dev/null @@ -1,283 +0,0 @@ -#include -#include -#include -#include - -#include "ultramodern.hpp" - -class OSThreadComparator { -public: - bool operator() (OSThread *a, OSThread *b) const { - return a->priority < b->priority; - } -}; - -class thread_queue_t : public std::priority_queue, OSThreadComparator> { -public: - // TODO comment this - bool remove(const OSThread* value) { - auto it = std::find(this->c.begin(), this->c.end(), value); - - if (it == this->c.end()) { - return false; - } - - if (it == this->c.begin()) { - // deque the top element - this->pop(); - } else { - // remove element and re-heap - this->c.erase(it); - std::make_heap(this->c.begin(), this->c.end(), this->comp); - } - - return true; - } -}; - -static struct { - std::vector to_schedule; - std::vector to_stop; - std::vector to_cleanup; - std::vector> to_reprioritize; - std::mutex mutex; - // OSThread* running_thread; - std::atomic_int notify_count; - std::atomic_int action_count; - - bool can_preempt; - std::mutex premption_mutex; -} scheduler_context{}; - -void handle_thread_queueing(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - if (!scheduler_context.to_schedule.empty()) { - OSThread* to_schedule = scheduler_context.to_schedule.back(); - scheduler_context.to_schedule.pop_back(); - scheduler_context.action_count.fetch_sub(1); - debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id); - running_thread_queue.push(to_schedule); - } -} - -void handle_thread_stopping(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_stop.empty()) { - OSThread* to_stop = scheduler_context.to_stop.back(); - scheduler_context.to_stop.pop_back(); - scheduler_context.action_count.fetch_sub(1); - debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); - running_thread_queue.remove(to_stop); - } -} - -void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_cleanup.empty()) { - OSThread* to_cleanup = scheduler_context.to_cleanup.back(); - scheduler_context.to_cleanup.pop_back(); - scheduler_context.action_count.fetch_sub(1); - - debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); - running_thread_queue.remove(to_cleanup); - // If the cleaned up thread was the running thread, schedule a new one to run. - if (to_cleanup == cur_running_thread) { - // If there's a thread queued to run, set it as the new running thread. - if (!running_thread_queue.empty()) { - cur_running_thread = running_thread_queue.top(); - } - // Otherwise, set the running thread to null so the next thread that can be run gets started. - else { - cur_running_thread = nullptr; - } - } - to_cleanup->context->host_thread.join(); - delete to_cleanup->context; - to_cleanup->context = nullptr; - } -} - -void handle_thread_reprioritization(thread_queue_t& running_thread_queue) { - std::lock_guard lock{scheduler_context.mutex}; - - while (!scheduler_context.to_reprioritize.empty()) { - const std::pair to_reprioritize = scheduler_context.to_reprioritize.back(); - scheduler_context.to_reprioritize.pop_back(); - scheduler_context.action_count.fetch_sub(1); - - debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize.first->id, to_reprioritize.second); - running_thread_queue.remove(to_reprioritize.first); - to_reprioritize.first->priority = to_reprioritize.second; - running_thread_queue.push(to_reprioritize.first); - } -} - -void handle_scheduler_notifications() { - std::lock_guard lock{scheduler_context.mutex}; - int32_t notify_count = scheduler_context.notify_count.exchange(0); - if (notify_count) { - debug_printf("Received %d notifications\n", notify_count); - scheduler_context.action_count.fetch_sub(notify_count); - } -} - -void swap_running_thread(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { - if (running_thread_queue.size() > 0) { - OSThread* new_running_thread = running_thread_queue.top(); - if (cur_running_thread != new_running_thread) { - if (cur_running_thread && cur_running_thread->state == OSThreadState::RUNNING) { - debug_printf("[Scheduler] Need to wait for thread %d to pause itself\n", cur_running_thread->id); - return; - } else { - debug_printf("[Scheduler] Switching execution to thread %d (%d)\n", new_running_thread->id, new_running_thread->priority); - } - ultramodern::resume_thread_impl(new_running_thread); - cur_running_thread = new_running_thread; - } else if (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING) { - ultramodern::resume_thread_impl(cur_running_thread); - } - } else { - cur_running_thread = nullptr; - } -} - -extern std::atomic_bool exited; - -void scheduler_func() { - thread_queue_t running_thread_queue{}; - OSThread* cur_running_thread = nullptr; - - ultramodern::set_native_thread_name("Scheduler Thread"); - ultramodern::set_native_thread_priority(ultramodern::ThreadPriority::Critical); - - while (true) { - OSThread* old_running_thread = cur_running_thread; - scheduler_context.action_count.wait(0); - - std::lock_guard lock{scheduler_context.premption_mutex}; - - // Handle notifications - handle_scheduler_notifications(); - - // Handle stopping threads - handle_thread_stopping(running_thread_queue); - - // Handle cleaning up threads - handle_thread_cleanup(running_thread_queue, cur_running_thread); - - // Handle queueing threads to run - handle_thread_queueing(running_thread_queue); - - // Handle threads that have changed priority - handle_thread_reprioritization(running_thread_queue); - - if (!exited) { - // Determine which thread to run, stopping the current running thread if necessary - swap_running_thread(running_thread_queue, cur_running_thread); - } - else { - return; - } - - std::this_thread::yield(); - if (old_running_thread != cur_running_thread && old_running_thread && cur_running_thread) { - debug_printf("[Scheduler] Swapped from Thread %d (%d) to Thread %d (%d)\n", - old_running_thread->id, old_running_thread->priority, cur_running_thread->id, cur_running_thread->priority); - } - } -} - -extern "C" void do_yield() { - std::this_thread::yield(); -} - -namespace ultramodern { - -void init_scheduler() { - scheduler_context.can_preempt = true; - std::thread scheduler_thread{scheduler_func}; - scheduler_thread.detach(); -} - -void schedule_running_thread(OSThread *t) { - debug_printf("[Scheduler] Queuing Thread %d to be scheduled\n", t->id); - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_schedule.push_back(t); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); -} - -void swap_to_thread(RDRAM_ARG OSThread *to) { - OSThread *self = TO_PTR(OSThread, ultramodern::this_thread()); - debug_printf("[Scheduler] Scheduling swap from thread %d to %d\n", self->id, to->id); - { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_schedule.push_back(to); - ultramodern::set_self_paused(PASS_RDRAM1); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); - } - ultramodern::wait_for_resumed(PASS_RDRAM1); -} - -void reprioritize_thread(OSThread *t, OSPri pri) { - debug_printf("[Scheduler] Adjusting Thread %d priority to %d\n", t->id, pri); - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_reprioritize.emplace_back(t, pri); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); -} - -void pause_self(RDRAM_ARG1) { - OSThread *self = TO_PTR(OSThread, ultramodern::this_thread()); - debug_printf("[Scheduler] Thread %d pausing itself\n", self->id); - { - std::lock_guard lock{scheduler_context.mutex}; - ultramodern::set_self_paused(PASS_RDRAM1); - scheduler_context.to_stop.push_back(self); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); - } - ultramodern::wait_for_resumed(PASS_RDRAM1); -} - -void cleanup_thread(OSThread *t) { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.to_cleanup.push_back(t); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); -} - -void disable_preemption() { - scheduler_context.premption_mutex.lock(); - if (ultramodern::is_game_thread()) { - scheduler_context.can_preempt = false; - } -} - -void enable_preemption() { - if (ultramodern::is_game_thread()) { - scheduler_context.can_preempt = true; - } -#pragma warning(push) -#pragma warning( disable : 26110) - scheduler_context.premption_mutex.unlock(); -#pragma warning( pop ) -} - -void notify_scheduler() { - std::lock_guard lock{scheduler_context.mutex}; - scheduler_context.notify_count.fetch_add(1); - scheduler_context.action_count.fetch_add(1); - scheduler_context.action_count.notify_all(); -} - -} - -extern "C" void pause_self(uint8_t* rdram) { - ultramodern::pause_self(rdram); -} - diff --git a/ultramodern/scheduling.cpp b/ultramodern/scheduling.cpp new file mode 100644 index 0000000..1064cc4 --- /dev/null +++ b/ultramodern/scheduling.cpp @@ -0,0 +1,66 @@ +#include "ultramodern.hpp" + +void ultramodern::run_next_thread(RDRAM_ARG1) { + if (thread_queue_empty(PASS_RDRAM running_queue)) { + throw std::runtime_error("No threads left to run!\n"); + } + + OSThread* to_run = TO_PTR(OSThread, thread_queue_pop(PASS_RDRAM running_queue)); + debug_printf("[Scheduling] Resuming execution of thread %d\n", to_run->id); + to_run->context->running.signal(); +} + +void ultramodern::schedule_running_thread(RDRAM_ARG PTR(OSThread) t_) { + debug_printf("[Scheduling] Adding thread %d to the running queue\n", TO_PTR(OSThread, t_)->id); + thread_queue_insert(PASS_RDRAM running_queue, t_); + TO_PTR(OSThread, t_)->state = OSThreadState::QUEUED; +} + +void ultramodern::check_running_queue(RDRAM_ARG1) { + // Check if there are any threads in the running queue. + if (!thread_queue_empty(PASS_RDRAM running_queue)) { + // Check if the highest priority thread in the queue is higher priority than the current thread. + OSThread* next_thread = TO_PTR(OSThread, ultramodern::thread_queue_peek(PASS_RDRAM running_queue)); + OSThread* self = TO_PTR(OSThread, ultramodern::this_thread()); + if (next_thread->priority > self->priority) { + ultramodern::thread_queue_pop(PASS_RDRAM running_queue); + // Swap to the higher priority thread. + ultramodern::swap_to_thread(PASS_RDRAM next_thread); + } + } +} + +void ultramodern::swap_to_thread(RDRAM_ARG OSThread *to) { + debug_printf("[Scheduling] Thread %d giving execution to thread %d\n", TO_PTR(OSThread, ultramodern::this_thread())->id, to->id); + // Insert this thread in the running queue. + thread_queue_insert(PASS_RDRAM running_queue, ultramodern::this_thread()); + TO_PTR(OSThread, ultramodern::this_thread())->state = OSThreadState::QUEUED; + // Unpause the target thread and wait for this one to be unpaused. + ultramodern::resume_thread(to); + ultramodern::wait_for_resumed(PASS_RDRAM1); +} + +void ultramodern::wait_for_resumed(RDRAM_ARG1) { + TO_PTR(OSThread, ultramodern::this_thread())->context->running.wait(); + // If this thread was marked to be destroyed by another thre, destroy it again from its own context. + // This will actually destroy the thread instead of just marking it to be destroyed. + if (TO_PTR(OSThread, ultramodern::this_thread())->destroyed) { + osDestroyThread(PASS_RDRAM NULLPTR); + } +} + +void ultramodern::resume_thread(OSThread *t) { + if (t->state != OSThreadState::QUEUED) { + assert(false && "Threads should only be resumed from the queued state!"); + } + t->state = OSThreadState::RUNNING; + t->context->running.signal(); +} + +extern "C" void pause_self(RDRAM_ARG1) { + while (true) { + // Wait until an external message arrives, then allow the next thread to run. + ultramodern::wait_for_external_message(PASS_RDRAM1); + ultramodern::check_running_queue(PASS_RDRAM1); + } +} diff --git a/ultramodern/threadqueue.cpp b/ultramodern/threadqueue.cpp new file mode 100644 index 0000000..5869811 --- /dev/null +++ b/ultramodern/threadqueue.cpp @@ -0,0 +1,66 @@ +#include + +#include "ultramodern.hpp" + +static PTR(OSThread) running_queue_impl = NULLPTR; + +static PTR(OSThread)* queue_to_ptr(RDRAM_ARG PTR(PTR(OSThread)) queue) { + if (queue == ultramodern::running_queue) { + return &running_queue_impl; + } + return TO_PTR(PTR(OSThread), queue); +} + +void ultramodern::thread_queue_insert(RDRAM_ARG PTR(PTR(OSThread)) queue_, PTR(OSThread) toadd_) { + PTR(OSThread)* cur = queue_to_ptr(PASS_RDRAM queue_); + OSThread* toadd = TO_PTR(OSThread, toadd_); + debug_printf("[Thread Queue] Inserting thread %d into queue 0x%08X\n", toadd->id, (uintptr_t)queue_); + while (*cur && TO_PTR(OSThread, *cur)->priority > toadd->priority) { + cur = &TO_PTR(OSThread, *cur)->next; + } + toadd->next = (*cur); + toadd->queue = queue_; + *cur = toadd_; + + debug_printf(" Contains:"); + cur = queue_to_ptr(PASS_RDRAM queue_); + while (*cur) { + debug_printf("%d (%d) ", TO_PTR(OSThread, *cur)->id, TO_PTR(OSThread, *cur)->priority); + cur = &TO_PTR(OSThread, *cur)->next; + } + debug_printf("\n"); +} + +PTR(OSThread) ultramodern::thread_queue_pop(RDRAM_ARG PTR(PTR(OSThread)) queue_) { + PTR(OSThread)* queue = queue_to_ptr(PASS_RDRAM queue_); + PTR(OSThread) ret = *queue; + *queue = TO_PTR(OSThread, ret)->next; + TO_PTR(OSThread, ret)->queue = NULLPTR; + debug_printf("[Thread Queue] Popped thread %d from queue 0x%08X\n", TO_PTR(OSThread, ret)->id, (uintptr_t)queue_); + return ret; +} + +bool ultramodern::thread_queue_remove(RDRAM_ARG PTR(PTR(OSThread)) queue_, PTR(OSThread) t_) { + debug_printf("[Thread Queue] Removing thread %d from queue 0x%08X\n", TO_PTR(OSThread, t_)->id, (uintptr_t)queue_); + + PTR(PTR(OSThread)) cur = queue_; + while (cur != NULLPTR) { + PTR(OSThread)* cur_ptr = queue_to_ptr(PASS_RDRAM queue_); + if (*cur_ptr == t_) { + return true; + } + cur = TO_PTR(OSThread, *cur_ptr)->next; + } + + return false; +} + +bool ultramodern::thread_queue_empty(RDRAM_ARG PTR(PTR(OSThread)) queue_) { + PTR(OSThread)* queue = queue_to_ptr(PASS_RDRAM queue_); + return *queue == NULLPTR; +} + +PTR(OSThread) ultramodern::thread_queue_peek(RDRAM_ARG PTR(PTR(OSThread)) queue_) { + PTR(OSThread)* queue = queue_to_ptr(PASS_RDRAM queue_); + return *queue; +} diff --git a/ultramodern/threads.cpp b/ultramodern/threads.cpp index ddba4ff..f2d683a 100644 --- a/ultramodern/threads.cpp +++ b/ultramodern/threads.cpp @@ -5,6 +5,7 @@ #include "ultra64.h" #include "ultramodern.hpp" +#include "blockingconcurrentqueue.h" // Native APIs only used to set thread names for easier debugging #ifdef _WIN32 @@ -41,8 +42,6 @@ void run_thread_function(uint8_t* rdram, uint64_t addr, uint64_t sp, uint64_t ar #define run_thread_function(func, sp, arg) func(arg) #endif -struct thread_terminated : std::exception {}; - #if defined(_WIN32) void ultramodern::set_native_thread_name(const std::string& name) { std::wstring wname{name.begin(), name.end()}; @@ -135,14 +134,12 @@ static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entry permanent_threads.fetch_add(1); } - // Set initialized to false to indicate that this thread can be started. - self->context->initialized.store(true); - self->context->initialized.notify_all(); + // Signal the initialized semaphore to indicate that this thread can be started. + self->context->initialized.signal(); debug_printf("[Thread] Thread waiting to be started: %d\n", self->id); // Wait until the thread is marked as running. - ultramodern::set_self_paused(PASS_RDRAM1); ultramodern::wait_for_resumed(PASS_RDRAM1); debug_printf("[Thread] Thread started: %d\n", self->id); @@ -150,11 +147,13 @@ static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entry try { // Run the thread's function with the provided argument. run_thread_function(PASS_RDRAM entrypoint, self->sp, arg); - } catch (thread_terminated& terminated) { - + // The thread function terminated normally, so mark this thread as destroyed and run the next thread. + self->destroyed = true; + ultramodern::run_next_thread(PASS_RDRAM1); + } catch (ultramodern::thread_terminated& terminated) { } - // Dispose of this thread after it completes. + // Dispose of this thread after it completes and run the next queued thread. ultramodern::cleanup_thread(self); // TODO fix these being hardcoded (this is only used for quicksaving) @@ -175,15 +174,21 @@ extern "C" void osStartThread(RDRAM_ARG PTR(OSThread) t_) { OSThread* t = TO_PTR(OSThread, t_); debug_printf("[os] Start Thread %d\n", t->id); - // Wait until the thread is initialized to indicate that it's action_queued to be started. - t->context->initialized.wait(false); + // Wait until the thread is initialized to indicate that it's ready to be started. + t->context->initialized.wait(); debug_printf("[os] Thread %d is ready to be started\n", t->id); - if (thread_self && (t->priority > TO_PTR(OSThread, thread_self)->priority)) { - ultramodern::swap_to_thread(PASS_RDRAM t); - } else { - ultramodern::schedule_running_thread(t); + // If this is a game thread, insert the new thread into the running queue and then check the running queue. + if (thread_self) { + ultramodern::schedule_running_thread(PASS_RDRAM t_); + ultramodern::check_running_queue(PASS_RDRAM1); + } + // Otherwise, immediately start the thread and terminate this one. + else { + t->state = OSThreadState::QUEUED; + ultramodern::resume_thread(t); + //throw ultramodern::thread_terminated{}; } } @@ -192,17 +197,15 @@ extern "C" void osCreateThread(RDRAM_ARG PTR(OSThread) t_, OSId id, PTR(thread_f OSThread *t = TO_PTR(OSThread, t_); t->next = NULLPTR; + t->queue = NULLPTR; t->priority = pri; t->id = id; - t->state = OSThreadState::PAUSED; + t->state = OSThreadState::STOPPED; t->sp = sp - 0x10; // Set up the first stack frame t->destroyed = false; // Spawn a new thread, which will immediately pause itself and wait until it's been started. t->context = new UltraThreadContext{}; - t->context->initialized.store(false); - t->context->running.store(false); - t->context->host_thread = std::thread{_thread_func, PASS_RDRAM t_, entrypoint, arg}; } @@ -211,33 +214,47 @@ extern "C" void osStopThread(RDRAM_ARG PTR(OSThread) t_) { } extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) { - // Check if the thread is destroying itself (arg is null or thread_self) - if (t_ == NULLPTR || t_ == thread_self) { - throw thread_terminated{}; + if (t_ == NULLPTR) { + t_ = thread_self; } - // Otherwise, mark the target thread as destroyed. Next time it reaches a stopping point, - // it'll check this and terminate itself instead of pausing. - else { - OSThread* t = TO_PTR(OSThread, t_); + OSThread* t = TO_PTR(OSThread, t_); + // Check if the thread is destroying itself (arg is null or thread_self) + if (t_ == thread_self) { + // Check if the thread was destroyed by another thread. If it wasn't, then this thread destroyed itself and a new thread + // needs to be run. + if (!t->destroyed) { + t->destroyed = true; + ultramodern::run_next_thread(PASS_RDRAM1); + } + throw ultramodern::thread_terminated{}; + } + // Otherwise if the thread isn't stopped, remove it from its currrent queue., + if (t->state != OSThreadState::STOPPED) { + ultramodern::thread_queue_remove(PASS_RDRAM t->queue, t_); + } + // Check if the thread has already been destroyed to prevent destroying it again. + if (!t->destroyed) { + // Mark the target thread as destroyed and resume it. When it starts it'll check this and terminate itself instead of resuming. t->destroyed = true; + t->context->running.signal(); } } -extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) { - if (t == NULLPTR) { - t = thread_self; +extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t_, OSPri pri) { + if (t_ == NULLPTR) { + t_ = thread_self; } - bool pause_self = false; - if (pri > TO_PTR(OSThread, thread_self)->priority) { - pause_self = true; - ultramodern::set_self_paused(PASS_RDRAM1); - } else if (t == thread_self && pri < TO_PTR(OSThread, thread_self)->priority) { - pause_self = true; - ultramodern::set_self_paused(PASS_RDRAM1); - } - ultramodern::reprioritize_thread(TO_PTR(OSThread, t), pri); - if (pause_self) { - ultramodern::wait_for_resumed(PASS_RDRAM1); + OSThread* t = TO_PTR(OSThread, t_); + + if (t->priority != pri) { + t->priority = pri; + + if (t_ != ultramodern::this_thread() && t->state != OSThreadState::STOPPED) { + ultramodern::thread_queue_remove(PASS_RDRAM t->queue, t_); + ultramodern::thread_queue_insert(PASS_RDRAM t->queue, t_); + } + + ultramodern::check_running_queue(PASS_RDRAM1); } } @@ -255,42 +272,37 @@ extern "C" OSId osGetThreadId(RDRAM_ARG PTR(OSThread) t) { return TO_PTR(OSThread, t)->id; } -// TODO yield thread, need a stable priority queue in the scheduler - -void ultramodern::set_self_paused(RDRAM_ARG1) { - debug_printf("[Thread] Thread pausing itself: %d\n", TO_PTR(OSThread, thread_self)->id); - TO_PTR(OSThread, thread_self)->state = OSThreadState::PAUSED; - TO_PTR(OSThread, thread_self)->context->running.store(false); - TO_PTR(OSThread, thread_self)->context->running.notify_all(); -} - -void check_destroyed(OSThread* t) { - if (t->destroyed) { - throw thread_terminated{}; - } -} - -void ultramodern::wait_for_resumed(RDRAM_ARG1) { - check_destroyed(TO_PTR(OSThread, thread_self)); - TO_PTR(OSThread, thread_self)->context->running.wait(false); - check_destroyed(TO_PTR(OSThread, thread_self)); -} - -void ultramodern::pause_thread_impl(OSThread* t) { - t->state = OSThreadState::PREEMPTED; - t->context->running.store(false); - t->context->running.notify_all(); -} - -void ultramodern::resume_thread_impl(OSThread *t) { - if (t->state == OSThreadState::PREEMPTED) { - // Nothing to do here - } - t->state = OSThreadState::RUNNING; - t->context->running.store(true); - t->context->running.notify_all(); -} - PTR(OSThread) ultramodern::this_thread() { return thread_self; } + +static std::thread thread_cleaner_thread; +static moodycamel::BlockingConcurrentQueue deleted_threads{}; +extern std::atomic_bool exited; + +void thread_cleaner_func() { + using namespace std::chrono_literals; + while (!exited) { + OSThread* to_delete; + if (deleted_threads.wait_dequeue_timed(to_delete, 10ms)) { + printf("Deleting thread %d\n", to_delete->id); + UltraThreadContext* cur_context = to_delete->context; + to_delete->context = nullptr; + + cur_context->host_thread.join(); + delete cur_context; + } + } +} + +void ultramodern::init_thread_cleanup() { + thread_cleaner_thread = std::thread{thread_cleaner_func}; +} + +void ultramodern::cleanup_thread(OSThread *t) { + deleted_threads.enqueue(t); +} + +void ultramodern::join_thread_cleaner_thread() { + thread_cleaner_thread.join(); +} diff --git a/ultramodern/ultra64.h b/ultramodern/ultra64.h index a056031..312c975 100644 --- a/ultramodern/ultra64.h +++ b/ultramodern/ultra64.h @@ -3,10 +3,6 @@ #include -#ifdef __cplusplus -#include -#endif - #ifdef __GNUC__ #define UNUSED __attribute__((unused)) #define ALIGNED(x) __attribute__((aligned(x))) @@ -24,14 +20,28 @@ typedef uint16_t u16; typedef int8_t s8; typedef uint8_t u8; -#define PTR(x) int32_t -#define RDRAM_ARG uint8_t *rdram, -#define RDRAM_ARG1 uint8_t *rdram -#define PASS_RDRAM rdram, -#define PASS_RDRAM1 rdram -#define TO_PTR(type, var) ((type*)(&rdram[(uint64_t)var - 0xFFFFFFFF80000000])) -#ifdef __cplusplus -#define NULLPTR (PTR(void))0 +#if 0 // For native compilation +# define PTR(x) x* +# define RDRAM_ARG +# define RDRAM_ARG1 +# define PASS_RDRAM +# define PASS_RDRAM1 +# define TO_PTR(type, var) var +# define GET_MEMBER(type, addr, member) (&addr->member) +# ifdef __cplusplus +# define NULLPTR nullptr +# endif +#else +# define PTR(x) int32_t +# define RDRAM_ARG uint8_t *rdram, +# define RDRAM_ARG1 uint8_t *rdram +# define PASS_RDRAM rdram, +# define PASS_RDRAM1 rdram +# define TO_PTR(type, var) ((type*)(&rdram[(uint64_t)var - 0xFFFFFFFF80000000])) +# define GET_MEMBER(type, addr, member) (addr + (intptr_t)&(((type*)nullptr)->member)) +# ifdef __cplusplus +# define NULLPTR (PTR(void))0 +# endif #endif #ifndef NULL @@ -76,18 +86,16 @@ typedef u64 OSTime; typedef struct UltraThreadContext UltraThreadContext; typedef enum { - RUNNING, - PAUSED, STOPPED, - BLOCKED_PAUSED, - BLOCKED_STOPPED, - PREEMPTED + QUEUED, + RUNNING, + BLOCKED } OSThreadState; typedef struct OSThread_t { PTR(struct OSThread_t) next; // Next thread in the given queue OSPri priority; - uint32_t pad1; + PTR(PTR(struct OSThread_t)) queue; // Queue this thread is in, if any uint32_t pad2; uint16_t flags; // These two are swapped to reflect rdram byteswapping uint16_t state; @@ -226,10 +234,6 @@ void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri); OSPri osGetThreadPri(RDRAM_ARG PTR(OSThread) thread); OSId osGetThreadId(RDRAM_ARG PTR(OSThread) t); -s32 MQ_GET_COUNT(RDRAM_ARG PTR(OSMesgQueue)); -s32 MQ_IS_EMPTY(RDRAM_ARG PTR(OSMesgQueue)); -s32 MQ_IS_FULL(RDRAM_ARG PTR(OSMesgQueue)); - void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue), PTR(OSMesg), s32); s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue), OSMesg, s32); s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue), OSMesg, s32); diff --git a/ultramodern/ultrainit.cpp b/ultramodern/ultrainit.cpp index 21bbedd..3e2cfff 100644 --- a/ultramodern/ultrainit.cpp +++ b/ultramodern/ultrainit.cpp @@ -7,8 +7,8 @@ void ultramodern::preinit(uint8_t* rdram, ultramodern::WindowHandle window_handl ultramodern::init_timers(rdram); ultramodern::init_audio(); ultramodern::save_init(); + ultramodern::init_thread_cleanup(); } extern "C" void osInitialize() { - ultramodern::init_scheduler(); } diff --git a/ultramodern/ultramodern.hpp b/ultramodern/ultramodern.hpp index 667c8cb..3ce916b 100644 --- a/ultramodern/ultramodern.hpp +++ b/ultramodern/ultramodern.hpp @@ -2,10 +2,12 @@ #define __ultramodern_HPP__ #include -#include -#include -#include +#include +#include +#undef MOODYCAMEL_DELETE_FUNCTION +#define MOODYCAMEL_DELETE_FUNCTION = delete +#include "lightweightsemaphore.h" #include "ultra64.h" #if defined(_WIN32) @@ -22,8 +24,8 @@ struct UltraThreadContext { std::thread host_thread; - std::atomic_bool running; - std::atomic_bool initialized; + moodycamel::LightweightSemaphore running; + moodycamel::LightweightSemaphore initialized; }; namespace ultramodern { @@ -51,26 +53,36 @@ constexpr int32_t cart_handle = 0x80800000; constexpr int32_t flash_handle = (int32_t)(cart_handle + sizeof(OSPiHandle)); constexpr uint32_t save_size = 1024 * 1024 / 8; // Maximum save size, 1Mbit for flash +// Initialization. void preinit(uint8_t* rdram, WindowHandle window_handle); void save_init(); -void init_scheduler(); void init_events(uint8_t* rdram, WindowHandle window_handle); void init_timers(RDRAM_ARG1); -void set_self_paused(RDRAM_ARG1); -void yield_self(RDRAM_ARG1); -void block_self(RDRAM_ARG1); -void unblock_thread(OSThread* t); +void init_thread_cleanup(); + +// Thread queues. +constexpr PTR(PTR(OSThread)) running_queue = (PTR(PTR(OSThread)))-1; + +void thread_queue_insert(RDRAM_ARG PTR(PTR(OSThread)) queue, PTR(OSThread) toadd); +PTR(OSThread) thread_queue_pop(RDRAM_ARG PTR(PTR(OSThread)) queue); +bool thread_queue_remove(RDRAM_ARG PTR(PTR(OSThread)) queue_, PTR(OSThread) t_); +bool thread_queue_empty(RDRAM_ARG PTR(PTR(OSThread)) queue); +PTR(OSThread) thread_queue_peek(RDRAM_ARG PTR(PTR(OSThread)) queue); + +// Message queues. +void wait_for_external_message(RDRAM_ARG1); + +// Thread scheduling. +void check_running_queue(RDRAM_ARG1); void wait_for_resumed(RDRAM_ARG1); +void run_next_thread(RDRAM_ARG1); void swap_to_thread(RDRAM_ARG OSThread *to); -void pause_thread_impl(OSThread *t); -void resume_thread_impl(OSThread* t); -void schedule_running_thread(OSThread *t); -void pause_self(RDRAM_ARG1); -void halt_self(RDRAM_ARG1); -void stop_thread(OSThread *t); +void resume_thread(OSThread* t); +void schedule_running_thread(RDRAM_ARG PTR(OSThread) t); void cleanup_thread(OSThread *t); uint32_t permanent_thread_count(); uint32_t temporary_thread_count(); +struct thread_terminated : std::exception {}; enum class ThreadPriority { Low, @@ -83,24 +95,24 @@ enum class ThreadPriority { void set_native_thread_name(const std::string& name); void set_native_thread_priority(ThreadPriority pri); PTR(OSThread) this_thread(); -void disable_preemption(); -void enable_preemption(); -void notify_scheduler(); -void reprioritize_thread(OSThread *t, OSPri pri); void set_main_thread(); bool is_game_thread(); void submit_rsp_task(RDRAM_ARG PTR(OSTask) task); void send_si_message(); uint32_t get_speed_multiplier(); + +// Time std::chrono::high_resolution_clock::time_point get_start(); std::chrono::high_resolution_clock::duration time_since_start(); -void get_window_size(uint32_t& width, uint32_t& height); -uint32_t get_target_framerate(uint32_t original); -uint32_t get_display_refresh_rate(); void measure_input_latency(); void sleep_milliseconds(uint32_t millis); void sleep_until(const std::chrono::high_resolution_clock::time_point& time_point); +// Graphics +void get_window_size(uint32_t& width, uint32_t& height); +uint32_t get_target_framerate(uint32_t original); +uint32_t get_display_refresh_rate(); + // Audio void init_audio(); void set_audio_frequency(uint32_t freq); @@ -138,6 +150,7 @@ struct gfx_callbacks_t { bool is_game_started(); void quit(); void join_event_threads(); +void join_thread_cleaner_thread(); } // namespace ultramodern