From bc6bc61cf0e4bca51039cb8b7370a85644bd4353 Mon Sep 17 00:00:00 2001 From: "simon.kagstrom" Date: Sat, 28 Mar 2009 15:18:50 +0000 Subject: [PATCH] Implemented ping / ack between the broker and the peers. Every 5 seconds the broker will try to ping the peers, and if the peer hasn't replied within 15 seconds, it is removed lazily when a client connects. --- Src/Network.cpp | 48 ++++++++++++++++++++-- Src/Network.h | 4 +- Src/network-broker/main.py | 82 +++++++++++++++++++++++++++++++++++--- 3 files changed, 125 insertions(+), 9 deletions(-) diff --git a/Src/Network.cpp b/Src/Network.cpp index dc48ec1..af12312 100644 --- a/Src/Network.cpp +++ b/Src/Network.cpp @@ -539,7 +539,7 @@ bool Network::ReceiveUpdate(NetworkUpdate *dst, size_t total_sz, do { size_t actual_sz = this->ReceiveFrom(p, this->sock, 4096, NULL); - if (actual_sz < 0) + if (actual_sz <= 0) return false; if (this->DeMarshalAllData((NetworkUpdate*)p, actual_sz, @@ -636,6 +636,12 @@ bool Network::MarshalData(NetworkUpdate *p) case CONNECT_TO_PEER: case STOP: break; + case PING: + case ACK: + { + NetworkUpdatePingAck *pa = (NetworkUpdatePingAck *)p->data; + pa->seq = htonl(pa->seq); + } break; case SELECT_PEER: { NetworkUpdateSelectPeer *sp = (NetworkUpdateSelectPeer *)p->data; @@ -719,6 +725,12 @@ bool Network::DeMarshalData(NetworkUpdate *p) case STOP: /* Nothing to do, just bytes */ break; + case PING: + case ACK: + { + NetworkUpdatePingAck *pa = (NetworkUpdatePingAck *)p->data; + pa->seq = ntohl(pa->seq); + } break; case SELECT_PEER: { NetworkUpdateSelectPeer *sp = (NetworkUpdateSelectPeer *)p->data; @@ -802,7 +814,7 @@ bool Network::DecodeUpdate(uint8 *screen, uint8 *js) { } break; case PING: - /* Send an ack */ + /* FIXME! Send an ack */ break; case ACK: /* Should never receive this */ case DISCONNECT: @@ -854,6 +866,22 @@ bool Network::IpToStr(char *dst, uint8 *ip_in) return true; } +/* OK, this is a pretty ugly special case, but it's only used when + * communicating with the broker before a peer connection. */ +void Network::SendPingAck(int seq) +{ + this->ResetNetworkUpdate(); + + NetworkUpdate *ud = InitNetworkUpdate(this->ud, ACK, + sizeof(NetworkUpdate) + sizeof(NetworkUpdatePingAck)); + NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data; + + p->seq = seq; + this->AddNetworkUpdate(ud); + this->SendUpdate(); + this->ResetNetworkUpdate(); +} + bool Network::WaitForPeerAddress() { NetworkUpdateListPeers *pi; @@ -865,7 +893,14 @@ bool Network::WaitForPeerAddress() this->ResetNetworkUpdate(); if (this->ReceiveUpdate(&tv) == false) return false; - if (ud->type != LIST_PEERS) + if (this->ud->type == PING) + { + NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data; + /* Send ack and go back to this state again */ + this->SendPingAck(p->seq); + return false; + } + if (this->ud->type != LIST_PEERS) return false; pi = (NetworkUpdateListPeers *)this->ud->data; @@ -915,6 +950,13 @@ bool Network::WaitForPeerList() this->ResetNetworkUpdate(); if (this->ReceiveUpdate(&tv) == false) return false; + if (this->ud->type == PING) + { + NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data; + /* Send ack and go back to this state again */ + this->SendPingAck(p->seq); + return false; + } if (ud->type != LIST_PEERS) return false; diff --git a/Src/Network.h b/Src/Network.h index 3006337..a802c81 100644 --- a/Src/Network.h +++ b/Src/Network.h @@ -77,7 +77,7 @@ struct NetworkUpdateSelectPeer struct NetworkUpdatePingAck { - uint8 seq; + uint32 seq; }; /* @@ -253,6 +253,8 @@ protected: bool DecodeDisplayRaw(Uint8 *screen, struct NetworkUpdate *src, int x, int y); + void SendPingAck(int seq); + bool ReceiveUpdate(NetworkUpdate *dst, size_t sz, struct timeval *tv); bool ReceiveData(void *dst, int sock, size_t sz); diff --git a/Src/network-broker/main.py b/Src/network-broker/main.py index c879b20..c855aae 100644 --- a/Src/network-broker/main.py +++ b/Src/network-broker/main.py @@ -1,4 +1,4 @@ -import socket, struct, syslog +import socket, struct, syslog, time, thread import SocketServer FRODO_NETWORK_MAGIC = 0x1976 @@ -26,6 +26,9 @@ def log_warn(msg, echo = False): def log_info(msg, echo = False): log(syslog.LOG_INFO, msg, echo) +def cur_time(): + return time.mktime(time.localtime()) + class Packet: def __init__(self): """Create a new packet""" @@ -57,11 +60,35 @@ class StopPacket(Packet): Packet.__init__(self) self.type = STOP + def marshal(self): + return struct.pack(">HHL", self.magic, self.type, self.size) + +class PingAckPacket(Packet): + def __init__(self): + Packet.__init__(self) + self.type = PING + self.seq = 0 + self.size = self.size + 4 + + def set_seq(self, seq): + self.seq = seq + + def demarshal_from_data(self, data): + """Init a new packet from raw data.""" + Packet.demarshal_from_data(self, data) + self.seq = struct.unpack(">L", data[8:12])[0] + + def marshal(self): + """Create data representation of a packet""" + return Packet.marshal(self) + struct.pack(">L", self.seq) + + class SelectPeerPacket(Packet): def __init__(self): Packet.__init__(self) self.type = SELECT_PEER self.server_id = 0 + self.size = self.size + 4 def demarshal_from_data(self, data): """Create a new packet from raw data.""" @@ -141,6 +168,9 @@ class Peer: self.is_master = 0 self.id = id + # Assume it's alive now + self.last_ping = cur_time() + def addr_to_ip_and_port(self, addr): ip = struct.unpack("@L", socket.inet_pton(socket.AF_INET, addr[0]))[0] port = addr[1] @@ -156,6 +186,17 @@ class Peer: if not self.is_master: lp = ListPeersPacket() + # Remove inactive peers + rp = [] + for peer in self.srv.peers.itervalues(): + if peer != self and peer.seconds_since_last_ping() > 15: + rp.append(peer) + for peer in rp: + log_info("Peer %s:%d has been inactive for %d seconds, removing" % (peer.addr[0], + peer.addr[1], + peer.seconds_since_last_ping())) + self.srv.remove_peer(peer) + for peer in self.srv.peers.itervalues(): if peer != self and peer.is_master: lp.add_peer(peer) @@ -178,6 +219,13 @@ class Peer: self.srv.remove_peer(peer) self.srv.remove_peer(self) + if pkt.type == ACK: + self.last_ping = cur_time() + + def seconds_since_last_ping(self): + now = cur_time() + return now - self.last_ping + def send_packet(self, data): self.srv.socket.sendto(data + StopPacket().marshal(), 0, self.addr) @@ -211,8 +259,10 @@ class BrokerPacketHandler(SocketServer.DatagramRequestHandler): log_error("Broken packet: %s" % e) return - log_info("Received packet %d from %s:%d" % (pkt.get_type(), self.client_address[0], - self.client_address[1])) + # Log received packets (except ping ACKs to avoid filling the server) + if pkt.get_type() != ACK: + log_info("Received packet %d from %s:%d" % (pkt.get_type(), self.client_address[0], + self.client_address[1])) peer = srv.get_peer(self.client_address) @@ -232,6 +282,7 @@ class Broker(SocketServer.UDPServer): self.peers = {} self.peers_by_id = {} self.id = 0 + self.ping_seq = 0 def send_data(self, dst, data): self.socket.sendto(data, dst) @@ -256,6 +307,15 @@ class Broker(SocketServer.UDPServer): return v return None + def ping_all_peers(self): + """Ping all peers (to see that they are alive)""" + for k,v in self.peers.iteritems(): + p = PingAckPacket() + p.set_seq(self.ping_seq) + v.send_packet( p.marshal() ) + + self.ping_seq = self.ping_seq + 1 + def remove_peer(self, peer): try: del self.peers[ peer.addr ] @@ -263,15 +323,27 @@ class Broker(SocketServer.UDPServer): except: log_error("Could not remove %s, something is wrong" % (str(peer.addr))) +def ping_thread_fn(broker, time_to_sleep): + """Run as a separate thread to ping all peers""" + + while True: + try: + broker.ping_all_peers() + time.sleep( time_to_sleep ) + except Exception, e: + print e + # Some of the Frodo network packets. There are more packets, but these # are not interesting to the broker (and shouldn't be sent there either!) packet_class_by_type = { CONNECT_TO_BROKER : ConnectToBrokerPacket, SELECT_PEER : SelectPeerPacket, + ACK : PingAckPacket, } if __name__ == "__main__": syslog.openlog("frodo") log_info("Starting Frodo network broker", True) - s = Broker( ("", 46214), BrokerPacketHandler) - s.serve_forever() + broker = Broker( ("", 46214), BrokerPacketHandler) + thread.start_new_thread(ping_thread_fn, (broker, 5)) + broker.serve_forever()