#!/usr/bin/env python import socket, struct, syslog, time, thread, ip2country, sys import SocketServer, random # Local imports import stats, image FRODO_NETWORK_PROTOCOL_VERSION = 4 FRODO_NETWORK_MAGIC = 0x1976 CONNECT_TO_BROKER = 99 # Hello, broker LIST_PEERS = 98 # List of peers CONNECT_TO_PEER = 97 # A peer wants to connect SELECT_PEER = 93 # The client selects who to connect to DISCONNECT = 96 # Disconnect from a peer PING = 95 # Are you alive? ACK = 94 # Yep STOP = 55 # No more packets pkt_type_to_str = { CONNECT_TO_BROKER : "connect-to-broker", LIST_PEERS : "list-peers", CONNECT_TO_PEER : "connect-to-peer", SELECT_PEER : "select-peer", DISCONNECT : "disconnect", PING : "ping", ACK : "ack", STOP : "stop", } def log(pri, msg, echo): syslog.syslog(pri, msg) if True: print msg def log_error(msg, echo = False): log(syslog.LOG_ERR, msg, echo) def log_warn(msg, echo = False): log(syslog.LOG_WARNING, msg, echo) def log_info(msg, echo = False): log(syslog.LOG_INFO, msg, echo) def cur_time(): return time.mktime(time.localtime()) class Packet: def __init__(self): """Create a new packet""" self.magic = FRODO_NETWORK_MAGIC self.type = 0 self.size = 8 def demarshal_from_data(self, data): """Create a new packet from raw data. Data should always be in network byte order""" self.magic = struct.unpack(">H", data[0:2])[0] self.type = struct.unpack(">H", data[2:4])[0] self.size = struct.unpack(">L", data[4:8])[0] def get_magic(self): return self.magic def get_type(self): return self.type def get_size(self): return self.size def marshal(self): return struct.pack(">HHL", self.magic, self.type, self.size) class StopPacket(Packet): def __init__(self): Packet.__init__(self) self.type = STOP def marshal(self): return struct.pack(">HHL", self.magic, self.type, self.size) class PingAckPacket(Packet): def __init__(self): Packet.__init__(self) self.type = PING self.seq = 0 self.size = self.size + 4 def set_seq(self, seq): self.seq = seq def demarshal_from_data(self, data): """Init a new packet from raw data.""" Packet.demarshal_from_data(self, data) self.seq = struct.unpack(">L", data[8:12])[0] def marshal(self): """Create data representation of a packet""" return Packet.marshal(self) + struct.pack(">L", self.seq) class SelectPeerPacket(Packet): def __init__(self): Packet.__init__(self) self.type = SELECT_PEER self.server_id = 0 self.size = self.size + 4 def demarshal_from_data(self, data): """Create a new packet from raw data.""" Packet.demarshal_from_data(self, data) self.server_id = struct.unpack(">L", data[8:12])[0] def get_id(self): return self.server_id class ConnectToBrokerPacket(Packet): def __init__(self): self.key = 0 self._is_master = 0 self.private_port = 0 self.public_port = 0 self.private_ip = "" self.public_ip = "" self.type = CONNECT_TO_BROKER self.name = "" self.server_id = 0 self.avatar = 0 self.screenshot = "" def demarshal_from_data(self, data): Packet.demarshal_from_data(self, data) self.key = struct.unpack(">H", data[44:46])[0] self._is_master = struct.unpack(">H", data[46:48])[0] self.name = struct.unpack("32s", data[48:48+32])[0] self.server_id = struct.unpack(">L", data[80:84])[0] self.version = struct.unpack(">L", data[84:88])[0] self.name = self.name[0:self.name.find('\0')] if self.version >= 4: self.avatar = struct.unpack(">L", data[88:92])[0] self.screenshot = struct.unpack(">%ds" % (image.SCREENSHOT_SIZE), data[92:92 + image.SCREENSHOT_SIZE])[0] def get_key(self): return self.key def get_avatar(self): return self.avatar def get_screenshot(self): return self.screenshot def get_name(self): return self.name def is_master(self): return self._is_master class ListPeersPacket(Packet): def __init__(self, version): Packet.__init__(self) self.n_peers = 0 self.peers = [] self.type = LIST_PEERS self.size = self.size + 24 self.version = version def add_peer(self, peer): self.peers.append(peer) self.n_peers = self.n_peers + 1 self.size = self.size + 80 # Add avatar and screenshot size if self.version >= 4: self.size = self.size + image.SCREENSHOT_SIZE + 4 def marshal(self): out = struct.pack(">L16sHxx", self.n_peers, "", 0) for peer in self.peers: name = "%s (%s)" % (peer.name, peer.country) if peer.country == "": name = peer.name out = out + struct.pack(">HH16s16sHH31sBLL", 0, peer.public_port, "", peer.public_ip, peer.key, peer.is_master, name, 0, peer.id, self.version) if self.version >= 4: out = out + struct.pack(">L%ds" % (image.SCREENSHOT_SIZE), peer.avatar, peer.screenshot) return Packet.marshal(self) + out class DummyPeer: def __init__(self, name): self.name = name self.country = "" self.public_port = 0 self.public_ip = "0.0.0.0" self.key = 0 self.is_master = 0 self.id = 0 class Peer: def __init__(self, addr, srv, id): self.srv = srv self.addr = addr self.public_ip, self.public_port = self.addr_to_ip_and_port(addr) # Lookup which country this guy is from try: self.country = srv.ip2c.lookup( addr[0] )[1] except ValueError, e: self.country = None if self.country == None: self.country = "Unknown location" # These will be set by the CONNECT_TO_BROKER packet below self.key = 0 self.name = "" self.is_master = 0 self.id = id self.avatar = 0 self.screenshot = "" # Assume it's alive now self.last_ping = cur_time() def addr_to_ip_and_port(self, addr): ip = struct.unpack("@L", socket.inet_pton(socket.AF_INET, addr[0]))[0] port = addr[1] return "%08x" % (ip), port def handle_packet(self, pkt): if pkt.type == CONNECT_TO_BROKER: self.key = pkt.get_key() self.name = pkt.get_name() self.is_master = pkt.is_master() # If an old Frodo tries to connect, give a helpful message if pkt.version != FRODO_NETWORK_PROTOCOL_VERSION: lp = ListPeersPacket(pkt.version) lp.add_peer(DummyPeer("Your frodo is too old.")) lp.add_peer(DummyPeer("download a new version at")) lp.add_peer(DummyPeer("http://frodo-wii.googlecode.com")) log_info("Version too old, sending upgrade notice to %s:%d" % (self.addr[0], self.addr[1]) ) self.send_packet(lp.marshal()) return self.avatar = pkt.get_avatar() self.screenshot = pkt.get_screenshot() self.srv.log_connection(self.name, self.country) try: # Save as one of the last few screenshots which = random.randrange(0,12) img = image.image_from_data(self.screenshot) if img.interestingness > 0.4 or random.randrange(0, 10) < 2: img.save("%s%d.png" % (self.srv.image_dir, which)) except Exception, e: log_info("Could not convert image data" + str(e)) # Send list of peers if this is not a master lp = ListPeersPacket(FRODO_NETWORK_PROTOCOL_VERSION) # Remove inactive peers rp = [] for peer in self.srv.peers.itervalues(): if peer != self and peer.seconds_since_last_ping() > 15: rp.append(peer) for peer in rp: log_info("Peer %s:%d has been inactive for %d seconds, removing" % (peer.addr[0], peer.addr[1], peer.seconds_since_last_ping())) self.srv.remove_peer(peer) for peer in self.srv.peers.itervalues(): if peer != self: # and peer.is_master: lp.add_peer(peer) # And send the packet to this peer log_info("Sending list of peers (%d) to %s:%d" % (lp.n_peers, self.addr[0], self.addr[1]) ) self.send_packet(lp.marshal()) if pkt.type == SELECT_PEER: peer = self.srv.get_peer_by_id( pkt.get_id() ) # Tell the peer that we have connected lp = ListPeersPacket(FRODO_NETWORK_PROTOCOL_VERSION) lp.add_peer(self) log_info("Sending list of peers for peer selected to %s:%d" % ( self.addr[0], self.addr[1])) peer.send_packet( lp.marshal() ) # These two are no longer needed self.srv.remove_peer(peer) self.srv.remove_peer(self) if pkt.type == ACK: self.last_ping = cur_time() def seconds_since_last_ping(self): now = cur_time() return now - self.last_ping def send_packet(self, data): self.srv.socket.sendto(data + StopPacket().marshal(), 0, self.addr) def __str__(self): return '%s:%d "%s" %d %d' % (self.public_ip, self.public_port, self.name, self.key, self.is_master) class BrokerPacketHandler(SocketServer.DatagramRequestHandler): def get_packet_from_data(self, data): magic = struct.unpack(">H", data[0:2])[0] type = struct.unpack(">H", data[2:4])[0] if magic != FRODO_NETWORK_MAGIC: raise Exception("Packet magic does not match: %4x vs %4x\n" % (magic, FRODO_NETWORK_MAGIC) ) try: out = packet_class_by_type[type]() out.demarshal_from_data(data) return out except KeyError, e: raise Exception("Unknown packet type %d" % (type)) def handle(self): srv = self.server data = self.rfile.read() try: pkt = self.get_packet_from_data(data) except Exception, e: log_error("Broken packet: %s" % e) return # Log received packets (except ping ACKs to avoid filling the server) if pkt.get_type() != ACK: t = pkt.get_type() s = "%d" % (t) try: s = pkt_type_to_str[t] except KeyError, e: pass log_info("Received packet %s from %s:%d" % (s, self.client_address[0], self.client_address[1])) peer = srv.get_peer(self.client_address) try: peer.handle_packet(pkt) except Exception, e: # Sends crap, let's remove it log_error("Handling packet failed, removing peer: %s" % e) srv.remove_peer(peer) class Broker(SocketServer.UDPServer): def __init__(self, host, req_handler, ip2c, stat_data, stat_html, image_dir): SocketServer.UDPServer.__init__(self, host, req_handler) # Instead of setsockopt( ... REUSEADDR ... ) self.allow_reuse_address = True self.peers = {} self.peers_by_id = {} self.id = 0 self.ping_seq = 0 self.ip2c = ip2c self.stat_html = stat_html self.stat_data = stat_data self.image_dir = image_dir stats.load(self.stat_data) try: stats.generate_html(self.stat_html) except: # Don't care if it fails pass def log_connection(self, who, country): stats.add_connection(who, country) try: stats.save(self.stat_data) except Exception, e: error_log("saving stats failed with %s" % str(e) ) try: stats.generate_html(self.stat_html) except Exception, e: error_log("generating HTML failed with %s" % str(e) ) def send_data(self, dst, data): self.socket.sendto(data, dst) def get_peer(self, key): "Return the peer for a certain key, or a new one if it doesn't exist" try: peer = self.peers[key] except KeyError, e: peer = Peer(key, self, self.id) self.peers[key] = peer self.peers_by_id[self.id] = peer self.id = self.id + 1 return peer def get_peer_by_id(self, id): return self.peers_by_id[id] def get_peer_by_name_key(self, name, key): for k,v in self.peers.iteritems(): if name == v.get_name() and key == v.get_key(): return v return None def ping_all_peers(self): """Ping all peers (to see that they are alive)""" for k,v in self.peers.iteritems(): p = PingAckPacket() p.set_seq(self.ping_seq) v.send_packet( p.marshal() ) self.ping_seq = self.ping_seq + 1 def remove_peer(self, peer): try: del self.peers[ peer.addr ] del self.peers_by_id[ peer.id ] except: log_error("Could not remove %s (probably wrong version)" % (str(peer.addr))) def ping_thread_fn(broker, time_to_sleep): """Run as a separate thread to ping all peers""" while True: try: broker.ping_all_peers() time.sleep( time_to_sleep ) except Exception, e: print e # Some of the Frodo network packets. There are more packets, but these # are not interesting to the broker (and shouldn't be sent there either!) packet_class_by_type = { CONNECT_TO_BROKER : ConnectToBrokerPacket, SELECT_PEER : SelectPeerPacket, ACK : PingAckPacket, } def usage(): print "Usage: network-broker stat-data-file stat-html-file image-dir" sys.exit(1) if __name__ == "__main__": if len(sys.argv) != 4: usage() random.seed(time.time()) ip2c = ip2country.IP2Country(verbose=0) syslog.openlog("frodo") log_info("Starting Frodo network broker", True) broker = Broker( ("", 46214), BrokerPacketHandler, ip2c, sys.argv[1], sys.argv[2], sys.argv[3]) thread.start_new_thread(ping_thread_fn, (broker, 5)) broker.serve_forever()