home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_1875 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  25.0 KB  |  617 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __docformat__ = 'restructuredtext en'
  5. import cPickle as pickle
  6. from types import FunctionType
  7. from zope.interface import Interface, implements
  8. from twisted.internet import defer
  9. from twisted.python import components, failure, log
  10. from foolscap import Referenceable
  11. from IPython.kernel import error
  12. from IPython.kernel.util import printer
  13. from IPython.kernel import map as Map
  14. from IPython.kernel.parallelfunction import ParallelFunction
  15. from IPython.kernel.mapper import MultiEngineMapper, IMultiEngineMapperFactory, IMapper
  16. from IPython.kernel.twistedutil import gatherBoth
  17. from IPython.kernel.multiengine import MultiEngine, IMultiEngine, IFullSynchronousMultiEngine, ISynchronousMultiEngine
  18. from IPython.kernel.multiengineclient import wrapResultList
  19. from IPython.kernel.pendingdeferred import PendingDeferredManager
  20. from IPython.kernel.pickleutil import can, canDict, canSequence, uncan, uncanDict, uncanSequence
  21. from IPython.kernel.clientinterfaces import IFCClientInterfaceProvider, IBlockingClientAdaptor
  22. import __main__
  23.  
  24. def packageResult(wrappedMethod):
  25.     
  26.     def wrappedPackageResult(self, *args, **kwargs):
  27.         d = wrappedMethod(self, *args, **kwargs)
  28.         d.addCallback(self.packageSuccess)
  29.         d.addErrback(self.packageFailure)
  30.         return d
  31.  
  32.     return wrappedPackageResult
  33.  
  34.  
  35. class IFCSynchronousMultiEngine(Interface):
  36.     pass
  37.  
  38.  
  39. class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
  40.     implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
  41.     addSlash = True
  42.     
  43.     def __init__(self, multiengine):
  44.         self.smultiengine = ISynchronousMultiEngine(multiengine)
  45.         self._deferredIDCallbacks = { }
  46.  
  47.     
  48.     def packageFailure(self, f):
  49.         f.cleanFailure()
  50.         return self.packageSuccess(f)
  51.  
  52.     
  53.     def packageSuccess(self, obj):
  54.         serial = pickle.dumps(obj, 2)
  55.         return serial
  56.  
  57.     
  58.     def remote_get_pending_deferred(self, deferredID, block):
  59.         d = self.smultiengine.get_pending_deferred(deferredID, block)
  60.         
  61.         try:
  62.             callback = self._deferredIDCallbacks.pop(deferredID)
  63.         except KeyError:
  64.             callback = None
  65.  
  66.         if callback is not None:
  67.             d.addCallback(callback[0], *callback[1], **callback[2])
  68.         
  69.         return d
  70.  
  71.     remote_get_pending_deferred = packageResult(remote_get_pending_deferred)
  72.     
  73.     def remote_clear_pending_deferreds(self):
  74.         return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
  75.  
  76.     remote_clear_pending_deferreds = packageResult(remote_clear_pending_deferreds)
  77.     
  78.     def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
  79.         self._deferredIDCallbacks[did] = (callback, args, kwargs)
  80.         return did
  81.  
  82.     
  83.     def remote_execute(self, lines, targets, block):
  84.         return self.smultiengine.execute(lines, targets = targets, block = block)
  85.  
  86.     remote_execute = packageResult(remote_execute)
  87.     
  88.     def remote_push(self, binaryNS, targets, block):
  89.         
  90.         try:
  91.             namespace = pickle.loads(binaryNS)
  92.         except:
  93.             d = defer.fail(failure.Failure())
  94.  
  95.         d = self.smultiengine.push(namespace, targets = targets, block = block)
  96.         return d
  97.  
  98.     remote_push = packageResult(remote_push)
  99.     
  100.     def remote_pull(self, keys, targets, block):
  101.         d = self.smultiengine.pull(keys, targets = targets, block = block)
  102.         return d
  103.  
  104.     remote_pull = packageResult(remote_pull)
  105.     
  106.     def remote_push_function(self, binaryNS, targets, block):
  107.         
  108.         try:
  109.             namespace = pickle.loads(binaryNS)
  110.         except:
  111.             d = defer.fail(failure.Failure())
  112.  
  113.         namespace = uncanDict(namespace)
  114.         d = self.smultiengine.push_function(namespace, targets = targets, block = block)
  115.         return d
  116.  
  117.     remote_push_function = packageResult(remote_push_function)
  118.     
  119.     def _canMultipleKeys(self, result):
  120.         return [ canSequence(r) for r in result ]
  121.  
  122.     
  123.     def remote_pull_function(self, keys, targets, block):
  124.         
  125.         def can_functions(r, keys):
  126.             if len(keys) == 1 or isinstance(keys, str):
  127.                 result = canSequence(r)
  128.             elif len(keys) > 1:
  129.                 result = [ canSequence(s) for s in r ]
  130.             
  131.             return result
  132.  
  133.         d = self.smultiengine.pull_function(keys, targets = targets, block = block)
  134.         if block:
  135.             d.addCallback(can_functions, keys)
  136.         else:
  137.             (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, can_functions, keys)))
  138.         return d
  139.  
  140.     remote_pull_function = packageResult(remote_pull_function)
  141.     
  142.     def remote_push_serialized(self, binaryNS, targets, block):
  143.         
  144.         try:
  145.             namespace = pickle.loads(binaryNS)
  146.         except:
  147.             d = defer.fail(failure.Failure())
  148.  
  149.         d = self.smultiengine.push_serialized(namespace, targets = targets, block = block)
  150.         return d
  151.  
  152.     remote_push_serialized = packageResult(remote_push_serialized)
  153.     
  154.     def remote_pull_serialized(self, keys, targets, block):
  155.         d = self.smultiengine.pull_serialized(keys, targets = targets, block = block)
  156.         return d
  157.  
  158.     remote_pull_serialized = packageResult(remote_pull_serialized)
  159.     
  160.     def remote_get_result(self, i, targets, block):
  161.         if i == 'None':
  162.             i = None
  163.         
  164.         return self.smultiengine.get_result(i, targets = targets, block = block)
  165.  
  166.     remote_get_result = packageResult(remote_get_result)
  167.     
  168.     def remote_reset(self, targets, block):
  169.         return self.smultiengine.reset(targets = targets, block = block)
  170.  
  171.     remote_reset = packageResult(remote_reset)
  172.     
  173.     def remote_keys(self, targets, block):
  174.         return self.smultiengine.keys(targets = targets, block = block)
  175.  
  176.     remote_keys = packageResult(remote_keys)
  177.     
  178.     def remote_kill(self, controller, targets, block):
  179.         return self.smultiengine.kill(controller, targets = targets, block = block)
  180.  
  181.     remote_kill = packageResult(remote_kill)
  182.     
  183.     def remote_clear_queue(self, targets, block):
  184.         return self.smultiengine.clear_queue(targets = targets, block = block)
  185.  
  186.     remote_clear_queue = packageResult(remote_clear_queue)
  187.     
  188.     def remote_queue_status(self, targets, block):
  189.         return self.smultiengine.queue_status(targets = targets, block = block)
  190.  
  191.     remote_queue_status = packageResult(remote_queue_status)
  192.     
  193.     def remote_set_properties(self, binaryNS, targets, block):
  194.         
  195.         try:
  196.             ns = pickle.loads(binaryNS)
  197.         except:
  198.             d = defer.fail(failure.Failure())
  199.  
  200.         d = self.smultiengine.set_properties(ns, targets = targets, block = block)
  201.         return d
  202.  
  203.     remote_set_properties = packageResult(remote_set_properties)
  204.     
  205.     def remote_get_properties(self, keys, targets, block):
  206.         if keys == 'None':
  207.             keys = None
  208.         
  209.         return self.smultiengine.get_properties(keys, targets = targets, block = block)
  210.  
  211.     remote_get_properties = packageResult(remote_get_properties)
  212.     
  213.     def remote_has_properties(self, keys, targets, block):
  214.         return self.smultiengine.has_properties(keys, targets = targets, block = block)
  215.  
  216.     remote_has_properties = packageResult(remote_has_properties)
  217.     
  218.     def remote_del_properties(self, keys, targets, block):
  219.         return self.smultiengine.del_properties(keys, targets = targets, block = block)
  220.  
  221.     remote_del_properties = packageResult(remote_del_properties)
  222.     
  223.     def remote_clear_properties(self, targets, block):
  224.         return self.smultiengine.clear_properties(targets = targets, block = block)
  225.  
  226.     remote_clear_properties = packageResult(remote_clear_properties)
  227.     
  228.     def remote_get_ids(self):
  229.         return self.smultiengine.get_ids()
  230.  
  231.     
  232.     def remote_get_client_name(self):
  233.         return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
  234.  
  235.  
  236. components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, IMultiEngine, IFCSynchronousMultiEngine)
  237.  
  238. class FCFullSynchronousMultiEngineClient(object):
  239.     implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor, IMultiEngineMapperFactory, IMapper)
  240.     
  241.     def __init__(self, remote_reference):
  242.         self.remote_reference = remote_reference
  243.         self._deferredIDCallbacks = { }
  244.         self.pdm = PendingDeferredManager()
  245.  
  246.     
  247.     def unpackage(self, r):
  248.         return pickle.loads(r)
  249.  
  250.     
  251.     def get_pending_deferred(self, deferredID, block = True):
  252.         if self.pdm.quick_has_id(deferredID):
  253.             d = self.pdm.get_pending_deferred(deferredID, block)
  254.             return d
  255.         d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
  256.         d.addCallback(self.unpackage)
  257.         
  258.         try:
  259.             callback = self._deferredIDCallbacks.pop(deferredID)
  260.         except KeyError:
  261.             self.pdm.quick_has_id(deferredID)
  262.             self.pdm.quick_has_id(deferredID)
  263.             callback = None
  264.         except:
  265.             self.pdm.quick_has_id(deferredID)
  266.  
  267.         if callback is not None:
  268.             d.addCallback(callback[0], *callback[1], **callback[2])
  269.         
  270.         return d
  271.  
  272.     
  273.     def clear_pending_deferreds(self):
  274.         self.pdm.clear_pending_deferreds()
  275.         d2 = self.remote_reference.callRemote('clear_pending_deferreds')
  276.         d2.addCallback(self.unpackage)
  277.         return d2
  278.  
  279.     
  280.     def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
  281.         self._deferredIDCallbacks[did] = (callback, args, kwargs)
  282.         return did
  283.  
  284.     
  285.     def execute(self, lines, targets = 'all', block = True):
  286.         d = self.remote_reference.callRemote('execute', lines, targets, block)
  287.         d.addCallback(self.unpackage)
  288.         return d
  289.  
  290.     
  291.     def push(self, namespace, targets = 'all', block = True):
  292.         serial = pickle.dumps(namespace, 2)
  293.         d = self.remote_reference.callRemote('push', serial, targets, block)
  294.         d.addCallback(self.unpackage)
  295.         return d
  296.  
  297.     
  298.     def pull(self, keys, targets = 'all', block = True):
  299.         d = self.remote_reference.callRemote('pull', keys, targets, block)
  300.         d.addCallback(self.unpackage)
  301.         return d
  302.  
  303.     
  304.     def push_function(self, namespace, targets = 'all', block = True):
  305.         cannedNamespace = canDict(namespace)
  306.         serial = pickle.dumps(cannedNamespace, 2)
  307.         d = self.remote_reference.callRemote('push_function', serial, targets, block)
  308.         d.addCallback(self.unpackage)
  309.         return d
  310.  
  311.     
  312.     def pull_function(self, keys, targets = 'all', block = True):
  313.         
  314.         def uncan_functions(r, keys):
  315.             if len(keys) == 1 or isinstance(keys, str):
  316.                 return uncanSequence(r)
  317.             if len(keys) > 1:
  318.                 return [ uncanSequence(s) for s in r ]
  319.  
  320.         d = self.remote_reference.callRemote('pull_function', keys, targets, block)
  321.         if block:
  322.             d.addCallback(self.unpackage)
  323.             d.addCallback(uncan_functions, keys)
  324.         else:
  325.             d.addCallback(self.unpackage)
  326.             (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, uncan_functions, keys)))
  327.         return d
  328.  
  329.     
  330.     def push_serialized(self, namespace, targets = 'all', block = True):
  331.         cannedNamespace = canDict(namespace)
  332.         serial = pickle.dumps(cannedNamespace, 2)
  333.         d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
  334.         d.addCallback(self.unpackage)
  335.         return d
  336.  
  337.     
  338.     def pull_serialized(self, keys, targets = 'all', block = True):
  339.         d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
  340.         d.addCallback(self.unpackage)
  341.         return d
  342.  
  343.     
  344.     def get_result(self, i = None, targets = 'all', block = True):
  345.         if i is None:
  346.             i = 'None'
  347.         
  348.         d = self.remote_reference.callRemote('get_result', i, targets, block)
  349.         d.addCallback(self.unpackage)
  350.         return d
  351.  
  352.     
  353.     def reset(self, targets = 'all', block = True):
  354.         d = self.remote_reference.callRemote('reset', targets, block)
  355.         d.addCallback(self.unpackage)
  356.         return d
  357.  
  358.     
  359.     def keys(self, targets = 'all', block = True):
  360.         d = self.remote_reference.callRemote('keys', targets, block)
  361.         d.addCallback(self.unpackage)
  362.         return d
  363.  
  364.     
  365.     def kill(self, controller = False, targets = 'all', block = True):
  366.         d = self.remote_reference.callRemote('kill', controller, targets, block)
  367.         d.addCallback(self.unpackage)
  368.         return d
  369.  
  370.     
  371.     def clear_queue(self, targets = 'all', block = True):
  372.         d = self.remote_reference.callRemote('clear_queue', targets, block)
  373.         d.addCallback(self.unpackage)
  374.         return d
  375.  
  376.     
  377.     def queue_status(self, targets = 'all', block = True):
  378.         d = self.remote_reference.callRemote('queue_status', targets, block)
  379.         d.addCallback(self.unpackage)
  380.         return d
  381.  
  382.     
  383.     def set_properties(self, properties, targets = 'all', block = True):
  384.         serial = pickle.dumps(properties, 2)
  385.         d = self.remote_reference.callRemote('set_properties', serial, targets, block)
  386.         d.addCallback(self.unpackage)
  387.         return d
  388.  
  389.     
  390.     def get_properties(self, keys = None, targets = 'all', block = True):
  391.         if keys == None:
  392.             keys = 'None'
  393.         
  394.         d = self.remote_reference.callRemote('get_properties', keys, targets, block)
  395.         d.addCallback(self.unpackage)
  396.         return d
  397.  
  398.     
  399.     def has_properties(self, keys, targets = 'all', block = True):
  400.         d = self.remote_reference.callRemote('has_properties', keys, targets, block)
  401.         d.addCallback(self.unpackage)
  402.         return d
  403.  
  404.     
  405.     def del_properties(self, keys, targets = 'all', block = True):
  406.         d = self.remote_reference.callRemote('del_properties', keys, targets, block)
  407.         d.addCallback(self.unpackage)
  408.         return d
  409.  
  410.     
  411.     def clear_properties(self, targets = 'all', block = True):
  412.         d = self.remote_reference.callRemote('clear_properties', targets, block)
  413.         d.addCallback(self.unpackage)
  414.         return d
  415.  
  416.     
  417.     def get_ids(self):
  418.         d = self.remote_reference.callRemote('get_ids')
  419.         return d
  420.  
  421.     
  422.     def _process_targets(self, targets):
  423.         
  424.         def create_targets(ids):
  425.             if isinstance(targets, int):
  426.                 engines = [
  427.                     targets]
  428.             elif targets == 'all':
  429.                 engines = ids
  430.             elif isinstance(targets, (list, tuple)):
  431.                 engines = targets
  432.             
  433.             for t in engines:
  434.                 if t not in ids:
  435.                     raise error.InvalidEngineID('engine with id %r does not exist' % t)
  436.                 t not in ids
  437.             
  438.             return engines
  439.  
  440.         d = self.get_ids()
  441.         d.addCallback(create_targets)
  442.         return d
  443.  
  444.     
  445.     def scatter(self, key, seq, dist = 'b', flatten = False, targets = 'all', block = True):
  446.         
  447.         def do_scatter(engines):
  448.             nEngines = len(engines)
  449.             mapClass = Map.dists[dist]
  450.             mapObject = mapClass()
  451.             d_list = []
  452.             for index, engineid in enumerate(engines):
  453.                 partition = mapObject.getPartition(seq, index, nEngines)
  454.                 if flatten and len(partition) == 1:
  455.                     d = self.push({
  456.                         key: partition[0] }, targets = engineid, block = False)
  457.                 else:
  458.                     d = self.push({
  459.                         key: partition }, targets = engineid, block = False)
  460.                 d_list.append(d)
  461.             
  462.             d = gatherBoth(d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
  463.             d.addCallback(error.collect_exceptions, 'scatter')
  464.             
  465.             def process_did_list(did_list):
  466.                 new_d_list = [ self.get_pending_deferred(did, True) for did in did_list ]
  467.                 final_d = gatherBoth(new_d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
  468.                 final_d.addCallback(error.collect_exceptions, 'scatter')
  469.                 final_d.addCallback((lambda lop: [ i[0] for i in lop ]))
  470.                 return final_d
  471.  
  472.             if block:
  473.                 d.addCallback(process_did_list)
  474.                 return d
  475.             deferred_id = self.pdm.get_deferred_id()
  476.             d_to_return = defer.Deferred()
  477.             
  478.             def do_it(did_list):
  479.                 d_to_return.callback(deferred_id)
  480.                 return process_did_list(did_list)
  481.  
  482.             d.addCallback(do_it)
  483.             self.pdm.save_pending_deferred(d, deferred_id)
  484.             return d_to_return
  485.  
  486.         d = self._process_targets(targets)
  487.         d.addCallback(do_scatter)
  488.         return d
  489.  
  490.     
  491.     def gather(self, key, dist = 'b', targets = 'all', block = True):
  492.         
  493.         def do_gather(engines):
  494.             nEngines = len(engines)
  495.             mapClass = Map.dists[dist]
  496.             mapObject = mapClass()
  497.             d_list = []
  498.             for index, engineid in enumerate(engines):
  499.                 d = self.pull(key, targets = engineid, block = False)
  500.                 d_list.append(d)
  501.             
  502.             d = gatherBoth(d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
  503.             d.addCallback(error.collect_exceptions, 'scatter')
  504.             
  505.             def process_did_list(did_list):
  506.                 new_d_list = [ self.get_pending_deferred(did, True) for did in did_list ]
  507.                 final_d = gatherBoth(new_d_list, fireOnOneErrback = 0, consumeErrors = 1, logErrors = 0)
  508.                 final_d.addCallback(error.collect_exceptions, 'gather')
  509.                 final_d.addCallback((lambda lop: [ i[0] for i in lop ]))
  510.                 final_d.addCallback(mapObject.joinPartitions)
  511.                 return final_d
  512.  
  513.             if block:
  514.                 d.addCallback(process_did_list)
  515.                 return d
  516.             deferred_id = self.pdm.get_deferred_id()
  517.             d_to_return = defer.Deferred()
  518.             
  519.             def do_it(did_list):
  520.                 d_to_return.callback(deferred_id)
  521.                 return process_did_list(did_list)
  522.  
  523.             d.addCallback(do_it)
  524.             self.pdm.save_pending_deferred(d, deferred_id)
  525.             return d_to_return
  526.  
  527.         d = self._process_targets(targets)
  528.         d.addCallback(do_gather)
  529.         return d
  530.  
  531.     
  532.     def raw_map(self, func, sequences, dist = 'b', targets = 'all', block = True):
  533.         if not isinstance(sequences, (list, tuple)):
  534.             raise TypeError('sequences must be a list or tuple')
  535.         isinstance(sequences, (list, tuple))
  536.         max_len = max((lambda .0: for s in .0:
  537. len(s))(sequences))
  538.         for s in sequences:
  539.             if len(s) != max_len:
  540.                 raise ValueError('all sequences must have equal length')
  541.             len(s) != max_len
  542.         
  543.         if isinstance(func, FunctionType):
  544.             d = self.push_function(dict(_ipython_map_func = func), targets = targets, block = False)
  545.             (d.addCallback,)((lambda did: self.get_pending_deferred(did, True)))
  546.             sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
  547.         elif isinstance(func, str):
  548.             d = defer.succeed(None)
  549.             sourceToRun = '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
  550.         else:
  551.             raise TypeError('func must be a function or str')
  552.         (None, None, None, isinstance(func, FunctionType).addCallback)((lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets = targets)))
  553.         (None, None, d.addCallback)((lambda _: self.execute(sourceToRun, targets = targets, block = False)))
  554.         (d.addCallback,)((lambda did: self.get_pending_deferred(did, True)))
  555.         (None, None, None, d.addCallback)((lambda _: self.gather('_ipython_map_seq_result', dist, targets = targets, block = block)))
  556.         return d
  557.  
  558.     
  559.     def map(self, func, *sequences):
  560.         return self.mapper().map(func, *sequences)
  561.  
  562.     
  563.     def mapper(self, dist = 'b', targets = 'all', block = True):
  564.         return MultiEngineMapper(self, dist, targets, block)
  565.  
  566.     
  567.     def parallel(self, dist = 'b', targets = 'all', block = True):
  568.         mapper = self.mapper(dist, targets, block)
  569.         pf = ParallelFunction(mapper)
  570.         return pf
  571.  
  572.     
  573.     def _transformPullResult(self, pushResult, multitargets, lenKeys):
  574.         if not multitargets:
  575.             result = pushResult[0]
  576.         elif lenKeys > 1:
  577.             result = zip(*pushResult)
  578.         elif lenKeys is 1:
  579.             result = list(pushResult)
  580.         
  581.         return result
  582.  
  583.     
  584.     def zip_pull(self, keys, targets = 'all', block = True):
  585.         if not isinstance(targets, int):
  586.             pass
  587.         multitargets = len(targets) > 1
  588.         lenKeys = len(keys)
  589.         d = self.pull(keys, targets = targets, block = block)
  590.         if block:
  591.             d.addCallback(self._transformPullResult, multitargets, lenKeys)
  592.         else:
  593.             (None, None, d.addCallback)((lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys)))
  594.         return d
  595.  
  596.     
  597.     def run(self, fname, targets = 'all', block = True):
  598.         fileobj = open(fname, 'r')
  599.         source = fileobj.read()
  600.         fileobj.close()
  601.         
  602.         try:
  603.             code = compile(source, fname, 'exec')
  604.         except:
  605.             return defer.fail(failure.Failure())
  606.  
  607.         d = self.execute(source, targets = targets, block = block)
  608.         return d
  609.  
  610.     
  611.     def adapt_to_blocking_client(self):
  612.         IFullBlockingMultiEngineClient = IFullBlockingMultiEngineClient
  613.         import IPython.kernel.multiengineclient
  614.         return IFullBlockingMultiEngineClient(self)
  615.  
  616.  
  617.