home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- import logging
- import random
- import sys
- import struct
- import uuid
- import traceback
- import util
- from util import callsback
- from util.primitives.funcs import get
- from util.Events import EventMixin
-
- try:
- from CStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
-
- log = logging.getLogger('msn.p2p.data')
-
- flagged = lambda v, f: f & v == f
-
- randid = lambda : random.randint(4, sys.maxint // 2 - 5)
-
- class Flags:
- names = {
- 0: 'none',
- 1: 'sync',
- 2: 'ack',
- 4: 'wait',
- 8: 'err',
- 32: 'data',
- 64: 'byea',
- 128: 'byem',
- 16777264: 'file',
- 256: 'dchs' }
- NONE = 0
- ONE = 1
- UNKNOWN = ONE
- SYNC = ONE
- ACK = 2
- WAIT = 4
- ERROR = 8
- DATA = 32
- BYEACK = 64
- BYEMSG = 128
- FILE = 16777264
- HANDSHAKE = 256
- DCHS = HANDSHAKE
-
-
- class Header(util.packable.Packable):
- fmt = ('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q')
- byteorder = '<'
-
-
- class P2PMessage(object):
-
- def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
- self.sender = get(sender, 'name', sender)
- self.recipient = get(recipient, 'name', recipient)
- self.content = content
- if acked_msg_id is None:
- acked_msg_id = randid()
-
- self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
- self.app_id = self.footer = app_id
- self.transferred = 0
- self._complete = False
-
-
- def reset(self):
- if self.content is not None:
-
- try:
- self.content.seek(0)
- except ValueError:
- pass
- except:
- None<EXCEPTION MATCH>ValueError
-
-
- None<EXCEPTION MATCH>ValueError
-
-
- def write(self, data):
- self.content.write(data)
- self.transferred = self.content.tell()
- if not self._complete:
- self._complete = self.transferred == self.size
-
-
-
- def seek(self, position):
- self.content.seek(position)
-
-
- def read(self, max_size):
- if self.content is not None:
- if self.content.closed:
- log.error('Read called on a message with closed content! wtf.')
- return None
- data = self.content.read(max_size)
- self.transferred = self.content.tell()
- if not self._complete:
- self._complete = self.transferred == self.size
-
- else:
- data = ''
- self._complete = True
- return data
-
-
- def tell(self):
- return self.transferred
-
-
- def size(self):
- return self.header.total
-
- size = property(size)
-
- def complete(self):
- return self._complete
-
- complete = property(complete)
-
- def __hash__(self):
- return hash((self.sender, self.recipient, self.header.pack()))
-
-
- def __repr__(self):
- content = self.content
- if isinstance(content, StringIO):
- content = content.getvalue()
-
- contentstr = ''
- if content:
-
- try:
- contentstr = 'content=%r' % content[:30]
- except Exception:
- pass
- except:
- None<EXCEPTION MATCH>Exception
-
-
- None<EXCEPTION MATCH>Exception
- return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
-
-
-
- class P2PTransport(EventMixin):
- events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
-
- def __init__(self, client):
- EventMixin.__init__(self)
- client._p2p_manager._register_transport(self)
- self.p2p_clients = 0
-
-
- def p2p_peers(self):
- raise NotImplementedError
-
- p2p_peers = property(p2p_peers)
-
- def p2p_rating(self):
- raise NotImplementedError
-
- p2p_rating = property(p2p_rating)
-
- def p2p_max_msg_size(self):
- raise NotImplementedError
-
- p2p_max_msg_size = property(p2p_max_msg_size)
-
- def p2p_send(self, data, callback = None):
- raise NotImplementedError
-
- p2p_send = callsback(p2p_send)
-
- def p2p_overhead(self):
- raise NotImplementedError
-
- p2p_overhead = property(p2p_overhead)
-
- def build_data(self, header, body, footer):
- raise NotImplementedError
-
-
- def __repr__(self):
- if hasattr(self, 'state'):
- more_info = ' state=%r' % self.state
- else:
- more_info = ''
- return '<%s p2pclients=%r id=%d%s>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self), more_info)
-
-
-
- class P2PManager(EventMixin):
- events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
-
- def register_bridge(self, name, cls):
- self.bridges[name] = cls
-
-
- def unregister_bridge(self, name):
- return self.bridges.pop(name, None) is not None
-
-
- def get_bridge_names(self):
- return self.bridges.keys()
-
-
- def get_bridge_class(self, name):
- return self.bridges.get(name, None)
-
-
- def __init__(self, client):
- EventMixin.__init__(self)
- self.client = client
- self.bridges = { }
- self._transports = []
- self._incoming = { }
- self._outgoing = { }
- self._sent = { }
- self._last_acked = None
- self.sort_transports()
-
-
- def close_all(self):
- for transport in self._transports[:]:
-
- try:
- log.info('Disconnecting %r', transport)
- transport.Disconnect()
- continue
- traceback.print_exc()
- continue
-
-
-
-
- def sort_transports(self):
- self._best = { }
- for transport in self._transports:
- rating = transport.p2p_rating
- for peer in transport.p2p_peers:
- if peer in self._best:
- prev_rating = self._best[peer].p2p_rating
- if rating > prev_rating:
- self._best[peer] = transport
-
- rating > prev_rating
- self._best[peer] = transport
-
-
-
-
- def _register_transport(self, transport):
-
- try:
- v = transport._registered
- except AttributeError:
- v = False
-
- if v:
- log.info('Transport was already registered. Returning from register.')
- return None
- bind = transport.bind
- bind('contacts_changed', self.transport_sorter)
- bind('recv_data', self._on_recv_data)
- bind('send_data', self._on_send_data)
- self._transports.append(transport)
- transport._registered = True
- self.sort_transports()
-
-
- def _unregister_transport(self, transport):
-
- try:
- v = transport._registered
- except AttributeError:
- v = False
-
- if not v:
- log.info('Transport was not registered. Returning from unregister.')
- return None
- log.debug('P2PManager removing transport %r', transport)
- unbind = transport.unbind
- unbind('contacts_changed', self.transport_sorter)
- unbind('recv_data', self._on_recv_data)
- unbind('send_data', self._on_send_data)
- transport._registered = False
- self._transports.remove(transport)
- self.sort_transports()
-
-
- def transport_sorter(self, *a):
- self.sort_transports()
-
-
- def get_best(self, peer, callback = None):
-
- try:
- return self._best[peer]
- except KeyError:
- log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
- return self.client._get_default_p2p_transport(peer, callback = callback)
-
-
- get_best = callsback(get_best)
-
- def _should_notify_send(self):
- return True
-
- try:
- self.notify_threshold
- except AttributeError:
- self.notify_threshold = 5000
-
- setattr(self, '_progress_notify_count', getattr(self, '_progress_notify_count', 0) + 1)
- return self._progress_notify_count % self.notify_threshold == 0
-
-
- def _on_send_data(self):
- if self._should_notify_send():
- self.event('send_data')
-
-
-
- def _on_recv_data(self, transport, sender, data, has_footer = True):
-
- try:
- (header, data) = Header.unpack(data)
- except Exception:
- _e = None
- print repr(data)
- raise
-
- if has_footer:
- footer = struct.unpack('>L', data[-4:])[0]
- data = data[:-4]
- else:
- footer = 0
- if footer == 0 and data == '\x00\x00\x00\x00':
- footer = 1
-
-
- try:
- pass
- except:
- traceback.print_exc()
- raise
-
- if header.total == header.offset + header.length:
- log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
-
- if flagged(header.flags, Flags.ACK):
- self._last_acked = header.msgid_ack
- return self._process_ack(header.msgid_ack)
- if flagged(header.flags, Flags.ERROR):
- log.info('Got binary transport error')
- if header.msgid_ack in self._outgoing:
- self._outgoing[header.msgid_ack].on_done()
-
- self.event('recv_error')
- return None
- if flagged(header.flags, Flags.SYNC):
-
- try:
- sent = self._outgoing[header.msgid_ack]
- except Exception:
- flagged(header.flags, Flags.ERROR)
- _e = flagged(header.flags, Flags.ERROR)
- flagged(header.flags, Flags.ACK)
- log.info("can't find old message with msgid = %r", header.msgid_ack)
- except:
- flagged(header.flags, Flags.ERROR)
-
- log.error('P2P sync received: %s', list(header))
- log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
- sent.msg.seek(max(header.total_ack - sent.msg.header.length, 0))
- return None
- if flagged(header.flags, Flags.HANDSHAKE):
- log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
- their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
- if not transport.info.peer.nonce_plain:
- pass
- if not transport.info.peer.nonce_hashed:
- pass
- flagged(header.flags, Flags.SYNC)(log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.info.peer.nonce_plain is None else '', their_nonce, their_nonce)
- log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.info.local.nonce_plain, transport.info.local.nonce_hashed)
- if transport.info.peer.nonce_plain is None:
- transport.got_nonce(their_nonce, 'plain')
- elif transport.info.peer.nonce_hashed is None:
- transport.got_nonce(their_nonce, 'hashed')
- else:
- log.info('Had all the nonces already. (How did that happen?)')
- transport._send_nonce(header.msgid, header.msgid_ack)
- transport.event('on_ready')
- return None
- if flagged(header.flags, Flags.BYEACK):
- log.info('Got ack for BYE message. Going to send waiting flags')
- elif flagged(header.flags, Flags.WAIT):
- log.critical("Got waiting message. Here's the header: %r", list(header))
- msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
- self.send_message(msg)
- return None
- flagged(header.flags, Flags.SYNC)
- id = header.msgid
-
- try:
- if header.offset != msg.tell():
- pass
-
- if flagged(header.flags, Flags.DATA):
- msg.seek(header.offset)
- else:
- log.warning("Did not seek() message because it's not a data message (msg = %r). nullbytes?", msg)
- msg.write(data)
- except:
- flagged(header.flags, Flags.ACK) if id not in self._incoming else flagged(header.flags, Flags.ERROR)
- self.event('recv_error')
- return None
-
- self.event('recv_data')
-
-
- def _process_ack(self, id):
- if id in self._sent:
- msg = self._sent.pop(id)
- log.info('Got ack for message, NOT resetting it: %r', msg)
- self.event('send_msg_end', msg)
- else:
- log.error('got ack for unknown message')
-
-
- def _process_new(self, sender, header, footer):
- content = self.client.slp_call_master._create_message_content(header, footer)
- msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
- self._incoming[header.msgid] = msg
- self.event('recv_msg_start', msg)
- return msg
-
-
- def _process_msg(self, header, footer, msg):
- log.info('P2P message complete (%s)', msg)
- self.send_ack(header, footer, msg)
- msg.reset()
-
- try:
- self.event('recv_msg_end', self, msg)
- except Exception:
- _e = None
- traceback.print_stack()
- traceback.print_exc()
-
- del self._incoming[header.msgid]
-
-
- def send_ack(self, header, footer, msg):
-
- try:
- flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
- log.info('Going to send ack with flags %s', flags)
-
- try:
- id = self._last_acked + 1
- except:
- id = randid()
- finally:
- self._last_acked = None
-
- sender = get(msg, 'sender')
- ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
- self.send_message(ack_msg)
- except Exception:
- e = None
- traceback.print_exc()
- raise e
-
-
-
- def send_with_producer(self, msg, callback = None):
- if msg.header.msgid not in self._outgoing:
- log.warning('Sending producer for msg with id %d', msg.header.msgid)
- self.event('send_msg_start', msg)
- prod = self.make_producer(msg, callback = callback)
- self._outgoing[msg.header.msgid] = prod
- self._send_producer(prod)
- else:
- log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
- return None
- return msg.header.msgid not in self._outgoing
-
- send_with_producer = callsback(send_with_producer)
- send_message = send_with_producer
-
- def _send_producer(self, prod):
-
- try:
- if not prod.transport._registered:
- self._register_transport(prod.transport)
- except Exception:
- e = None
- traceback.print_exc()
- raise e
-
-
- try:
- prod.push()
- except TypeError:
- e = None
- log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
- traceback.print_exc()
-
-
-
- def make_producer(self, msg, callback = None):
- log.info('P2PData making a producer for %r', msg)
- prod = P2PProducer(self, msg, callback.success, callback.error)
- return prod
-
- make_producer = callsback(make_producer)
-
- def _unqueue(self, msg):
- log.info('unqueuing %r', msg)
- del self._outgoing[msg.header.msgid]
- if not flagged(msg.header.flags, Flags.ACK):
- self._sent[msg.header.msgid] = msg
-
-
-
-
- class P2PProducer(object):
-
- def __init__(self, master, msg, whendone = None, on_error = None):
- self.master = master
- self.msg = msg
- self._transport = self.master.get_best(self.msg.recipient)
- self._transport.p2p_clients += 1
- self._finished = False
-
- def donothing():
- pass
-
- self._oncomplete = self._transport if whendone is not None else donothing
- self._onerror = None if on_error is not None else donothing
- self.master.bind('send_msg_end', self.on_ack)
-
-
- def transport(self):
- if self._finished:
- raise AttributeError
- self._finished
- self._transport.p2p_clients -= 1
- old_xport = self._transport
- self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
- self._transport.p2p_clients += 1
- return self._transport
-
- transport = property(transport)
-
- def recipient(self):
- return self.msg.recipient
-
- recipient = property(recipient)
-
- def more(self):
- if self._finished:
- log.info('Message %r complete or cancelled, returning None', self.msg)
- return None
- msg = self.msg
- transport = self.transport
- msg.header.msgid_ack = getattr(transport, '_super_secret_msgid', msg.header.msgid_ack)
- (header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
- if header is None and body is None and footer is None:
- if msg.complete:
- log.warning('Message %r complete. Returning None')
- else:
- log.error('Message %r is not complete but had no more data', msg)
- self.on_done()
- return None
- data = transport.build_data(header, body, footer)
- transport.event('send_data')
- return data
-
-
- def on_ack(self, msg):
- if msg.header.msgid == self.msg.header.msgid:
- log.info('Finished producer calling %r', self._oncomplete)
- self._oncomplete()
- self.master.unbind('send_msg_end', self.on_ack)
-
-
-
- def on_done(self):
- log.info('Producer finished')
- self._finished = True
- self.master._unqueue(self.msg)
- self._transport.p2p_clients -= 1
- del self._transport
-
-
- def push(self):
- transport = self.transport
- transport.push_with_producer(self, error = self._onerror)
-
-
- def __repr__(self):
- return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._finished else 'transport=%r' % self._transport)
-
-
-
- def _next_msg(msg, size):
- header = msg.header
- header.offset = msg.transferred
- body = msg.read(size)
- if body is None:
- return (None, None, None)
- header.length = len(body)
- return (header.pack(), body, struct.pack('>L', msg.footer))
-
-