Zelda64Recomp/ultramodern/mesgqueue.cpp

189 lines
6.4 KiB
C++

#include <thread>
#include "blockingconcurrentqueue.h"
#include "ultra64.h"
#include "ultramodern.hpp"
#include "recomp.h"
struct QueuedMessage {
PTR(OSMesgQueue) mq;
OSMesg mesg;
bool jam;
};
static moodycamel::BlockingConcurrentQueue<QueuedMessage> external_messages {};
void enqueue_external_message(PTR(OSMesgQueue) mq, OSMesg msg, bool jam) {
external_messages.enqueue({mq, msg, jam});
}
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block);
void dequeue_external_messages(RDRAM_ARG1) {
QueuedMessage to_send;
while (external_messages.try_dequeue(to_send)) {
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
}
}
void ultramodern::wait_for_external_message(RDRAM_ARG1) {
QueuedMessage to_send;
external_messages.wait_dequeue(to_send);
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
}
extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
mq->blocked_on_recv = NULLPTR;
mq->blocked_on_send = NULLPTR;
mq->msgCount = count;
mq->msg = msg;
mq->validCount = 0;
mq->first = 0;
}
s32 MQ_GET_COUNT(OSMesgQueue *mq) {
return mq->validCount;
}
s32 MQ_IS_EMPTY(OSMesgQueue *mq) {
return mq->validCount == 0;
}
s32 MQ_IS_FULL(OSMesgQueue* mq) {
return MQ_GET_COUNT(mq) >= mq->msgCount;
}
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block) {
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
if (!block) {
// If non-blocking, fail if the queue is full.
if (MQ_IS_FULL(mq)) {
return false;
}
}
else {
// Otherwise, yield this thread until the queue has room.
while (MQ_IS_FULL(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_send), ultramodern::this_thread());
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
}
}
if (jam) {
// Jams insert at the head of the message queue's buffer.
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
mq->validCount++;
}
else {
// Sends insert at the tail of the message queue's buffer.
s32 last = (mq->first + mq->validCount) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[last] = msg;
mq->validCount++;
}
// If any threads were blocked on receiving from this message queue, pop the first one and schedule it.
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv);
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
}
return true;
}
bool do_recv(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, bool block) {
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
if (!block) {
// If non-blocking, fail if the queue is empty
if (MQ_IS_EMPTY(mq)) {
return false;
}
} else {
// Otherwise, yield this thread in a loop until the queue is no longer full
while (MQ_IS_EMPTY(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv), ultramodern::this_thread());
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
}
}
if (msg_ != NULLPTR) {
*TO_PTR(OSMesg, msg_) = TO_PTR(OSMesg, mq->msg)[mq->first];
}
mq->first = (mq->first + 1) % mq->msgCount;
mq->validCount--;
// If any threads were blocked on sending to this message queue, pop the first one and schedule it.
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_send);
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
}
return true;
}
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
bool jam = false;
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
if (!ultramodern::is_game_thread()) {
enqueue_external_message(mq_, msg, jam);
return 0;
}
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to send the message.
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return sent ? 0 : -1;
}
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
bool jam = true;
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
if (!ultramodern::is_game_thread()) {
enqueue_external_message(mq_, msg, jam);
return 0;
}
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to send the message.
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return sent ? 0 : -1;
}
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
assert(ultramodern::is_game_thread() && "RecvMesg not allowed outside of game threads.");
// Handle any messages that have been received from an external thread.
dequeue_external_messages(PASS_RDRAM1);
// Try to receive a message.
bool received = do_recv(PASS_RDRAM mq_, msg_, flags == OS_MESG_BLOCK);
// Check the queue to see if this thread should swap execution to another.
ultramodern::check_running_queue(PASS_RDRAM1);
return received ? 0 : -1;
}