NetPlay: Remove RunOnThread and add SendAsync methods

Add std::unique_ptr<sf::Packet> objects to a queue instead of functions,
makes things easier to read, and avoids headaches while checking the
lifetime of the concerned objects.
This commit is contained in:
mathieui 2015-03-13 02:03:09 +01:00
parent 44d7207a1c
commit 8ee402863d
4 changed files with 77 additions and 98 deletions

View File

@ -479,11 +479,11 @@ void NetPlayClient::Disconnect()
m_server = nullptr; m_server = nullptr;
} }
void NetPlayClient::RunOnThread(std::function<void()> func) void NetPlayClient::SendAsync(sf::Packet* packet)
{ {
{ {
std::lock_guard<std::recursive_mutex> lkq(m_crit.run_queue_write); std::lock_guard<std::recursive_mutex> lkq(m_crit.async_queue_write);
m_run_queue.Push(func); m_async_queue.Push(std::unique_ptr<sf::Packet>(packet));
} }
ENetUtil::WakeupThread(m_client); ENetUtil::WakeupThread(m_client);
} }
@ -498,10 +498,10 @@ void NetPlayClient::ThreadFunc()
if (m_traversal_client) if (m_traversal_client)
m_traversal_client->HandleResends(); m_traversal_client->HandleResends();
net = enet_host_service(m_client, &netEvent, 4); net = enet_host_service(m_client, &netEvent, 4);
while (!m_run_queue.Empty()) while (!m_async_queue.Empty())
{ {
m_run_queue.Front()(); Send(*(m_async_queue.Front().get()));
m_run_queue.Pop(); m_async_queue.Pop();
} }
if (net > 0) if (net > 0)
{ {
@ -586,57 +586,47 @@ void NetPlayClient::GetPlayers(std::vector<const Player *> &player_list)
// called from ---GUI--- thread // called from ---GUI--- thread
void NetPlayClient::SendChatMessage(const std::string& msg) void NetPlayClient::SendChatMessage(const std::string& msg)
{ {
RunOnThread([msg, this]() { sf::Packet* spac = new sf::Packet;
sf::Packet spac; *spac << (MessageId)NP_MSG_CHAT_MESSAGE;
spac << (MessageId)NP_MSG_CHAT_MESSAGE; *spac << msg;
spac << msg; SendAsync(spac);
Send(spac);
});
} }
// called from ---CPU--- thread // called from ---CPU--- thread
void NetPlayClient::SendPadState(const PadMapping in_game_pad, const GCPadStatus& pad) void NetPlayClient::SendPadState(const PadMapping in_game_pad, const GCPadStatus& pad)
{ {
sf::Packet spac; sf::Packet* spac = new sf::Packet;
spac << (MessageId)NP_MSG_PAD_DATA; *spac << (MessageId)NP_MSG_PAD_DATA;
spac << in_game_pad; *spac << in_game_pad;
spac << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight; *spac << pad.button << pad.analogA << pad.analogB << pad.stickX << pad.stickY << pad.substickX << pad.substickY << pad.triggerLeft << pad.triggerRight;
RunOnThread([spac, this]() mutable { SendAsync(spac);
// send to server
Send(spac);
});
} }
// called from ---CPU--- thread // called from ---CPU--- thread
void NetPlayClient::SendWiimoteState(const PadMapping in_game_pad, const NetWiimote& nw) void NetPlayClient::SendWiimoteState(const PadMapping in_game_pad, const NetWiimote& nw)
{ {
RunOnThread([=]() { sf::Packet* spac = new sf::Packet;
// send to server *spac << (MessageId)NP_MSG_WIIMOTE_DATA;
sf::Packet spac; *spac << in_game_pad;
spac << (MessageId)NP_MSG_WIIMOTE_DATA; *spac << (u8)nw.size();
spac << in_game_pad; for (auto it : nw)
spac << (u8)nw.size(); {
for (auto it : nw) *spac << it;
{ }
spac << it; SendAsync(spac);
}
Send(spac);
});
} }
// called from ---GUI--- thread // called from ---GUI--- thread
bool NetPlayClient::StartGame(const std::string &path) bool NetPlayClient::StartGame(const std::string &path)
{ {
RunOnThread([this](){ std::lock_guard<std::recursive_mutex> lkg(m_crit.game);
std::lock_guard<std::recursive_mutex> lkg(m_crit.game); // tell server i started the game
// tell server i started the game sf::Packet* spac = new sf::Packet;
sf::Packet spac; *spac << (MessageId)NP_MSG_START_GAME;
spac << (MessageId)NP_MSG_START_GAME; *spac << m_current_game;
spac << m_current_game; *spac << (char *)&g_NetPlaySettings;
spac << (char *)&g_NetPlaySettings; SendAsync(spac);
Send(spac);
});
if (m_is_running) if (m_is_running)
{ {
@ -986,11 +976,9 @@ void NetPlayClient::Stop()
// tell the server to stop if we have a pad mapped in game. // tell the server to stop if we have a pad mapped in game.
if (isPadMapped) if (isPadMapped)
{ {
RunOnThread([this](){ sf::Packet* spac = new sf::Packet;
sf::Packet spac; *spac << (MessageId)NP_MSG_STOP_GAME;
spac << (MessageId)NP_MSG_STOP_GAME; SendAsync(spac);
Send(spac);
});
} }
} }

View File

@ -48,7 +48,7 @@ class NetPlayClient : public TraversalClientClient
{ {
public: public:
void ThreadFunc(); void ThreadFunc();
void RunOnThread(std::function<void()> func); void SendAsync(sf::Packet* packet);
NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog, const std::string& name, bool traversal, std::string centralServer, u16 centralPort); NetPlayClient(const std::string& address, const u16 port, NetPlayUI* dialog, const std::string& name, bool traversal, std::string centralServer, u16 centralPort);
~NetPlayClient(); ~NetPlayClient();
@ -95,10 +95,10 @@ protected:
std::recursive_mutex game; std::recursive_mutex game;
// lock order // lock order
std::recursive_mutex players; std::recursive_mutex players;
std::recursive_mutex run_queue_write; std::recursive_mutex async_queue_write;
} m_crit; } m_crit;
Common::FifoQueue<std::function<void()>, false> m_run_queue; Common::FifoQueue<std::unique_ptr<sf::Packet>, false> m_async_queue;
Common::FifoQueue<GCPadStatus> m_pad_buffer[4]; Common::FifoQueue<GCPadStatus> m_pad_buffer[4];
Common::FifoQueue<NetWiimote> m_wiimote_buffer[4]; Common::FifoQueue<NetWiimote> m_wiimote_buffer[4];

View File

@ -126,10 +126,13 @@ void NetPlayServer::ThreadFunc()
if (m_traversal_client) if (m_traversal_client)
m_traversal_client->HandleResends(); m_traversal_client->HandleResends();
net = enet_host_service(m_server, &netEvent, 1000); net = enet_host_service(m_server, &netEvent, 1000);
while (!m_run_queue.Empty()) while (!m_async_queue.Empty())
{ {
m_run_queue.Front()(); {
m_run_queue.Pop(); std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(*(m_async_queue.Front().get()));
}
m_async_queue.Pop();
} }
if (net > 0) if (net > 0)
{ {
@ -438,21 +441,18 @@ void NetPlayServer::AdjustPadBufferSize(unsigned int size)
m_target_buffer_size = size; m_target_buffer_size = size;
// tell clients to change buffer size // tell clients to change buffer size
sf::Packet spac; sf::Packet* spac = new sf::Packet;
spac << (MessageId)NP_MSG_PAD_BUFFER; *spac << (MessageId)NP_MSG_PAD_BUFFER;
spac << (u32)m_target_buffer_size; *spac << (u32)m_target_buffer_size;
RunOnThread([spac, this]() mutable { SendAsyncToClients(spac);
std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac);
});
} }
void NetPlayServer::RunOnThread(std::function<void()> func) void NetPlayServer::SendAsyncToClients(sf::Packet* packet)
{ {
{ {
std::lock_guard<std::recursive_mutex> lkq(m_crit.run_queue_write); std::lock_guard<std::recursive_mutex> lkq(m_crit.async_queue_write);
m_run_queue.Push(func); m_async_queue.Push(std::unique_ptr<sf::Packet>(packet));
} }
ENetUtil::WakeupThread(m_server); ENetUtil::WakeupThread(m_server);
} }
@ -598,15 +598,12 @@ void NetPlayServer::OnTraversalStateChanged()
// called from ---GUI--- thread // called from ---GUI--- thread
void NetPlayServer::SendChatMessage(const std::string& msg) void NetPlayServer::SendChatMessage(const std::string& msg)
{ {
sf::Packet spac; sf::Packet* spac = new sf::Packet;
spac << (MessageId)NP_MSG_CHAT_MESSAGE; *spac << (MessageId)NP_MSG_CHAT_MESSAGE;
spac << (PlayerId)0; // server id always 0 *spac << (PlayerId)0; // server id always 0
spac << msg; *spac << msg;
RunOnThread([spac, this]() mutable { SendAsyncToClients(spac);
std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac);
});
} }
// called from ---GUI--- thread // called from ---GUI--- thread
@ -617,14 +614,11 @@ bool NetPlayServer::ChangeGame(const std::string &game)
m_selected_game = game; m_selected_game = game;
// send changed game to clients // send changed game to clients
sf::Packet spac; sf::Packet* spac = new sf::Packet;
spac << (MessageId)NP_MSG_CHANGE_GAME; *spac << (MessageId)NP_MSG_CHANGE_GAME;
spac << game; *spac << game;
RunOnThread([spac, this]() mutable { SendAsyncToClients(spac);
std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac);
});
return true; return true;
} }
@ -647,25 +641,22 @@ bool NetPlayServer::StartGame()
g_netplay_initial_gctime = Common::Timer::GetLocalTimeSinceJan1970(); g_netplay_initial_gctime = Common::Timer::GetLocalTimeSinceJan1970();
// tell clients to start game // tell clients to start game
sf::Packet spac; sf::Packet* spac = new sf::Packet;
spac << (MessageId)NP_MSG_START_GAME; *spac << (MessageId)NP_MSG_START_GAME;
spac << m_current_game; *spac << m_current_game;
spac << m_settings.m_CPUthread; *spac << m_settings.m_CPUthread;
spac << m_settings.m_CPUcore; *spac << m_settings.m_CPUcore;
spac << m_settings.m_DSPEnableJIT; *spac << m_settings.m_DSPEnableJIT;
spac << m_settings.m_DSPHLE; *spac << m_settings.m_DSPHLE;
spac << m_settings.m_WriteToMemcard; *spac << m_settings.m_WriteToMemcard;
spac << m_settings.m_OCEnable; *spac << m_settings.m_OCEnable;
spac << m_settings.m_OCFactor; *spac << m_settings.m_OCFactor;
spac << m_settings.m_EXIDevice[0]; *spac << m_settings.m_EXIDevice[0];
spac << m_settings.m_EXIDevice[1]; *spac << m_settings.m_EXIDevice[1];
spac << (u32)g_netplay_initial_gctime; *spac << (u32)g_netplay_initial_gctime;
spac << (u32)g_netplay_initial_gctime << 32; *spac << (u32)g_netplay_initial_gctime << 32;
RunOnThread([spac, this]() mutable { SendAsyncToClients(spac);
std::lock_guard<std::recursive_mutex> lkp(m_crit.players);
SendToClients(spac);
});
m_is_running = true; m_is_running = true;

View File

@ -21,7 +21,7 @@ class NetPlayServer : public TraversalClientClient
{ {
public: public:
void ThreadFunc(); void ThreadFunc();
void RunOnThread(std::function<void()> func); void SendAsyncToClients(sf::Packet* packet);
NetPlayServer(const u16 port, bool traversal, std::string centralServer, u16 centralPort); NetPlayServer(const u16 port, bool traversal, std::string centralServer, u16 centralPort);
~NetPlayServer(); ~NetPlayServer();
@ -104,12 +104,12 @@ private:
std::recursive_mutex game; std::recursive_mutex game;
// lock order // lock order
std::recursive_mutex players; std::recursive_mutex players;
std::recursive_mutex run_queue_write; std::recursive_mutex async_queue_write;
} m_crit; } m_crit;
std::string m_selected_game; std::string m_selected_game;
std::thread m_thread; std::thread m_thread;
Common::FifoQueue<std::function<void()>, false> m_run_queue; Common::FifoQueue<std::unique_ptr<sf::Packet>, false> m_async_queue;
ENetHost* m_server; ENetHost* m_server;
TraversalClient* m_traversal_client; TraversalClient* m_traversal_client;