diff --git a/Source/Core/Core/NetPlayClient.cpp b/Source/Core/Core/NetPlayClient.cpp index b2012c10ce..2e792233fc 100644 --- a/Source/Core/Core/NetPlayClient.cpp +++ b/Source/Core/Core/NetPlayClient.cpp @@ -416,7 +416,6 @@ unsigned int NetPlayClient::OnData(sf::Packet& packet) spac << (MessageId)NP_MSG_PONG; spac << ping_key; - std::lock_guard lks(m_crit.send); Send(spac); } break; @@ -480,6 +479,33 @@ void NetPlayClient::Disconnect() m_server = nullptr; } +void NetPlayClient::RunOnThread(std::function func) +{ + { + std::lock_guard lkq(m_crit.run_queue_write); + m_run_queue.Push(func); + } + WakeupThread(m_client); +} + +void NetPlayClient::WakeupThread(ENetHost* host) +{ + // Send ourselves a spurious message. This is hackier than it should be. + // comex reported this as https://github.com/lsalzman/enet/issues/23, so + // hopefully there will be a better way to do it in the future. + ENetAddress address; + if (host->address.port != 0) + address.port = host->address.port; + else + enet_socket_get_address(host->socket, &address); + address.host = 0x0100007f; // localhost + u8 byte = 0; + ENetBuffer buf; + buf.data = &byte; + buf.dataLength = 1; + enet_socket_send(host->socket, &address, &buf, 1); +} + // called from ---NETPLAY--- thread void NetPlayClient::ThreadFunc() { @@ -487,11 +513,13 @@ void NetPlayClient::ThreadFunc() { ENetEvent netEvent; int net; + if (m_traversal_client) + m_traversal_client->HandleResends(); + net = enet_host_service(m_client, &netEvent, 4); + while (!m_run_queue.Empty()) { - std::lock_guard lks(m_crit.send); - if (m_traversal_client) - m_traversal_client->HandleResends(); - net = enet_host_service(m_client, &netEvent, 4); + m_run_queue.Front()(); + m_run_queue.Pop(); } if (net > 0) { @@ -517,7 +545,6 @@ void NetPlayClient::ThreadFunc() break; } } - } Disconnect(); @@ -577,57 +604,57 @@ void NetPlayClient::GetPlayers(std::vector &player_list) // called from ---GUI--- thread void NetPlayClient::SendChatMessage(const std::string& msg) { - sf::Packet spac; - spac << (MessageId)NP_MSG_CHAT_MESSAGE; - spac << msg; - - std::lock_guard lks(m_crit.send); - Send(spac); + RunOnThread([msg, this]() { + sf::Packet spac; + spac << (MessageId)NP_MSG_CHAT_MESSAGE; + spac << msg; + Send(spac); + }); } // called from ---CPU--- thread void NetPlayClient::SendPadState(const PadMapping in_game_pad, const GCPadStatus& pad) { - // send to server sf::Packet spac; spac << (MessageId)NP_MSG_PAD_DATA; spac << in_game_pad; spac << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight; - std::lock_guard lks(m_crit.send); - Send(spac); + RunOnThread([spac, this]() mutable { + // send to server + Send(spac); + }); } // called from ---CPU--- thread void NetPlayClient::SendWiimoteState(const PadMapping in_game_pad, const NetWiimote& nw) { - // send to server - sf::Packet spac; - spac << (MessageId)NP_MSG_WIIMOTE_DATA; - spac << in_game_pad; - spac << (u8)nw.size(); - for (auto it : nw) - { - spac << it; - } - - std::lock_guard lks(m_crit.send); - Send(spac); + RunOnThread([=]() { + // send to server + sf::Packet spac; + spac << (MessageId)NP_MSG_WIIMOTE_DATA; + spac << in_game_pad; + spac << (u8)nw.size(); + for (auto it : nw) + { + spac << it; + } + Send(spac); + }); } // called from ---GUI--- thread bool NetPlayClient::StartGame(const std::string &path) { - std::lock_guard lkg(m_crit.game); - - // tell server i started the game - sf::Packet spac; - spac << (MessageId)NP_MSG_START_GAME; - spac << m_current_game; - spac << (char *)&g_NetPlaySettings; - - std::lock_guard lks(m_crit.send); - Send(spac); + RunOnThread([this](){ + std::lock_guard lkg(m_crit.game); + // tell server i started the game + sf::Packet spac; + spac << (MessageId)NP_MSG_START_GAME; + spac << m_current_game; + spac << (char *)&g_NetPlaySettings; + Send(spac); + }); if (m_is_running) { @@ -954,6 +981,7 @@ bool NetPlayClient::StopGame() return true; } +// called from ---GUI--- thread void NetPlayClient::Stop() { if (m_is_running == false) @@ -976,9 +1004,11 @@ void NetPlayClient::Stop() // tell the server to stop if we have a pad mapped in game. if (isPadMapped) { - sf::Packet spac; - spac << (MessageId)NP_MSG_STOP_GAME; - Send(spac); + RunOnThread([this](){ + sf::Packet spac; + spac << (MessageId)NP_MSG_STOP_GAME; + Send(spac); + }); } } diff --git a/Source/Core/Core/NetPlayClient.h b/Source/Core/Core/NetPlayClient.h index ab5763d313..7b2e3e159a 100644 --- a/Source/Core/Core/NetPlayClient.h +++ b/Source/Core/Core/NetPlayClient.h @@ -47,6 +47,8 @@ class NetPlayClient : public TraversalClientClient { public: void ThreadFunc(); + void RunOnThread(std::function func); + void WakeupThread(ENetHost* host); NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog, const std::string& name, bool traversal, std::string centralServer, u16 centralPort); ~NetPlayClient(); @@ -92,9 +94,12 @@ protected: { std::recursive_mutex game; // lock order - std::recursive_mutex players, send; + std::recursive_mutex players; + std::recursive_mutex run_queue_write; } m_crit; + Common::FifoQueue, false> m_run_queue; + Common::FifoQueue m_pad_buffer[4]; Common::FifoQueue m_wiimote_buffer[4]; diff --git a/Source/Core/Core/NetPlayServer.cpp b/Source/Core/Core/NetPlayServer.cpp index 1dcd5c1c8f..7b96b14846 100644 --- a/Source/Core/Core/NetPlayServer.cpp +++ b/Source/Core/Core/NetPlayServer.cpp @@ -92,7 +92,6 @@ NetPlayServer::NetPlayServer(const u16 port, bool traversal, std::string central serverAddr.port = port; m_server = enet_host_create(&serverAddr, 10, 3, 0, 0); } - if (m_server != nullptr) { is_connected = true; @@ -117,7 +116,6 @@ void NetPlayServer::ThreadFunc() spac << (MessageId)NP_MSG_PING; spac << m_ping_key; - std::lock_guard lks(m_crit.send); m_ping_timer.Start(); SendToClients(spac); m_update_pings = false; @@ -125,11 +123,13 @@ void NetPlayServer::ThreadFunc() ENetEvent netEvent; int net; + if (m_traversal_client) + m_traversal_client->HandleResends(); + net = enet_host_service(m_server, &netEvent, 1000); + while (!m_run_queue.Empty()) { - std::lock_guard lks(m_crit.send); - if (m_traversal_client) - m_traversal_client->HandleResends(); - net = enet_host_service(m_server, &netEvent, 4); + m_run_queue.Front()(); + m_run_queue.Pop(); } if (net > 0) { @@ -149,7 +149,6 @@ void NetPlayServer::ThreadFunc() sf::Packet spac; spac << (MessageId)error; // don't need to lock, this client isn't in the client map - std::lock_guard lks(m_crit.send); Send(accept_peer, spac); if (netEvent.peer->data) { @@ -272,52 +271,46 @@ unsigned int NetPlayServer::OnConnect(ENetPeer* socket) } } + // send join message to already connected clients + sf::Packet spac; + spac << (MessageId)NP_MSG_PLAYER_JOIN; + spac << player.pid << player.name << player.revision; + SendToClients(spac); + + // send new client success message with their id + spac.clear(); + spac << (MessageId)0; + spac << player.pid; + Send(player.socket, spac); + + // send new client the selected game + if (m_selected_game != "") { - std::lock_guard lks(m_crit.send); + spac.clear(); + spac << (MessageId)NP_MSG_CHANGE_GAME; + spac << m_selected_game; + Send(player.socket, spac); + } - // send join message to already connected clients - sf::Packet spac; + // send the pad buffer value + spac.clear(); + spac << (MessageId)NP_MSG_PAD_BUFFER; + spac << (u32)m_target_buffer_size; + Send(player.socket, spac); + + // sync values with new client + for (const auto& p : m_players) + { + spac.clear(); spac << (MessageId)NP_MSG_PLAYER_JOIN; - spac << player.pid << player.name << player.revision; - SendToClients(spac); - - // send new client success message with their id - spac.clear(); - spac << (MessageId)0; - spac << player.pid; + spac << p.second.pid << p.second.name << p.second.revision; Send(player.socket, spac); - - // send new client the selected game - if (m_selected_game != "") - { - spac.clear(); - spac << (MessageId)NP_MSG_CHANGE_GAME; - spac << m_selected_game; - Send(player.socket, spac); - } - - // send the pad buffer value - spac.clear(); - spac << (MessageId)NP_MSG_PAD_BUFFER; - spac << (u32)m_target_buffer_size; - Send(player.socket, spac); - - // sync values with new client - for (const auto& p : m_players) - { - spac.clear(); - spac << (MessageId)NP_MSG_PLAYER_JOIN; - spac << p.second.pid << p.second.name << p.second.revision; - Send(player.socket, spac); - } - - } // unlock send + } // add client to the player list { std::lock_guard lkp(m_crit.players); m_players.insert(std::pair(*(PlayerId *)player.socket->data, player)); - std::lock_guard lks(m_crit.send); UpdatePadMapping(); // sync pad mappings with everyone UpdateWiimoteMapping(); } @@ -343,7 +336,6 @@ unsigned int NetPlayServer::OnDisconnect(Client& player) sf::Packet spac; spac << (MessageId)NP_MSG_DISABLE_GAME; // this thread doesn't need players lock - std::lock_guard lks(m_crit.send); SendToClients(spac, 1); break; } @@ -362,7 +354,6 @@ unsigned int NetPlayServer::OnDisconnect(Client& player) m_players.erase(it); // alert other players of disconnect - std::lock_guard lks(m_crit.send); SendToClients(spac); for (PadMapping& mapping : m_pad_map) @@ -451,9 +442,37 @@ void NetPlayServer::AdjustPadBufferSize(unsigned int size) spac << (MessageId)NP_MSG_PAD_BUFFER; spac << (u32)m_target_buffer_size; - std::lock_guard lkp(m_crit.players); - std::lock_guard lks(m_crit.send); - SendToClients(spac); + RunOnThread([spac, this]() mutable { + std::lock_guard lkp(m_crit.players); + SendToClients(spac); + }); +} + +void NetPlayServer::RunOnThread(std::function func) +{ + { + std::lock_guard lkq(m_crit.run_queue_write); + m_run_queue.Push(func); + } + WakeupThread(m_server); +} + +void NetPlayServer::WakeupThread(ENetHost* host) +{ + // Send ourselves a spurious message. This is hackier than it should be. + // comex reported this as https://github.com/lsalzman/enet/issues/23, so + // hopefully there will be a better way to do it in the future. + ENetAddress address; + if (host->address.port != 0) + address.port = host->address.port; + else + enet_socket_get_address(host->socket, &address); + address.host = 0x0100007f; // localhost + u8 byte = 0; + ENetBuffer buf; + buf.data = &byte; + buf.dataLength = 1; + enet_socket_send(host->socket, &address, &buf, 1); } // called from ---NETPLAY--- thread @@ -478,10 +497,7 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player) spac << player.pid; spac << msg; - { - std::lock_guard lks(m_crit.send); - SendToClients(spac, player.pid); - } + SendToClients(spac, player.pid); } break; @@ -505,7 +521,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player) spac << (MessageId)NP_MSG_PAD_DATA; spac << map << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight; - std::lock_guard lks(m_crit.send); SendToClients(spac, player.pid); } break; @@ -538,7 +553,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player) for (const u8& byte : data) spac << byte; - std::lock_guard lks(m_crit.send); SendToClients(spac, player.pid); } break; @@ -559,7 +573,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player) spac << player.pid; spac << player.ping; - std::lock_guard lks(m_crit.send); SendToClients(spac); } break; @@ -577,7 +590,6 @@ unsigned int NetPlayServer::OnData(sf::Packet& packet, Client& player) spac << (MessageId)NP_MSG_STOP_GAME; std::lock_guard lkp(m_crit.players); - std::lock_guard lks(m_crit.send); SendToClients(spac); m_is_running = false; @@ -601,7 +613,7 @@ void NetPlayServer::OnTraversalStateChanged() m_dialog->Update(); } -// called from ---GUI--- thread / and ---NETPLAY--- thread +// called from ---GUI--- thread void NetPlayServer::SendChatMessage(const std::string& msg) { sf::Packet spac; @@ -609,9 +621,10 @@ void NetPlayServer::SendChatMessage(const std::string& msg) spac << (PlayerId)0; // server id always 0 spac << msg; - std::lock_guard lkp(m_crit.players); - std::lock_guard lks(m_crit.send); - SendToClients(spac); + RunOnThread([spac, this]() mutable { + std::lock_guard lkp(m_crit.players); + SendToClients(spac); + }); } // called from ---GUI--- thread @@ -626,9 +639,10 @@ bool NetPlayServer::ChangeGame(const std::string &game) spac << (MessageId)NP_MSG_CHANGE_GAME; spac << game; - std::lock_guard lkp(m_crit.players); - std::lock_guard lks(m_crit.send); - SendToClients(spac); + RunOnThread([spac, this]() mutable { + std::lock_guard lkp(m_crit.players); + SendToClients(spac); + }); return true; } @@ -666,9 +680,10 @@ bool NetPlayServer::StartGame() spac << (u32)g_netplay_initial_gctime; spac << (u32)g_netplay_initial_gctime << 32; - std::lock_guard lkp(m_crit.players); - std::lock_guard lks(m_crit.send); - SendToClients(spac); + RunOnThread([spac, this]() mutable { + std::lock_guard lkp(m_crit.players); + SendToClients(spac); + }); m_is_running = true; diff --git a/Source/Core/Core/NetPlayServer.h b/Source/Core/Core/NetPlayServer.h index 5e8ac76411..21f7ea9ede 100644 --- a/Source/Core/Core/NetPlayServer.h +++ b/Source/Core/Core/NetPlayServer.h @@ -20,6 +20,8 @@ class NetPlayServer : public TraversalClientClient { public: void ThreadFunc(); + void RunOnThread(std::function func); + void WakeupThread(ENetHost* host); NetPlayServer(const u16 port, bool traversal, std::string centralServer, u16 centralPort); ~NetPlayServer(); @@ -101,11 +103,13 @@ private: { std::recursive_mutex game; // lock order - std::recursive_mutex players, send; + std::recursive_mutex players; + std::recursive_mutex run_queue_write; } m_crit; std::string m_selected_game; std::thread m_thread; + Common::FifoQueue, false> m_run_queue; ENetHost* m_server; TraversalClient* m_traversal_client;