home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __docformat__ = 'restructuredtext en'
- __test__ = { }
- import copy
- import time
- from types import FunctionType
- import zope.interface as zi
- import string
- from twisted.internet import defer, reactor
- from twisted.python import components, log, failure
- from IPython.kernel.util import printer
- from IPython.kernel import engineservice as es, error
- from IPython.kernel import controllerservice as cs
- from IPython.kernel.twistedutil import gatherBoth, DeferredList
- from IPython.kernel.pickleutil import can, uncan, CannedFunction
- time_format = '%Y/%m/%d %H:%M:%S'
-
- class ITask(zi.Interface):
- zi.Attribute('retries', 'How many times to retry the task')
- zi.Attribute('recovery_task', 'A task to try if the initial one fails')
- zi.Attribute('taskid', 'the id of the task')
-
- def start_time(result):
- pass
-
-
- def stop_time(result):
- pass
-
-
- def pre_task(d, queued_engine):
- pass
-
-
- def post_task(d, queued_engine):
- pass
-
-
- def submit_task(d, queued_engine):
- pass
-
-
- def process_result(d, result, engine_id):
- pass
-
-
- def check_depend(properties):
- pass
-
-
- def can_task(self):
- pass
-
-
- def uncan_task(self):
- pass
-
-
-
- class BaseTask(object):
- zi.implements(ITask)
-
- def __init__(self, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
- self.clear_before = clear_before
- self.clear_after = clear_after
- self.retries = retries
- self.recovery_task = recovery_task
- self.depend = depend
- self.taskid = None
-
-
- def start_time(self, result):
- self.start = time.time()
- self.start_struct = time.localtime()
- return result
-
-
- def stop_time(self, result):
- self.stop = time.time()
- self.stop_struct = time.localtime()
- self.duration = self.stop - self.start
- self.submitted = time.strftime(time_format, self.start_struct)
- self.completed = time.strftime(time_format)
- return result
-
-
- def pre_task(self, d, queued_engine):
- if self.clear_before:
- (d.addCallback,)((lambda r: queued_engine.reset()))
-
-
-
- def post_task(self, d, queued_engine):
-
- def reseter(result):
- queued_engine.reset()
- return result
-
- if self.clear_after:
- d.addBoth(reseter)
-
-
-
- def submit_task(self, d, queued_engine):
- raise NotImplementedError('submit_task must be implemented in a subclass')
-
-
- def process_result(self, result, engine_id):
- if isinstance(result, failure.Failure):
- return (False, result)
- return (True, result)
-
-
- def check_depend(self, properties):
- if self.depend is not None:
- return self.depend(properties)
- return True
-
-
- def can_task(self):
- self.depend = can(self.depend)
- if isinstance(self.recovery_task, BaseTask):
- self.recovery_task.can_task()
-
-
-
- def uncan_task(self):
- self.depend = uncan(self.depend)
- if isinstance(self.recovery_task, BaseTask):
- self.recovery_task.uncan_task()
-
-
-
-
- class MapTask(BaseTask):
- zi.implements(ITask)
-
- def __init__(self, function, args = None, kwargs = None, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
- BaseTask.__init__(self, clear_before, clear_after, retries, recovery_task, depend)
- if not isinstance(function, FunctionType):
- raise TypeError('a task function must be a FunctionType')
- isinstance(function, FunctionType)
- self.function = function
- if args is None:
- self.args = ()
- else:
- self.args = args
- if not isinstance(self.args, (list, tuple)):
- raise TypeError('a task args must be a list or tuple')
- isinstance(self.args, (list, tuple))
- if kwargs is None:
- self.kwargs = { }
- else:
- self.kwargs = kwargs
- if not isinstance(self.kwargs, dict):
- raise TypeError('a task kwargs must be a dict')
- isinstance(self.kwargs, dict)
-
-
- def submit_task(self, d, queued_engine):
- (None, d.addCallback)((lambda r: queued_engine.push_function(dict(_ipython_task_function = self.function))))
- (None, d.addCallback)((lambda r: queued_engine.push(dict(_ipython_task_args = self.args, _ipython_task_kwargs = self.kwargs))))
- (d.addCallback,)((lambda r: queued_engine.execute('_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')))
- (d.addCallback,)((lambda r: queued_engine.pull('_ipython_task_result')))
-
-
- def can_task(self):
- self.function = can(self.function)
- BaseTask.can_task(self)
-
-
- def uncan_task(self):
- self.function = uncan(self.function)
- BaseTask.uncan_task(self)
-
-
-
- class StringTask(BaseTask):
-
- def __init__(self, expression, pull = None, push = None, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
- if not isinstance(expression, str):
- raise TypeError('a task expression must be a string')
- isinstance(expression, str)
- self.expression = expression
- if pull == None:
- self.pull = ()
- elif isinstance(pull, str):
- self.pull = (pull,)
- elif isinstance(pull, (list, tuple)):
- self.pull = pull
- else:
- raise TypeError('pull must be str or a sequence of strs')
- if pull == None == None:
- self.push = { }
- elif isinstance(push, dict):
- self.push = push
- else:
- raise TypeError('push must be a dict')
- (pull == None == None).__init__(self, clear_before, clear_after, retries, recovery_task, depend)
-
-
- def submit_task(self, d, queued_engine):
- if self.push is not None:
- (None, d.addCallback)((lambda r: queued_engine.push(self.push)))
-
- (None, d.addCallback)((lambda r: queued_engine.execute(self.expression)))
- if self.pull is not None:
- (None, d.addCallback)((lambda r: queued_engine.pull(self.pull)))
- else:
- d.addCallback((lambda r: pass))
-
-
- def process_result(self, result, engine_id):
- if isinstance(result, failure.Failure):
- tr = TaskResult(result, engine_id)
- elif self.pull is None:
- resultDict = { }
- elif len(self.pull) == 1:
- resultDict = {
- self.pull[0]: result }
- else:
- resultDict = dict(zip(self.pull, result))
- tr = TaskResult(resultDict, engine_id)
- tr.submitted = self.submitted
- tr.completed = self.completed
- tr.duration = self.duration
- if hasattr(self, 'taskid'):
- tr.taskid = self.taskid
- else:
- tr.taskid = None
- if isinstance(result, failure.Failure):
- return (False, tr)
- return (True, tr)
-
-
-
- class ResultNS(object):
-
- def __init__(self, dikt):
- for k, v in dikt.iteritems():
- setattr(self, k, v)
-
-
-
- def __repr__(self):
- l = dir(self)
- d = { }
- for k in l:
- if k[:2] != '__' and k[-2:] != '__':
- d[k] = getattr(self, k)
- continue
-
- return 'NS' + repr(d)
-
-
- def __getitem__(self, key):
- return getattr(self, key)
-
-
-
- class TaskResult(object):
- taskid = None
-
- def _getNS(self):
- if isinstance(self.failure, failure.Failure):
- return self.failure.raiseException()
- return self._ns
-
-
- def _setNS(self, v):
- raise Exception('the ns attribute cannot be changed')
-
- ns = property(_getNS, _setNS)
-
- def __init__(self, results, engineid):
- self.engineid = engineid
- if isinstance(results, failure.Failure):
- self.failure = results
- self.results = { }
- else:
- self.results = results
- self.failure = None
- self._ns = ResultNS(self.results)
- self.keys = self.results.keys()
-
-
- def __repr__(self):
- if self.failure is not None:
- contents = self.failure
- else:
- contents = self.results
- return 'TaskResult[ID:%r]:%r' % (self.taskid, contents)
-
-
- def __getitem__(self, key):
- if self.failure is not None:
- self.raise_exception()
-
- return self.results[key]
-
-
- def raise_exception(self):
- if self.failure is not None:
- self.failure.raiseException()
-
-
-
-
- class IWorker(zi.Interface):
- zi.Attribute('workerid', 'the id of the worker')
-
- def run(task):
- pass
-
-
-
- class WorkerFromQueuedEngine(object):
- zi.implements(IWorker)
-
- def __init__(self, qe):
- self.queuedEngine = qe
- self.workerid = None
-
-
- def _get_properties(self):
- return self.queuedEngine.properties
-
- properties = property(_get_properties, (lambda self, _: pass))
-
- def run(self, task):
- d = defer.succeed(None)
- d.addCallback(task.start_time)
- task.pre_task(d, self.queuedEngine)
- task.submit_task(d, self.queuedEngine)
- task.post_task(d, self.queuedEngine)
- d.addBoth(task.stop_time)
- d.addBoth(task.process_result, self.queuedEngine.id)
- return d
-
-
- components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
-
- class IScheduler(zi.Interface):
- zi.Attribute('nworkers', 'the number of unassigned workers')
- zi.Attribute('ntasks', 'the number of unscheduled tasks')
- zi.Attribute('workerids', 'a list of the worker ids')
- zi.Attribute('taskids', 'a list of the task ids')
-
- def add_task(task, **flags):
- pass
-
-
- def pop_task(id = None):
- pass
-
-
- def add_worker(worker, **flags):
- pass
-
-
- def pop_worker(id = None):
- pass
-
-
- def ready():
- pass
-
-
- def schedule():
- pass
-
-
-
- class FIFOScheduler(object):
- zi.implements(IScheduler)
-
- def __init__(self):
- self.tasks = []
- self.workers = []
-
-
- def _ntasks(self):
- return len(self.tasks)
-
-
- def _nworkers(self):
- return len(self.workers)
-
- ntasks = property(_ntasks, (lambda self, _: pass))
- nworkers = property(_nworkers, (lambda self, _: pass))
-
- def _taskids(self):
- return [ t.taskid for t in self.tasks ]
-
-
- def _workerids(self):
- return [ w.workerid for w in self.workers ]
-
- taskids = property(_taskids, (lambda self, _: pass))
- workerids = property(_workerids, (lambda self, _: pass))
-
- def add_task(self, task, **flags):
- self.tasks.append(task)
-
-
- def pop_task(self, id = None):
- if id is None:
- return self.tasks.pop(0)
- for i in range(len(self.tasks)):
- taskid = self.tasks[i].taskid
- if id == taskid:
- return self.tasks.pop(i)
-
- raise IndexError('No task #%i' % id)
-
-
- def add_worker(self, worker, **flags):
- self.workers.append(worker)
-
-
- def pop_worker(self, id = None):
- if id is None:
- return self.workers.pop(0)
- for i in range(len(self.workers)):
- workerid = self.workers[i].workerid
- if id == workerid:
- return self.workers.pop(i)
-
- raise IndexError('No worker #%i' % id)
-
-
- def schedule(self):
- for t in self.tasks:
- for w in self.workers:
-
- try:
- cando = t.check_depend(w.properties)
- except:
- cando = False
-
- if cando:
- return (self.pop_worker(w.workerid), self.pop_task(t.taskid))
-
-
- return (None, None)
-
-
-
- class LIFOScheduler(FIFOScheduler):
-
- def add_task(self, task, **flags):
- self.tasks.insert(0, task)
-
-
- def add_worker(self, worker, **flags):
- self.workers.insert(0, worker)
-
-
-
- class ITaskController(cs.IControllerBase):
-
- def run(task):
- pass
-
-
- def get_task_result(taskid, block = False):
- pass
-
-
- def abort(taskid):
- pass
-
-
- def barrier(taskids):
- pass
-
-
- def spin():
- pass
-
-
- def queue_status(verbose = False):
- pass
-
-
- def clear():
- pass
-
-
-
- class TaskController(cs.ControllerAdapterBase):
- zi.implements(ITaskController)
- SchedulerClass = FIFOScheduler
- timeout = 30
-
- def __init__(self, controller):
- self.controller = controller
- self.controller.on_register_engine_do(self.registerWorker, True)
- self.controller.on_unregister_engine_do(self.unregisterWorker, True)
- self.taskid = 0
- self.failurePenalty = 1
- self.pendingTasks = { }
- self.deferredResults = { }
- self.finishedResults = { }
- self.workers = { }
- self.abortPending = []
- self.idleLater = None
- self.scheduler = self.SchedulerClass()
- for id in self.controller.engines.keys():
- self.workers[id] = IWorker(self.controller.engines[id])
- self.workers[id].workerid = id
- self.schedule.add_worker(self.workers[id])
-
-
-
- def registerWorker(self, id):
- if self.workers.get(id):
- raise ValueError('worker with id %s already exists. This should not happen.' % id)
- self.workers.get(id)
- self.workers[id] = IWorker(self.controller.engines[id])
- self.workers[id].workerid = id
- if not self.pendingTasks.has_key(id):
- self.scheduler.add_worker(self.workers[id])
-
- self.distributeTasks()
-
-
- def unregisterWorker(self, id):
- if self.workers.has_key(id):
-
- try:
- self.scheduler.pop_worker(id)
- except IndexError:
- pass
-
- self.workers.pop(id)
-
-
-
- def _pendingTaskIDs(self):
- return [ t.taskid for t in self.pendingTasks.values() ]
-
-
- def run(self, task):
- task.taskid = self.taskid
- task.start = time.localtime()
- self.taskid += 1
- d = defer.Deferred()
- self.scheduler.add_task(task)
- log.msg('Queuing task: %i' % task.taskid)
- self.deferredResults[task.taskid] = []
- self.distributeTasks()
- return defer.succeed(task.taskid)
-
-
- def get_task_result(self, taskid, block = False):
- log.msg('Getting task result: %i' % taskid)
- if self.finishedResults.has_key(taskid):
- tr = self.finishedResults[taskid]
- return defer.succeed(tr)
- if self.deferredResults.has_key(taskid):
- if block:
- d = defer.Deferred()
- self.deferredResults[taskid].append(d)
- return d
- return defer.succeed(None)
- self.deferredResults.has_key(taskid)
- return defer.fail(IndexError('task ID not registered: %r' % taskid))
-
-
- def abort(self, taskid):
- if not isinstance(taskid, int):
- return defer.fail(failure.Failure(TypeError('an integer task id expected: %r' % taskid)))
-
- try:
- self.scheduler.pop_task(taskid)
- except IndexError:
- isinstance(taskid, int)
- e = isinstance(taskid, int)
- if taskid in self.finishedResults.keys():
- d = defer.fail(IndexError('Task Already Completed'))
- elif taskid in self.abortPending:
- d = defer.fail(IndexError('Task Already Aborted'))
- elif taskid in self._pendingTaskIDs():
- self.abortPending.append(taskid)
- d = defer.succeed(None)
- else:
- d = defer.fail(e)
- except:
- taskid in self.finishedResults.keys()
-
- d = defer.execute(self._doAbort, taskid)
- return d
-
-
- def barrier(self, taskids):
- dList = []
- if isinstance(taskids, int):
- taskids = [
- taskids]
-
- for id in taskids:
- d = self.get_task_result(id, block = True)
- dList.append(d)
-
- d = DeferredList(dList, consumeErrors = 1)
- d.addCallbacks((lambda r: pass))
- return d
-
-
- def spin(self):
- return defer.succeed(self.distributeTasks())
-
-
- def queue_status(self, verbose = False):
- pending = self._pendingTaskIDs()
- failed = []
- succeeded = []
- for k, v in self.finishedResults.iteritems():
- if not isinstance(v, failure.Failure):
- if hasattr(v, 'failure'):
- if v.failure is None:
- succeeded.append(k)
- else:
- failed.append(k)
-
- hasattr(v, 'failure')
-
- scheduled = self.scheduler.taskids
- if verbose:
- result = dict(pending = pending, failed = failed, succeeded = succeeded, scheduled = scheduled)
- else:
- result = dict(pending = len(pending), failed = len(failed), succeeded = len(succeeded), scheduled = len(scheduled))
- return defer.succeed(result)
-
-
- def _doAbort(self, taskid):
- log.msg('Task aborted: %i' % taskid)
- result = failure.Failure(error.TaskAborted())
- self._finishTask(taskid, result)
- if taskid in self.abortPending:
- self.abortPending.remove(taskid)
-
-
-
- def _finishTask(self, taskid, result):
- dlist = self.deferredResults.pop(taskid)
- self.finishedResults[taskid] = result
- for d in dlist:
- d.callback(result)
-
-
-
- def distributeTasks(self):
- log.msg('distributing Tasks')
- (worker, task) = self.scheduler.schedule()
- if not worker and not task:
- if self.idleLater and self.idleLater.called:
- self.idleLater = None
- else:
- self.checkIdle()
- return False
- while worker and task:
- self.pendingTasks[worker.workerid] = task
- d = worker.run(task)
- log.msg('Running task %i on worker %i' % (task.taskid, worker.workerid))
- d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
- (worker, task) = self.scheduler.schedule()
- continue
- not task
- self.checkIdle()
- return True
-
-
- def checkIdle(self):
- if self.idleLater and not (self.idleLater.called):
- self.idleLater.cancel()
-
- if self.scheduler.ntasks and self.workers and self.scheduler.nworkers == len(self.workers):
- self.idleLater = reactor.callLater(self.timeout, self.failIdle)
- else:
- self.idleLater = None
-
-
- def failIdle(self):
- if not self.distributeTasks():
- while self.scheduler.ntasks:
- t = self.scheduler.pop_task()
- msg = 'task %i failed to execute due to unmet dependencies' % t.taskid
- msg += ' for %i seconds' % self.timeout
- log.msg('Task aborted by timeout: %i' % t.taskid)
- f = failure.Failure(error.TaskTimeout(msg))
- self._finishTask(t.taskid, f)
-
- self.idleLater = None
-
-
- def taskCompleted(self, success_and_result, taskid, workerid):
- (success, result) = success_and_result
-
- try:
- task = self.pendingTasks.pop(workerid)
- except:
- log.msg('Tried to pop bad pending task %i from worker %i' % (taskid, workerid))
- log.msg('Result: %r' % result)
- log.msg('Pending tasks: %s' % self.pendingTasks)
- return None
-
- aborted = False
- if taskid in self.abortPending:
- self._doAbort(taskid)
- aborted = True
-
- if not aborted:
- if not success:
- log.msg('Task %i failed on worker %i' % (taskid, workerid))
- if task.retries > 0:
- task.retries -= 1
- self.scheduler.add_task(task)
- s = 'Resubmitting task %i, %i retries remaining' % (taskid, task.retries)
- log.msg(s)
- self.distributeTasks()
- elif isinstance(task.recovery_task, BaseTask) and task.recovery_task.retries > -1:
- task.retries = -1
- task.recovery_task.taskid = taskid
- task = task.recovery_task
- self.scheduler.add_task(task)
- s = 'Recovering task %i, %i retries remaining' % (taskid, task.retries)
- log.msg(s)
- self.distributeTasks()
- else:
- self._finishTask(taskid, result)
- reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
- else:
- log.msg('Task completed: %i' % taskid)
- self._finishTask(taskid, result)
- self.readmitWorker(workerid)
- elif not success:
- reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
- else:
- self.readmitWorker(workerid)
-
-
- def readmitWorker(self, workerid):
- if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
- self.scheduler.add_worker(self.workers[workerid])
- self.distributeTasks()
-
-
-
- def clear(self):
- self.finishedResults = { }
- return defer.succeed(None)
-
-
- components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
-