home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from util import callsback, get, PriorityProducer, RoundRobinProducer, try_this
- from util.Events import EventMixin, event
- import struct
- import common
- import msn
- from msn.P2P.P2PData import P2PTransport, randid, Header, Flags
- import logging
- log = logging.getLogger('msn.dc')
- sock_in = logging.getLogger('msn.dc.sock.in')
- sock_out = logging.getLogger('msn.dc.sock.out')
-
- class MSNDCProtocol(P2PTransport):
- events = P2PTransport.events | set(('on_close', 'on_error', 'on_ready', 'on_connect', 'recv_data'))
-
- def __init__(self, client, peer, serving, mynonces, theirnonces):
- EventMixin.__init__(self)
- log.info('MSNDCProtocol created')
- self.client = client
- self.peer = peer
- self.socket = None
- self._MSNDCProtocol__connecter_class = None if serving else MSNDC_Client
- (self.out_key, self.out_hkey) = mynonces
- (self.in_key, self.in_hkey) = theirnonces
- self.hashed = False
- self.mine = True
- self._MSNDCProtocol__sentnonce = False
- self.rrobin = None
- log.critical('MSNDC created: %r', self)
-
-
- def purge_messages(self):
- if get(self, 'socket', None) is not None:
- log.info('Purging P2P messages back to P2P manager. Data in out buffer? %r', self.socket.ac_out_buffer)
- while self.socket.producer_fifo:
- (__, prod) = self.socket.producer_fifo.pop()
- if hasattr(prod, 'msg'):
- prod.msg.reset()
- self.client._p2p_manager.send_message(prod.msg)
- continue
- if prod is not None:
- log.error('Not re-sending the following producer: %r', prod)
- continue
-
-
-
- def Connect(self, ips = None):
- args = None if ips is None else (ips,)
- self._connecter = self._MSNDCProtocol__connecter_class(*args)
- self._connecter.connect(success = self._connected, error = self._timeout)
- del self._MSNDCProtocol__connecter_class
-
-
- def _connected(self, sock):
- self._connecter.cleanup()
- old_data = get(sock, 'data', '')
- del self._connecter
- self.socket = MSNDCSocket(sock, old_data)
-
- bind = lambda n, f: EventMixin.bind(self.socket, n, f)
- bind('on_message', self.incoming)
- bind('on_error', self._sck_closed)
- bind('on_close', self._sck_closed)
- bind('on_send', self.send_data)
- self.rrobin = RoundRobinProducer(self.socket)
- P2PTransport.__init__(self, self.client)
- self.event('on_connect')
- self.rrobin.unqueue()
- self.rrobin.queue()
-
-
- def incoming(self, data):
- if not data:
- return None
-
- if data.startswith('foo'):
- log.warning('Got "foo"')
- return None
-
- self.event('recv_data', self, self.peer, data, False)
-
-
- def _send_nonce(self, msgid, msgidack):
- self._super_secret_msgid = msgidack
- if self._MSNDCProtocol__sentnonce:
- log.warning('not sending nonce')
- return None
- sessid = self.sessid
- log.warning('Sending nonce again, this time with session id')
- else:
- sessid = 0
- msgid += self._MSNDCProtocol__sentnonce
- owner = None if self.mine else 'their'
- if self.hashed:
- log.warning('Sending %s hashed-nonce', owner)
- if self.mine:
- nonce = self.out_hkey
- else:
- nonce = self.in_hkey
- else:
- log.warning('Sending %s unhashed-nonce', owner)
- if self.mine:
- nonce = self.out_key
- else:
- nonce = self.in_key
- if nonce is None:
- return None
-
- self._MSNDCProtocol__sentnonce = True
- data = Header(session = sessid, msgid = msgid, flags = Flags.DCHS).pack()[:-16] + nonce.bytes_le
- log.warning('Their nonces: unhashed=%r, hashed=%r', self.in_key, self.in_hkey)
- log.warning('my nonces : unhashed=%r, hashed=%r', self.out_key, self.out_hkey)
-
- try:
- self.rrobin.unqueue()
- except AttributeError:
- pass
-
- self.socket._send(data)
- self.event('on_ready')
- self.rrobin.queue()
-
-
- def send_data(self):
- pass
-
- send_data = event(send_data)
-
- def _sck_closed(self):
- if not hasattr(self, 'socket'):
- return None
-
- log.critical('MSNDC socket has been closed.')
- self.client._p2p_manager._unregister_transport(self)
- if self.socket is not None:
-
- unbind = lambda n, f: EventMixin.unbind(self.socket, n, f)
- unbind('on_message', self.incoming)
- unbind('on_error', self._sck_closed)
- unbind('on_close', self._sck_closed)
- unbind('on_send', self.send_data)
- self.socket.close_when_done()
-
- self.purge_messages()
- self.event('on_error')
- self.socket = None
-
-
- def _timeout(self):
- if hasattr(self, 'socket'):
- self._sck_closed()
-
- self.event('on_error')
-
-
- def Disconnect(self):
- if hasattr(self, 'socket'):
- log.critical('MSNDC closing socket now.')
- self.socket.close_when_done()
- self._sck_closed()
-
-
-
- def localport(self):
-
- try:
- return self._connecter.localport
- except:
-
- try:
- return self.socket.localport
- return 0
-
-
-
- localport = property(localport)
-
- def p2p_peers(self):
- return [
- self.peer]
-
- p2p_peers = property(p2p_peers)
-
- def p2p_rating(self):
- return 100
-
- p2p_rating = property(p2p_rating)
-
- def p2p_max_msg_size(self):
- return 1400
-
- p2p_max_msg_size = property(p2p_max_msg_size)
-
- def p2p_overhead(self):
- return 52
-
- p2p_overhead = property(p2p_overhead)
-
- def p2p_send(self, recvr, data, callback = None):
-
- try:
- self.socket._send(data)
- except:
- callback.error()
-
-
- p2p_send = callsback(p2p_send)
-
- def push_with_producer(self, prod, callback = None):
- self.rrobin.add(prod)
- log.info('Push with producer: %r, %r', self, prod)
- self.rrobin.unqueue()
- self.rrobin.queue()
-
- push_with_producer = callsback(push_with_producer)
-
- def build_data(self, header, body, footer):
- data = header + body
- return struct.pack('<I', len(data)) + data
-
-
-
- class MSNDCSocket(common.socket, EventMixin):
- hdr_size = 4
- events = EventMixin.events | set(('on_message', 'on_close', 'on_error', 'on_send'))
-
- def __init__(self, conn, prev_data = ''):
- common.socket.__init__(self, conn)
- self.set_terminator(self.hdr_size)
- self.ac_in_buffer = prev_data
- EventMixin.__init__(self)
- self.data = ''
- self.getting_hdr = True
-
-
- def collect_incoming_data(self, data):
- self.data += data
-
-
- def found_terminator(self):
- data = self.data
- self.data = ''
- self.getting_hdr = not (self.getting_hdr)
- if not self.getting_hdr:
- (next_term,) = struct.unpack('<I', data)
- if next_term:
- self.set_terminator(next_term)
- else:
- self.found_terminator()
- else:
- self.set_terminator(self.hdr_size)
- self.event('on_message', data)
-
-
- def handle_close(self):
- self.event('on_close')
- common.socket.handle_close(self)
- self.close()
-
-
- def handle_expt(self):
- self.event('on_error')
- common.socket.handle_expt(self)
-
-
- def handle_error(self, e = None):
- import traceback
- traceback.print_exc()
- self.event('on_error')
- self.close()
- common.socket.handle_error(self, e)
-
-
- def _send(self, data):
- sock_out.log(5, ' MSNDCSocket Data out: %r', data[:100])
- real_data = struct.pack('<I', len(data)) + data
- return common.socket.push(self, real_data)
-
-
- def __repr__(self):
- pn = None
-
- try:
- pn = self.socket.getpeername()
- finally:
- return '<%s connected to %r>' % (self.__class__.__name__, pn)
-
-
-
- def localport(self):
-
- try:
- return self.socket.getsockname()[1]
- except:
- return 0
-
-
- localport = property(localport)
-
-
- class MSNDCConnecter(EventMixin):
- events = EventMixin.events | set(('timeout', 'connected'))
-
- def __init__(self, ips):
- EventMixin.__init__(self)
- self._ips = ips
- self.data = ''
-
-
- def connect(self):
- raise NotImplementedError
-
-
- def collect_incoming_data(self, data):
- self.data += data
-
-
- def bind(self, *a, **k):
- return EventMixin.bind(self, *a, **k)
-
-
- def _timeout(self):
- pref = pref
- import common
- return pref('msn.direct.timeout', type = int, default = 5)
-
- _timeout = property(_timeout)
-
-
- class MSNDC_Server(common.TimeoutSocket, MSNDCConnecter):
-
- def __init__(self):
- common.TimeoutSocket.__init__(self, False)
- MSNDCConnecter.__init__(self, ())
- self.set_terminator(0)
-
-
- def connect(self, callback = None):
- self.tryaccept(('', 0), callback.success, callback.error, self._timeout)
-
- connect = callsback(connect)
-
- def localport(self):
-
- try:
- return self.socket.getsockname()[1]
- except:
- return 0
-
-
- localport = property(localport)
-
- def cleanup(self):
- self.del_channel()
- self.close()
-
-
-
- class MSNDC_Client(common.HydraSocket, MSNDCConnecter):
-
- def __init__(self, ips):
- common.HydraSocket.__init__(self)
- MSNDCConnecter.__init__(self, ips)
-
-
- def connect(self, callback = None):
- self._MSNDC_Client__callback = callback
- self.tryconnect(self._ips, self.connected, callback.error, self._timeout, cls = BufferedTimeoutSocket)
-
- connect = callsback(connect)
-
- def connected(self, sck):
- data = 'foo\x00'
- if sck.send(struct.pack('<I', len(data)) + data) != 4 + len(data):
- sck.close()
- self.on_fail()
- log.warning('Send of "foo" failed')
- return None
- else:
- log.warning('Sent "foo"')
- self._MSNDC_Client__callback(sck)
-
-
- def cleanup(self):
- pass
-
-
-
- class BufferedTimeoutSocket(common.TimeoutSocket):
-
- def __init__(self, *a, **k):
- common.TimeoutSocket.__init__(self, *a, **k)
- self.set_terminator(0)
- self._BufferedTimeoutSocket__data = ''
-
-
- def collect_incoming_data(self, data):
- self._BufferedTimeoutSocket__data += data
-
-
- def recv(self, bytes):
- if self._BufferedTimeoutSocket__data:
- data = self._BufferedTimeoutSocket__data[:bytes]
- self._BufferedTimeoutSocket__data = self._BufferedTimeoutSocket__data[bytes:]
- else:
- data = self.socket.recv(bytes)
- return data
-
-
- def handle_close(self):
- self.socket.close()
-
-
-