home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __docformat__ = 'restructuredtext en'
- from types import FunctionType
- from zope.interface import Interface, implements
- from IPython.kernel.task import MapTask
- from IPython.kernel.twistedutil import DeferredList, gatherBoth
- from IPython.kernel.util import printer
- from IPython.kernel.error import collect_exceptions
-
- class IMapper(Interface):
-
- def map(func, *seqs):
- pass
-
-
-
- class IMultiEngineMapperFactory(Interface):
-
- def mapper(dist = 'b', targets = 'all', block = True):
- pass
-
-
-
- class ITaskMapperFactory(Interface):
-
- def mapper(clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None, block = True):
- pass
-
-
-
- class MultiEngineMapper(object):
- implements(IMapper)
-
- def __init__(self, multiengine, dist = 'b', targets = 'all', block = True):
- self.multiengine = multiengine
- self.dist = dist
- self.targets = targets
- self.block = block
-
-
- def map(self, func, *sequences):
- max_len = max((lambda .0: for s in .0:
- len(s))(sequences))
- for s in sequences:
- if len(s) != max_len:
- raise ValueError('all sequences must have equal length')
- len(s) != max_len
-
- return self.multiengine.raw_map(func, sequences, dist = self.dist, targets = self.targets, block = self.block)
-
-
-
- class TaskMapper(object):
-
- def __init__(self, task_controller, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None, block = True):
- self.task_controller = task_controller
- self.clear_before = clear_before
- self.clear_after = clear_after
- self.retries = retries
- self.recovery_task = recovery_task
- self.depend = depend
- self.block = block
-
-
- def map(self, func, *sequences):
- max_len = max((lambda .0: for s in .0:
- len(s))(sequences))
- for s in sequences:
- if len(s) != max_len:
- raise ValueError('all sequences must have equal length')
- len(s) != max_len
-
- task_args = zip(*sequences)
- task_ids = []
- dlist = []
- for ta in task_args:
- task = MapTask(func, ta, clear_before = self.clear_before, clear_after = self.clear_after, retries = self.retries, recovery_task = self.recovery_task, depend = self.depend)
- dlist.append(self.task_controller.run(task))
-
- dlist = gatherBoth(dlist, consumeErrors = 1)
- dlist.addCallback(collect_exceptions, 'map')
- if self.block:
-
- def get_results(task_ids):
- d = self.task_controller.barrier(task_ids)
- (None, d.addCallback)((lambda _: []([ self.task_controller.get_task_result(tid) for tid in task_ids ], consumeErrors = 1)))
- d.addCallback(collect_exceptions, 'map')
- return d
-
- dlist.addCallback(get_results)
-
- return dlist
-
-
-
- class SynchronousTaskMapper(object):
-
- def __init__(self, task_controller, clear_before = False, clear_after = False, retries = 0, recovery_task = None, depend = None, block = True):
- self.task_controller = task_controller
- self.clear_before = clear_before
- self.clear_after = clear_after
- self.retries = retries
- self.recovery_task = recovery_task
- self.depend = depend
- self.block = block
-
-
- def map(self, func, *sequences):
- max_len = max((lambda .0: for s in .0:
- len(s))(sequences))
- for s in sequences:
- if len(s) != max_len:
- raise ValueError('all sequences must have equal length')
- len(s) != max_len
-
- task_args = zip(*sequences)
- task_ids = []
- for ta in task_args:
- task = MapTask(func, ta, clear_before = self.clear_before, clear_after = self.clear_after, retries = self.retries, recovery_task = self.recovery_task, depend = self.depend)
- task_ids.append(self.task_controller.run(task))
-
- if self.block:
- self.task_controller.barrier(task_ids)
- task_results = [ self.task_controller.get_task_result(tid) for tid in task_ids ]
- return task_results
- return task_ids
-
-
-