home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- import socket
- import threading
- import logging
- import collections
- import common
- import util
- import util.Events as Events
- log = logging.getLogger('msn.sock')
- import msn
- import msn.Msnifier as msn
-
- dummy = lambda *a, **k: pass
-
- def trid(max = 2147483647, i = 0):
- while True:
- i += 1
- yield i
- if i == max:
- i = 0
- continue
-
-
- class MSNSocketBase(Events.EventMixin):
- events = Events.EventMixin.events | set(('on_connect', 'on_send', 'on_conn_error', 'on_close', 'on_message'))
- delim = '\r\n'
- payload_commands = 'MSG UUX UBX PAG IPG NOT GCF ADL UUN UBN RML FQY 241 508 UBM UUM'.split()
-
- def __init__(self):
- Events.EventMixin.__init__(self)
- self.trid = trid()
- self.callbacks = collections.defaultdict(list)
- if not hasattr(self, '_lock'):
- self._lock = threading.RLock()
-
- self.timeouts = { }
-
-
- def set_trid(self, msgobj, trid):
- if trid is True:
- msgobj.trid = self.trid.next()
-
-
-
- def set_callbacks(self, msgobj, callback):
- if callback is sentinel:
- callback = None
-
- if msgobj.is_trid:
- self.set_timeout(msgobj)
- self.callbacks[msgobj.trid].append(callback)
- else:
- self.callbacks[msgobj.cmd].append(callback)
-
-
- def set_timeout(self, msgobj):
- timeout = getattr(msgobj, 'timeout', None)
- if timeout is not None and common.pref('msn.socket.use_timeout', type = bool, default = False):
- log.info('Starting timeout for %r', msgobj)
- timer = util.Timer(timeout, self.timeout_handler(msgobj))
- self.timeouts[msgobj.trid] = timer
- timer.start()
-
-
-
- def timeout_handler(self, msgobj):
-
- def handler():
- log.debug('This message has timed out: %r', msgobj)
- msgcopy = msgobj.copy()
- msgcopy.timeout = msgobj.timeout
- msgcopy.retries = msgobj.retries - 1
- msgcopy.trid = 0
- if msgcopy.retries == 0:
- return None
-
- log.debug('Retrying this message that timed out: %r', msgcopy)
- self._lock.__enter__()
-
- try:
- callback = self.callbacks.pop(msgobj.trid, None)
- finally:
- pass
-
- self.send(msgcopy, trid = True, callback = callback)
-
- return handler
-
-
- def unset_timeout(self, msgobj, include_previous = True):
- if not msgobj.is_trid:
- return None
-
- if include_previous:
- for i in range(msgobj.trid):
- self.unset_timeout_single(i)
-
-
- self.unset_timeout_single(msgobj.trid)
-
-
- def unset_timeout_single(self, key):
-
- try:
- timer = self.timeouts.pop(key, None)
- if timer is not None:
- timer.stop()
- except (IndexError, KeyError):
- pass
-
-
-
- def pause(self):
- pass
-
-
- def unpause(self):
- pass
-
-
- def on_connect(self):
- return self
-
- on_connect = Events.event(on_connect)
-
- def on_send(self, data):
- pass
-
- on_send = Events.event(on_send)
-
- def on_conn_error(self, e = None):
- log.info('%r had a connection error: %r', self, e)
- return (self, e)
-
- on_conn_error = Events.event(on_conn_error)
-
- def on_close(self):
- return self
-
- on_close = Events.event(on_close)
-
- def unset_callbacks(self, msg):
- callback = None
-
- try:
- if not msg.trid:
- pass
- callback = self.callbacks[msg.cmd][0]
- except (KeyError, IndexError):
- e = None
- pop = False
-
- pop = True
- if pop:
- if msg.is_trid:
- self.unset_timeout(msg, include_previous = True)
- for i in range(msg.trid):
-
- try:
- self.callbacks.pop(i, None)
- continue
- except (IndexError, KeyError):
- continue
-
-
-
- elif not msg.trid:
- self.callbacks[msg.cmd].pop(0)
-
-
- return callback
-
- unset_callbacks = util.lock(unset_callbacks)
-
- def adjust_message(self, msg):
- if msg.cmd == 'QNG':
- msg.cmd = 'PNG'
- msg.trid = 0
-
- return msg
-
-
- def on_message(self, msg):
- self.event('on_message', msg)
- msg = self.adjust_message(msg)
- callback = self.unset_callbacks(msg)
- if callback is None:
- return None
-
-
- try:
- if msg.is_error:
- f = callback.error
- else:
- f = callback.success
- except AttributeError:
- e = None
- log.error('AttributeError in msnsocket.on_message: %r\ncallback was: %r', e, callback)
-
- log.debug('MSNSocket calling %r', f)
-
- try:
- f(self, msg)
- except Exception:
- e = None
- log.error('Error in callback')
- import traceback
- traceback.print_exc()
-
-
-
- def close(self):
- while self.timeouts:
-
- try:
- (k, v) = self.timeouts.popitem()
- except KeyError:
- break
- continue
-
- if v is not None:
- v.stop()
- continue
-
-
-
- class MSNSocket(MSNSocketBase, common.socket):
- speed_limit = 2000
- speed_window = 0.25
-
- def __init__(self):
- common.socket.__init__(self)
- MSNSocketBase.__init__(self)
- self.set_terminator(self.delim)
- self.data = ''
- self.expecting = 'command'
- self._server = None
- self.rater = msn.Msnifier.Msnifier(self)
- self.rater.start()
- self._bc_lock = threading.RLock()
- self.bytecount = [
- (0, util.default_timer())]
- log.debug('%r created', self)
-
-
- def get_local_sockname(self):
- return self.socket.getsockname()
-
-
- def connect_args_for(self, type, addr):
- return (type, addr)
-
-
- def connect(self, type, host_port):
- self._scktype = type
-
- try:
- (host, port) = host_port
- except (ValueError, TypeError):
- raise TypeError("%r address must be <type 'tuple'> (host, port) not %r (%r)", type(self).__name__, type(host_port), host_port)
-
- if self._server is not None:
- raise ValueError("Don't know which server to use! self._server = %r, host_port = %r.", self._server, host_port)
-
- self._server = host_port
- log.info('connecting socket to %r', self._server)
-
- try:
- common.socket.connect(self, self._server, error = self.on_conn_error)
- except Exception:
- e = None
- self.on_conn_error(e)
- return None
-
- self.bind_event('on_message', (lambda msg: log.debug('Received %r', msg)))
-
- _connect = connect
-
- def _disconnect(self):
- self.close_when_done()
-
-
- def _closed(self):
- return not getattr(self.socket, 'connected', False)
-
- _closed = property(_closed)
-
- def __repr__(self):
-
- try:
- s = 'connected to %r' % (self.socket.getpeername(),)
- except socket.error:
- s = 'not connected'
-
- return '<%s %s>' % (type(self).__name__, s)
-
-
- def test_connection(self, callback = None):
- if self._scktype == 'NS':
- self.send(msn.Message('PNG'), callback = callback)
- else:
- log.info('Not testing connection because this is not an NS socket.')
- callback.success()
-
- test_connection = util.callsback(test_connection)
-
- def handle_connect(self):
- log.debug('connection established')
- self.on_connect()
-
-
- def handle_expt(self):
- log.warning('OOB data. self.data = %r', self.data)
- self.close()
-
-
- def collect_incoming_data(self, data):
- self.data += data
-
- collect_incoming_data = util.lock(collect_incoming_data)
-
- def set_terminator(self, term):
- common.socket.set_terminator(self, term)
-
-
- def found_terminator(self):
- self.data += self.delim
-
- try:
- self._lock.__enter__()
-
- try:
- self.data = ''
- data = self.data
- log.debug_s('IN : %r', data)
- dlist = data.split(' ')
- cmd = dlist[0]
- if self.expecting == 'command' and dlist[0] in self.payload_commands:
- self.expecting = 'payload'
- self.data = data
-
- try:
- new_term = int(dlist[-1])
- except ValueError:
- self._lock
- self._lock
- self
- new_term = 0
- except:
- self._lock
-
- return self.set_terminator(new_term)
- elif self.expecting == 'payload':
- self.expecting = 'command'
- data = data[:-len(self.delim)]
- payload = True
- else:
- payload = False
- self.set_terminator(self.delim)
- msg = msn.Message.from_net(data, payload)
- finally:
- pass
-
- except Exception:
- self
- e = self
- log.info('error parsing message, testing connection\nError was %r', e)
- self.test_connection(success = self.conn_ok, error = self.conn_error)
- import traceback
- traceback.print_exc()
- except:
- self
-
- self.on_message(msg)
-
-
- def handle_close(self):
- log.warning('socket closed, self.data = %r', self.data)
- self.rater.stop()
- self.close()
-
-
- def close(self):
- log.warning('socket closing, self.data = %r', self.data)
- MSNSocketBase.close(self)
- common.socket.close(self)
- self.on_close()
-
-
- def send_gen(self, gen, priority = 5):
- self.rater.send_pkt(gen, priority)
-
-
- def send(self, msgobj, trid = sentinel, callback = None, **kw):
- self.set_trid(msgobj, trid)
- self.set_callbacks(msgobj, callback)
- log.debug('Sending %r', msgobj)
- self.rater.send_pkt(str(msgobj), **kw)
-
- send = util.callsback(send)
-
- def conn_ok(self):
- log.info('connection test passed')
-
-
- def conn_error(self):
- log.warning('connection test failed')
- self.close_when_done()
- self.on_conn_error()
-
-
- def _send(self, data, *a, **k):
- log.log_s(0, 'sent: %s' % data)
- self._lock.__enter__()
-
- try:
- log.debug_s('OUT : %r' % (data,))
- if not common.socket.send(self, data, *a, **k):
- log.critical('Message dropped in MSNSocket: <%s>' % data)
- finally:
- pass
-
- self.on_send(data)
- now = util.default_timer()
- self._bc_lock.__enter__()
-
- try:
- self.bytecount.append((len(data), now))
- finally:
- pass
-
-
-
- def time_to_send(self, data):
- now = util.default_timer()
- self._bc_lock.__enter__()
-
- try:
- self.bytecount = (self._bc_lock, filter)((lambda t: now - t[1] < self.speed_window), self.bytecount)
- finally:
- pass
-
- send_rate = sum((lambda .0: for b in .0:
- b[0])(self.bytecount))
- if send_rate < self.speed_limit:
- return 0
-
- log.debug('sending too fast')
- bytes = dlen = len(data)
- for size, tstamp in reversed(self.bytecount):
- bytes += size
- interval = now - tstamp
- if (bytes / interval) * self.speed_window > self.speed_limit:
- break
- continue
-
- tts = (bytes / self.speed_limit) * self.speed_window + interval
- log.log(5, 'currently sending at %d bytes/sec', send_rate)
- log.debug('sleeping for %r seconds' % tts)
- return tts
-
-
- def close_when_done(self):
- self.send(msn.Message('OUT'))
- self.rater.stop()
- common.socket.close_when_done(self)
-
-
-