frodo-wii/network-broker/network-broker

768 lines
24 KiB
Python
Executable File

#!/usr/bin/env python
import socket, struct, syslog, time, thread, ip2country, sys
import SocketServer, random
# Local imports
import stats, image
DATA_KEY_RANGE = 1000
FRODO_NETWORK_PROTOCOL_VERSION = 4
FRODO_NETWORK_MAGIC = 0x1976
CONNECT_TO_BROKER = 99 # Hello, broker
LIST_PEERS = 98 # List of peers
CONNECT_TO_PEER = 97 # A peer wants to connect
SELECT_PEER = 93 # The client selects who to connect to
DISCONNECT = 96 # Disconnect from a peer
PING = 95 # Are you alive?
ACK = 94 # Yep
REGISTER_DATA = 90
STOP = 55 # No more packets
TEXT_MESSAGE = 9
# Network regions
REGION_UNKNOWN = 0
REGION_EUROPE = 1
REGION_AFRICA = 2
REGION_NORTH_AMERICA = 3
REGION_SOUTH_AMERICA = 4
REGION_MIDDLE_EAST = 5
REGION_SOUTH_ASIA = 6
REGION_EAST_ASIA = 7
REGION_OCEANIA = 8
REGION_ANTARTICA = 9
# Flags in packets
NETWORK_UPDATE_TEXT_MESSAGE_BROADCAST = 1
NETWORK_UPDATE_LIST_PEERS_IS_CONNECT = 1
pkt_type_to_str = {
CONNECT_TO_BROKER : "connect-to-broker",
LIST_PEERS : "list-peers",
CONNECT_TO_PEER : "connect-to-peer",
SELECT_PEER : "select-peer",
DISCONNECT : "disconnect",
PING : "ping",
ACK : "ack",
STOP : "stop",
REGISTER_DATA : "register-data",
}
region_to_str = {
REGION_UNKNOWN : "Unknown",
REGION_EUROPE : "Europe",
REGION_AFRICA : "Africa",
REGION_NORTH_AMERICA : "North america",
REGION_SOUTH_AMERICA : "South america",
REGION_MIDDLE_EAST : "Middle east",
REGION_SOUTH_ASIA : "South asia",
REGION_EAST_ASIA : "East asia",
REGION_OCEANIA : "Oceania",
REGION_ANTARTICA : "Antartica",
}
def log(pri, msg, echo):
syslog.syslog(pri, msg)
if True:
print msg
def log_error(msg, echo = False):
log(syslog.LOG_ERR, msg, echo)
def log_warn(msg, echo = False):
log(syslog.LOG_WARNING, msg, echo)
def log_info(msg, echo = False):
log(syslog.LOG_INFO, msg, echo)
def cur_time():
return time.mktime(time.localtime())
class DataStore:
def __init__(self):
self.data = {}
def add_entry(self, peer, key, entry):
entry.peer = peer
entry.key = key
self.data[key] = entry
def get_entry(self, key):
return self.data[key]
def remove_entry(self, key):
del self.data[key]
def remove_peer(self, peer):
try:
for entry in self.data.iteritems():
if entry == peer:
self.remove_entry(entry.key)
except:
log_info("Cannot delete peer %s" % (peer))
class DataEntry:
def __init__(self, key, metadata, data):
self.key = key
self.metadata = metadata
self.data = data
def get_key(self):
return self.key
def get_metadata(self):
return self.metadata
def get_data(self):
return self.data
class Packet:
def __init__(self):
"""Create a new packet"""
self.magic = FRODO_NETWORK_MAGIC
self.type = 0
self.size = 8
def demarshal_from_data(self, data):
"""Create a new packet from raw data. Data should always be in network
byte order"""
self.magic = struct.unpack(">H", data[0:2])[0]
self.type = struct.unpack(">H", data[2:4])[0]
self.size = struct.unpack(">L", data[4:8])[0]
def get_magic(self):
return self.magic
def get_type(self):
return self.type
def get_size(self):
return self.size
def marshal(self):
if self.type != STOP and self.type != PING:
log_info("Sending packet %d (%d bytes)" % (self.type, self.size))
return struct.pack(">HHL", self.magic, self.type, self.size)
class StopPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = STOP
class RegisterDataPacket(Packet):
def __init__(self, key = -1, metadata = 0, data = ""):
Packet.__init__(self)
self.type = REGISTER_DATA
self.key = key
self.metadata = metadata
self.data = data
self.size = self.size + 8 + len(data)
def get_key(self):
return self.key
def get_entry(self):
return DataEntry(self.key, self.metadata, self.data)
def demarshal_from_data(self, data):
Packet.demarshal_from_data(self, data)
self.key = struct.unpack(">L", data[8:12])[0]
self.metadata = struct.unpack(">L", data[12:16])[0]
self.data = data[16:]
def marshal(self):
return Packet.marshal(self) + struct.pack(">L", self.key) + \
struct.pack(">L", self.metadata) + self.data
class PingAckPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = PING
self.seq = 0
self.size = self.size + 4
def set_seq(self, seq):
self.seq = seq
def demarshal_from_data(self, data):
"""Init a new packet from raw data."""
Packet.demarshal_from_data(self, data)
self.seq = struct.unpack(">L", data[8:12])[0]
def marshal(self):
"""Create data representation of a packet"""
return Packet.marshal(self) + struct.pack(">L", self.seq)
class DisconnectPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = DISCONNECT
class TextMessagePacket(Packet):
def __init__(self, message = ""):
Packet.__init__(self)
msg_len = len(message) + 1 # NULL
# Used only by the server
self.timestamp = time.mktime(time.localtime())
self.type = TEXT_MESSAGE
self.message = message
self.flags = NETWORK_UPDATE_TEXT_MESSAGE_BROADCAST # Always here
self.size = self.size + 1 + msg_len
def get_timestamp(self):
return self.timestamp
def demarshal_from_data(self, data):
Packet.demarshal_from_data(self, data)
strlen = self.size - 8 - 1
# Flags is always broadcast
self.message = struct.unpack(">%ds" % (strlen), data[9:])[0]
self.size = 8 + 1 + len(self.message) + 1 # NULL
def marshal(self):
to_pad = len(self.message) % 3
return Packet.marshal(self) + struct.pack(">B%dsx" % len(self.message),
self.flags, self.message)
class SelectPeerPacket(Packet):
def __init__(self):
Packet.__init__(self)
self.type = SELECT_PEER
self.server_id = 0
self.size = self.size + 4
def demarshal_from_data(self, data):
"""Create a new packet from raw data."""
Packet.demarshal_from_data(self, data)
self.server_id = struct.unpack(">L", data[8:12])[0]
def get_id(self):
return self.server_id
class ConnectToBrokerPacket(Packet):
def __init__(self):
self.key = 0
self._is_master = 0
self.private_port = 0
self.public_port = 0
self.private_ip = ""
self.public_ip = ""
self.type = CONNECT_TO_BROKER
self.name = ""
self.server_id = 0
self.avatar = 0
self.region = REGION_UNKNOWN
self.screenshot_key = -1
def demarshal_from_data(self, data):
Packet.demarshal_from_data(self, data)
self.key = struct.unpack(">H", data[44:46])[0]
self._is_master = struct.unpack(">H", data[46:48])[0]
self.name = struct.unpack("32s", data[48:48+32])[0]
self.server_id = struct.unpack(">L", data[80:84])[0]
self.version = struct.unpack(">L", data[84:88])[0]
self.name = self.name[0:self.name.find('\0')]
if self.version >= 4:
self.region = struct.unpack(">B", data[88:89])[0]
self.avatar = struct.unpack(">H", data[90:92])[0]
self.screenshot_key = struct.unpack(">L", data[92:96])[0]
def get_key(self):
return self.key
def get_avatar(self):
return self.avatar
def get_region(self):
return self.region
def get_screenshot_key(self):
return self.screenshot_key
def get_name(self):
return self.name
def is_master(self):
return self._is_master
class ListPeersPacket(Packet):
def __init__(self, version = FRODO_NETWORK_PROTOCOL_VERSION, flags = 0):
Packet.__init__(self)
self.n_peers = 0
self.peers = []
self.type = LIST_PEERS
self.size = self.size + 24
self.version = version
self.flags = flags
self.region = REGION_UNKNOWN
self.avatar = 0
def add_peer(self, peer):
self.peers.append(peer)
self.n_peers = self.n_peers + 1
self.size = self.size + 80
# Add avatar and screenshot key size
if self.version >= 4:
self.size = self.size + 8
def marshal(self):
out = struct.pack(">L16sHBx", self.n_peers, "", 0, self.flags)
for peer in self.peers:
name = "%s (%s)" % (peer.name, peer.country)
if peer.country == "":
name = peer.name
out = out + struct.pack(">HH16s16sHH31sBLL",
0, peer.public_port, "",
peer.public_ip, peer.key,
peer.is_master, name,
0, peer.id, self.version)
if self.version >= 4:
out = out + struct.pack(">BxHL", peer.region, peer.avatar, peer.screenshot_key)
return Packet.marshal(self) + out
class DummyPeer:
def __init__(self, name):
self.name = name
self.country = ""
self.public_port = 0
self.public_ip = "0.0.0.0"
self.key = 0
self.is_master = 0
self.id = 0
self.avatar = 0
self.region = 0
class Peer:
def __init__(self, addr, srv, id):
self.srv = srv
self.addr = addr
self.public_ip, self.public_port = self.addr_to_ip_and_port(addr)
# Lookup which country this guy is from
try:
self.country = srv.ip2c.lookup( addr[0] )[1]
except ValueError, e:
self.country = None
if self.country == None:
self.country = "Unknown location"
# These will be set by the CONNECT_TO_BROKER packet below
self.key = 0
self.name = ""
self.is_master = 0
self.id = id
self.avatar = 0
self.region = 0
self.screenshot_key = -1
self.data_key = srv.get_data_key()
# Assume it's alive now
self.last_ping = cur_time()
def addr_to_ip_and_port(self, addr):
ip = struct.unpack("@L", socket.inet_pton(socket.AF_INET, addr[0]))[0]
port = addr[1]
return "%08x" % (ip), port
def handle_packet(self, pkt):
if pkt.type == CONNECT_TO_BROKER:
self.key = pkt.get_key()
self.name = pkt.get_name()
self.is_master = pkt.is_master()
# If an old Frodo tries to connect, give a helpful message
if pkt.version != FRODO_NETWORK_PROTOCOL_VERSION:
lp = ListPeersPacket(pkt.version)
lp.add_peer(DummyPeer("Your frodo is too old."))
lp.add_peer(DummyPeer("download a new version at"))
lp.add_peer(DummyPeer("http://www.c64-network.org"))
log_info("Version too old, sending upgrade notice to %s:%d" %
(self.addr[0], self.addr[1]) )
self.send_packet(lp.marshal())
return
self.avatar = pkt.get_avatar()
self.region = pkt.get_region()
if self.region != REGION_UNKNOWN and self.country == "Unknown location":
try:
self.country = region_to_str[self.region]
except KeyError, e:
self.country = "Unknown location"
self.screenshot_key = pkt.get_screenshot_key() + self.data_key
self.srv.log_connection(self.name, self.country)
# Send list of peers if this is not a master
registered_data = []
lp = ListPeersPacket()
for peer in self.srv.waiting_peers.itervalues():
if peer == self:
continue
# Don't add peers which haven't sent their screenshots yet
# (yes, it's quite unlikely, but anyway)
try:
entry = self.srv.data_store.get_entry(peer.screenshot_key)
registered_data.append(entry)
except KeyError, e:
log_info("Peer %s hasn't sent it's screenshot yet: %s" % (peer.name, str(e)))
continue
lp.add_peer(peer)
# First send the registry data
for entry in registered_data:
rp = RegisterDataPacket(entry.get_key(), entry.get_metadata(), entry.get_data())
self.send_packet(rp.marshal())
# And send the packet to this peer
log_info("Sending list of peers (%d) to %s:%d" % (lp.n_peers,
self.addr[0], self.addr[1]) )
self.send_packet(lp.marshal())
# Send all current messages
for msg in self.srv.messages:
self.send_packet(msg.marshal())
if pkt.type == REGISTER_DATA:
# Save screenshot (maybe) if this is the screenshot key
entry = pkt.get_entry()
self.srv.data_store.add_entry(self, entry.get_key() + self.data_key, entry)
if entry.get_key() == self.screenshot_key:
try:
which = self.srv.next_image_nr
self.srv.next_image_nr = (self.srv.next_image_nr + 1) % 9
f = open("%s%d.png" % (self.srv.image_dir, which), "w")
f.write(entry.get_data())
f.close()
except Exception, e:
log_info("Could not convert image data" + str(e))
if pkt.type == SELECT_PEER:
peer = self.srv.get_peer_by_id( pkt.get_id() )
# Tell the peer that we have connected
lp = ListPeersPacket( flags = NETWORK_UPDATE_LIST_PEERS_IS_CONNECT )
lp.add_peer(self)
log_info("Sending list of peers for peer selected to %s:%d" % (
self.addr[0], self.addr[1]))
peer.send_packet( lp.marshal() )
# These two are no longer needed
self.srv.make_peer_active(peer)
self.srv.make_peer_active(self)
if pkt.type == ACK:
self.last_ping = cur_time()
if pkt.type == TEXT_MESSAGE:
self.srv.enqueue_message(self, pkt)
def seconds_since_last_ping(self):
now = cur_time()
return now - self.last_ping
def send_packet(self, data):
all_data = data + StopPacket().marshal()
for i in range(0, len(all_data), 4096):
cur = all_data[i : min(i + 4096, len(all_data))]
self.srv.socket.sendto(cur,
0, self.addr)
def __str__(self):
return '%s:%d "%s" %d %d' % (self.public_ip, self.public_port,
self.name, self.key, self.is_master)
class BrokerPacketHandler(SocketServer.DatagramRequestHandler):
def get_packets_from_data(self, data):
pkts = []
off = 0
while True:
if off >= len(data):
break
magic = struct.unpack(">H", data[off:off + 2])[0]
type = struct.unpack(">H", data[off + 2:off + 4])[0]
size = struct.unpack(">L", data[off + 4:off + 8])[0]
if type == STOP:
break
pkt_data = data[off:off + size]
off = off + size
if magic != FRODO_NETWORK_MAGIC:
raise Exception("Packet magic does not match: %4x vs %4x\n" % (magic,
FRODO_NETWORK_MAGIC) )
try:
out = packet_class_by_type[type]()
out.demarshal_from_data(pkt_data)
pkts.append(out)
except KeyError, e:
raise Exception("Unknown packet type %d" % (type))
return pkts
def handle(self):
srv = self.server
data = self.rfile.read()
try:
pkts = self.get_packets_from_data(data)
except Exception, e:
log_error("Broken packets: %s" % e)
return
for pkt in pkts:
# Log received packets (except ping ACKs to avoid filling the server)
if pkt.get_type() != ACK:
t = pkt.get_type()
s = "%d" % (t)
try:
s = pkt_type_to_str[t]
except KeyError, e:
pass
log_info("Received packet %s from %s:%d" % (s, self.client_address[0],
self.client_address[1]))
peer = srv.get_peer(self.client_address)
# Handle disconnects by removing the peer and ignoring the rest
if pkt.get_type() == DISCONNECT:
log_info("Peer disconnected, removing")
self.server.remove_peer(peer)
return
try:
peer.handle_packet(pkt)
except Exception, e:
# Sends crap, let's remove it
log_error("Handling packets failed, removing peer: %s" % e)
srv.remove_peer(peer)
class Broker(SocketServer.UDPServer):
def __init__(self, host, req_handler, ip2c, stat_data, stat_html, image_dir):
SocketServer.UDPServer.__init__(self, host, req_handler)
# Instead of setsockopt( ... REUSEADDR ... )
self.allow_reuse_address = True
self.peers = {}
self.peers_by_id = {}
self.waiting_peers = {}
self.active_peers = {}
self.data_store = DataStore()
self.id = 0
self.ping_seq = 0
self.ip2c = ip2c
self.messages = []
self.stat_html = stat_html
self.stat_data = stat_data
self.image_dir = image_dir
self.data_key = DATA_KEY_RANGE
self.next_image_nr = random.randrange(0,9)
stats.load(self.stat_data)
try:
stats.generate_html(self.stat_html)
except:
# Don't care if it fails
pass
def get_data_key(self):
out = self.data_key
self.data_key = (self.data_key + DATA_KEY_RANGE) & 0xffffffff
# Start at DATA_KEY_RANGE
if self.data_key < DATA_KEY_RANGE:
self.data_key = self.data_key + DATA_KEY_RANGE
return out
def log_connection(self, who, country):
stats.add_connection(who, country)
stats.update_peer_nr(len(self.waiting_peers), len(self.active_peers))
try:
stats.save(self.stat_data)
except Exception, e:
log_error("saving stats failed with %s" % str(e) )
try:
stats.generate_html(self.stat_html)
except Exception, e:
log_error("generating HTML failed with %s" % str(e) )
def send_data(self, dst, data):
self.socket.sendto(data, dst)
def get_peer(self, key):
"Return the peer for a certain key, or a new one if it doesn't exist"
try:
peer = self.peers[key]
except KeyError, e:
peer = Peer(key, self, self.id)
self.peers[key] = peer
self.waiting_peers[key] = peer
self.peers_by_id[self.id] = peer
self.id = self.id + 1
return peer
def get_peer_by_id(self, id):
return self.peers_by_id[id]
def get_peer_by_name_key(self, name, key):
for k,v in self.peers.iteritems():
if name == v.get_name() and key == v.get_key():
return v
return None
def enqueue_message(self, sending_peer, message):
# Store last 10 messages
self.messages = self.messages[-9:] + [message]
all_msgs = []
for addr, peer in self.peers.iteritems():
all_msgs.append(message.message)
if peer != sending_peer:
peer.send_packet(message.marshal())
stats.set_messages(all_msgs)
try:
stats.generate_html(self.stat_html)
except Exception, e:
pass
def dequeue_old_messages(self):
now = time.mktime(time.localtime())
to_delete = 0
all_msgs = []
for msg in self.messages:
diff = now - msg.get_timestamp()
# Older than one day?
if diff > 24 * 60 * 60:
to_delete = to_delete + 1
log_info("Deleting old message")
self.messages = self.messages[ to_delete : ]
for msg in self.messages:
all_msgs.append(msg.message)
stats.set_messages(all_msgs)
try:
stats.generate_html(self.stat_html)
except Exception, e:
pass
def ping_all_peers(self):
"""Ping all peers (to see that they are alive)"""
for k,v in self.peers.iteritems():
p = PingAckPacket()
p.set_seq(self.ping_seq)
v.send_packet( p.marshal() )
self.ping_seq = self.ping_seq + 1
def make_peer_active(self, peer):
try:
del self.waiting_peers[ peer.addr ]
self.active_peers[ peer.addr ] = peer
except Exception, e:
log_error("Moving peer %s to active failed: %s" % (str(peer.addr), str(e)))
stats.update_peer_nr(len(self.waiting_peers), len(self.active_peers))
try:
stats.generate_html(self.stat_html)
except Exception, e:
pass
def remove_peer(self, peer):
try:
del self.peers[ peer.addr ]
del self.peers_by_id[ peer.id ]
# The peer is on one of these two lists
if self.active_peers.has_key( peer.addr ):
del self.active_peers[peer.addr]
if self.waiting_peers.has_key( peer.addr ):
del self.waiting_peers[peer.addr]
self.data_store.remove_peer(peer)
stats.update_peer_nr(len(self.waiting_peers), len(self.active_peers))
stats.generate_html(self.stat_html)
except Exception, e:
log_error("Could not remove %s (probably wrong version): %s" %
(str(peer.addr), str(e)))
def ping_thread_fn(broker, time_to_sleep):
"""Run as a separate thread to ping all peers"""
while True:
try:
broker.dequeue_old_messages()
broker.ping_all_peers()
time.sleep( time_to_sleep )
# Remove inactive peers
rp = []
for peer in broker.peers.itervalues():
if peer.seconds_since_last_ping() > 15:
rp.append(peer)
for peer in rp:
log_info("Peer %s:%d has been inactive for %d seconds, removing" % (peer.addr[0],
peer.addr[1],
peer.seconds_since_last_ping()))
broker.remove_peer(peer)
except Exception, e:
print e
# Some of the Frodo network packets. There are more packets, but these
# are not interesting to the broker (and shouldn't be sent there either!)
packet_class_by_type = {
CONNECT_TO_BROKER : ConnectToBrokerPacket,
SELECT_PEER : SelectPeerPacket,
REGISTER_DATA : RegisterDataPacket,
DISCONNECT : DisconnectPacket,
TEXT_MESSAGE : TextMessagePacket,
ACK : PingAckPacket,
}
def usage():
print "Usage: network-broker stat-data-file stat-html-file image-dir"
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) != 4:
usage()
random.seed(time.time())
ip2c = ip2country.IP2Country(verbose=0)
syslog.openlog("frodo")
log_info("Starting Frodo network broker", True)
broker = Broker( ("", 46214), BrokerPacketHandler,
ip2c, sys.argv[1], sys.argv[2], sys.argv[3])
thread.start_new_thread(ping_thread_fn, (broker, 5))
broker.serve_forever()