Refactor Condition Variable Waiting/Signalling

The way we handled waking/timeouts of condition variables was fairly inaccurate to HOS as we moved locking of the mutex to the waker thread which could change the order of operations and would cause what were functionally spurious wakeups for all awoken threads.

This commit fixes it by doing all locks on the waker thread and only awakening the waiter thread once the condition variable was signalled and the mutex was unlocked. In addition, this fixes races between a timeout and a signal that could lead to double-insertion as a result of a refactor of how timeouts work in the new system.
This commit is contained in:
PixelyIon 2022-11-26 01:43:42 +05:30 committed by Mark Collins
parent 1eb4eec103
commit f487d81769
2 changed files with 137 additions and 27 deletions

View File

@ -218,6 +218,16 @@ namespace skyline::kernel::type {
Result KProcess::ConditionalVariableWait(u32 *key, u32 *mutex, KHandle tag, i64 timeout) {
TRACE_EVENT_FMT("kernel", "ConditionalVariableWait 0x{:X} (0x{:X})", key, mutex);
{
// Update all waiter information
std::unique_lock lock{state.thread->waiterMutex};
state.thread->waitKey = mutex;
state.thread->waitTag = tag;
state.thread->waitConditionVariable = key;
state.thread->waitSignalled = false;
state.thread->waitResult = {};
}
{
std::scoped_lock lock{syncWaiterMutex};
auto queue{syncWaiters.equal_range(key)};
@ -230,15 +240,72 @@ namespace skyline::kernel::type {
}
if (timeout > 0 && !state.scheduler->TimedWaitSchedule(std::chrono::nanoseconds(timeout))) {
bool inQueue{true};
{
std::unique_lock lock(syncWaiterMutex);
// Attempt to remove ourselves from the queue so we cannot be signalled
std::unique_lock syncLock{syncWaiterMutex};
auto queue{syncWaiters.equal_range(key)};
auto iterator{std::find(queue.first, queue.second, SyncWaiters::value_type{key, state.thread})};
if (iterator != queue.second)
if (syncWaiters.erase(iterator) == queue.second)
__atomic_store_n(key, false, __ATOMIC_SEQ_CST);
syncWaiters.erase(iterator);
else
inQueue = false;
}
bool shouldWait{false};
if (!inQueue) {
// If we weren't in the queue then we need to check if we were signalled already
while (true) {
std::unique_lock lock{state.thread->waiterMutex};
if (state.thread->waitSignalled) {
if (state.thread->waitKey) {
auto waitThread{state.thread->waitThread};
std::unique_lock waitLock{waitThread->waiterMutex, std::try_to_lock};
if (!waitLock) {
// If we can't lock the waitThread's waiterMutex then we need to wait without holding the current thread's waiterMutex to avoid a deadlock
lock.unlock();
waitLock.lock();
continue;
}
auto &waiters{waitThread->waiters};
auto it{std::find(waiters.begin(), waiters.end(), state.thread)};
if (it != waiters.end()) {
// If we were signalled but are waiting on locking the associated mutex then we need to cancel our wait
waiters.erase(it);
state.thread->UpdatePriorityInheritance();
state.thread->waitKey = nullptr;
state.thread->waitTag = 0;
state.thread->waitThread = nullptr;
} else {
// If we were signalled and are no longer waiting on the associated mutex then we're already scheduled
shouldWait = true;
}
} else {
// If the waitKey is null then we were signalled and are no longer waiting on the associated mutex
shouldWait = true;
}
} else {
// If we were in the process of being signalled but prior to the mutex being locked then we can just cancel our wait
state.thread->waitConditionVariable = nullptr;
state.thread->waitSignalled = true;
}
break;
}
} else {
// If we were in the queue then we can just cancel our wait
state.thread->waitConditionVariable = nullptr;
state.thread->waitSignalled = true;
}
if (shouldWait) {
// Wait if we've been signalled in the meantime as it would be problematic to double insert a thread into the scheduler
state.scheduler->WaitSchedule();
return state.thread->waitResult;
}
state.scheduler->InsertThread(state.thread);
state.scheduler->WaitSchedule();
@ -247,38 +314,78 @@ namespace skyline::kernel::type {
state.scheduler->WaitSchedule();
}
KHandle value{};
if (!__atomic_compare_exchange_n(mutex, &value, tag, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
while (MutexLock(state.thread, mutex, value & ~HandleWaitersBit, tag, true) != Result{})
if ((value = __atomic_or_fetch(mutex, HandleWaitersBit, __ATOMIC_SEQ_CST)) == HandleWaitersBit)
if (__atomic_compare_exchange_n(mutex, &value, tag, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
break;
return {};
return state.thread->waitResult;
}
void KProcess::ConditionalVariableSignal(u32 *key, i32 amount) {
TRACE_EVENT_FMT("kernel", "ConditionalVariableSignal 0x{:X}", key);
i32 waiterCount{amount};
std::shared_ptr<type::KThread> thread;
do {
if (thread) {
state.scheduler->InsertThread(thread);
thread = {};
while (amount <= 0 || waiterCount) {
std::shared_ptr<type::KThread> thread;
void *conditionVariable{};
{
// Try to find a thread to signal
std::scoped_lock lock{syncWaiterMutex};
auto queue{syncWaiters.equal_range(key)};
if (queue.first != queue.second) {
// If we found a thread then we need to remove it from the queue
thread = queue.first->second;
conditionVariable = thread->waitConditionVariable;
#ifndef NDEBUG
if (conditionVariable != key)
Logger::Warn("Condition variable mismatch: 0x{:X} != 0x{:X}", conditionVariable, key);
#endif
syncWaiters.erase(queue.first);
waiterCount--;
} else if (queue.first == queue.second) {
// If we didn't find a thread then we need to clear the boolean flag denoting that there are no more threads waiting on this conditional variable
__atomic_store_n(key, false, __ATOMIC_SEQ_CST);
break;
}
}
std::scoped_lock lock{syncWaiterMutex};
auto queue{syncWaiters.equal_range(key)};
std::scoped_lock lock{thread->waiterMutex};
if (thread->waitConditionVariable == conditionVariable) {
// If the thread is still waiting on the same condition variable then we can signal it (It could no longer be waiting due to a timeout)
u32 *mutex{thread->waitKey};
KHandle tag{thread->waitTag};
if (queue.first != queue.second && (amount <= 0 || waiterCount)) {
thread = queue.first->second;
syncWaiters.erase(queue.first);
waiterCount--;
} else if (queue.first == queue.second) {
__atomic_store_n(key, false, __ATOMIC_SEQ_CST); // We need to update the boolean flag denoting that there are no more threads waiting on this conditional variable
while (true) {
// We need to lock the mutex before the thread can be scheduled
KHandle value{};
if (__atomic_compare_exchange_n(mutex, &value, tag, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
// A quick CAS to lock the mutex for the thread, we can just schedule the thread if we succeed
state.scheduler->InsertThread(thread);
break;
}
if ((value & HandleWaitersBit) == 0)
// Set the waiters bit in the mutex if it wasn't already set
if (!__atomic_compare_exchange_n(mutex, &value, value | HandleWaitersBit, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
continue; // If we failed to set the waiters bit due to an outdated value then try again
// If we couldn't CAS the lock then we need to let the mutex holder schedule the thread instead of us during an unlock
auto result{MutexLock(thread, mutex, value & ~HandleWaitersBit, tag, true)};
if (result == result::InvalidCurrentMemory) {
continue;
} else if (result == result::InvalidHandle) {
thread->waitResult = result::InvalidState;
state.scheduler->InsertThread(thread);
} else if (result != Result{}) {
throw exception("Failed to lock mutex: 0x{:X}", result);
}
break;
}
// Update the thread's wait state to avoid incorrect timeout cancellation behavior
thread->waitConditionVariable = nullptr;
thread->waitSignalled = true;
thread->waitResult = {};
}
} while (thread);
}
}
Result KProcess::WaitForAddress(u32 *address, u32 value, i64 timeout, ArbitrationType type) {

View File

@ -62,11 +62,14 @@ namespace skyline {
bool pendingYield{}; //!< If the thread has been yielded and hasn't been acted upon it yet
bool forceYield{}; //!< If the thread has been forcefully yielded by another thread
std::mutex waiterMutex; //!< Synchronizes operations on mutation of the waiter members
std::recursive_mutex waiterMutex; //!< Synchronizes operations on mutation of the waiter members
u32 *waitKey; //!< The key of the mutex which this thread is waiting on
KHandle waitTag; //!< The handle of the thread which requested the mutex lock
std::shared_ptr<KThread> waitThread; //!< The thread which this thread is waiting on
std::list<std::shared_ptr<type::KThread>> waiters; //!< A queue of threads waiting on this thread sorted by priority
void *waitConditionVariable; //!< The condition variable which this thread is waiting on
bool waitSignalled{}; //!< If the conditional variable has been signalled already
Result waitResult; //!< The result of the wait operation
bool isCancellable{false}; //!< If the thread is currently in a position where it's cancellable
bool cancelSync{false}; //!< Whether to cancel the SvcWaitSynchronization call this thread currently is in/the next one it joins