home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) __docformat__ = 'restructuredtext en' import sys import cPickle as pickle from types import FunctionType import linecache import warnings from twisted.internet import reactor from twisted.python import components, log from twisted.python.failure import Failure from zope.interface import Interface, implements, Attribute from IPython.ColorANSI import TermColors from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import error from IPython.kernel.parallelfunction import ParallelFunction from IPython.kernel.mapper import MultiEngineMapper, IMultiEngineMapperFactory, IMapper from IPython.kernel import map as Map from IPython.kernel import multiengine as me from IPython.kernel.multiengine import IFullMultiEngine, IFullSynchronousMultiEngine class IPendingResult(Interface): result_id = Attribute('ID of the deferred on the other side') client = Attribute('A client that I came from') r = Attribute('An attribute that is a property that calls and returns get_result') def get_result(default = None, block = True): pass def add_callback(f, *args, **kwargs): pass class PendingResult(object): def __init__(self, client, result_id): self.client = client self.result_id = result_id self.called = False self.raised = False self.callbacks = [] def get_result(self, default = None, block = True): if self.called: if self.raised: raise self.result[0], self.result[1], self.result[2] self.raised return self.result self.called try: result = self.client.get_pending_deferred(self.result_id, block) except error.ResultNotCompleted: return default self.result = sys.exc_info() self.called = True self.raised = True raise for cb in self.callbacks: result = cb[0](result, *cb[1], **cb[2]) self.result = result self.called = True return result def add_callback(self, f, *args, **kwargs): self.callbacks.append((f, args, kwargs)) def __cmp__(self, other): if self.result_id < other.result_id: return -1 return 1 def _get_r(self): return self.get_result(block = True) r = property(_get_r) class ResultList(list): def __repr__(self): output = [] if sys.platform == 'win32': blue = normal = red = green = '' else: blue = TermColors.Blue normal = TermColors.Normal red = TermColors.Red green = TermColors.Green output.append('<Results List>\n') for cmd in self: if isinstance(cmd, Failure): output.append(cmd) continue target = cmd.get('id', None) cmd_num = cmd.get('number', None) cmd_stdin = cmd.get('input', { }).get('translated', 'No Input') cmd_stdout = cmd.get('stdout', None) cmd_stderr = cmd.get('stderr', None) output.append('%s[%i]%s In [%i]:%s %s\n' % (green, target, blue, cmd_num, normal, cmd_stdin)) if cmd_stdout: output.append('%s[%i]%s Out[%i]:%s %s\n' % (green, target, red, cmd_num, normal, cmd_stdout)) if cmd_stderr: output.append('%s[%i]%s Err[%i]:\n%s %s' % (green, target, red, cmd_num, normal, cmd_stderr)) continue return ''.join(output) def wrapResultList(result): if len(result) == 0: result = [ result] return ResultList(result) class QueueStatusList(list): def __repr__(self): output = [] output.append('<Queue Status List>\n') for e in self: output.append('Engine: %s\n' % repr(e[0])) output.append(' Pending: %s\n' % repr(e[1]['pending'])) for q in e[1]['queue']: output.append(' Command: %s\n' % repr(q)) return ''.join(output) class InteractiveMultiEngineClient(object): def activate(self): try: __IPYTHON__.activeController = self except NameError: print 'The IPython Controller magics only work within IPython.' def __setitem__(self, key, value): (targets, block) = self._findTargetsAndBlock() return self.push({ key: value }, targets = targets, block = block) def __getitem__(self, key): if isinstance(key, str): (targets, block) = self._findTargetsAndBlock() return self.pull(key, targets = targets, block = block) raise TypeError('__getitem__ only takes strs') def __len__(self): return len(self.get_ids()) def findsource_file(self, f): linecache.checkcache() s = findsource(f.f_code) lnum = f.f_lineno wsource = s[0][f.f_lineno:] return strip_whitespace(wsource) def findsource_ipython(self, f): ipapi = ipapi import IPython self.ip = ipapi.get() wsource = [ l + '\n' for l in self.ip.IP.input_hist_raw[-1].splitlines()[1:] ] return strip_whitespace(wsource) def __enter__(self): f = sys._getframe(1) local_ns = f.f_locals global_ns = f.f_globals if f.f_code.co_filename == '<ipython console>': s = self.findsource_ipython(f) else: s = self.findsource_file(f) self._with_context_result = self.execute(s) def __exit__(self, etype, value, tb): if issubclass(etype, error.StopLocalExecution): return True def remote(): m = 'Special exception to stop local execution of parallel code.' raise error.StopLocalExecution(m) def strip_whitespace(source): wsource = [ l.expandtabs(4) for l in source ] done = False for line in wsource: for col, char in enumerate(line): if char != ' ': done = True break continue None if line.isspace() else [] if done: break continue for lno, line in enumerate(wsource): lead = line[:col] if lead.isspace(): continue continue if not lead.lstrip().startswith('#'): break continue src_lines = [ l[col:] for l in wsource[:lno + 1] ] for nline, line in enumerate(src_lines): if 'remote()' in line: break continue None if line.isspace() or line.startswith('#') else [] raise ValueError('remote() call missing at the start of code') src = ''.join(src_lines[nline + 1:]) return src _prop_warn = '\nWe are currently refactoring the task dependency system. This might\ninvolve the removal of this method and other methods related to engine\nproperties. Please see the docstrings for IPython.kernel.TaskRejectError \nfor more information.' class IFullBlockingMultiEngineClient(Interface): pass class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): implements(IFullBlockingMultiEngineClient, IMultiEngineMapperFactory, IMapper) def __init__(self, smultiengine): self.smultiengine = smultiengine self.block = True self.targets = 'all' def _findBlock(self, block = None): if block is None: return self.block if block in (True, False): return block raise ValueError('block must be True or False') def _findTargets(self, targets = None): if targets is None: return self.targets if not isinstance(targets, (str, list, tuple, int)): raise ValueError('targets must be a str, list, tuple or int') isinstance(targets, (str, list, tuple, int)) return targets def _findTargetsAndBlock(self, targets = None, block = None): return (self._findTargets(targets), self._findBlock(block)) def _blockFromThread(self, function, *args, **kwargs): block = kwargs.get('block', None) if block is None: raise error.MissingBlockArgument("'block' keyword argument is missing") block is None result = blockingCallFromThread(function, *args, **kwargs) if not block: result = PendingResult(self, result) return result def get_pending_deferred(self, deferredID, block): return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block) def barrier(self, pendingResults): prList = list(pendingResults) for pr in prList: if not isinstance(pr, PendingResult): raise error.NotAPendingResult('Objects passed to barrier must be PendingResult instances') isinstance(pr, PendingResult) prList.sort() for pr in prList: try: result = pr.get_result(block = True) continue except Exception: continue def flush(self): r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds) return r clear_pending_results = flush def execute(self, lines, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) result = blockingCallFromThread(self.smultiengine.execute, lines, targets = targets, block = block) if block: result = ResultList(result) else: result = PendingResult(self, result) result.add_callback(wrapResultList) return result def push(self, namespace, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.push, namespace, targets = targets, block = block) def pull(self, keys, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.pull, keys, targets = targets, block = block) def push_function(self, namespace, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.push_function, namespace, targets = targets, block = block) def pull_function(self, keys, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.pull_function, keys, targets = targets, block = block) def push_serialized(self, namespace, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets = targets, block = block) def pull_serialized(self, keys, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets = targets, block = block) def get_result(self, i = None, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) result = blockingCallFromThread(self.smultiengine.get_result, i, targets = targets, block = block) if block: result = ResultList(result) else: result = PendingResult(self, result) result.add_callback(wrapResultList) return result def reset(self, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.reset, targets = targets, block = block) def keys(self, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.keys, targets = targets, block = block) def kill(self, controller = False, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.kill, controller, targets = targets, block = block) def clear_queue(self, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.clear_queue, targets = targets, block = block) def queue_status(self, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.queue_status, targets = targets, block = block) def set_properties(self, properties, targets = None, block = None): warnings.warn(_prop_warn) (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.set_properties, properties, targets = targets, block = block) def get_properties(self, keys = None, targets = None, block = None): warnings.warn(_prop_warn) (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.get_properties, keys, targets = targets, block = block) def has_properties(self, keys, targets = None, block = None): warnings.warn(_prop_warn) (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.has_properties, keys, targets = targets, block = block) def del_properties(self, keys, targets = None, block = None): warnings.warn(_prop_warn) (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.del_properties, keys, targets = targets, block = block) def clear_properties(self, targets = None, block = None): warnings.warn(_prop_warn) (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.clear_properties, targets = targets, block = block) def get_ids(self): result = blockingCallFromThread(self.smultiengine.get_ids) return result def scatter(self, key, seq, dist = 'b', flatten = False, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.scatter, key, seq, dist, flatten, targets = targets, block = block) def gather(self, key, dist = 'b', targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.gather, key, dist, targets = targets, block = block) def raw_map(self, func, seq, dist = 'b', targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.raw_map, func, seq, dist, targets = targets, block = block) def map(self, func, *sequences): return self.mapper().map(func, *sequences) def mapper(self, dist = 'b', targets = 'all', block = None): return MultiEngineMapper(self, dist, targets, block) def parallel(self, dist = 'b', targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) mapper = self.mapper(dist, targets, block) pf = ParallelFunction(mapper) return pf def zip_pull(self, keys, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.zip_pull, keys, targets = targets, block = block) def run(self, filename, targets = None, block = None): (targets, block) = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.run, filename, targets = targets, block = block) def benchmark(self, push_size = 10000): import timeit import __builtin__ __builtin__._mec_self = self benchmarks = { } repeat = 3 count = 10 timer = timeit.Timer('_mec_self.execute("pass",0)') result = 1000 * min(timer.repeat(repeat, count)) / count benchmarks['single_engine_latency'] = (result, 'msec') timer = timeit.Timer('_mec_self.execute("pass")') result = 1000 * min(timer.repeat(repeat, count)) / count benchmarks['all_engine_latency'] = (result, 'msec') try: import numpy as np except: pass timer = timeit.Timer('_mec_self.push(d)', "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size) result = min(timer.repeat(repeat, count)) / count benchmarks['all_engine_push'] = (1e-06 * push_size * 8 / result, 'MB/sec') try: import numpy as np except: pass timer = timeit.Timer('_mec_self.push(d,0)', "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size) result = min(timer.repeat(repeat, count)) / count benchmarks['single_engine_push'] = (1e-06 * push_size * 8 / result, 'MB/sec') return benchmarks components.registerAdapter(FullBlockingMultiEngineClient, IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)