home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_1887 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  27.8 KB  |  759 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __docformat__ = 'restructuredtext en'
  5. __test__ = { }
  6. import copy
  7. import time
  8. from types import FunctionType
  9. import zope.interface as zi
  10. import string
  11. from twisted.internet import defer, reactor
  12. from twisted.python import components, log, failure
  13. from IPython.kernel.util import printer
  14. from IPython.kernel import engineservice as es, error
  15. from IPython.kernel import controllerservice as cs
  16. from IPython.kernel.twistedutil import gatherBoth, DeferredList
  17. from IPython.kernel.pickleutil import can, uncan, CannedFunction
  18. time_format = '%Y/%m/%d %H:%M:%S'
  19.  
  20. class ITask(zi.Interface):
  21.     zi.Attribute('retries', 'How many times to retry the task')
  22.     zi.Attribute('recovery_task', 'A task to try if the initial one fails')
  23.     zi.Attribute('taskid', 'the id of the task')
  24.     
  25.     def start_time(result):
  26.         pass
  27.  
  28.     
  29.     def stop_time(result):
  30.         pass
  31.  
  32.     
  33.     def pre_task(d, queued_engine):
  34.         pass
  35.  
  36.     
  37.     def post_task(d, queued_engine):
  38.         pass
  39.  
  40.     
  41.     def submit_task(d, queued_engine):
  42.         pass
  43.  
  44.     
  45.     def process_result(d, result, engine_id):
  46.         pass
  47.  
  48.     
  49.     def check_depend(properties):
  50.         pass
  51.  
  52.     
  53.     def can_task(self):
  54.         pass
  55.  
  56.     
  57.     def uncan_task(self):
  58.         pass
  59.  
  60.  
  61.  
  62. class BaseTask(object):
  63.     zi.implements(ITask)
  64.     
  65.     def __init__(self, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
  66.         self.clear_before = clear_before
  67.         self.clear_after = clear_after
  68.         self.retries = retries
  69.         self.recovery_task = recovery_task
  70.         self.depend = depend
  71.         self.taskid = None
  72.  
  73.     
  74.     def start_time(self, result):
  75.         self.start = time.time()
  76.         self.start_struct = time.localtime()
  77.         return result
  78.  
  79.     
  80.     def stop_time(self, result):
  81.         self.stop = time.time()
  82.         self.stop_struct = time.localtime()
  83.         self.duration = self.stop - self.start
  84.         self.submitted = time.strftime(time_format, self.start_struct)
  85.         self.completed = time.strftime(time_format)
  86.         return result
  87.  
  88.     
  89.     def pre_task(self, d, queued_engine):
  90.         if self.clear_before:
  91.             (d.addCallback,)((lambda r: queued_engine.reset()))
  92.         
  93.  
  94.     
  95.     def post_task(self, d, queued_engine):
  96.         
  97.         def reseter(result):
  98.             queued_engine.reset()
  99.             return result
  100.  
  101.         if self.clear_after:
  102.             d.addBoth(reseter)
  103.         
  104.  
  105.     
  106.     def submit_task(self, d, queued_engine):
  107.         raise NotImplementedError('submit_task must be implemented in a subclass')
  108.  
  109.     
  110.     def process_result(self, result, engine_id):
  111.         if isinstance(result, failure.Failure):
  112.             return (False, result)
  113.         return (True, result)
  114.  
  115.     
  116.     def check_depend(self, properties):
  117.         if self.depend is not None:
  118.             return self.depend(properties)
  119.         return True
  120.  
  121.     
  122.     def can_task(self):
  123.         self.depend = can(self.depend)
  124.         if isinstance(self.recovery_task, BaseTask):
  125.             self.recovery_task.can_task()
  126.         
  127.  
  128.     
  129.     def uncan_task(self):
  130.         self.depend = uncan(self.depend)
  131.         if isinstance(self.recovery_task, BaseTask):
  132.             self.recovery_task.uncan_task()
  133.         
  134.  
  135.  
  136.  
  137. class MapTask(BaseTask):
  138.     zi.implements(ITask)
  139.     
  140.     def __init__(self, function, args = None, kwargs = None, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
  141.         BaseTask.__init__(self, clear_before, clear_after, retries, recovery_task, depend)
  142.         if not isinstance(function, FunctionType):
  143.             raise TypeError('a task function must be a FunctionType')
  144.         isinstance(function, FunctionType)
  145.         self.function = function
  146.         if args is None:
  147.             self.args = ()
  148.         else:
  149.             self.args = args
  150.         if not isinstance(self.args, (list, tuple)):
  151.             raise TypeError('a task args must be a list or tuple')
  152.         isinstance(self.args, (list, tuple))
  153.         if kwargs is None:
  154.             self.kwargs = { }
  155.         else:
  156.             self.kwargs = kwargs
  157.         if not isinstance(self.kwargs, dict):
  158.             raise TypeError('a task kwargs must be a dict')
  159.         isinstance(self.kwargs, dict)
  160.  
  161.     
  162.     def submit_task(self, d, queued_engine):
  163.         (None, d.addCallback)((lambda r: queued_engine.push_function(dict(_ipython_task_function = self.function))))
  164.         (None, d.addCallback)((lambda r: queued_engine.push(dict(_ipython_task_args = self.args, _ipython_task_kwargs = self.kwargs))))
  165.         (d.addCallback,)((lambda r: queued_engine.execute('_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')))
  166.         (d.addCallback,)((lambda r: queued_engine.pull('_ipython_task_result')))
  167.  
  168.     
  169.     def can_task(self):
  170.         self.function = can(self.function)
  171.         BaseTask.can_task(self)
  172.  
  173.     
  174.     def uncan_task(self):
  175.         self.function = uncan(self.function)
  176.         BaseTask.uncan_task(self)
  177.  
  178.  
  179.  
  180. class StringTask(BaseTask):
  181.     
  182.     def __init__(self, expression, pull = None, push = None, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None):
  183.         if not isinstance(expression, str):
  184.             raise TypeError('a task expression must be a string')
  185.         isinstance(expression, str)
  186.         self.expression = expression
  187.         if pull == None:
  188.             self.pull = ()
  189.         elif isinstance(pull, str):
  190.             self.pull = (pull,)
  191.         elif isinstance(pull, (list, tuple)):
  192.             self.pull = pull
  193.         else:
  194.             raise TypeError('pull must be str or a sequence of strs')
  195.         if pull == None == None:
  196.             self.push = { }
  197.         elif isinstance(push, dict):
  198.             self.push = push
  199.         else:
  200.             raise TypeError('push must be a dict')
  201.         (pull == None == None).__init__(self, clear_before, clear_after, retries, recovery_task, depend)
  202.  
  203.     
  204.     def submit_task(self, d, queued_engine):
  205.         if self.push is not None:
  206.             (None, d.addCallback)((lambda r: queued_engine.push(self.push)))
  207.         
  208.         (None, d.addCallback)((lambda r: queued_engine.execute(self.expression)))
  209.         if self.pull is not None:
  210.             (None, d.addCallback)((lambda r: queued_engine.pull(self.pull)))
  211.         else:
  212.             d.addCallback((lambda r: pass))
  213.  
  214.     
  215.     def process_result(self, result, engine_id):
  216.         if isinstance(result, failure.Failure):
  217.             tr = TaskResult(result, engine_id)
  218.         elif self.pull is None:
  219.             resultDict = { }
  220.         elif len(self.pull) == 1:
  221.             resultDict = {
  222.                 self.pull[0]: result }
  223.         else:
  224.             resultDict = dict(zip(self.pull, result))
  225.         tr = TaskResult(resultDict, engine_id)
  226.         tr.submitted = self.submitted
  227.         tr.completed = self.completed
  228.         tr.duration = self.duration
  229.         if hasattr(self, 'taskid'):
  230.             tr.taskid = self.taskid
  231.         else:
  232.             tr.taskid = None
  233.         if isinstance(result, failure.Failure):
  234.             return (False, tr)
  235.         return (True, tr)
  236.  
  237.  
  238.  
  239. class ResultNS(object):
  240.     
  241.     def __init__(self, dikt):
  242.         for k, v in dikt.iteritems():
  243.             setattr(self, k, v)
  244.         
  245.  
  246.     
  247.     def __repr__(self):
  248.         l = dir(self)
  249.         d = { }
  250.         for k in l:
  251.             if k[:2] != '__' and k[-2:] != '__':
  252.                 d[k] = getattr(self, k)
  253.                 continue
  254.         
  255.         return 'NS' + repr(d)
  256.  
  257.     
  258.     def __getitem__(self, key):
  259.         return getattr(self, key)
  260.  
  261.  
  262.  
  263. class TaskResult(object):
  264.     taskid = None
  265.     
  266.     def _getNS(self):
  267.         if isinstance(self.failure, failure.Failure):
  268.             return self.failure.raiseException()
  269.         return self._ns
  270.  
  271.     
  272.     def _setNS(self, v):
  273.         raise Exception('the ns attribute cannot be changed')
  274.  
  275.     ns = property(_getNS, _setNS)
  276.     
  277.     def __init__(self, results, engineid):
  278.         self.engineid = engineid
  279.         if isinstance(results, failure.Failure):
  280.             self.failure = results
  281.             self.results = { }
  282.         else:
  283.             self.results = results
  284.             self.failure = None
  285.         self._ns = ResultNS(self.results)
  286.         self.keys = self.results.keys()
  287.  
  288.     
  289.     def __repr__(self):
  290.         if self.failure is not None:
  291.             contents = self.failure
  292.         else:
  293.             contents = self.results
  294.         return 'TaskResult[ID:%r]:%r' % (self.taskid, contents)
  295.  
  296.     
  297.     def __getitem__(self, key):
  298.         if self.failure is not None:
  299.             self.raise_exception()
  300.         
  301.         return self.results[key]
  302.  
  303.     
  304.     def raise_exception(self):
  305.         if self.failure is not None:
  306.             self.failure.raiseException()
  307.         
  308.  
  309.  
  310.  
  311. class IWorker(zi.Interface):
  312.     zi.Attribute('workerid', 'the id of the worker')
  313.     
  314.     def run(task):
  315.         pass
  316.  
  317.  
  318.  
  319. class WorkerFromQueuedEngine(object):
  320.     zi.implements(IWorker)
  321.     
  322.     def __init__(self, qe):
  323.         self.queuedEngine = qe
  324.         self.workerid = None
  325.  
  326.     
  327.     def _get_properties(self):
  328.         return self.queuedEngine.properties
  329.  
  330.     properties = property(_get_properties, (lambda self, _: pass))
  331.     
  332.     def run(self, task):
  333.         d = defer.succeed(None)
  334.         d.addCallback(task.start_time)
  335.         task.pre_task(d, self.queuedEngine)
  336.         task.submit_task(d, self.queuedEngine)
  337.         task.post_task(d, self.queuedEngine)
  338.         d.addBoth(task.stop_time)
  339.         d.addBoth(task.process_result, self.queuedEngine.id)
  340.         return d
  341.  
  342.  
  343. components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
  344.  
  345. class IScheduler(zi.Interface):
  346.     zi.Attribute('nworkers', 'the number of unassigned workers')
  347.     zi.Attribute('ntasks', 'the number of unscheduled tasks')
  348.     zi.Attribute('workerids', 'a list of the worker ids')
  349.     zi.Attribute('taskids', 'a list of the task ids')
  350.     
  351.     def add_task(task, **flags):
  352.         pass
  353.  
  354.     
  355.     def pop_task(id = None):
  356.         pass
  357.  
  358.     
  359.     def add_worker(worker, **flags):
  360.         pass
  361.  
  362.     
  363.     def pop_worker(id = None):
  364.         pass
  365.  
  366.     
  367.     def ready():
  368.         pass
  369.  
  370.     
  371.     def schedule():
  372.         pass
  373.  
  374.  
  375.  
  376. class FIFOScheduler(object):
  377.     zi.implements(IScheduler)
  378.     
  379.     def __init__(self):
  380.         self.tasks = []
  381.         self.workers = []
  382.  
  383.     
  384.     def _ntasks(self):
  385.         return len(self.tasks)
  386.  
  387.     
  388.     def _nworkers(self):
  389.         return len(self.workers)
  390.  
  391.     ntasks = property(_ntasks, (lambda self, _: pass))
  392.     nworkers = property(_nworkers, (lambda self, _: pass))
  393.     
  394.     def _taskids(self):
  395.         return [ t.taskid for t in self.tasks ]
  396.  
  397.     
  398.     def _workerids(self):
  399.         return [ w.workerid for w in self.workers ]
  400.  
  401.     taskids = property(_taskids, (lambda self, _: pass))
  402.     workerids = property(_workerids, (lambda self, _: pass))
  403.     
  404.     def add_task(self, task, **flags):
  405.         self.tasks.append(task)
  406.  
  407.     
  408.     def pop_task(self, id = None):
  409.         if id is None:
  410.             return self.tasks.pop(0)
  411.         for i in range(len(self.tasks)):
  412.             taskid = self.tasks[i].taskid
  413.             if id == taskid:
  414.                 return self.tasks.pop(i)
  415.         
  416.         raise IndexError('No task #%i' % id)
  417.  
  418.     
  419.     def add_worker(self, worker, **flags):
  420.         self.workers.append(worker)
  421.  
  422.     
  423.     def pop_worker(self, id = None):
  424.         if id is None:
  425.             return self.workers.pop(0)
  426.         for i in range(len(self.workers)):
  427.             workerid = self.workers[i].workerid
  428.             if id == workerid:
  429.                 return self.workers.pop(i)
  430.         
  431.         raise IndexError('No worker #%i' % id)
  432.  
  433.     
  434.     def schedule(self):
  435.         for t in self.tasks:
  436.             for w in self.workers:
  437.                 
  438.                 try:
  439.                     cando = t.check_depend(w.properties)
  440.                 except:
  441.                     cando = False
  442.  
  443.                 if cando:
  444.                     return (self.pop_worker(w.workerid), self.pop_task(t.taskid))
  445.             
  446.         
  447.         return (None, None)
  448.  
  449.  
  450.  
  451. class LIFOScheduler(FIFOScheduler):
  452.     
  453.     def add_task(self, task, **flags):
  454.         self.tasks.insert(0, task)
  455.  
  456.     
  457.     def add_worker(self, worker, **flags):
  458.         self.workers.insert(0, worker)
  459.  
  460.  
  461.  
  462. class ITaskController(cs.IControllerBase):
  463.     
  464.     def run(task):
  465.         pass
  466.  
  467.     
  468.     def get_task_result(taskid, block = False):
  469.         pass
  470.  
  471.     
  472.     def abort(taskid):
  473.         pass
  474.  
  475.     
  476.     def barrier(taskids):
  477.         pass
  478.  
  479.     
  480.     def spin():
  481.         pass
  482.  
  483.     
  484.     def queue_status(verbose = False):
  485.         pass
  486.  
  487.     
  488.     def clear():
  489.         pass
  490.  
  491.  
  492.  
  493. class TaskController(cs.ControllerAdapterBase):
  494.     zi.implements(ITaskController)
  495.     SchedulerClass = FIFOScheduler
  496.     timeout = 30
  497.     
  498.     def __init__(self, controller):
  499.         self.controller = controller
  500.         self.controller.on_register_engine_do(self.registerWorker, True)
  501.         self.controller.on_unregister_engine_do(self.unregisterWorker, True)
  502.         self.taskid = 0
  503.         self.failurePenalty = 1
  504.         self.pendingTasks = { }
  505.         self.deferredResults = { }
  506.         self.finishedResults = { }
  507.         self.workers = { }
  508.         self.abortPending = []
  509.         self.idleLater = None
  510.         self.scheduler = self.SchedulerClass()
  511.         for id in self.controller.engines.keys():
  512.             self.workers[id] = IWorker(self.controller.engines[id])
  513.             self.workers[id].workerid = id
  514.             self.schedule.add_worker(self.workers[id])
  515.         
  516.  
  517.     
  518.     def registerWorker(self, id):
  519.         if self.workers.get(id):
  520.             raise ValueError('worker with id %s already exists.  This should not happen.' % id)
  521.         self.workers.get(id)
  522.         self.workers[id] = IWorker(self.controller.engines[id])
  523.         self.workers[id].workerid = id
  524.         if not self.pendingTasks.has_key(id):
  525.             self.scheduler.add_worker(self.workers[id])
  526.         
  527.         self.distributeTasks()
  528.  
  529.     
  530.     def unregisterWorker(self, id):
  531.         if self.workers.has_key(id):
  532.             
  533.             try:
  534.                 self.scheduler.pop_worker(id)
  535.             except IndexError:
  536.                 pass
  537.  
  538.             self.workers.pop(id)
  539.         
  540.  
  541.     
  542.     def _pendingTaskIDs(self):
  543.         return [ t.taskid for t in self.pendingTasks.values() ]
  544.  
  545.     
  546.     def run(self, task):
  547.         task.taskid = self.taskid
  548.         task.start = time.localtime()
  549.         self.taskid += 1
  550.         d = defer.Deferred()
  551.         self.scheduler.add_task(task)
  552.         log.msg('Queuing task: %i' % task.taskid)
  553.         self.deferredResults[task.taskid] = []
  554.         self.distributeTasks()
  555.         return defer.succeed(task.taskid)
  556.  
  557.     
  558.     def get_task_result(self, taskid, block = False):
  559.         log.msg('Getting task result: %i' % taskid)
  560.         if self.finishedResults.has_key(taskid):
  561.             tr = self.finishedResults[taskid]
  562.             return defer.succeed(tr)
  563.         if self.deferredResults.has_key(taskid):
  564.             if block:
  565.                 d = defer.Deferred()
  566.                 self.deferredResults[taskid].append(d)
  567.                 return d
  568.             return defer.succeed(None)
  569.         self.deferredResults.has_key(taskid)
  570.         return defer.fail(IndexError('task ID not registered: %r' % taskid))
  571.  
  572.     
  573.     def abort(self, taskid):
  574.         if not isinstance(taskid, int):
  575.             return defer.fail(failure.Failure(TypeError('an integer task id expected: %r' % taskid)))
  576.         
  577.         try:
  578.             self.scheduler.pop_task(taskid)
  579.         except IndexError:
  580.             isinstance(taskid, int)
  581.             e = isinstance(taskid, int)
  582.             if taskid in self.finishedResults.keys():
  583.                 d = defer.fail(IndexError('Task Already Completed'))
  584.             elif taskid in self.abortPending:
  585.                 d = defer.fail(IndexError('Task Already Aborted'))
  586.             elif taskid in self._pendingTaskIDs():
  587.                 self.abortPending.append(taskid)
  588.                 d = defer.succeed(None)
  589.             else:
  590.                 d = defer.fail(e)
  591.         except:
  592.             taskid in self.finishedResults.keys()
  593.  
  594.         d = defer.execute(self._doAbort, taskid)
  595.         return d
  596.  
  597.     
  598.     def barrier(self, taskids):
  599.         dList = []
  600.         if isinstance(taskids, int):
  601.             taskids = [
  602.                 taskids]
  603.         
  604.         for id in taskids:
  605.             d = self.get_task_result(id, block = True)
  606.             dList.append(d)
  607.         
  608.         d = DeferredList(dList, consumeErrors = 1)
  609.         d.addCallbacks((lambda r: pass))
  610.         return d
  611.  
  612.     
  613.     def spin(self):
  614.         return defer.succeed(self.distributeTasks())
  615.  
  616.     
  617.     def queue_status(self, verbose = False):
  618.         pending = self._pendingTaskIDs()
  619.         failed = []
  620.         succeeded = []
  621.         for k, v in self.finishedResults.iteritems():
  622.             if not isinstance(v, failure.Failure):
  623.                 if hasattr(v, 'failure'):
  624.                     if v.failure is None:
  625.                         succeeded.append(k)
  626.                     else:
  627.                         failed.append(k)
  628.                 
  629.             hasattr(v, 'failure')
  630.         
  631.         scheduled = self.scheduler.taskids
  632.         if verbose:
  633.             result = dict(pending = pending, failed = failed, succeeded = succeeded, scheduled = scheduled)
  634.         else:
  635.             result = dict(pending = len(pending), failed = len(failed), succeeded = len(succeeded), scheduled = len(scheduled))
  636.         return defer.succeed(result)
  637.  
  638.     
  639.     def _doAbort(self, taskid):
  640.         log.msg('Task aborted: %i' % taskid)
  641.         result = failure.Failure(error.TaskAborted())
  642.         self._finishTask(taskid, result)
  643.         if taskid in self.abortPending:
  644.             self.abortPending.remove(taskid)
  645.         
  646.  
  647.     
  648.     def _finishTask(self, taskid, result):
  649.         dlist = self.deferredResults.pop(taskid)
  650.         self.finishedResults[taskid] = result
  651.         for d in dlist:
  652.             d.callback(result)
  653.         
  654.  
  655.     
  656.     def distributeTasks(self):
  657.         log.msg('distributing Tasks')
  658.         (worker, task) = self.scheduler.schedule()
  659.         if not worker and not task:
  660.             if self.idleLater and self.idleLater.called:
  661.                 self.idleLater = None
  662.             else:
  663.                 self.checkIdle()
  664.             return False
  665.         while worker and task:
  666.             self.pendingTasks[worker.workerid] = task
  667.             d = worker.run(task)
  668.             log.msg('Running task %i on worker %i' % (task.taskid, worker.workerid))
  669.             d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
  670.             (worker, task) = self.scheduler.schedule()
  671.             continue
  672.             not task
  673.         self.checkIdle()
  674.         return True
  675.  
  676.     
  677.     def checkIdle(self):
  678.         if self.idleLater and not (self.idleLater.called):
  679.             self.idleLater.cancel()
  680.         
  681.         if self.scheduler.ntasks and self.workers and self.scheduler.nworkers == len(self.workers):
  682.             self.idleLater = reactor.callLater(self.timeout, self.failIdle)
  683.         else:
  684.             self.idleLater = None
  685.  
  686.     
  687.     def failIdle(self):
  688.         if not self.distributeTasks():
  689.             while self.scheduler.ntasks:
  690.                 t = self.scheduler.pop_task()
  691.                 msg = 'task %i failed to execute due to unmet dependencies' % t.taskid
  692.                 msg += ' for %i seconds' % self.timeout
  693.                 log.msg('Task aborted by timeout: %i' % t.taskid)
  694.                 f = failure.Failure(error.TaskTimeout(msg))
  695.                 self._finishTask(t.taskid, f)
  696.         
  697.         self.idleLater = None
  698.  
  699.     
  700.     def taskCompleted(self, success_and_result, taskid, workerid):
  701.         (success, result) = success_and_result
  702.         
  703.         try:
  704.             task = self.pendingTasks.pop(workerid)
  705.         except:
  706.             log.msg('Tried to pop bad pending task %i from worker %i' % (taskid, workerid))
  707.             log.msg('Result: %r' % result)
  708.             log.msg('Pending tasks: %s' % self.pendingTasks)
  709.             return None
  710.  
  711.         aborted = False
  712.         if taskid in self.abortPending:
  713.             self._doAbort(taskid)
  714.             aborted = True
  715.         
  716.         if not aborted:
  717.             if not success:
  718.                 log.msg('Task %i failed on worker %i' % (taskid, workerid))
  719.                 if task.retries > 0:
  720.                     task.retries -= 1
  721.                     self.scheduler.add_task(task)
  722.                     s = 'Resubmitting task %i, %i retries remaining' % (taskid, task.retries)
  723.                     log.msg(s)
  724.                     self.distributeTasks()
  725.                 elif isinstance(task.recovery_task, BaseTask) and task.recovery_task.retries > -1:
  726.                     task.retries = -1
  727.                     task.recovery_task.taskid = taskid
  728.                     task = task.recovery_task
  729.                     self.scheduler.add_task(task)
  730.                     s = 'Recovering task %i, %i retries remaining' % (taskid, task.retries)
  731.                     log.msg(s)
  732.                     self.distributeTasks()
  733.                 else:
  734.                     self._finishTask(taskid, result)
  735.                 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
  736.             else:
  737.                 log.msg('Task completed: %i' % taskid)
  738.                 self._finishTask(taskid, result)
  739.                 self.readmitWorker(workerid)
  740.         elif not success:
  741.             reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
  742.         else:
  743.             self.readmitWorker(workerid)
  744.  
  745.     
  746.     def readmitWorker(self, workerid):
  747.         if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
  748.             self.scheduler.add_worker(self.workers[workerid])
  749.             self.distributeTasks()
  750.         
  751.  
  752.     
  753.     def clear(self):
  754.         self.finishedResults = { }
  755.         return defer.succeed(None)
  756.  
  757.  
  758. components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
  759.