home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_645 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  16.2 KB  |  589 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'Pool']
  6. import threading
  7. import Queue
  8. import itertools
  9. import collections
  10. import time
  11. from multiprocessing import Process, cpu_count, TimeoutError
  12. from multiprocessing.util import Finalize, debug
  13. RUN = 0
  14. CLOSE = 1
  15. TERMINATE = 2
  16. job_counter = itertools.count()
  17.  
  18. def mapstar(args):
  19.     return map(*args)
  20.  
  21.  
  22. def worker(inqueue, outqueue, initializer = None, initargs = ()):
  23.     put = outqueue.put
  24.     get = inqueue.get
  25.     if hasattr(inqueue, '_writer'):
  26.         inqueue._writer.close()
  27.         outqueue._reader.close()
  28.     
  29.     if initializer is not None:
  30.         initializer(*initargs)
  31.     
  32.     while None:
  33.         
  34.         try:
  35.             task = get()
  36.         except (EOFError, IOError):
  37.             debug('worker got EOFError or IOError -- exiting')
  38.             break
  39.  
  40.         if task is None:
  41.             debug('worker got sentinel -- exiting')
  42.             break
  43.         
  44.         (job, i, func, args, kwds) = task
  45.         
  46.         try:
  47.             result = (True, func(*args, **kwds))
  48.         except Exception:
  49.             e = None
  50.             result = (False, e)
  51.  
  52.         continue
  53.         return None
  54.  
  55.  
  56. class Pool(object):
  57.     Process = Process
  58.     
  59.     def __init__(self, processes = None, initializer = None, initargs = ()):
  60.         self._setup_queues()
  61.         self._taskqueue = Queue.Queue()
  62.         self._cache = { }
  63.         self._state = RUN
  64.         if processes is None:
  65.             
  66.             try:
  67.                 processes = cpu_count()
  68.             except NotImplementedError:
  69.                 processes = 1
  70.             except:
  71.                 None<EXCEPTION MATCH>NotImplementedError
  72.             
  73.  
  74.         None<EXCEPTION MATCH>NotImplementedError
  75.         self._pool = []
  76.         for i in range(processes):
  77.             w = self.Process(target = worker, args = (self._inqueue, self._outqueue, initializer, initargs))
  78.             self._pool.append(w)
  79.             w.name = w.name.replace('Process', 'PoolWorker')
  80.             w.daemon = True
  81.             w.start()
  82.         
  83.         self._task_handler = threading.Thread(target = Pool._handle_tasks, args = (self._taskqueue, self._quick_put, self._outqueue, self._pool))
  84.         self._task_handler.daemon = True
  85.         self._task_handler._state = RUN
  86.         self._task_handler.start()
  87.         self._result_handler = threading.Thread(target = Pool._handle_results, args = (self._outqueue, self._quick_get, self._cache))
  88.         self._result_handler.daemon = True
  89.         self._result_handler._state = RUN
  90.         self._result_handler.start()
  91.         self._terminate = Finalize(self, self._terminate_pool, args = (self._taskqueue, self._inqueue, self._outqueue, self._pool, self._task_handler, self._result_handler, self._cache), exitpriority = 15)
  92.  
  93.     
  94.     def _setup_queues(self):
  95.         SimpleQueue = SimpleQueue
  96.         import queues
  97.         self._inqueue = SimpleQueue()
  98.         self._outqueue = SimpleQueue()
  99.         self._quick_put = self._inqueue._writer.send
  100.         self._quick_get = self._outqueue._reader.recv
  101.  
  102.     
  103.     def apply(self, func, args = (), kwds = { }):
  104.         return self.apply_async(func, args, kwds).get()
  105.  
  106.     
  107.     def map(self, func, iterable, chunksize = None):
  108.         return self.map_async(func, iterable, chunksize).get()
  109.  
  110.     
  111.     def imap(self, func, iterable, chunksize = 1):
  112.         if chunksize == 1:
  113.             result = IMapIterator(self._cache)
  114.             (None, self._taskqueue.put)(((lambda .0: for i, x in .0:
  115. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  116.             return result
  117.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  118.         result = IMapIterator(self._cache)
  119.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  120. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  121.         return (lambda .0: for chunk in .0:
  122. for item in chunk:
  123. item)(result)
  124.  
  125.     
  126.     def imap_unordered(self, func, iterable, chunksize = 1):
  127.         if chunksize == 1:
  128.             result = IMapUnorderedIterator(self._cache)
  129.             (None, self._taskqueue.put)(((lambda .0: for i, x in .0:
  130. (result._job, i, func, (x,), { }))(enumerate(iterable)), result._set_length))
  131.             return result
  132.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  133.         result = IMapUnorderedIterator(self._cache)
  134.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  135. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), result._set_length))
  136.         return (lambda .0: for chunk in .0:
  137. for item in chunk:
  138. item)(result)
  139.  
  140.     
  141.     def apply_async(self, func, args = (), kwds = { }, callback = None):
  142.         result = ApplyResult(self._cache, callback)
  143.         self._taskqueue.put(([
  144.             (result._job, None, func, args, kwds)], None))
  145.         return result
  146.  
  147.     
  148.     def map_async(self, func, iterable, chunksize = None, callback = None):
  149.         if not hasattr(iterable, '__len__'):
  150.             iterable = list(iterable)
  151.         
  152.         if chunksize is None:
  153.             (chunksize, extra) = divmod(len(iterable), len(self._pool) * 4)
  154.             if extra:
  155.                 chunksize += 1
  156.             
  157.         
  158.         task_batches = Pool._get_tasks(func, iterable, chunksize)
  159.         result = MapResult(self._cache, chunksize, len(iterable), callback)
  160.         (self._taskqueue.put,)(((lambda .0: for i, x in .0:
  161. (result._job, i, mapstar, (x,), { }))(enumerate(task_batches)), None))
  162.         return result
  163.  
  164.     
  165.     def _handle_tasks(taskqueue, put, outqueue, pool):
  166.         thread = threading.current_thread()
  167.         for taskseq, set_length in iter(taskqueue.get, None):
  168.             i = -1
  169.             for i, task in enumerate(taskseq):
  170.                 if thread._state:
  171.                     debug('task handler found thread._state != RUN')
  172.                     break
  173.                 
  174.                 
  175.                 try:
  176.                     put(task)
  177.                 continue
  178.                 except IOError:
  179.                     debug('could not put task on queue')
  180.                     break
  181.                     continue
  182.                 
  183.  
  184.             elif set_length:
  185.                 debug('doing set_length()')
  186.                 set_length(i + 1)
  187.                 continue
  188.             None<EXCEPTION MATCH>IOError
  189.         
  190.         
  191.         try:
  192.             debug('task handler sending sentinel to result handler')
  193.             outqueue.put(None)
  194.             debug('task handler sending sentinel to workers')
  195.             for p in pool:
  196.                 put(None)
  197.         except IOError:
  198.             debug('task handler got IOError when sending sentinels')
  199.  
  200.         debug('task handler exiting')
  201.  
  202.     _handle_tasks = staticmethod(_handle_tasks)
  203.     
  204.     def _handle_results(outqueue, get, cache):
  205.         thread = threading.current_thread()
  206.         while None:
  207.             
  208.             try:
  209.                 task = get()
  210.             except (IOError, EOFError):
  211.                 debug('result handler got EOFError/IOError -- exiting')
  212.                 return None
  213.  
  214.             if thread._state:
  215.                 debug('result handler found thread._state=TERMINATE')
  216.                 break
  217.             
  218.             if task is None:
  219.                 debug('result handler got sentinel')
  220.                 break
  221.             
  222.             (job, i, obj) = task
  223.             
  224.             try:
  225.                 cache[job]._set(i, obj)
  226.             continue
  227.             except KeyError:
  228.                 continue
  229.             
  230.  
  231.             while cache and thread._state != TERMINATE:
  232.                 
  233.                 try:
  234.                     task = get()
  235.                 except (IOError, EOFError):
  236.                     None<EXCEPTION MATCH>KeyError
  237.                     None<EXCEPTION MATCH>KeyError
  238.                     debug('result handler got EOFError/IOError -- exiting')
  239.                     return None
  240.  
  241.                 if task is None:
  242.                     debug('result handler ignoring extra sentinel')
  243.                     continue
  244.                 
  245.                 (job, i, obj) = task
  246.                 
  247.                 try:
  248.                     cache[job]._set(i, obj)
  249.                 continue
  250.                 except KeyError:
  251.                     continue
  252.                 
  253.  
  254.                 None<EXCEPTION MATCH>KeyError
  255.             if hasattr(outqueue, '_reader'):
  256.                 debug('ensuring that outqueue is not full')
  257.                 
  258.                 try:
  259.                     for i in range(10):
  260.                         if not outqueue._reader.poll():
  261.                             break
  262.                         
  263.                         get()
  264.                 except (IOError, EOFError):
  265.                     pass
  266.                 except:
  267.                     None<EXCEPTION MATCH>(IOError, EOFError)
  268.                 
  269.  
  270.         None<EXCEPTION MATCH>(IOError, EOFError)
  271.         debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state)
  272.  
  273.     _handle_results = staticmethod(_handle_results)
  274.     
  275.     def _get_tasks(func, it, size):
  276.         it = iter(it)
  277.         while None:
  278.             x = tuple(itertools.islice(it, size))
  279.             if not x:
  280.                 return None
  281.             yield (func, x)
  282.             continue
  283.             return None
  284.  
  285.     _get_tasks = staticmethod(_get_tasks)
  286.     
  287.     def __reduce__(self):
  288.         raise NotImplementedError('pool objects cannot be passed between processes or pickled')
  289.  
  290.     
  291.     def close(self):
  292.         debug('closing pool')
  293.         if self._state == RUN:
  294.             self._state = CLOSE
  295.             self._taskqueue.put(None)
  296.         
  297.  
  298.     
  299.     def terminate(self):
  300.         debug('terminating pool')
  301.         self._state = TERMINATE
  302.         self._terminate()
  303.  
  304.     
  305.     def join(self):
  306.         debug('joining pool')
  307.         self._task_handler.join()
  308.         self._result_handler.join()
  309.         for p in self._pool:
  310.             p.join()
  311.         
  312.  
  313.     
  314.     def _help_stuff_finish(inqueue, task_handler, size):
  315.         debug('removing tasks from inqueue until task handler finished')
  316.         inqueue._rlock.acquire()
  317.         while task_handler.is_alive() and inqueue._reader.poll():
  318.             inqueue._reader.recv()
  319.             time.sleep(0)
  320.  
  321.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  322.     
  323.     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, task_handler, result_handler, cache):
  324.         debug('finalizing pool')
  325.         task_handler._state = TERMINATE
  326.         taskqueue.put(None)
  327.         debug('helping task handler/workers to finish')
  328.         cls._help_stuff_finish(inqueue, task_handler, len(pool))
  329.         result_handler._state = TERMINATE
  330.         outqueue.put(None)
  331.         if pool and hasattr(pool[0], 'terminate'):
  332.             debug('terminating workers')
  333.             for p in pool:
  334.                 p.terminate()
  335.             
  336.         
  337.         debug('joining task handler')
  338.         task_handler.join(1e+100)
  339.         debug('joining result handler')
  340.         result_handler.join(1e+100)
  341.         if pool and hasattr(pool[0], 'terminate'):
  342.             debug('joining pool workers')
  343.             for p in pool:
  344.                 p.join()
  345.             
  346.         
  347.  
  348.     _terminate_pool = classmethod(_terminate_pool)
  349.  
  350.  
  351. class ApplyResult(object):
  352.     
  353.     def __init__(self, cache, callback):
  354.         self._cond = threading.Condition(threading.Lock())
  355.         self._job = job_counter.next()
  356.         self._cache = cache
  357.         self._ready = False
  358.         self._callback = callback
  359.         cache[self._job] = self
  360.  
  361.     
  362.     def ready(self):
  363.         return self._ready
  364.  
  365.     
  366.     def successful(self):
  367.         return self._success
  368.  
  369.     
  370.     def wait(self, timeout = None):
  371.         self._cond.acquire()
  372.         
  373.         try:
  374.             if not self._ready:
  375.                 self._cond.wait(timeout)
  376.         finally:
  377.             self._cond.release()
  378.  
  379.  
  380.     
  381.     def get(self, timeout = None):
  382.         self.wait(timeout)
  383.         if not self._ready:
  384.             raise TimeoutError
  385.         self._ready
  386.         if self._success:
  387.             return self._value
  388.         raise self._value
  389.  
  390.     
  391.     def _set(self, i, obj):
  392.         (self._success, self._value) = obj
  393.         if self._callback and self._success:
  394.             self._callback(self._value)
  395.         
  396.         self._cond.acquire()
  397.         
  398.         try:
  399.             self._ready = True
  400.             self._cond.notify()
  401.         finally:
  402.             self._cond.release()
  403.  
  404.         del self._cache[self._job]
  405.  
  406.  
  407.  
  408. class MapResult(ApplyResult):
  409.     
  410.     def __init__(self, cache, chunksize, length, callback):
  411.         ApplyResult.__init__(self, cache, callback)
  412.         self._success = True
  413.         self._value = [
  414.             None] * length
  415.         self._chunksize = chunksize
  416.         if chunksize <= 0:
  417.             self._number_left = 0
  418.             self._ready = True
  419.         else:
  420.             self._number_left = length // chunksize + bool(length % chunksize)
  421.  
  422.     
  423.     def _set(self, i, success_result):
  424.         (success, result) = success_result
  425.         if success:
  426.             self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
  427.             self._number_left -= 1
  428.             if self._number_left == 0:
  429.                 if self._callback:
  430.                     self._callback(self._value)
  431.                 
  432.                 del self._cache[self._job]
  433.                 self._cond.acquire()
  434.                 
  435.                 try:
  436.                     self._ready = True
  437.                     self._cond.notify()
  438.                 finally:
  439.                     self._cond.release()
  440.  
  441.             
  442.         else:
  443.             self._success = False
  444.             self._value = result
  445.             del self._cache[self._job]
  446.             self._cond.acquire()
  447.             
  448.             try:
  449.                 self._ready = True
  450.                 self._cond.notify()
  451.             finally:
  452.                 self._cond.release()
  453.  
  454.  
  455.  
  456.  
  457. class IMapIterator(object):
  458.     
  459.     def __init__(self, cache):
  460.         self._cond = threading.Condition(threading.Lock())
  461.         self._job = job_counter.next()
  462.         self._cache = cache
  463.         self._items = collections.deque()
  464.         self._index = 0
  465.         self._length = None
  466.         self._unsorted = { }
  467.         cache[self._job] = self
  468.  
  469.     
  470.     def __iter__(self):
  471.         return self
  472.  
  473.     
  474.     def next(self, timeout = None):
  475.         self._cond.acquire()
  476.         
  477.         try:
  478.             item = self._items.popleft()
  479.         except IndexError:
  480.             if self._index == self._length:
  481.                 raise StopIteration
  482.             self._index == self._length
  483.             self._cond.wait(timeout)
  484.             
  485.             try:
  486.                 item = self._items.popleft()
  487.             except IndexError:
  488.                 if self._index == self._length:
  489.                     raise StopIteration
  490.                 self._index == self._length
  491.                 raise TimeoutError
  492.             except:
  493.                 None<EXCEPTION MATCH>IndexError
  494.             
  495.  
  496.             None<EXCEPTION MATCH>IndexError
  497.         finally:
  498.             self._cond.release()
  499.  
  500.         (success, value) = item
  501.         if success:
  502.             return value
  503.         raise value
  504.  
  505.     __next__ = next
  506.     
  507.     def _set(self, i, obj):
  508.         self._cond.acquire()
  509.         
  510.         try:
  511.             if self._index == i:
  512.                 self._items.append(obj)
  513.                 self._index += 1
  514.                 while self._index in self._unsorted:
  515.                     obj = self._unsorted.pop(self._index)
  516.                     self._items.append(obj)
  517.                     self._index += 1
  518.                     continue
  519.                     self
  520.                 self._cond.notify()
  521.             else:
  522.                 self._unsorted[i] = obj
  523.             if self._index == self._length:
  524.                 del self._cache[self._job]
  525.         finally:
  526.             self._cond.release()
  527.  
  528.  
  529.     
  530.     def _set_length(self, length):
  531.         self._cond.acquire()
  532.         
  533.         try:
  534.             self._length = length
  535.             if self._index == self._length:
  536.                 self._cond.notify()
  537.                 del self._cache[self._job]
  538.         finally:
  539.             self._cond.release()
  540.  
  541.  
  542.  
  543.  
  544. class IMapUnorderedIterator(IMapIterator):
  545.     
  546.     def _set(self, i, obj):
  547.         self._cond.acquire()
  548.         
  549.         try:
  550.             self._items.append(obj)
  551.             self._index += 1
  552.             self._cond.notify()
  553.             if self._index == self._length:
  554.                 del self._cache[self._job]
  555.         finally:
  556.             self._cond.release()
  557.  
  558.  
  559.  
  560.  
  561. class ThreadPool(Pool):
  562.     from dummy import Process
  563.     
  564.     def __init__(self, processes = None, initializer = None, initargs = ()):
  565.         Pool.__init__(self, processes, initializer, initargs)
  566.  
  567.     
  568.     def _setup_queues(self):
  569.         self._inqueue = Queue.Queue()
  570.         self._outqueue = Queue.Queue()
  571.         self._quick_put = self._inqueue.put
  572.         self._quick_get = self._outqueue.get
  573.  
  574.     
  575.     def _help_stuff_finish(inqueue, task_handler, size):
  576.         inqueue.not_empty.acquire()
  577.         
  578.         try:
  579.             inqueue.queue.clear()
  580.             inqueue.queue.extend([
  581.                 None] * size)
  582.             inqueue.not_empty.notify_all()
  583.         finally:
  584.             inqueue.not_empty.release()
  585.  
  586.  
  587.     _help_stuff_finish = staticmethod(_help_stuff_finish)
  588.  
  589.