home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2012 January / maximum-cd-2012-01.iso / DiscContents / digsby_setup.exe / lib / msn / P2P / P2PData.pyo (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2011-10-05  |  19.9 KB  |  626 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.6)
  3.  
  4. import logging
  5. import random
  6. import sys
  7. import struct
  8. import uuid
  9. import traceback
  10. import util
  11. from util import callsback
  12. from util.primitives.funcs import get
  13. from util.Events import EventMixin
  14.  
  15. try:
  16.     from CStringIO import StringIO
  17. except ImportError:
  18.     from StringIO import StringIO
  19.  
  20. log = logging.getLogger('msn.p2p.data')
  21.  
  22. flagged = lambda v, f: f & v == f
  23.  
  24. randid = lambda : random.randint(4, sys.maxint // 2 - 5)
  25.  
  26. class Flags:
  27.     names = {
  28.         0: 'none',
  29.         1: 'sync',
  30.         2: 'ack',
  31.         4: 'wait',
  32.         8: 'err',
  33.         32: 'data',
  34.         64: 'byea',
  35.         128: 'byem',
  36.         16777264: 'file',
  37.         256: 'dchs' }
  38.     NONE = 0
  39.     ONE = 1
  40.     UNKNOWN = ONE
  41.     SYNC = ONE
  42.     ACK = 2
  43.     WAIT = 4
  44.     ERROR = 8
  45.     DATA = 32
  46.     BYEACK = 64
  47.     BYEMSG = 128
  48.     FILE = 16777264
  49.     HANDSHAKE = 256
  50.     DCHS = HANDSHAKE
  51.  
  52.  
  53. class Header(util.packable.Packable):
  54.     fmt = ('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q')
  55.     byteorder = '<'
  56.  
  57.  
  58. class P2PMessage(object):
  59.     
  60.     def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
  61.         self.sender = get(sender, 'name', sender)
  62.         self.recipient = get(recipient, 'name', recipient)
  63.         self.content = content
  64.         if acked_msg_id is None:
  65.             acked_msg_id = randid()
  66.         
  67.         self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
  68.         self.app_id = self.footer = app_id
  69.         self.transferred = 0
  70.         self._complete = False
  71.  
  72.     
  73.     def reset(self):
  74.         if self.content is not None:
  75.             
  76.             try:
  77.                 self.content.seek(0)
  78.             except ValueError:
  79.                 pass
  80.             except:
  81.                 None<EXCEPTION MATCH>ValueError
  82.             
  83.  
  84.         None<EXCEPTION MATCH>ValueError
  85.  
  86.     
  87.     def write(self, data):
  88.         self.content.write(data)
  89.         self.transferred = self.content.tell()
  90.         if not self._complete:
  91.             self._complete = self.transferred == self.size
  92.         
  93.  
  94.     
  95.     def seek(self, position):
  96.         self.content.seek(position)
  97.  
  98.     
  99.     def read(self, max_size):
  100.         if self.content is not None:
  101.             if self.content.closed:
  102.                 log.error('Read called on a message with closed content! wtf.')
  103.                 return None
  104.             data = self.content.read(max_size)
  105.             self.transferred = self.content.tell()
  106.             if not self._complete:
  107.                 self._complete = self.transferred == self.size
  108.             
  109.         else:
  110.             data = ''
  111.             self._complete = True
  112.         return data
  113.  
  114.     
  115.     def tell(self):
  116.         return self.transferred
  117.  
  118.     
  119.     def size(self):
  120.         return self.header.total
  121.  
  122.     size = property(size)
  123.     
  124.     def complete(self):
  125.         return self._complete
  126.  
  127.     complete = property(complete)
  128.     
  129.     def __hash__(self):
  130.         return hash((self.sender, self.recipient, self.header.pack()))
  131.  
  132.     
  133.     def __repr__(self):
  134.         content = self.content
  135.         if isinstance(content, StringIO):
  136.             content = content.getvalue()
  137.         
  138.         contentstr = ''
  139.         if content:
  140.             
  141.             try:
  142.                 contentstr = 'content=%r' % content[:30]
  143.             except Exception:
  144.                 pass
  145.             except:
  146.                 None<EXCEPTION MATCH>Exception
  147.             
  148.  
  149.         None<EXCEPTION MATCH>Exception
  150.         return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
  151.  
  152.  
  153.  
  154. class P2PTransport(EventMixin):
  155.     events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
  156.     
  157.     def __init__(self, client):
  158.         EventMixin.__init__(self)
  159.         client._p2p_manager._register_transport(self)
  160.         self.p2p_clients = 0
  161.  
  162.     
  163.     def p2p_peers(self):
  164.         raise NotImplementedError
  165.  
  166.     p2p_peers = property(p2p_peers)
  167.     
  168.     def p2p_rating(self):
  169.         raise NotImplementedError
  170.  
  171.     p2p_rating = property(p2p_rating)
  172.     
  173.     def p2p_max_msg_size(self):
  174.         raise NotImplementedError
  175.  
  176.     p2p_max_msg_size = property(p2p_max_msg_size)
  177.     
  178.     def p2p_send(self, data, callback = None):
  179.         raise NotImplementedError
  180.  
  181.     p2p_send = callsback(p2p_send)
  182.     
  183.     def p2p_overhead(self):
  184.         raise NotImplementedError
  185.  
  186.     p2p_overhead = property(p2p_overhead)
  187.     
  188.     def build_data(self, header, body, footer):
  189.         raise NotImplementedError
  190.  
  191.     
  192.     def __repr__(self):
  193.         if hasattr(self, 'state'):
  194.             more_info = ' state=%r' % self.state
  195.         else:
  196.             more_info = ''
  197.         return '<%s p2pclients=%r id=%d%s>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self), more_info)
  198.  
  199.  
  200.  
  201. class P2PManager(EventMixin):
  202.     events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
  203.     
  204.     def register_bridge(self, name, cls):
  205.         self.bridges[name] = cls
  206.  
  207.     
  208.     def unregister_bridge(self, name):
  209.         return self.bridges.pop(name, None) is not None
  210.  
  211.     
  212.     def get_bridge_names(self):
  213.         return self.bridges.keys()
  214.  
  215.     
  216.     def get_bridge_class(self, name):
  217.         return self.bridges.get(name, None)
  218.  
  219.     
  220.     def __init__(self, client):
  221.         EventMixin.__init__(self)
  222.         self.client = client
  223.         self.bridges = { }
  224.         self._transports = []
  225.         self._incoming = { }
  226.         self._outgoing = { }
  227.         self._sent = { }
  228.         self._last_acked = None
  229.         self.sort_transports()
  230.  
  231.     
  232.     def close_all(self):
  233.         for transport in self._transports[:]:
  234.             
  235.             try:
  236.                 log.info('Disconnecting %r', transport)
  237.                 transport.Disconnect()
  238.             continue
  239.             traceback.print_exc()
  240.             continue
  241.  
  242.         
  243.  
  244.     
  245.     def sort_transports(self):
  246.         self._best = { }
  247.         for transport in self._transports:
  248.             rating = transport.p2p_rating
  249.             for peer in transport.p2p_peers:
  250.                 if peer in self._best:
  251.                     prev_rating = self._best[peer].p2p_rating
  252.                     if rating > prev_rating:
  253.                         self._best[peer] = transport
  254.                     
  255.                 rating > prev_rating
  256.                 self._best[peer] = transport
  257.             
  258.         
  259.  
  260.     
  261.     def _register_transport(self, transport):
  262.         
  263.         try:
  264.             v = transport._registered
  265.         except AttributeError:
  266.             v = False
  267.  
  268.         if v:
  269.             log.info('Transport was already registered. Returning from register.')
  270.             return None
  271.         bind = transport.bind
  272.         bind('contacts_changed', self.transport_sorter)
  273.         bind('recv_data', self._on_recv_data)
  274.         bind('send_data', self._on_send_data)
  275.         self._transports.append(transport)
  276.         transport._registered = True
  277.         self.sort_transports()
  278.  
  279.     
  280.     def _unregister_transport(self, transport):
  281.         
  282.         try:
  283.             v = transport._registered
  284.         except AttributeError:
  285.             v = False
  286.  
  287.         if not v:
  288.             log.info('Transport was not registered. Returning from unregister.')
  289.             return None
  290.         log.debug('P2PManager removing transport %r', transport)
  291.         unbind = transport.unbind
  292.         unbind('contacts_changed', self.transport_sorter)
  293.         unbind('recv_data', self._on_recv_data)
  294.         unbind('send_data', self._on_send_data)
  295.         transport._registered = False
  296.         self._transports.remove(transport)
  297.         self.sort_transports()
  298.  
  299.     
  300.     def transport_sorter(self, *a):
  301.         self.sort_transports()
  302.  
  303.     
  304.     def get_best(self, peer, callback = None):
  305.         
  306.         try:
  307.             return self._best[peer]
  308.         except KeyError:
  309.             log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
  310.             return self.client._get_default_p2p_transport(peer, callback = callback)
  311.  
  312.  
  313.     get_best = callsback(get_best)
  314.     
  315.     def _should_notify_send(self):
  316.         return True
  317.         
  318.         try:
  319.             self.notify_threshold
  320.         except AttributeError:
  321.             self.notify_threshold = 5000
  322.  
  323.         setattr(self, '_progress_notify_count', getattr(self, '_progress_notify_count', 0) + 1)
  324.         return self._progress_notify_count % self.notify_threshold == 0
  325.  
  326.     
  327.     def _on_send_data(self):
  328.         if self._should_notify_send():
  329.             self.event('send_data')
  330.         
  331.  
  332.     
  333.     def _on_recv_data(self, transport, sender, data, has_footer = True):
  334.         
  335.         try:
  336.             (header, data) = Header.unpack(data)
  337.         except Exception:
  338.             _e = None
  339.             print repr(data)
  340.             raise 
  341.  
  342.         if has_footer:
  343.             footer = struct.unpack('>L', data[-4:])[0]
  344.             data = data[:-4]
  345.         else:
  346.             footer = 0
  347.         if footer == 0 and data == '\x00\x00\x00\x00':
  348.             footer = 1
  349.         
  350.         
  351.         try:
  352.             pass
  353.         except:
  354.             traceback.print_exc()
  355.             raise 
  356.  
  357.         if header.total == header.offset + header.length:
  358.             log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
  359.         
  360.         if flagged(header.flags, Flags.ACK):
  361.             self._last_acked = header.msgid_ack
  362.             return self._process_ack(header.msgid_ack)
  363.         if flagged(header.flags, Flags.ERROR):
  364.             log.info('Got binary transport error')
  365.             if header.msgid_ack in self._outgoing:
  366.                 self._outgoing[header.msgid_ack].on_done()
  367.             
  368.             self.event('recv_error')
  369.             return None
  370.         if flagged(header.flags, Flags.SYNC):
  371.             
  372.             try:
  373.                 sent = self._outgoing[header.msgid_ack]
  374.             except Exception:
  375.                 flagged(header.flags, Flags.ERROR)
  376.                 _e = flagged(header.flags, Flags.ERROR)
  377.                 flagged(header.flags, Flags.ACK)
  378.                 log.info("can't find old message with msgid = %r", header.msgid_ack)
  379.             except:
  380.                 flagged(header.flags, Flags.ERROR)
  381.  
  382.             log.error('P2P sync received: %s', list(header))
  383.             log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
  384.             sent.msg.seek(max(header.total_ack - sent.msg.header.length, 0))
  385.             return None
  386.         if flagged(header.flags, Flags.HANDSHAKE):
  387.             log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
  388.             their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
  389.             if not transport.info.peer.nonce_plain:
  390.                 pass
  391.             if not transport.info.peer.nonce_hashed:
  392.                 pass
  393.             flagged(header.flags, Flags.SYNC)(log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.info.peer.nonce_plain is None else '', their_nonce, their_nonce)
  394.             log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.info.local.nonce_plain, transport.info.local.nonce_hashed)
  395.             if transport.info.peer.nonce_plain is None:
  396.                 transport.got_nonce(their_nonce, 'plain')
  397.             elif transport.info.peer.nonce_hashed is None:
  398.                 transport.got_nonce(their_nonce, 'hashed')
  399.             else:
  400.                 log.info('Had all the nonces already. (How did that happen?)')
  401.             transport._send_nonce(header.msgid, header.msgid_ack)
  402.             transport.event('on_ready')
  403.             return None
  404.         if flagged(header.flags, Flags.BYEACK):
  405.             log.info('Got ack for BYE message. Going to send waiting flags')
  406.         elif flagged(header.flags, Flags.WAIT):
  407.             log.critical("Got waiting message. Here's the header: %r", list(header))
  408.             msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
  409.             self.send_message(msg)
  410.             return None
  411.         flagged(header.flags, Flags.SYNC)
  412.         id = header.msgid
  413.         
  414.         try:
  415.             if header.offset != msg.tell():
  416.                 pass
  417.             
  418.             if flagged(header.flags, Flags.DATA):
  419.                 msg.seek(header.offset)
  420.             else:
  421.                 log.warning("Did not seek() message because it's not a data message (msg = %r). nullbytes?", msg)
  422.             msg.write(data)
  423.         except:
  424.             flagged(header.flags, Flags.ACK) if id not in self._incoming else flagged(header.flags, Flags.ERROR)
  425.             self.event('recv_error')
  426.             return None
  427.  
  428.         self.event('recv_data')
  429.  
  430.     
  431.     def _process_ack(self, id):
  432.         if id in self._sent:
  433.             msg = self._sent.pop(id)
  434.             log.info('Got ack for message, NOT resetting it: %r', msg)
  435.             self.event('send_msg_end', msg)
  436.         else:
  437.             log.error('got ack for unknown message')
  438.  
  439.     
  440.     def _process_new(self, sender, header, footer):
  441.         content = self.client.slp_call_master._create_message_content(header, footer)
  442.         msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
  443.         self._incoming[header.msgid] = msg
  444.         self.event('recv_msg_start', msg)
  445.         return msg
  446.  
  447.     
  448.     def _process_msg(self, header, footer, msg):
  449.         log.info('P2P message complete (%s)', msg)
  450.         self.send_ack(header, footer, msg)
  451.         msg.reset()
  452.         
  453.         try:
  454.             self.event('recv_msg_end', self, msg)
  455.         except Exception:
  456.             _e = None
  457.             traceback.print_stack()
  458.             traceback.print_exc()
  459.  
  460.         del self._incoming[header.msgid]
  461.  
  462.     
  463.     def send_ack(self, header, footer, msg):
  464.         
  465.         try:
  466.             flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
  467.             log.info('Going to send ack with flags %s', flags)
  468.             
  469.             try:
  470.                 id = self._last_acked + 1
  471.             except:
  472.                 id = randid()
  473.             finally:
  474.                 self._last_acked = None
  475.  
  476.             sender = get(msg, 'sender')
  477.             ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
  478.             self.send_message(ack_msg)
  479.         except Exception:
  480.             e = None
  481.             traceback.print_exc()
  482.             raise e
  483.  
  484.  
  485.     
  486.     def send_with_producer(self, msg, callback = None):
  487.         if msg.header.msgid not in self._outgoing:
  488.             log.warning('Sending producer for msg with id %d', msg.header.msgid)
  489.             self.event('send_msg_start', msg)
  490.             prod = self.make_producer(msg, callback = callback)
  491.             self._outgoing[msg.header.msgid] = prod
  492.             self._send_producer(prod)
  493.         else:
  494.             log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
  495.             return None
  496.         return msg.header.msgid not in self._outgoing
  497.  
  498.     send_with_producer = callsback(send_with_producer)
  499.     send_message = send_with_producer
  500.     
  501.     def _send_producer(self, prod):
  502.         
  503.         try:
  504.             if not prod.transport._registered:
  505.                 self._register_transport(prod.transport)
  506.         except Exception:
  507.             e = None
  508.             traceback.print_exc()
  509.             raise e
  510.  
  511.         
  512.         try:
  513.             prod.push()
  514.         except TypeError:
  515.             e = None
  516.             log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
  517.             traceback.print_exc()
  518.  
  519.  
  520.     
  521.     def make_producer(self, msg, callback = None):
  522.         log.info('P2PData making a producer for %r', msg)
  523.         prod = P2PProducer(self, msg, callback.success, callback.error)
  524.         return prod
  525.  
  526.     make_producer = callsback(make_producer)
  527.     
  528.     def _unqueue(self, msg):
  529.         log.info('unqueuing %r', msg)
  530.         del self._outgoing[msg.header.msgid]
  531.         if not flagged(msg.header.flags, Flags.ACK):
  532.             self._sent[msg.header.msgid] = msg
  533.         
  534.  
  535.  
  536.  
  537. class P2PProducer(object):
  538.     
  539.     def __init__(self, master, msg, whendone = None, on_error = None):
  540.         self.master = master
  541.         self.msg = msg
  542.         self._transport = self.master.get_best(self.msg.recipient)
  543.         self._transport.p2p_clients += 1
  544.         self._finished = False
  545.         
  546.         def donothing():
  547.             pass
  548.  
  549.         self._oncomplete = self._transport if whendone is not None else donothing
  550.         self._onerror = None if on_error is not None else donothing
  551.         self.master.bind('send_msg_end', self.on_ack)
  552.  
  553.     
  554.     def transport(self):
  555.         if self._finished:
  556.             raise AttributeError
  557.         self._finished
  558.         self._transport.p2p_clients -= 1
  559.         old_xport = self._transport
  560.         self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
  561.         self._transport.p2p_clients += 1
  562.         return self._transport
  563.  
  564.     transport = property(transport)
  565.     
  566.     def recipient(self):
  567.         return self.msg.recipient
  568.  
  569.     recipient = property(recipient)
  570.     
  571.     def more(self):
  572.         if self._finished:
  573.             log.info('Message %r complete or cancelled, returning None', self.msg)
  574.             return None
  575.         msg = self.msg
  576.         transport = self.transport
  577.         msg.header.msgid_ack = getattr(transport, '_super_secret_msgid', msg.header.msgid_ack)
  578.         (header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
  579.         if header is None and body is None and footer is None:
  580.             if msg.complete:
  581.                 log.warning('Message %r complete. Returning None')
  582.             else:
  583.                 log.error('Message %r is not complete but had no more data', msg)
  584.             self.on_done()
  585.             return None
  586.         data = transport.build_data(header, body, footer)
  587.         transport.event('send_data')
  588.         return data
  589.  
  590.     
  591.     def on_ack(self, msg):
  592.         if msg.header.msgid == self.msg.header.msgid:
  593.             log.info('Finished producer calling %r', self._oncomplete)
  594.             self._oncomplete()
  595.             self.master.unbind('send_msg_end', self.on_ack)
  596.         
  597.  
  598.     
  599.     def on_done(self):
  600.         log.info('Producer finished')
  601.         self._finished = True
  602.         self.master._unqueue(self.msg)
  603.         self._transport.p2p_clients -= 1
  604.         del self._transport
  605.  
  606.     
  607.     def push(self):
  608.         transport = self.transport
  609.         transport.push_with_producer(self, error = self._onerror)
  610.  
  611.     
  612.     def __repr__(self):
  613.         return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._finished else 'transport=%r' % self._transport)
  614.  
  615.  
  616.  
  617. def _next_msg(msg, size):
  618.     header = msg.header
  619.     header.offset = msg.transferred
  620.     body = msg.read(size)
  621.     if body is None:
  622.         return (None, None, None)
  623.     header.length = len(body)
  624.     return (header.pack(), body, struct.pack('>L', msg.footer))
  625.  
  626.