Implemented ping / ack between the broker and the peers. Every 5 seconds

the broker will try to ping the peers, and if the peer hasn't replied
within 15 seconds, it is removed lazily when a client connects.
This commit is contained in:
simon.kagstrom 2009-03-28 15:18:50 +00:00
parent c79f6195ad
commit bc6bc61cf0
3 changed files with 125 additions and 9 deletions

View File

@ -539,7 +539,7 @@ bool Network::ReceiveUpdate(NetworkUpdate *dst, size_t total_sz,
do {
size_t actual_sz = this->ReceiveFrom(p, this->sock,
4096, NULL);
if (actual_sz < 0)
if (actual_sz <= 0)
return false;
if (this->DeMarshalAllData((NetworkUpdate*)p, actual_sz,
@ -636,6 +636,12 @@ bool Network::MarshalData(NetworkUpdate *p)
case CONNECT_TO_PEER:
case STOP:
break;
case PING:
case ACK:
{
NetworkUpdatePingAck *pa = (NetworkUpdatePingAck *)p->data;
pa->seq = htonl(pa->seq);
} break;
case SELECT_PEER:
{
NetworkUpdateSelectPeer *sp = (NetworkUpdateSelectPeer *)p->data;
@ -719,6 +725,12 @@ bool Network::DeMarshalData(NetworkUpdate *p)
case STOP:
/* Nothing to do, just bytes */
break;
case PING:
case ACK:
{
NetworkUpdatePingAck *pa = (NetworkUpdatePingAck *)p->data;
pa->seq = ntohl(pa->seq);
} break;
case SELECT_PEER:
{
NetworkUpdateSelectPeer *sp = (NetworkUpdateSelectPeer *)p->data;
@ -802,7 +814,7 @@ bool Network::DecodeUpdate(uint8 *screen, uint8 *js)
{
} break;
case PING:
/* Send an ack */
/* FIXME! Send an ack */
break;
case ACK: /* Should never receive this */
case DISCONNECT:
@ -854,6 +866,22 @@ bool Network::IpToStr(char *dst, uint8 *ip_in)
return true;
}
/* OK, this is a pretty ugly special case, but it's only used when
* communicating with the broker before a peer connection. */
void Network::SendPingAck(int seq)
{
this->ResetNetworkUpdate();
NetworkUpdate *ud = InitNetworkUpdate(this->ud, ACK,
sizeof(NetworkUpdate) + sizeof(NetworkUpdatePingAck));
NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data;
p->seq = seq;
this->AddNetworkUpdate(ud);
this->SendUpdate();
this->ResetNetworkUpdate();
}
bool Network::WaitForPeerAddress()
{
NetworkUpdateListPeers *pi;
@ -865,7 +893,14 @@ bool Network::WaitForPeerAddress()
this->ResetNetworkUpdate();
if (this->ReceiveUpdate(&tv) == false)
return false;
if (ud->type != LIST_PEERS)
if (this->ud->type == PING)
{
NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data;
/* Send ack and go back to this state again */
this->SendPingAck(p->seq);
return false;
}
if (this->ud->type != LIST_PEERS)
return false;
pi = (NetworkUpdateListPeers *)this->ud->data;
@ -915,6 +950,13 @@ bool Network::WaitForPeerList()
this->ResetNetworkUpdate();
if (this->ReceiveUpdate(&tv) == false)
return false;
if (this->ud->type == PING)
{
NetworkUpdatePingAck *p = (NetworkUpdatePingAck*)ud->data;
/* Send ack and go back to this state again */
this->SendPingAck(p->seq);
return false;
}
if (ud->type != LIST_PEERS)
return false;

View File

@ -77,7 +77,7 @@ struct NetworkUpdateSelectPeer
struct NetworkUpdatePingAck
{
uint8 seq;
uint32 seq;
};
/*
@ -253,6 +253,8 @@ protected:
bool DecodeDisplayRaw(Uint8 *screen, struct NetworkUpdate *src,
int x, int y);
void SendPingAck(int seq);
bool ReceiveUpdate(NetworkUpdate *dst, size_t sz, struct timeval *tv);
bool ReceiveData(void *dst, int sock, size_t sz);

View File

@ -1,4 +1,4 @@
import socket, struct, syslog
import socket, struct, syslog, time, thread
import SocketServer
FRODO_NETWORK_MAGIC = 0x1976
@ -26,6 +26,9 @@ def log_warn(msg, echo = False):
def log_info(msg, echo = False):
log(syslog.LOG_INFO, msg, echo)
def cur_time():
return time.mktime(time.localtime())
class Packet:
def __init__(self):
"""Create a new packet"""
@ -57,11 +60,35 @@ class StopPacket(Packet):
Packet.__init__(self)
self.type = STOP
def marshal(self):
return struct.pack(">HHL", self.magic, self.type, self.size)
class PingAckPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = PING
self.seq = 0
self.size = self.size + 4
def set_seq(self, seq):
self.seq = seq
def demarshal_from_data(self, data):
"""Init a new packet from raw data."""
Packet.demarshal_from_data(self, data)
self.seq = struct.unpack(">L", data[8:12])[0]
def marshal(self):
"""Create data representation of a packet"""
return Packet.marshal(self) + struct.pack(">L", self.seq)
class SelectPeerPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = SELECT_PEER
self.server_id = 0
self.size = self.size + 4
def demarshal_from_data(self, data):
"""Create a new packet from raw data."""
@ -141,6 +168,9 @@ class Peer:
self.is_master = 0
self.id = id
# Assume it's alive now
self.last_ping = cur_time()
def addr_to_ip_and_port(self, addr):
ip = struct.unpack("@L", socket.inet_pton(socket.AF_INET, addr[0]))[0]
port = addr[1]
@ -156,6 +186,17 @@ class Peer:
if not self.is_master:
lp = ListPeersPacket()
# Remove inactive peers
rp = []
for peer in self.srv.peers.itervalues():
if peer != self and peer.seconds_since_last_ping() > 15:
rp.append(peer)
for peer in rp:
log_info("Peer %s:%d has been inactive for %d seconds, removing" % (peer.addr[0],
peer.addr[1],
peer.seconds_since_last_ping()))
self.srv.remove_peer(peer)
for peer in self.srv.peers.itervalues():
if peer != self and peer.is_master:
lp.add_peer(peer)
@ -178,6 +219,13 @@ class Peer:
self.srv.remove_peer(peer)
self.srv.remove_peer(self)
if pkt.type == ACK:
self.last_ping = cur_time()
def seconds_since_last_ping(self):
now = cur_time()
return now - self.last_ping
def send_packet(self, data):
self.srv.socket.sendto(data + StopPacket().marshal(),
0, self.addr)
@ -211,8 +259,10 @@ class BrokerPacketHandler(SocketServer.DatagramRequestHandler):
log_error("Broken packet: %s" % e)
return
log_info("Received packet %d from %s:%d" % (pkt.get_type(), self.client_address[0],
self.client_address[1]))
# Log received packets (except ping ACKs to avoid filling the server)
if pkt.get_type() != ACK:
log_info("Received packet %d from %s:%d" % (pkt.get_type(), self.client_address[0],
self.client_address[1]))
peer = srv.get_peer(self.client_address)
@ -232,6 +282,7 @@ class Broker(SocketServer.UDPServer):
self.peers = {}
self.peers_by_id = {}
self.id = 0
self.ping_seq = 0
def send_data(self, dst, data):
self.socket.sendto(data, dst)
@ -256,6 +307,15 @@ class Broker(SocketServer.UDPServer):
return v
return None
def ping_all_peers(self):
"""Ping all peers (to see that they are alive)"""
for k,v in self.peers.iteritems():
p = PingAckPacket()
p.set_seq(self.ping_seq)
v.send_packet( p.marshal() )
self.ping_seq = self.ping_seq + 1
def remove_peer(self, peer):
try:
del self.peers[ peer.addr ]
@ -263,15 +323,27 @@ class Broker(SocketServer.UDPServer):
except:
log_error("Could not remove %s, something is wrong" % (str(peer.addr)))
def ping_thread_fn(broker, time_to_sleep):
"""Run as a separate thread to ping all peers"""
while True:
try:
broker.ping_all_peers()
time.sleep( time_to_sleep )
except Exception, e:
print e
# Some of the Frodo network packets. There are more packets, but these
# are not interesting to the broker (and shouldn't be sent there either!)
packet_class_by_type = {
CONNECT_TO_BROKER : ConnectToBrokerPacket,
SELECT_PEER : SelectPeerPacket,
ACK : PingAckPacket,
}
if __name__ == "__main__":
syslog.openlog("frodo")
log_info("Starting Frodo network broker", True)
s = Broker( ("", 46214), BrokerPacketHandler)
s.serve_forever()
broker = Broker( ("", 46214), BrokerPacketHandler)
thread.start_new_thread(ping_thread_fn, (broker, 5))
broker.serve_forever()