home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2009 June / maximum-cd-2009-06.iso / DiscContents / digsby_setup.exe / lib / msn / P2P / P2PData.pyo (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-02-26  |  18.9 KB  |  610 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. import logging
  5. import random
  6. import sys
  7. import struct
  8. import uuid
  9. import util
  10. from util import callsback, get, call_later
  11. from util.Events import EventMixin
  12.  
  13. try:
  14.     from CStringIO import StringIO
  15. except ImportError:
  16.     from StringIO import StringIO
  17.  
  18. log = logging.getLogger('msn.p2p.data')
  19.  
  20. flagged = lambda v, f: f & v == f
  21.  
  22. randid = lambda : random.randint(4, sys.maxint // 2 - 5)
  23.  
  24. class Flags:
  25.     names = {
  26.         0: 'none',
  27.         1: 'sync',
  28.         2: 'ack',
  29.         4: 'wait',
  30.         8: 'err',
  31.         32: 'data',
  32.         64: 'byea',
  33.         128: 'byem',
  34.         16777264: 'file',
  35.         256: 'dchs' }
  36.     NONE = 0
  37.     ONE = 1
  38.     UNKNOWN = ONE
  39.     SYNC = ONE
  40.     ACK = 2
  41.     WAIT = 4
  42.     ERROR = 8
  43.     DATA = 32
  44.     BYEACK = 64
  45.     BYEMSG = 128
  46.     FILE = 16777264
  47.     HANDSHAKE = 256
  48.     DCHS = HANDSHAKE
  49.  
  50. Header = util.new_packable(('session', 'I', 'msgid', 'I', 'offset', 'Q', 'total', 'Q', 'length', 'I', 'flags', 'I', 'msgid_ack', 'I', 'msgid_ackack', 'I', 'total_ack', 'Q'), byteorder = '<')
  51.  
  52. class P2PMessage(object):
  53.     
  54.     def __init__(self, sender, recipient, id, flags, session_id, app_id, size, content, acked_msg_id = None, prev_acked_msg_id = 0, acked_data_size = 0):
  55.         self.sender = get(sender, 'name', sender)
  56.         self.recipient = get(recipient, 'name', recipient)
  57.         self.content = content
  58.         if acked_msg_id is None:
  59.             acked_msg_id = randid()
  60.         
  61.         self.header = Header(session = session_id, msgid = id, offset = 0, total = size, length = 0, flags = flags, msgid_ack = acked_msg_id, msgid_ackack = prev_acked_msg_id, total_ack = acked_data_size)
  62.         self.app_id = self.footer = app_id
  63.         self.transferred = 0
  64.  
  65.     
  66.     def reset(self):
  67.         if self.content is not None:
  68.             
  69.             try:
  70.                 self.content.seek(0)
  71.             except ValueError:
  72.                 pass
  73.             except:
  74.                 None<EXCEPTION MATCH>ValueError
  75.             
  76.  
  77.         None<EXCEPTION MATCH>ValueError
  78.  
  79.     
  80.     def write(self, data):
  81.         self.content.write(data)
  82.         self.transferred = self.content.tell()
  83.  
  84.     
  85.     def seek(self, position):
  86.         self.content.seek(position)
  87.  
  88.     
  89.     def read(self, max_size):
  90.         if self.content is not None:
  91.             if self.content.closed:
  92.                 log.error('Read called on a message with closed content! wtf.')
  93.                 return None
  94.             
  95.             data = self.content.read(max_size)
  96.             self.transferred = self.content.tell()
  97.         else:
  98.             data = ''
  99.         return data
  100.  
  101.     
  102.     def tell(self):
  103.         return self.transferred
  104.  
  105.     
  106.     def size(self):
  107.         return self.header.total
  108.  
  109.     size = property(size)
  110.     
  111.     def complete(self):
  112.         return self.transferred == self.size
  113.  
  114.     complete = property(complete)
  115.     
  116.     def __hash__(self):
  117.         return hash((self.sender, self.recipient, self.header.pack()))
  118.  
  119.     
  120.     def __repr__(self):
  121.         content = self.content
  122.         if isinstance(content, StringIO):
  123.             content = content.getvalue()
  124.         
  125.         contentstr = ''
  126.         if content:
  127.             
  128.             try:
  129.                 contentstr = 'content=%r' % content[:30]
  130.             except Exception:
  131.                 pass
  132.             except:
  133.                 None<EXCEPTION MATCH>Exception
  134.             
  135.  
  136.         None<EXCEPTION MATCH>Exception
  137.         return '<%s session=%d msgid=%d size=%d offset=%d total=%d flags=%d type(content)=%r %s>' % (type(self).__name__, self.header.session, self.header.msgid, self.header.length, self.tell(), self.size, self.header.flags, type(content).__name__, contentstr)
  138.  
  139.  
  140.  
  141. class P2PTransport(EventMixin):
  142.     events = EventMixin.events | set(('contacts_changed', 'recv_data', 'send_data'))
  143.     
  144.     def __init__(self, client):
  145.         EventMixin.__init__(self)
  146.         client._p2p_manager._register_transport(self)
  147.         self.p2p_clients = 0
  148.  
  149.     
  150.     def p2p_peers(self):
  151.         raise NotImplementedError
  152.  
  153.     p2p_peers = property(p2p_peers)
  154.     
  155.     def p2p_rating(self):
  156.         raise NotImplementedError
  157.  
  158.     p2p_rating = property(p2p_rating)
  159.     
  160.     def p2p_max_msg_size(self):
  161.         raise NotImplementedError
  162.  
  163.     p2p_max_msg_size = property(p2p_max_msg_size)
  164.     
  165.     def p2p_send(self, data, callback = None):
  166.         raise NotImplementedError
  167.  
  168.     p2p_send = callsback(p2p_send)
  169.     
  170.     def p2p_overhead(self):
  171.         raise NotImplementedError
  172.  
  173.     p2p_overhead = property(p2p_overhead)
  174.     
  175.     def build_data(self, header, body, footer):
  176.         raise NotImplementedError
  177.  
  178.     
  179.     def __repr__(self):
  180.         if hasattr(self, 'state'):
  181.             more_info = ' state=%r' % self.state
  182.         else:
  183.             more_info = ''
  184.         return '<%s p2pclients=%r id=%d%s>' % (type(self).__name__, get(self, 'p2p_clients', None), id(self), more_info)
  185.  
  186.  
  187.  
  188. class P2PManager(EventMixin):
  189.     events = EventMixin.events | set(('recv_msg_start', 'recv_msg_end', 'recv_error', 'send_msg_start', 'send_msg_end', 'send_error', 'recv_data', 'send_data'))
  190.     
  191.     def __init__(self, client):
  192.         EventMixin.__init__(self)
  193.         self.client = client
  194.         self._transports = []
  195.         self._P2PManager__incoming = { }
  196.         self._P2PManager__outgoing = { }
  197.         self._P2PManager__sent = { }
  198.         self._P2PManager__last_acked = None
  199.         self.sort_transports()
  200.  
  201.     
  202.     def close_all(self):
  203.         for transport in self._transports:
  204.             
  205.             try:
  206.                 transport.Disconnect()
  207.             continue
  208.             import traceback
  209.             traceback.print_exc()
  210.             continue
  211.  
  212.         
  213.  
  214.     
  215.     def _outgoing(self):
  216.         return self._P2PManager__outgoing
  217.  
  218.     _outgoing = property(_outgoing)
  219.     
  220.     def sort_transports(self):
  221.         self._best = { }
  222.         for transport in self._transports:
  223.             rating = transport.p2p_rating
  224.             for peer in transport.p2p_peers:
  225.                 if peer in self._best:
  226.                     prev_rating = self._best[peer].p2p_rating
  227.                     if rating > prev_rating:
  228.                         self._best[peer] = transport
  229.                     
  230.                 rating > prev_rating
  231.                 self._best[peer] = transport
  232.             
  233.         
  234.  
  235.     
  236.     def _register_transport(self, transport):
  237.         
  238.         try:
  239.             v = transport._P2PManager__registered
  240.         except AttributeError:
  241.             v = False
  242.  
  243.         if v:
  244.             log.info('Transport was already registered. Returning from register.')
  245.             return None
  246.         
  247.         bind = transport.bind
  248.         bind('contacts_changed', self.transport_sorter)
  249.         bind('recv_data', self._on_recv_data)
  250.         bind('send_data', self._on_send_data)
  251.         self._transports.append(transport)
  252.         transport._P2PManager__registered = True
  253.         self.sort_transports()
  254.  
  255.     
  256.     def _unregister_transport(self, transport):
  257.         
  258.         try:
  259.             v = transport._P2PManager__registered
  260.         except AttributeError:
  261.             v = False
  262.  
  263.         if not v:
  264.             log.info('Transport was not registered. Returning from unregister.')
  265.             return None
  266.         
  267.         log.debug('P2PManager removing transport %r', transport)
  268.         unbind = transport.unbind
  269.         unbind('contacts_changed', self.transport_sorter)
  270.         unbind('recv_data', self._on_recv_data)
  271.         unbind('send_data', self._on_send_data)
  272.         transport._P2PManager__registered = False
  273.         self._transports.remove(transport)
  274.         self.sort_transports()
  275.  
  276.     
  277.     def transport_sorter(self, *a):
  278.         self.sort_transports()
  279.  
  280.     
  281.     def get_best(self, peer, callback = None):
  282.         
  283.         try:
  284.             return self._best[peer]
  285.         except KeyError:
  286.             log.info('No transport found for %s, returning default (bests: %r)', peer, self._best)
  287.             return self.client._get_default_p2p_transport(peer, callback = callback)
  288.  
  289.  
  290.     get_best = callsback(get_best)
  291.     
  292.     def _on_send_data(self):
  293.         self.event('send_data')
  294.  
  295.     
  296.     def _on_recv_data(self, transport, sender, data, has_footer = True):
  297.         
  298.         try:
  299.             header = Header.unpack(data[:48])
  300.             data = data[48:]
  301.         except Exception:
  302.             e = None
  303.             print repr(data)
  304.             raise 
  305.  
  306.         log.debug('P2PManager got message(flags=%d) from %r via %r', header.flags, sender, transport)
  307.         if has_footer:
  308.             footer = struct.unpack('>L', data[-4:])[0]
  309.             data = data[:-4]
  310.         else:
  311.             footer = 0
  312.         if footer == 0 and data == '\x00\x00\x00\x00':
  313.             footer = 1
  314.         
  315.         
  316.         try:
  317.             pass
  318.         except:
  319.             import traceback
  320.             traceback.print_exc()
  321.             raise 
  322.  
  323.         if header.total == header.offset + header.length:
  324.             log.critical('Got completed P2PMessage with flags %d (%s)', header.flags, get(Flags.names, header.flags, 'Super duper unknown flags %d' % (header.flags,)))
  325.         
  326.         if flagged(header.flags, Flags.ACK):
  327.             self._P2PManager__last_acked = header.msgid_ack
  328.             return self._process_ack(header.msgid_ack)
  329.         elif flagged(header.flags, Flags.ERROR):
  330.             log.info('Got binary transport error')
  331.             if header.msgid_ack in self._P2PManager__outgoing:
  332.                 self._P2PManager__outgoing[header.msgid_ack].on_done()
  333.             
  334.             self.event('recv_error')
  335.             return None
  336.         elif flagged(header.flags, Flags.SYNC):
  337.             
  338.             try:
  339.                 sent = self._P2PManager__outgoing[header.msgid_ack]
  340.             except Exception:
  341.                 e = None
  342.  
  343.             log.error('P2P sync received: %s', list(header))
  344.             log.error('Error acks total %d, my message says offset %d', header.total_ack, sent.msg.header.offset)
  345.             sent.msg.seek(header.total_ack - sent.msg.header.length)
  346.             return None
  347.         elif flagged(header.flags, Flags.HANDSHAKE):
  348.             log.warning('Got Direct Connect Handshake Message (DCHS) header=<%r>', list(header))
  349.             their_nonce = uuid.UUID(bytes_le = header.pack()[-16:])
  350.             msn_hash = msn_hash
  351.             import SLPCalls
  352.             if not transport.in_key:
  353.                 pass
  354.             if not transport.in_hkey:
  355.                 pass
  356.             None(log.warning, 'Their %shashed Nonce (unhashed: %s, hashed: %s)' if transport.in_key is None else '', their_nonce, their_nonce)
  357.             log.warning('My unhashed Nonce (unhashed: %s, hashed: %s)', transport.out_key, transport.out_hkey)
  358.             if transport.in_key is None:
  359.                 transport.in_key = their_nonce
  360.             elif transport.in_hkey is None:
  361.                 transport.in_hkey = their_nonce
  362.             else:
  363.                 log.info('Had all the nonces already. (How did that happen?)')
  364.             transport._send_nonce(header.msgid, header.msgid_ack)
  365.             transport.event('on_ready')
  366.             return None
  367.         elif flagged(header.flags, Flags.BYEACK):
  368.             log.info('Got ack for BYE message. Going to send waiting flags')
  369.         elif flagged(header.flags, Flags.WAIT):
  370.             log.critical("Got waiting message. Here's the header: %r", list(header))
  371.             msg = P2PMessage(None, sender, header.msgid, 6, header.session, footer, 0, None)
  372.             self.send_message(msg)
  373.             return None
  374.         
  375.         id = header.msgid
  376.         if id not in self._P2PManager__incoming:
  377.             log.info('Got new message')
  378.             msg = self._process_new(sender, header, footer)
  379.             transport.p2p_clients += 1
  380.         else:
  381.             log.debug('Continuing message with id=(%r)', id)
  382.             msg = self._P2PManager__incoming[id]
  383.         
  384.         try:
  385.             if header.offset != msg.tell():
  386.                 log.warning('Header offset does not match file offset')
  387.             
  388.             if flagged(header.flags, Flags.DATA):
  389.                 msg.seek(header.offset)
  390.             else:
  391.                 log.warning("Did not seek() message because it's not a data message (msg = %r). nullbytes?", msg)
  392.             msg.write(data)
  393.         except:
  394.             self.event('recv_error')
  395.             return None
  396.  
  397.         if msg.complete:
  398.             log.info('Received message')
  399.             transport.p2p_clients -= 1
  400.             self._process_msg(header, footer, msg)
  401.         
  402.         self.event('recv_data')
  403.  
  404.     
  405.     def _process_ack(self, id):
  406.         if id in self._P2PManager__sent:
  407.             msg = self._P2PManager__sent.pop(id)
  408.             log.info('Got ack for message, NOT resetting it: %r', msg)
  409.             self.event('send_msg_end', msg)
  410.         else:
  411.             log.error('got ack for unknown message')
  412.  
  413.     
  414.     def _process_new(self, sender, header, footer):
  415.         content = self.client.slp_call_master._create_message_content(header, footer)
  416.         msg = P2PMessage(sender, None, header.msgid, header.flags, header.session, footer, header.total, content)
  417.         self._P2PManager__incoming[header.msgid] = msg
  418.         self.event('recv_msg_start', msg)
  419.         return msg
  420.  
  421.     
  422.     def _process_msg(self, header, footer, msg):
  423.         log.info('P2P message complete (%s)', msg)
  424.         self.send_ack(header, footer, msg)
  425.         msg.reset()
  426.         
  427.         try:
  428.             self.event('recv_msg_end', self, msg)
  429.         except Exception:
  430.             e = None
  431.             import traceback
  432.             traceback.print_stack()
  433.             traceback.print_exc()
  434.  
  435.         del self._P2PManager__incoming[header.msgid]
  436.  
  437.     
  438.     def send_ack(self, header, footer, msg):
  439.         
  440.         try:
  441.             flags = None if flagged(header.flags, Flags.BYEMSG) else Flags.ACK
  442.             log.info('Going to send ack with flags %s', flags)
  443.             
  444.             try:
  445.                 id = self._P2PManager__last_acked + 1
  446.             except:
  447.                 id = randid()
  448.             finally:
  449.                 self._P2PManager__last_acked = None
  450.  
  451.             sender = get(msg, 'sender')
  452.             ack_msg = P2PMessage(None, sender, id, flags, header.session, footer, header.total, None, header.msgid, header.msgid_ack, header.total)
  453.             self.send_message(ack_msg)
  454.         except Exception:
  455.             e = None
  456.             import traceback
  457.             traceback.print_exc()
  458.             raise e
  459.  
  460.  
  461.     
  462.     def send_with_producer(self, msg, callback = None):
  463.         if msg.header.msgid not in self._P2PManager__outgoing:
  464.             log.warning('Sending producer for msg with id %d', msg.header.msgid)
  465.             self.event('send_msg_start', msg)
  466.             prod = self.make_producer(msg, callback = callback)
  467.             self._P2PManager__outgoing[msg.header.msgid] = prod
  468.             self._send_producer(prod)
  469.         else:
  470.             log.warning('Got producer for %d again (???). Not sending it.', msg.header.msgid)
  471.             return None
  472.  
  473.     send_with_producer = callsback(send_with_producer)
  474.     send_message = send_with_producer
  475.     
  476.     def _send_producer(self, prod):
  477.         
  478.         try:
  479.             if not prod.transport._P2PManager__registered:
  480.                 self._register_transport(prod.transport)
  481.         except Exception:
  482.             e = None
  483.             import traceback
  484.             traceback.print_exc()
  485.             raise e
  486.  
  487.         
  488.         try:
  489.             prod.push()
  490.         except TypeError:
  491.             e = None
  492.             log.info("Can't send this producer (%r) on its transport. Need to find a new transport.", prod)
  493.             import traceback
  494.             traceback.print_exc()
  495.  
  496.  
  497.     
  498.     def make_producer(self, msg, callback = None):
  499.         log.info('P2PData making a producer for %r', msg)
  500.         prod = P2PProducer(self, msg, callback.success, callback.error)
  501.         return prod
  502.  
  503.     make_producer = callsback(make_producer)
  504.     
  505.     def _unqueue(self, msg):
  506.         del self._P2PManager__outgoing[msg.header.msgid]
  507.         if not flagged(msg.header.flags, Flags.ACK):
  508.             self._P2PManager__sent[msg.header.msgid] = msg
  509.         
  510.  
  511.  
  512.  
  513. class P2PProducer(object):
  514.     
  515.     def __init__(self, master, msg, whendone = None, on_error = None):
  516.         self.master = master
  517.         self.msg = msg
  518.         self._transport = self.master.get_best(self.msg.recipient)
  519.         self._transport.p2p_clients += 1
  520.         self._P2PProducer__finished = False
  521.         
  522.         def donothing():
  523.             pass
  524.  
  525.         self._oncomplete = self._transport if whendone is not None else donothing
  526.         self._onerror = None if on_error is not None else donothing
  527.         self.master.bind('send_msg_end', self.on_ack)
  528.  
  529.     
  530.     def transport(self):
  531.         if self._P2PProducer__finished:
  532.             raise AttributeError
  533.         
  534.         self._transport.p2p_clients -= 1
  535.         old_xport = self._transport
  536.         self._transport = self.master.get_best(self.msg.recipient, error = self._onerror)
  537.         self._transport.p2p_clients += 1
  538.         return self._transport
  539.  
  540.     transport = property(transport)
  541.     
  542.     def recipient(self):
  543.         return self.msg.recipient
  544.  
  545.     recipient = property(recipient)
  546.     
  547.     def more(self):
  548.         if self._P2PProducer__finished:
  549.             log.info('Message %r complete or cancelled, returning None', self.msg)
  550.             return None
  551.         
  552.         msg = self.msg
  553.         transport = self.transport
  554.         if hasattr(transport, '_super_secret_msgid'):
  555.             msg.header.msgid_ack = transport._super_secret_msgid
  556.         
  557.         (header, body, footer) = _next_msg(msg, transport.p2p_max_msg_size - transport.p2p_overhead)
  558.         if body == body and footer == footer:
  559.             pass
  560.         elif footer == None:
  561.             if not msg.complete:
  562.                 log.error('Message %r is not complete but had no more data', msg)
  563.             else:
  564.                 log.warning('Message %r complete. Returning None')
  565.             self.on_done()
  566.             return None
  567.         
  568.         data = transport.build_data(header, body, footer)
  569.         transport.event('send_data')
  570.         return data
  571.  
  572.     
  573.     def on_ack(self, msg):
  574.         if msg.header.msgid == self.msg.header.msgid:
  575.             log.info('Finished producer calling %r', self._oncomplete)
  576.             self._oncomplete()
  577.             self.master.unbind('send_msg_end', self.on_ack)
  578.         
  579.  
  580.     
  581.     def on_done(self):
  582.         log.info('Producer finished')
  583.         self._P2PProducer__finished = True
  584.         self.master._unqueue(self.msg)
  585.         self._transport.p2p_clients -= 1
  586.         del self._transport
  587.  
  588.     
  589.     def push(self):
  590.         transport = self.transport
  591.         transport.push_with_producer(self, error = self._onerror)
  592.  
  593.     
  594.     def __repr__(self):
  595.         return None % ('<%s message=%r, %s>', type(self).__name__, self.msg if self._P2PProducer__finished else 'transport=%r' % self._transport)
  596.  
  597.  
  598.  
  599. def _next_msg(msg, size):
  600.     header = msg.header
  601.     header.offset = msg.transferred
  602.     body = msg.read(size)
  603.     if body is None:
  604.         return (None, None, None)
  605.     
  606.     blen = len(body)
  607.     header.length = blen
  608.     return (header.pack(), body, struct.pack('>L', msg.footer))
  609.  
  610.