home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __docformat__ = 'restructuredtext en'
- import cPickle as pickle
- from types import FunctionType
- from zope.interface import Interface, implements
- from twisted.internet import defer
- from twisted.python import components, failure, log
- from foolscap import Referenceable
- from IPython.kernel import error
- from IPython.kernel.util import printer
- from IPython.kernel import map as Map
- from IPython.kernel.parallelfunction import ParallelFunction
- from IPython.kernel.mapper import MultiEngineMapper, IMultiEngineMapperFactory, IMapper
- from IPython.kernel.twistedutil import gatherBoth
- from IPython.kernel.multiengine import MultiEngine, IMultiEngine, IFullSynchronousMultiEngine, ISynchronousMultiEngine
- from IPython.kernel.multiengineclient import wrapResultList
- from IPython.kernel.pendingdeferred import PendingDeferredManager
- from IPython.kernel.pickleutil import can, canDict, canSequence, uncan, uncanDict, uncanSequence
- from IPython.kernel.clientinterfaces import IFCClientInterfaceProvider, IBlockingClientAdaptor
- import __main__
-
- def packageResult(wrappedMethod):
-
- def wrappedPackageResult(self, *args, **kwargs):
- d = wrappedMethod(self, *args, **kwargs)
- d.addCallback(self.packageSuccess)
- d.addErrback(self.packageFailure)
- return d
-
- return wrappedPackageResult
-
-
- class IFCSynchronousMultiEngine(Interface):
- pass
-
-
- class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
- implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
- addSlash = True
-
- def __init__(self, multiengine):
- self.smultiengine = ISynchronousMultiEngine(multiengine)
- self._deferredIDCallbacks = { }
-
-
- def packageFailure(self, f):
- f.cleanFailure()
- return self.packageSuccess(f)
-
-
- def packageSuccess(self, obj):
- serial = pickle.dumps(obj, 2)
- return serial
-
-
- def remote_get_pending_deferred(self, deferredID, block):
- d = self.smultiengine.get_pending_deferred(deferredID, block)
-
- try:
- callback = self._deferredIDCallbacks.pop(deferredID)
- except KeyError:
- callback = None
-
- if callback is not None:
- d.addCallback(callback[0], *callback[1], **callback[2])
-
- return d
-
- remote_get_pending_deferred = packageResult(remote_get_pending_deferred)
-
- def remote_clear_pending_deferreds(self):
- return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
-
- remote_clear_pending_deferreds = packageResult(remote_clear_pending_deferreds)
-
- def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
- self._deferredIDCallbacks[did] = (callback, args, kwargs)
- return did
-
-
- def remote_execute(self, lines, targets, block):
- return self.smultiengine.execute(lines, targets = targets, block = block)
-
- remote_execute = packageResult(remote_execute)
-
- def remote_push(self, binaryNS, targets, block):
-
- try:
- namespace = pickle.loads(binaryNS)
- except:
- d = defer.fail(failure.Failure())
-
- d = self.smultiengine.push(namespace, targets = targets, block = block)
- return d
-
- remote_push = packageResult(remote_push)
-
- def remote_pull(self, keys, targets, block):
- d = self.smultiengine.pull(keys, targets = targets, block = block)
- return d
-
- remote_pull = packageResult(remote_pull)
-
- def remote_push_function(self, binaryNS, targets, block):
-
- try:
- namespace = pickle.loads(binaryNS)
- except:
- d = defer.fail(failure.Failure())
-
- namespace = uncanDict(namespace)
- d = self.smultiengine.push_function(namespace, targets = targets, block = block)
- return d
-
- remote_push_function = packageResult(remote_push_function)
-
- def _canMultipleKeys(self, result):
- return [ canSequence(r) for r in result ]
-
-
- def remote_pull_function(self, keys, targets, block):
-
- def can_functions(r, keys):
- if len(keys) == 1 or isinstance(keys, str):
- result = canSequence(r)
- elif len(keys) > 1:
- result = [ canSequence(s) for s in r ]
-
- return result
-
- d = self.smultiengine.pull_function(keys, targets = targets, block = block)
- if block:
- d.addCallback(can_functions, keys)
- else:
- (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, can_functions, keys)))
- return d
-
- remote_pull_function = packageResult(remote_pull_function)
-
- def remote_push_serialized(self, binaryNS, targets, block):
-
- try:
- namespace = pickle.loads(binaryNS)
- except:
- d = defer.fail(failure.Failure())
-
- d = self.smultiengine.push_serialized(namespace, targets = targets, block = block)
- return d
-
- remote_push_serialized = packageResult(remote_push_serialized)
-
- def remote_pull_serialized(self, keys, targets, block):
- d = self.smultiengine.pull_serialized(keys, targets = targets, block = block)
- return d
-
- remote_pull_serialized = packageResult(remote_pull_serialized)
-
- def remote_get_result(self, i, targets, block):
- if i == 'None':
- i = None
-
- return self.smultiengine.get_result(i, targets = targets, block = block)
-
- remote_get_result = packageResult(remote_get_result)
-
- def remote_reset(self, targets, block):
- return self.smultiengine.reset(targets = targets, block = block)
-
- remote_reset = packageResult(remote_reset)
-
- def remote_keys(self, targets, block):
- return self.smultiengine.keys(targets = targets, block = block)
-
- remote_keys = packageResult(remote_keys)
-
- def remote_kill(self, controller, targets, block):
- return self.smultiengine.kill(controller, targets = targets, block = block)
-
- remote_kill = packageResult(remote_kill)
-
- def remote_clear_queue(self, targets, block):
- return self.smultiengine.clear_queue(targets = targets, block = block)
-
- remote_clear_queue = packageResult(remote_clear_queue)
-
- def remote_queue_status(self, targets, block):
- return self.smultiengine.queue_status(targets = targets, block = block)
-
- remote_queue_status = packageResult(remote_queue_status)
-
- def remote_set_properties(self, binaryNS, targets, block):
-
- try:
- ns = pickle.loads(binaryNS)
- except:
- d = defer.fail(failure.Failure())
-
- d = self.smultiengine.set_properties(ns, targets = targets, block = block)
- return d
-
- remote_set_properties = packageResult(remote_set_properties)
-
- def remote_get_properties(self, keys, targets, block):
- if keys == 'None':
- keys = None
-
- return self.smultiengine.get_properties(keys, targets = targets, block = block)
-
- remote_get_properties = packageResult(remote_get_properties)
-
- def remote_has_properties(self, keys, targets, block):
- return self.smultiengine.has_properties(keys, targets = targets, block = block)
-
- remote_has_properties = packageResult(remote_has_properties)
-
- def remote_del_properties(self, keys, targets, block):
- return self.smultiengine.del_properties(keys, targets = targets, block = block)
-
- remote_del_properties = packageResult(remote_del_properties)
-
- def remote_clear_properties(self, targets, block):
- return self.smultiengine.clear_properties(targets = targets, block = block)
-
- remote_clear_properties = packageResult(remote_clear_properties)
-
- def remote_get_ids(self):
- return self.smultiengine.get_ids()
-
-
- def remote_get_client_name(self):
- return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
-
-
- components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, IMultiEngine, IFCSynchronousMultiEngine)
-
- class FCFullSynchronousMultiEngineClient(object):
- implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor, IMultiEngineMapperFactory, IMapper)
-
- def __init__(self, remote_reference):
- self.remote_reference = remote_reference
- self._deferredIDCallbacks = { }
- self.pdm = PendingDeferredManager()
-
-
- def unpackage(self, r):
- return pickle.loads(r)
-
-
- def get_pending_deferred(self, deferredID, block = True):
- if self.pdm.quick_has_id(deferredID):
- d = self.pdm.get_pending_deferred(deferredID, block)
- return d
- d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
- d.addCallback(self.unpackage)
-
- try:
- callback = self._deferredIDCallbacks.pop(deferredID)
- except KeyError:
- self.pdm.quick_has_id(deferredID)
- self.pdm.quick_has_id(deferredID)
- callback = None
- except:
- self.pdm.quick_has_id(deferredID)
-
- if callback is not None:
- d.addCallback(callback[0], *callback[1], **callback[2])
-
- return d
-
-
- def clear_pending_deferreds(self):
- self.pdm.clear_pending_deferreds()
- d2 = self.remote_reference.callRemote('clear_pending_deferreds')
- d2.addCallback(self.unpackage)
- return d2
-
-
- def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
- self._deferredIDCallbacks[did] = (callback, args, kwargs)
- return did
-
-
- def execute(self, lines, targets = 'all', block = True):
- d = self.remote_reference.callRemote('execute', lines, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def push(self, namespace, targets = 'all', block = True):
- serial = pickle.dumps(namespace, 2)
- d = self.remote_reference.callRemote('push', serial, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def pull(self, keys, targets = 'all', block = True):
- d = self.remote_reference.callRemote('pull', keys, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def push_function(self, namespace, targets = 'all', block = True):
- cannedNamespace = canDict(namespace)
- serial = pickle.dumps(cannedNamespace, 2)
- d = self.remote_reference.callRemote('push_function', serial, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def pull_function(self, keys, targets = 'all', block = True):
-
- def uncan_functions(r, keys):
- if len(keys) == 1 or isinstance(keys, str):
- return uncanSequence(r)
- if len(keys) > 1:
- return [ uncanSequence(s) for s in r ]
-
- d = self.remote_reference.callRemote('pull_function', keys, targets, block)
- if block:
- d.addCallback(self.unpackage)
- d.addCallback(uncan_functions, keys)
- else:
- d.addCallback(self.unpackage)
- (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, uncan_functions, keys)))
- return d
-
-
- def push_serialized(self, namespace, targets = 'all', block = True):
- cannedNamespace = canDict(namespace)
- serial = pickle.dumps(cannedNamespace, 2)
- d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def pull_serialized(self, keys, targets = 'all', block = True):
- d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def get_result(self, i = None, targets = 'all', block = True):
- if i is None:
- i = 'None'
-
- d = self.remote_reference.callRemote('get_result', i, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def reset(self, targets = 'all', block = True):
- d = self.remote_reference.callRemote('reset', targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def keys(self, targets = 'all', block = True):
- d = self.remote_reference.callRemote('keys', targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def kill(self, controller = False, targets = 'all', block = True):
- d = self.remote_reference.callRemote('kill', controller, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def clear_queue(self, targets = 'all', block = True):
- d = self.remote_reference.callRemote('clear_queue', targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def queue_status(self, targets = 'all', block = True):
- d = self.remote_reference.callRemote('queue_status', targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def set_properties(self, properties, targets = 'all', block = True):
- serial = pickle.dumps(properties, 2)
- d = self.remote_reference.callRemote('set_properties', serial, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def get_properties(self, keys = None, targets = 'all', block = True):
- if keys == None:
- keys = 'None'
-
- d = self.remote_reference.callRemote('get_properties', keys, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def has_properties(self, keys, targets = 'all', block = True):
- d = self.remote_reference.callRemote('has_properties', keys, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def del_properties(self, keys, targets = 'all', block = True):
- d = self.remote_reference.callRemote('del_properties', keys, targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def clear_properties(self, targets = 'all', block = True):
- d = self.remote_reference.callRemote('clear_properties', targets, block)
- d.addCallback(self.unpackage)
- return d
-
-
- def get_ids(self):
- d = self.remote_reference.callRemote('get_ids')
- return d
-
-
- def _process_targets(self, targets):
-
- def create_targets(ids):
- if isinstance(targets, int):
- engines = [
- targets]
- elif targets == 'all':
- engines = ids
- elif isinstance(targets, (list, tuple)):
- engines = targets
-
- for t in engines:
- if t not in ids:
- raise error.InvalidEngineID('engine with id %r does not exist' % t)
- t not in ids
-
- return engines
-
- d = self.get_ids()
- d.addCallback(create_targets)
- return d
-
-
- def scatter(self, key, seq, dist = 'b', flatten = False, targets = 'all', block = True):
-
- def do_scatter(engines):
- nEngines = len(engines)
- mapClass = Map.dists[dist]
- mapObject = mapClass()
- d_list = []
- for index, engineid in enumerate(engines):
- partition = mapObject.getPartition(seq, index, nEngines)
- if flatten and len(partition) == 1:
- d = self.push({
- key: partition[0] }, targets = engineid, block = False)
- else:
- d = self.push({
- key: partition }, targets = engineid, block = False)
- d_list.append(d)
-
- d = gatherBoth(d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
- d.addCallback(error.collect_exceptions, 'scatter')
-
- def process_did_list(did_list):
- new_d_list = [ self.get_pending_deferred(did, True) for did in did_list ]
- final_d = gatherBoth(new_d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
- final_d.addCallback(error.collect_exceptions, 'scatter')
- final_d.addCallback((lambda lop: [ i[0] for i in lop ]))
- return final_d
-
- if block:
- d.addCallback(process_did_list)
- return d
- deferred_id = self.pdm.get_deferred_id()
- d_to_return = defer.Deferred()
-
- def do_it(did_list):
- d_to_return.callback(deferred_id)
- return process_did_list(did_list)
-
- d.addCallback(do_it)
- self.pdm.save_pending_deferred(d, deferred_id)
- return d_to_return
-
- d = self._process_targets(targets)
- d.addCallback(do_scatter)
- return d
-
-
- def gather(self, key, dist = 'b', targets = 'all', block = True):
-
- def do_gather(engines):
- nEngines = len(engines)
- mapClass = Map.dists[dist]
- mapObject = mapClass()
- d_list = []
- for index, engineid in enumerate(engines):
- d = self.pull(key, targets = engineid, block = False)
- d_list.append(d)
-
- d = gatherBoth(d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
- d.addCallback(error.collect_exceptions, 'scatter')
-
- def process_did_list(did_list):
- new_d_list = [ self.get_pending_deferred(did, True) for did in did_list ]
- final_d = gatherBoth(new_d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
- final_d.addCallback(error.collect_exceptions, 'gather')
- final_d.addCallback((lambda lop: [ i[0] for i in lop ]))
- final_d.addCallback(mapObject.joinPartitions)
- return final_d
-
- if block:
- d.addCallback(process_did_list)
- return d
- deferred_id = self.pdm.get_deferred_id()
- d_to_return = defer.Deferred()
-
- def do_it(did_list):
- d_to_return.callback(deferred_id)
- return process_did_list(did_list)
-
- d.addCallback(do_it)
- self.pdm.save_pending_deferred(d, deferred_id)
- return d_to_return
-
- d = self._process_targets(targets)
- d.addCallback(do_gather)
- return d
-
-
- def raw_map(self, func, sequences, dist = 'b', targets = 'all', block = True):
- if not isinstance(sequences, (list, tuple)):
- raise TypeError('sequences must be a list or tuple')
- isinstance(sequences, (list, tuple))
- max_len = max((lambda .0: for s in .0:
- len(s))(sequences))
- for s in sequences:
- if len(s) != max_len:
- raise ValueError('all sequences must have equal length')
- len(s) != max_len
-
- if isinstance(func, FunctionType):
- d = self.push_function(dict(_ipython_map_func = func), targets = targets, block = False)
- (d.addCallback,)((lambda did: self.get_pending_deferred(did, True)))
- sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
- elif isinstance(func, str):
- d = defer.succeed(None)
- sourceToRun = '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
- else:
- raise TypeError('func must be a function or str')
- (None, None, None, isinstance(func, FunctionType).addCallback)((lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets = targets)))
- (None, None, d.addCallback)((lambda _: self.execute(sourceToRun, targets = targets, block = False)))
- (d.addCallback,)((lambda did: self.get_pending_deferred(did, True)))
- (None, None, None, d.addCallback)((lambda _: self.gather('_ipython_map_seq_result', dist, targets = targets, block = block)))
- return d
-
-
- def map(self, func, *sequences):
- return self.mapper().map(func, *sequences)
-
-
- def mapper(self, dist = 'b', targets = 'all', block = True):
- return MultiEngineMapper(self, dist, targets, block)
-
-
- def parallel(self, dist = 'b', targets = 'all', block = True):
- mapper = self.mapper(dist, targets, block)
- pf = ParallelFunction(mapper)
- return pf
-
-
- def _transformPullResult(self, pushResult, multitargets, lenKeys):
- if not multitargets:
- result = pushResult[0]
- elif lenKeys > 1:
- result = zip(*pushResult)
- elif lenKeys is 1:
- result = list(pushResult)
-
- return result
-
-
- def zip_pull(self, keys, targets = 'all', block = True):
- if not isinstance(targets, int):
- pass
- multitargets = len(targets) > 1
- lenKeys = len(keys)
- d = self.pull(keys, targets = targets, block = block)
- if block:
- d.addCallback(self._transformPullResult, multitargets, lenKeys)
- else:
- (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys)))
- return d
-
-
- def run(self, fname, targets = 'all', block = True):
- fileobj = open(fname, 'r')
- source = fileobj.read()
- fileobj.close()
-
- try:
- code = compile(source, fname, 'exec')
- except:
- return defer.fail(failure.Failure())
-
- d = self.execute(source, targets = targets, block = block)
- return d
-
-
- def adapt_to_blocking_client(self):
- IFullBlockingMultiEngineClient = IFullBlockingMultiEngineClient
- import IPython.kernel.multiengineclient
- return IFullBlockingMultiEngineClient(self)
-
-
-