home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import sys
- import threading
- from IPython.ultraTB import AutoFormattedTB
- from IPython.genutils import warn, error
-
- class BackgroundJobManager:
-
- def __init__(self):
- self.jobs_run = []
- self.jobs_comp = []
- self.jobs_dead = []
- self.jobs_all = { }
- self._comp_report = []
- self._dead_report = []
- self._s_created = BackgroundJobBase.stat_created_c
- self._s_running = BackgroundJobBase.stat_running_c
- self._s_completed = BackgroundJobBase.stat_completed_c
- self._s_dead = BackgroundJobBase.stat_dead_c
-
-
- def new(self, func_or_exp, *args, **kwargs):
- if callable(func_or_exp):
- kw = kwargs.get('kw', { })
- job = BackgroundJobFunc(func_or_exp, *args, **kw)
- elif isinstance(func_or_exp, basestring):
- if not args:
- frame = sys._getframe(1)
- glob = frame.f_globals
- loc = frame.f_locals
- elif len(args) == 1:
- glob = loc = args[0]
- elif len(args) == 2:
- (glob, loc) = args
- else:
- raise ValueError, 'Expression jobs take at most 2 args (globals,locals)'
- job = args(func_or_exp, glob, loc)
- else:
- raise
- jkeys = callable(func_or_exp).jobs_all.keys()
- if jkeys:
- job.num = max(jkeys) + 1
- else:
- job.num = 0
- self.jobs_run.append(job)
- self.jobs_all[job.num] = job
- print 'Starting job # %s in a separate thread.' % job.num
- job.start()
- return job
-
-
- def __getitem__(self, key):
- return self.jobs_all[key]
-
-
- def __call__(self):
- return self.status()
-
-
- def _update_status(self):
- run = self._s_running
- comp = self._s_completed
- dead = self._s_dead
- jobs_run = self.jobs_run
- for num in range(len(jobs_run)):
- job = jobs_run[num]
- stat = job.stat_code
- if stat == run:
- continue
- continue
- if stat == comp:
- self.jobs_comp.append(job)
- self._comp_report.append(job)
- jobs_run[num] = False
- continue
- if stat == dead:
- self.jobs_dead.append(job)
- self._dead_report.append(job)
- jobs_run[num] = False
- continue
-
- self.jobs_run = filter(None, self.jobs_run)
-
-
- def _group_report(self, group, name):
- if group:
- print '%s jobs:' % name
- for job in group:
- print '%s : %s' % (job.num, job)
-
- print
- return True
-
-
- def _group_flush(self, group, name):
- njobs = len(group)
- if njobs:
- plural = {
- 1: '' }.setdefault(njobs, 's')
- print 'Flushing %s %s job%s.' % (njobs, name, plural)
- group[:] = []
- return True
-
-
- def _status_new(self):
- self._update_status()
- new_comp = self._group_report(self._comp_report, 'Completed')
- new_dead = self._group_report(self._dead_report, 'Dead, call jobs.traceback() for details')
- self._comp_report[:] = []
- self._dead_report[:] = []
- if not new_comp:
- pass
- return new_dead
-
-
- def status(self, verbose = 0):
- self._update_status()
- self._group_report(self.jobs_run, 'Running')
- self._group_report(self.jobs_comp, 'Completed')
- self._group_report(self.jobs_dead, 'Dead')
- self._comp_report[:] = []
- self._dead_report[:] = []
-
-
- def remove(self, num):
-
- try:
- job = self.jobs_all[num]
- except KeyError:
- error('Job #%s not found' % num)
-
- stat_code = job.stat_code
- if stat_code == self._s_running:
- error('Job #%s is still running, it can not be removed.' % num)
- return None
- if stat_code == self._s_completed:
- self.jobs_comp.remove(job)
- elif stat_code == self._s_dead:
- self.jobs_dead.remove(job)
-
-
-
- def flush_finished(self):
- if self._status_new():
- error('New jobs completed since last _status_new(), aborting flush.')
- return None
- jobs_all = self.jobs_all
- for job in self.jobs_comp + self.jobs_dead:
- del jobs_all[job.num]
-
- fl_comp = self._group_flush(self.jobs_comp, 'Completed')
- fl_dead = self._group_flush(self.jobs_dead, 'Dead')
-
-
- def result(self, num):
-
- try:
- return self.jobs_all[num].result
- except KeyError:
- error('Job #%s not found' % num)
-
-
-
- def traceback(self, num):
-
- try:
- self.jobs_all[num].traceback()
- except KeyError:
- error('Job #%s not found' % num)
-
-
-
-
- class BackgroundJobBase(threading.Thread):
- stat_created = 'Created'
- stat_created_c = 0
- stat_running = 'Running'
- stat_running_c = 1
- stat_completed = 'Completed'
- stat_completed_c = 2
- stat_dead = 'Dead (Exception), call jobs.traceback() for details'
- stat_dead_c = -1
-
- def __init__(self):
- raise NotImplementedError, 'This class can not be instantiated directly.'
-
-
- def _init(self):
- for attr in [
- 'call',
- 'strform']:
- pass
-
- self.num = None
- self.status = BackgroundJobBase.stat_created
- self.stat_code = BackgroundJobBase.stat_created_c
- self.finished = False
- self.result = '<BackgroundJob has not completed>'
-
- try:
- self._make_tb = __IPYTHON__.InteractiveTB.text
- except:
- self._make_tb = AutoFormattedTB(mode = 'Context', color_scheme = 'NoColor', tb_offset = 1).text
-
- self._tb = None
- threading.Thread.__init__(self)
-
-
- def __str__(self):
- return self.strform
-
-
- def __repr__(self):
- return '<BackgroundJob: %s>' % self.strform
-
-
- def traceback(self):
- print self._tb
-
-
- def run(self):
-
- try:
- self.status = BackgroundJobBase.stat_running
- self.stat_code = BackgroundJobBase.stat_running_c
- self.result = self.call()
- except:
- self.status = BackgroundJobBase.stat_dead
- self.stat_code = BackgroundJobBase.stat_dead_c
- self.finished = None
- self.result = '<BackgroundJob died, call jobs.traceback() for details>'
- self._tb = self._make_tb()
-
- self.status = BackgroundJobBase.stat_completed
- self.stat_code = BackgroundJobBase.stat_completed_c
- self.finished = True
-
-
-
- class BackgroundJobExpr(BackgroundJobBase):
-
- def __init__(self, expression, glob = None, loc = None):
- self.code = compile(expression, '<BackgroundJob compilation>', 'eval')
- if glob is None:
- glob = { }
-
- if loc is None:
- loc = { }
-
- self.expression = self.strform = expression
- self.glob = glob
- self.loc = loc
- self._init()
-
-
- def call(self):
- return eval(self.code, self.glob, self.loc)
-
-
-
- class BackgroundJobFunc(BackgroundJobBase):
-
- def __init__(self, func, *args, **kwargs):
- if args is None:
- args = []
-
- if kwargs is None:
- kwargs = { }
-
- self.func = func
- self.args = args
- self.kwargs = kwargs
- self.strform = str(func)
- self._init()
-
-
- def call(self):
- return self.func(*self.args, **self.kwargs)
-
-
- if __name__ == '__main__':
- import time
-
- def sleepfunc(interval = 2, *a, **kw):
- args = dict(interval = interval, args = a, kwargs = kw)
- time.sleep(interval)
- return args
-
-
- def diefunc(interval = 2, *a, **kw):
- time.sleep(interval)
- die
-
-
- def printfunc(interval = 1, reps = 5):
- for n in range(reps):
- time.sleep(interval)
- print 'In the background...'
-
-
- jobs = BackgroundJobManager()
- jobs.new(sleepfunc, 4)
- jobs.new(sleepfunc, kw = {
- 'reps': 2 })
- jobs.new(diefunc, 1)
- jobs.new('printfunc(1,3)')
- print jobs[1].status
- jobs[1].traceback()
- print 'The result of job #0 is:', jobs[0].result
-
-