home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- import asyncore
- import errno
- import select
- import time
- import traceback
- import sys
- from select import error as select_error
- from Queue import Queue, Empty
- from sys import exc_clear
- from threading import RLock, currentThread
- from traceback import print_exc
- from asyncore import ExitNow
- from logging import getLogger
- log = getLogger('asyncore.thread')
- INTERRUPT_sentinel = 57005
- NO_SOCKET_SLEEP = 0.2
-
- def read(obj):
-
- try:
- obj.handle_read_event()
- except ExitNow:
- raise
- except Exception:
- e = None
- obj.handle_error(e)
-
-
-
- def write(obj):
-
- try:
- obj.handle_write_event()
- except ExitNow:
- raise
- except Exception:
- e = None
- obj.handle_error(e)
-
-
-
- def exc(obj):
-
- try:
- obj.handle_expt_event()
- except ExitNow:
- raise
- except Exception:
- e = None
- obj.handle_error(e)
-
-
-
- def readwrite(obj, flags):
-
- try:
- if flags & (select.POLLIN | select.POLLPRI):
- obj.handle_read_event()
-
- if flags & select.POLLOUT:
- obj.handle_write_event()
-
- if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
- obj.handle_expt_event()
- except ExitNow:
- raise
- except Exception:
- e = None
- obj.handle_error(e)
-
-
-
- def kpoll(timeout = 0, map = None, amap = asyncore.socket_map):
- if map is None:
- map = amap
-
- if map:
- mapget = map.get
- r = []
- w = []
- e = []
- for fd, obj in map.items():
- is_r = obj.readable()
- is_w = obj.writable()
- if is_r:
- r.append(fd)
-
- if is_w:
- w.append(fd)
-
- if is_r or is_w:
- e.append(fd)
- continue
-
- if r == r and w == w:
- pass
- elif w == e:
- time.sleep(max(timeout, NO_SOCKET_SLEEP))
- return None
-
-
- try:
- (r, w, e) = select.select(r, w, e, timeout)
- except select_error:
- []
- err = []
- if err[0] == errno.EINTR:
- return INTERRUPT_sentinel
- elif err[0] == errno.ENOTSOCK:
- BAD = None
- for _list in (r, w, e):
- if BAD is not None:
- break
-
- for _fd in _list:
-
- try:
- real_fn = mapget(_fd).fileno()
- if real_fn != _fd:
- raise Exception
- continue
- BAD = _fd
- break
- continue
-
-
-
- del _fd
- del _list
- None if BAD is None else None<EXCEPTION MATCH>KeyError
- raise err
- except:
- r
-
- for fd in r:
- obj = mapget(fd)
- read(obj)
-
- for fd in w:
- obj = mapget(fd)
- if obj is None:
- continue
-
- write(obj)
-
- for fd in e:
- obj = mapget(fd)
- if obj is None:
- continue
-
- exc(obj)
-
- if r == []:
- return True
-
-
-
-
- def callback_call(callable, callback):
-
- try:
- callable()
- except Exception:
- e = None
- print_exc()
-
- try:
- callback.error(e)
- print_exc()
-
-
-
- try:
- callback.success()
- except:
- print_exc()
-
-
- import util.threads.bgthread as util
-
- class AsyncoreThread(util.threads.bgthread.BackgroundThread):
-
- def __init__(self, timeout = 0.1, use_poll = 0, map = None):
- self.flag = True
- self.timeout = timeout
- self.use_poll = use_poll
- self.map = map
- self.timeouts = { }
- self.hooks = []
- util.threads.bgthread.BackgroundThread.__init__(self, None, None, 'AsyncoreThread')
-
-
- def run(self):
- self.BeforeRun()
-
- try:
- util.use_profiler(self, self.loop)
- except:
- traceback.print_exc()
- raise
- finally:
- self.AfterRun()
-
-
-
- def set_timeout(self, sock, secs):
- self.timeouts[sock] = secs
-
-
- def shortest_timeout(self):
- if not self.timeouts.values():
- pass
- return min([
- self.timeout])
-
-
- def join(self, timeout = None):
- self.flag = False
- util.threads.bgthread.BackgroundThread.join(self, timeout)
-
-
- def loop(self):
- if self.map is None:
- self.map = asyncore.socket_map
-
- last = 0
- fastcount = 0
- while self.flag:
- now = time.clock()
- if now - last < 0.1:
- fastcount += 1
- else:
- fastcount = 0
- last = now
- if fastcount and not (fastcount % 5000):
-
- try:
- log.critical('Asyncorethread may be spinning, fastcount %r socket map is: %r', (fastcount, self.map))
- except Exception:
-
- try:
- log.critical('Asyncorethread may be spinning, failed to print socket map')
- except Exception:
- pass
- except:
- None<EXCEPTION MATCH>Exception
-
-
- None<EXCEPTION MATCH>Exception
-
-
- None<EXCEPTION MATCH>Exception
- setattr(self, 'loopcount', getattr(self, 'loopcount', 0) + 1)
-
- try:
- (tocall, callback) = to_call.get_nowait()
- except Empty:
- e = None
- (tocall, callback) = (None, None)
- exc_clear()
-
- if tocall:
- callback_call(tocall, callback)
-
- empty = to_call.empty()
- timeout = self.shortest_timeout() * empty
- if self.map or not empty:
-
- try:
- kret = kpoll(timeout, self.map)
- except:
- print repr(self.map)
- raise
-
- if kret == INTERRUPT_sentinel:
- setattr(self, 'interrupt_count', getattr(self, 'interrupt_count', 0) + 1)
- if not self.interrupt_count % 5000:
- log.critical('interrupt count is high: %r', self.interrupt_count)
-
- else:
- setattr(self, 'interrupt_count', 0)
- else:
- time.sleep(0.3)
- exc_clear()
- if self.map:
- for sock in self.map.values():
- log.info('closing socket %r', sock)
- sock.close_when_done()
-
-
- log.info('Asyncore Thread is done.')
-
-
- def end(self):
- log.info('stopping the network thread. obtaining lock...')
- net_lock.__enter__()
-
- try:
- log.info('Ending asyncore loop...')
- self.flag = False
- for sock in self.map.values():
- log.info('closing socket %r', sock)
- sock.close_when_done()
- finally:
- pass
-
-
-
- def force_exit(self):
- net_lock.__enter__()
-
- try:
- if self.map is None:
- return None
-
- for sock in self.map.values():
-
- try:
- peername = sock.getpeername()
- except:
- peername = None
-
- log.critical('socket %s connected to %s', str(sock), peername)
- sock.close()
-
- self.map.clear()
- self.map = None
- self.flag = False
- del self.hooks
- finally:
- pass
-
-
-
- def add_hook(self, cb):
- self.hooks.append(cb)
-
-
- ref_count = 0
- net_thread = None
- net_lock = RLock()
- running = False
- to_call = Queue()
-
- def __start():
- global net_thread
-
- try:
- net_thread
- except NameError:
- return None
-
- if not ref_count <= 1:
- if net_thread:
- pass
- if not net_thread.isAlive():
- net_thread = AsyncoreThread()
- log.info('AsyncoreThread.start %s', ref_count)
- net_thread.start()
-
-
-
- def start():
- global running, ref_count
- running = True
- net_lock.__enter__()
-
- try:
- ref_count += 1
- __start()
- finally:
- pass
-
-
-
- def end():
- log.critical('AsyncoreThread.end called!')
- traceback.print_stack()
-
-
- def end_thread():
- net_lock.__enter__()
-
- try:
- log.info('AsyncoreThread.end %s', ref_count)
- if net_thread:
- net_thread.end()
- finally:
- pass
-
-
-
- def timeout(sck, secs):
- net_lock.__enter__()
-
- try:
- net_thread.set_timeout(sck, secs)
- finally:
- pass
-
-
-
- def join(timeout = 1.5):
- global net_thread, call_later
- if net_thread:
- log.critical('Joining with network thread, timeout is %s...', timeout)
- net_thread.join(timeout)
- if net_thread.isAlive():
- log.critical(' forcing critical exit.')
- net_thread.force_exit()
- net_thread.join()
- del net_thread
-
- log.critical('...done joining.')
-
- call_later = lambda call, callback = None: (None, call() if callback else None)
-
-
-
- def call_later(call, callback = None, callnow = True):
- if not callable(call):
- raise TypeError, 'argument must be callable'
-
- if callback is None:
- import util.callbacks as util
- callback = util.callbacks.EMPTY_CALLBACK
-
-
- import util.callbacks as util
- util.callbacks.register_call_later('AsyncoreThread', call_later)
-