home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- import logging
- import random
- import sys
- import struct
- import uuid
- import util
- from util import callsback, get, call_later
- from util.Events import EventMixin
-
- try:
- from CStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
-
- log = logging.getLogger('msn.p2p.data')
-
- flagged = lambda v, f: f & v == f
-
- randid = lambda : random.randint(4, sys.maxint // 2 - 5)
-
- class Flags:
- names = {
- 0: 'none',
- 1: 'sync',
- 2: 'ack',
- 4: 'wait',
- 8: 'err',
- 32: 'data',
- 64: 'byea',
- 128: 'byem',
- 16777264: 'file',
- 256: 'dchs' }
- NONE = 0
- ONE = 1
- UNKNOWN = ONE
- SYNC = ONE
- ACK = 2
- WAIT = 4
- ERROR = 8
- DATA = 32
- BYEACK = 64
- BYEMSG = 128
- FILE = 16777264
- HANDSHAKE = 256
- DCHS = HANDSHAKE
-
- Header = util.new_packable(('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q'), byteorder = '<')
-
- class P2PMessage(object):
-
- def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
- self.sender = get(sender, 'name', sender)
- self.recipient = get(recipient, 'name', recipient)
- self.content = content
- if acked_msg_id is None:
- acked_msg_id = randid()
-
- self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
- self.app_id = self.footer = app_id
- self.transferred = 0
-
-
- def reset(self):
- if self.content is not None:
-
- try:
- self.content.seek(0)
- except ValueError:
- pass
- except:
- None<EXCEPTION MATCH>ValueError
-
-
- None<EXCEPTION MATCH>ValueError
-
-
- def write(self, data):
- self.content.write(data)
- self.transferred = self.content.tell()
-
-
- def seek(self, position):
- self.content.seek(position)
-
-
- def read(self, max_size):
- if self.content is not None:
- if self.content.closed:
- log.error('Read called on a message with closed content! wtf.')
- return None
-
- data = self.content.read(max_size)
- self.transferred = self.content.tell()
- else:
- data = ''
- return data
-
-
- def tell(self):
- return self.transferred
-
-
- def size(self):
- return self.header.total
-
- size = property(size)
-
- def complete(self):
- return self.transferred == self.size
-
- complete = property(complete)
-
- def __hash__(self):
- return hash((self.sender, self.recipient, self.header.pack()))
-
-
- def __repr__(self):
- content = self.content
- if isinstance(content, StringIO):
- content = content.getvalue()
-
- contentstr = ''
- if content:
-
- try:
- contentstr = 'content=%r' % content[:30]
- except Exception:
- pass
- except:
- None<EXCEPTION MATCH>Exception
-
-
- None<EXCEPTION MATCH>Exception
- return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
-
-
-
- class P2PTransport(EventMixin):
- events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
-
- def __init__(self, client):
- EventMixin.__init__(self)
- client._p2p_manager._register_transport(self)
- self.p2p_clients = 0
-
-
- def p2p_peers(self):
- raise NotImplementedError
-
- p2p_peers = property(p2p_peers)
-
- def p2p_rating(self):
- raise NotImplementedError
-
- p2p_rating = property(p2p_rating)
-
- def p2p_max_msg_size(self):
- raise NotImplementedError
-
- p2p_max_msg_size = property(p2p_max_msg_size)
-
- def p2p_send(self, data, callback = None):
- raise NotImplementedError
-
- p2p_send = callsback(p2p_send)
-
- def p2p_overhead(self):
- raise NotImplementedError
-
- p2p_overhead = property(p2p_overhead)
-
- def build_data(self, header, body, footer):
- raise NotImplementedError
-
-
- def __repr__(self):
- if hasattr(self, 'state'):
- more_info = ' state=%r' % self.state
- else:
- more_info = ''
- return '<%s p2pclients=%r id=%d%s>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self), more_info)
-
-
-
- class P2PManager(EventMixin):
- events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
-
- def __init__(self, client):
- EventMixin.__init__(self)
- self.client = client
- self._transports = []
- self._P2PManager__incoming = { }
- self._P2PManager__outgoing = { }
- self._P2PManager__sent = { }
- self._P2PManager__last_acked = None
- self.sort_transports()
-
-
- def close_all(self):
- for transport in self._transports:
-
- try:
- transport.Disconnect()
- continue
- import traceback
- traceback.print_exc()
- continue
-
-
-
-
- def _outgoing(self):
- return self._P2PManager__outgoing
-
- _outgoing = property(_outgoing)
-
- def sort_transports(self):
- self._best = { }
- for transport in self._transports:
- rating = transport.p2p_rating
- for peer in transport.p2p_peers:
- if peer in self._best:
- prev_rating = self._best[peer].p2p_rating
- if rating > prev_rating:
- self._best[peer] = transport
-
- rating > prev_rating
- self._best[peer] = transport
-
-
-
-
- def _register_transport(self, transport):
-
- try:
- v = transport._P2PManager__registered
- except AttributeError:
- v = False
-
- if v:
- log.info('Transport was already registered. Returning from register.')
- return None
-
- bind = transport.bind
- bind('contacts_changed', self.transport_sorter)
- bind('recv_data', self._on_recv_data)
- bind('send_data', self._on_send_data)
- self._transports.append(transport)
- transport._P2PManager__registered = True
- self.sort_transports()
-
-
- def _unregister_transport(self, transport):
-
- try:
- v = transport._P2PManager__registered
- except AttributeError:
- v = False
-
- if not v:
- log.info('Transport was not registered. Returning from unregister.')
- return None
-
- log.debug('P2PManager removing transport %r', transport)
- unbind = transport.unbind
- unbind('contacts_changed', self.transport_sorter)
- unbind('recv_data', self._on_recv_data)
- unbind('send_data', self._on_send_data)
- transport._P2PManager__registered = False
- self._transports.remove(transport)
- self.sort_transports()
-
-
- def transport_sorter(self, *a):
- self.sort_transports()
-
-
- def get_best(self, peer, callback = None):
-
- try:
- return self._best[peer]
- except KeyError:
- log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
- return self.client._get_default_p2p_transport(peer, callback = callback)
-
-
- get_best = callsback(get_best)
-
- def _on_send_data(self):
- self.event('send_data')
-
-
- def _on_recv_data(self, transport, sender, data, has_footer = True):
-
- try:
- header = Header.unpack(data[:48])
- data = data[48:]
- except Exception:
- e = None
- print repr(data)
- raise
-
- log.debug('P2PManager got message(flags=%d) from %r via %r', header.flags, sender, transport)
- if has_footer:
- footer = struct.unpack('>L', data[-4:])[0]
- data = data[:-4]
- else:
- footer = 0
- if footer == 0 and data == '\x00\x00\x00\x00':
- footer = 1
-
-
- try:
- pass
- except:
- import traceback
- traceback.print_exc()
- raise
-
- if header.total == header.offset + header.length:
- log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
-
- if flagged(header.flags, Flags.ACK):
- self._P2PManager__last_acked = header.msgid_ack
- return self._process_ack(header.msgid_ack)
- elif flagged(header.flags, Flags.ERROR):
- log.info('Got binary transport error')
- if header.msgid_ack in self._P2PManager__outgoing:
- self._P2PManager__outgoing[header.msgid_ack].on_done()
-
- self.event('recv_error')
- return None
- elif flagged(header.flags, Flags.SYNC):
-
- try:
- sent = self._P2PManager__outgoing[header.msgid_ack]
- except Exception:
- e = None
-
- log.error('P2P sync received: %s', list(header))
- log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
- sent.msg.seek(header.total_ack - sent.msg.header.length)
- return None
- elif flagged(header.flags, Flags.HANDSHAKE):
- log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
- their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
- msn_hash = msn_hash
- import SLPCalls
- if not transport.in_key:
- pass
- if not transport.in_hkey:
- pass
- None(log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.in_key is None else '', their_nonce, their_nonce)
- log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.out_key, transport.out_hkey)
- if transport.in_key is None:
- transport.in_key = their_nonce
- elif transport.in_hkey is None:
- transport.in_hkey = their_nonce
- else:
- log.info('Had all the nonces already. (How did that happen?)')
- transport._send_nonce(header.msgid, header.msgid_ack)
- transport.event('on_ready')
- return None
- elif flagged(header.flags, Flags.BYEACK):
- log.info('Got ack for BYE message. Going to send waiting flags')
- elif flagged(header.flags, Flags.WAIT):
- log.critical("Got waiting message. Here's the header: %r", list(header))
- msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
- self.send_message(msg)
- return None
-
- id = header.msgid
- if id not in self._P2PManager__incoming:
- log.info('Got new message')
- msg = self._process_new(sender, header, footer)
- transport.p2p_clients += 1
- else:
- log.debug('Continuing message with id=(%r)', id)
- msg = self._P2PManager__incoming[id]
-
- try:
- if header.offset != msg.tell():
- log.warning('Header offset does not match file offset')
-
- if flagged(header.flags, Flags.DATA):
- msg.seek(header.offset)
- else:
- log.warning("Did not seek() message because it's not a data message (msg = %r). nullbytes?", msg)
- msg.write(data)
- except:
- self.event('recv_error')
- return None
-
- if msg.complete:
- log.info('Received message')
- transport.p2p_clients -= 1
- self._process_msg(header, footer, msg)
-
- self.event('recv_data')
-
-
- def _process_ack(self, id):
- if id in self._P2PManager__sent:
- msg = self._P2PManager__sent.pop(id)
- log.info('Got ack for message, NOT resetting it: %r', msg)
- self.event('send_msg_end', msg)
- else:
- log.error('got ack for unknown message')
-
-
- def _process_new(self, sender, header, footer):
- content = self.client.slp_call_master._create_message_content(header, footer)
- msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
- self._P2PManager__incoming[header.msgid] = msg
- self.event('recv_msg_start', msg)
- return msg
-
-
- def _process_msg(self, header, footer, msg):
- log.info('P2P message complete (%s)', msg)
- self.send_ack(header, footer, msg)
- msg.reset()
-
- try:
- self.event('recv_msg_end', self, msg)
- except Exception:
- e = None
- import traceback
- traceback.print_stack()
- traceback.print_exc()
-
- del self._P2PManager__incoming[header.msgid]
-
-
- def send_ack(self, header, footer, msg):
-
- try:
- flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
- log.info('Going to send ack with flags %s', flags)
-
- try:
- id = self._P2PManager__last_acked + 1
- except:
- id = randid()
- finally:
- self._P2PManager__last_acked = None
-
- sender = get(msg, 'sender')
- ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
- self.send_message(ack_msg)
- except Exception:
- e = None
- import traceback
- traceback.print_exc()
- raise e
-
-
-
- def send_with_producer(self, msg, callback = None):
- if msg.header.msgid not in self._P2PManager__outgoing:
- log.warning('Sending producer for msg with id %d', msg.header.msgid)
- self.event('send_msg_start', msg)
- prod = self.make_producer(msg, callback = callback)
- self._P2PManager__outgoing[msg.header.msgid] = prod
- self._send_producer(prod)
- else:
- log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
- return None
-
- send_with_producer = callsback(send_with_producer)
- send_message = send_with_producer
-
- def _send_producer(self, prod):
-
- try:
- if not prod.transport._P2PManager__registered:
- self._register_transport(prod.transport)
- except Exception:
- e = None
- import traceback
- traceback.print_exc()
- raise e
-
-
- try:
- prod.push()
- except TypeError:
- e = None
- log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
- import traceback
- traceback.print_exc()
-
-
-
- def make_producer(self, msg, callback = None):
- log.info('P2PData making a producer for %r', msg)
- prod = P2PProducer(self, msg, callback.success, callback.error)
- return prod
-
- make_producer = callsback(make_producer)
-
- def _unqueue(self, msg):
- del self._P2PManager__outgoing[msg.header.msgid]
- if not flagged(msg.header.flags, Flags.ACK):
- self._P2PManager__sent[msg.header.msgid] = msg
-
-
-
-
- class P2PProducer(object):
-
- def __init__(self, master, msg, whendone = None, on_error = None):
- self.master = master
- self.msg = msg
- self._transport = self.master.get_best(self.msg.recipient)
- self._transport.p2p_clients += 1
- self._P2PProducer__finished = False
-
- def donothing():
- pass
-
- self._oncomplete = self._transport if whendone is not None else donothing
- self._onerror = None if on_error is not None else donothing
- self.master.bind('send_msg_end', self.on_ack)
-
-
- def transport(self):
- if self._P2PProducer__finished:
- raise AttributeError
-
- self._transport.p2p_clients -= 1
- old_xport = self._transport
- self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
- self._transport.p2p_clients += 1
- return self._transport
-
- transport = property(transport)
-
- def recipient(self):
- return self.msg.recipient
-
- recipient = property(recipient)
-
- def more(self):
- if self._P2PProducer__finished:
- log.info('Message %r complete or cancelled, returning None', self.msg)
- return None
-
- msg = self.msg
- transport = self.transport
- if hasattr(transport, '_super_secret_msgid'):
- msg.header.msgid_ack = transport._super_secret_msgid
-
- (header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
- if body == body and footer == footer:
- pass
- elif footer == None:
- if not msg.complete:
- log.error('Message %r is not complete but had no more data', msg)
- else:
- log.warning('Message %r complete. Returning None')
- self.on_done()
- return None
-
- data = transport.build_data(header, body, footer)
- transport.event('send_data')
- return data
-
-
- def on_ack(self, msg):
- if msg.header.msgid == self.msg.header.msgid:
- log.info('Finished producer calling %r', self._oncomplete)
- self._oncomplete()
- self.master.unbind('send_msg_end', self.on_ack)
-
-
-
- def on_done(self):
- log.info('Producer finished')
- self._P2PProducer__finished = True
- self.master._unqueue(self.msg)
- self._transport.p2p_clients -= 1
- del self._transport
-
-
- def push(self):
- transport = self.transport
- transport.push_with_producer(self, error = self._onerror)
-
-
- def __repr__(self):
- return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._P2PProducer__finished else 'transport=%r' % self._transport)
-
-
-
- def _next_msg(msg, size):
- header = msg.header
- header.offset = msg.transferred
- body = msg.read(size)
- if body is None:
- return (None, None, None)
-
- blen = len(body)
- header.length = blen
- return (header.pack(), body, struct.pack('>L', msg.footer))
-
-