home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __docformat__ = 'restructuredtext en'
- __test__ = { }
- import copy
- import sys
- import cPickle as pickle
- from twisted.application import service
- from twisted.internet import defer, reactor
- from twisted.python import log, failure, components
- import zope.interface as zi
- from IPython.kernel.core.interpreter import Interpreter
- from IPython.kernel import newserialized, error
-
- class IEngineCore(zi.Interface):
- id = zi.interface.Attribute('the id of the Engine object')
- properties = zi.interface.Attribute('A dict of properties of the Engine')
-
- def execute(lines):
- pass
-
-
- def push(namespace):
- pass
-
-
- def pull(keys):
- pass
-
-
- def push_function(namespace):
- pass
-
-
- def pull_function(keys):
- pass
-
-
- def get_result(i = None):
- pass
-
-
- def reset():
- pass
-
-
- def kill():
- pass
-
-
- def keys():
- pass
-
-
-
- class IEngineSerialized(zi.Interface):
-
- def push_serialized(namespace):
- pass
-
-
- def pull_serialized(keys):
- pass
-
-
-
- class IEngineProperties(zi.Interface):
- properties = zi.Attribute('A StrictDict object, containing the properties')
-
- def set_properties(properties):
- pass
-
-
- def get_properties(keys = None):
- pass
-
-
- def del_properties(keys):
- pass
-
-
- def has_properties(keys):
- pass
-
-
- def clear_properties():
- pass
-
-
-
- class IEngineBase(IEngineCore, IEngineSerialized, IEngineProperties):
- pass
-
-
- class IEngineQueued(IEngineBase):
-
- def clear_queue():
- pass
-
-
- def queue_status():
- pass
-
-
- def register_failure_observer(obs):
- pass
-
-
- def unregister_failure_observer(obs):
- pass
-
-
-
- class IEngineThreaded(zi.Interface):
- pass
-
-
- class StrictDict(dict):
-
- def __init__(self, *args, **kwargs):
- dict.__init__(self, *args, **kwargs)
- self.modified = True
-
-
- def __getitem__(self, key):
- return copy.deepcopy(dict.__getitem__(self, key))
-
-
- def __setitem__(self, key, value):
-
- try:
- pickle.dumps(key, 2)
- pickle.dumps(value, 2)
- newvalue = copy.deepcopy(value)
- except Exception:
- e = None
- raise error.InvalidProperty("can't be a value: %r" % value)
-
- dict.__setitem__(self, key, newvalue)
- self.modified = True
-
-
- def __delitem__(self, key):
- dict.__delitem__(self, key)
- self.modified = True
-
-
- def update(self, dikt):
- for k, v in dikt.iteritems():
- self[k] = v
-
-
-
- def pop(self, key):
- self.modified = True
- return dict.pop(self, key)
-
-
- def popitem(self):
- self.modified = True
- return dict.popitem(self)
-
-
- def clear(self):
- self.modified = True
- dict.clear(self)
-
-
- def subDict(self, *keys):
- d = { }
- for key in keys:
- d[key] = self[key]
-
- return d
-
-
-
- class EngineAPI(object):
- _fix = False
-
- def __init__(self, id):
- self.id = id
- self.properties = StrictDict()
- self._fix = True
-
-
- def __setattr__(self, k, v):
- if self._fix:
- raise error.KernelError('I am protected!')
- self._fix
- object.__setattr__(self, k, v)
-
-
- def __delattr__(self, key):
- raise error.KernelError('I am protected!')
-
-
- _apiDict = { }
-
- def get_engine(id):
- if not _apiDict.get(id):
- _apiDict[id] = EngineAPI(id)
-
- return _apiDict[id]
-
-
- def drop_engine(id):
- if _apiDict.has_key(id):
- del _apiDict[id]
-
-
-
- class EngineService(object, service.Service):
- zi.implements(IEngineBase)
- name = 'EngineService'
-
- def __init__(self, shellClass = Interpreter, mpi = None):
- self.shellClass = shellClass
- self.shell = self.shellClass()
- self.mpi = mpi
- self.id = None
- self.properties = get_engine(self.id).properties
- if self.mpi is not None:
- log.msg('MPI started with rank = %i and size = %i' % (self.mpi.rank, self.mpi.size))
- self.id = self.mpi.rank
-
- self._seedNamespace()
-
-
- def _setID(self, id):
- self._id = id
- self.properties = get_engine(id).properties
- self.shell.push({
- 'id': id })
-
-
- def _getID(self):
- return self._id
-
- id = property(_getID, _setID)
-
- def _seedNamespace(self):
- self.shell.push({
- 'mpi': self.mpi,
- 'id': self.id })
-
-
- def executeAndRaise(self, msg, callable, *args, **kwargs):
- d = defer.Deferred()
-
- try:
- result = callable(*args, **kwargs)
- except:
- (et, ev, tb) = sys.exc_info()
- (et, ev, tb) = self.shell.formatTraceback(et, ev, tb, msg)
- ev._ipython_engine_info = msg
- f = failure.Failure(ev, et, None)
- d.errback(f)
-
- d.callback(result)
- return d
-
-
- def execute(self, lines):
- msg = {
- 'engineid': self.id,
- 'method': 'execute',
- 'args': [
- lines] }
- d = self.executeAndRaise(msg, self.shell.execute, lines)
- d.addCallback(self.addIDToResult)
- return d
-
-
- def addIDToResult(self, result):
- result['id'] = self.id
- return result
-
-
- def push(self, namespace):
- msg = {
- 'engineid': self.id,
- 'method': 'push',
- 'args': [
- repr(namespace.keys())] }
- d = self.executeAndRaise(msg, self.shell.push, namespace)
- return d
-
-
- def pull(self, keys):
- msg = {
- 'engineid': self.id,
- 'method': 'pull',
- 'args': [
- repr(keys)] }
- d = self.executeAndRaise(msg, self.shell.pull, keys)
- return d
-
-
- def push_function(self, namespace):
- msg = {
- 'engineid': self.id,
- 'method': 'push_function',
- 'args': [
- repr(namespace.keys())] }
- d = self.executeAndRaise(msg, self.shell.push_function, namespace)
- return d
-
-
- def pull_function(self, keys):
- msg = {
- 'engineid': self.id,
- 'method': 'pull_function',
- 'args': [
- repr(keys)] }
- d = self.executeAndRaise(msg, self.shell.pull_function, keys)
- return d
-
-
- def get_result(self, i = None):
- msg = {
- 'engineid': self.id,
- 'method': 'get_result',
- 'args': [
- repr(i)] }
- d = self.executeAndRaise(msg, self.shell.getCommand, i)
- d.addCallback(self.addIDToResult)
- return d
-
-
- def reset(self):
- msg = {
- 'engineid': self.id,
- 'method': 'reset',
- 'args': [] }
- del self.shell
- self.shell = self.shellClass()
- self.properties.clear()
- d = self.executeAndRaise(msg, self._seedNamespace)
- return d
-
-
- def kill(self):
- drop_engine(self.id)
-
- try:
- reactor.stop()
- except RuntimeError:
- log.msg('The reactor was not running apparently.')
- return defer.fail()
-
- return defer.succeed(None)
-
-
- def keys(self):
- remotes = []
- for k in self.shell.user_ns.iterkeys():
- if k not in ('__name__', '_ih', '_oh', '__builtins__', 'In', 'Out', '_', '__', '___', '__IP', 'input', 'raw_input'):
- remotes.append(k)
- continue
-
- return defer.succeed(remotes)
-
-
- def set_properties(self, properties):
- msg = {
- 'engineid': self.id,
- 'method': 'set_properties',
- 'args': [
- repr(properties.keys())] }
- return self.executeAndRaise(msg, self.properties.update, properties)
-
-
- def get_properties(self, keys = None):
- msg = {
- 'engineid': self.id,
- 'method': 'get_properties',
- 'args': [
- repr(keys)] }
- if keys is None:
- keys = self.properties.keys()
-
- return self.executeAndRaise(msg, self.properties.subDict, *keys)
-
-
- def _doDel(self, keys):
- for key in keys:
- del self.properties[key]
-
-
-
- def del_properties(self, keys):
- msg = {
- 'engineid': self.id,
- 'method': 'del_properties',
- 'args': [
- repr(keys)] }
- return self.executeAndRaise(msg, self._doDel, keys)
-
-
- def _doHas(self, keys):
- return [ self.properties.has_key(key) for key in keys ]
-
-
- def has_properties(self, keys):
- msg = {
- 'engineid': self.id,
- 'method': 'has_properties',
- 'args': [
- repr(keys)] }
- return self.executeAndRaise(msg, self._doHas, keys)
-
-
- def clear_properties(self):
- msg = {
- 'engineid': self.id,
- 'method': 'clear_properties',
- 'args': [] }
- return self.executeAndRaise(msg, self.properties.clear)
-
-
- def push_serialized(self, sNamespace):
- msg = {
- 'engineid': self.id,
- 'method': 'push_serialized',
- 'args': [
- repr(sNamespace.keys())] }
- ns = { }
- for k, v in sNamespace.iteritems():
-
- try:
- unserialized = newserialized.IUnSerialized(v)
- ns[k] = unserialized.getObject()
- continue
- return defer.fail()
-
-
- return self.executeAndRaise(msg, self.shell.push, ns)
-
-
- def pull_serialized(self, keys):
- msg = {
- 'engineid': self.id,
- 'method': 'pull_serialized',
- 'args': [
- repr(keys)] }
- if isinstance(keys, str):
- keys = [
- keys]
-
- if len(keys) == 1:
- d = self.executeAndRaise(msg, self.shell.pull, keys)
- d.addCallback(newserialized.serialize)
- return d
- if len(keys) > 1:
- d = self.executeAndRaise(msg, self.shell.pull, keys)
-
- def packThemUp(values):
- serials = []
- for v in values:
-
- try:
- serials.append(newserialized.serialize(v))
- continue
- return defer.fail(failure.Failure())
-
-
- return serials
-
- packThemUp = d.addCallback(packThemUp)
- return packThemUp
-
-
-
- def queue(methodToQueue):
-
- def queuedMethod(this, *args, **kwargs):
- name = methodToQueue.__name__
- return this.submitCommand(Command(name, *args, **kwargs))
-
- return queuedMethod
-
-
- class QueuedEngine(object):
- zi.implements(IEngineQueued)
-
- def __init__(self, engine):
- self.engine = engine
- self.id = engine.id
- self.queued = []
- self.history = { }
- self.engineStatus = { }
- self.currentCommand = None
- self.failureObservers = []
-
-
- def _get_properties(self):
- return self.engine.properties
-
- properties = property(_get_properties, (lambda self, _: pass))
-
- def submitCommand(self, cmd):
- d = defer.Deferred()
- cmd.setDeferred(d)
- if self.currentCommand is not None:
- if self.currentCommand.finished:
- self.currentCommand = cmd
- self.runCurrentCommand()
- else:
- self.queued.append(cmd)
- else:
- self.currentCommand = cmd
- self.runCurrentCommand()
- return d
-
-
- def runCurrentCommand(self):
- cmd = self.currentCommand
- f = getattr(self.engine, cmd.remoteMethod, None)
- if f:
- d = f(*cmd.args, **cmd.kwargs)
- if cmd.remoteMethod is 'execute':
- d.addCallback(self.saveResult)
-
- d.addCallback(self.finishCommand)
- d.addErrback(self.abortCommand)
- else:
- return defer.fail(AttributeError(cmd.remoteMethod))
- return f
-
-
- def _flushQueue(self):
- if len(self.queued) > 0:
- self.currentCommand = self.queued.pop(0)
- self.runCurrentCommand()
-
-
-
- def saveResult(self, result):
- self.history[result['number']] = result
- return result
-
-
- def finishCommand(self, result):
- self.currentCommand.handleResult(result)
- self.currentCommand.finished = True
- self._flushQueue()
- return result
-
-
- def abortCommand(self, reason):
- self.currentCommand.finished = True
- s = '%r %r %r' % (self.currentCommand.remoteMethod, self.currentCommand.args, self.currentCommand.kwargs)
- self.clear_queue(msg = s)
- self.currentCommand.handleError(reason)
-
-
- def execute(self, lines):
- pass
-
- execute = queue(execute)
-
- def push(self, namespace):
- pass
-
- push = queue(push)
-
- def pull(self, keys):
- pass
-
- pull = queue(pull)
-
- def push_function(self, namespace):
- pass
-
- push_function = queue(push_function)
-
- def pull_function(self, keys):
- pass
-
- pull_function = queue(pull_function)
-
- def get_result(self, i = None):
- if i is None:
- i = max(self.history.keys() + [
- None])
-
- cmd = self.history.get(i, None)
- if cmd is None:
- return self.submitCommand(Command('get_result', i))
- return defer.succeed(cmd)
-
-
- def reset(self):
- self.clear_queue()
- self.history = { }
- return self.submitCommand(Command('reset'))
-
-
- def kill(self):
- self.clear_queue()
- return self.submitCommand(Command('kill'))
-
-
- def keys(self):
- pass
-
- keys = queue(keys)
-
- def push_serialized(self, namespace):
- pass
-
- push_serialized = queue(push_serialized)
-
- def pull_serialized(self, keys):
- pass
-
- pull_serialized = queue(pull_serialized)
-
- def set_properties(self, namespace):
- pass
-
- set_properties = queue(set_properties)
-
- def get_properties(self, keys = None):
- pass
-
- get_properties = queue(get_properties)
-
- def del_properties(self, keys):
- pass
-
- del_properties = queue(del_properties)
-
- def has_properties(self, keys):
- pass
-
- has_properties = queue(has_properties)
-
- def clear_properties(self):
- pass
-
- clear_properties = queue(clear_properties)
-
- def clear_queue(self, msg = ''):
- for cmd in self.queued:
- cmd.deferred.errback(failure.Failure(error.QueueCleared(msg)))
-
- self.queued = []
- return defer.succeed(None)
-
-
- def queue_status(self):
- if self.currentCommand is not None:
- if self.currentCommand.finished:
- pending = repr(None)
- else:
- pending = repr(self.currentCommand)
- else:
- pending = repr(None)
- dikt = {
- 'queue': map(repr, self.queued),
- 'pending': pending }
- return defer.succeed(dikt)
-
-
- def register_failure_observer(self, obs):
- self.failureObservers.append(obs)
-
-
- def unregister_failure_observer(self, obs):
- self.failureObservers.remove(obs)
-
-
- components.registerAdapter(QueuedEngine, IEngineBase, IEngineQueued)
-
- class Command(object):
-
- def __init__(self, remoteMethod, *args, **kwargs):
- self.remoteMethod = remoteMethod
- self.args = args
- self.kwargs = kwargs
- self.finished = False
-
-
- def setDeferred(self, d):
- self.deferred = d
-
-
- def __repr__(self):
- if not self.args:
- args = ''
- else:
- args = str(self.args)[1:-2]
- for k, v in self.kwargs.iteritems():
- if args:
- args += ', '
-
- args += '%s=%r' % (k, v)
-
- return '%s(%s)' % (self.remoteMethod, args)
-
-
- def handleResult(self, result):
- self.deferred.callback(result)
-
-
- def handleError(self, reason):
- self.deferred.errback(reason)
-
-
-
- class ThreadedEngineService(EngineService):
- zi.implements(IEngineBase)
-
- def __init__(self, shellClass = Interpreter, mpi = None):
- EngineService.__init__(self, shellClass, mpi)
-
-
- def wrapped_execute(self, msg, lines):
-
- try:
- result = self.shell.execute(lines)
- except Exception:
- e = None
- (et, ev, tb) = sys.exc_info()
- (et, ev, tb) = self.shell.formatTraceback(et, ev, tb, msg)
- e = et(ev._ipython_traceback_text)
- e._ipython_engine_info = msg
- raise e
-
- return result
-
-
- def execute(self, lines):
- threads = threads
- import twisted.internet
- msg = {
- 'engineid': self.id,
- 'method': 'execute',
- 'args': [
- lines] }
- d = threads.deferToThread(self.wrapped_execute, msg, lines)
- d.addCallback(self.addIDToResult)
- return d
-
-
-