home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_1401 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  9.5 KB  |  332 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. from __future__ import with_statement
  5. __license__ = 'GPL v3'
  6. __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
  7. __docformat__ = 'restructuredtext en'
  8. import sys
  9. import os
  10. import cPickle
  11. import time
  12. import tempfile
  13. from math import ceil
  14. from threading import Thread, RLock
  15. from Queue import Queue, Empty
  16. from multiprocessing.connection import Listener, arbitrary_address
  17. from collections import deque
  18. from binascii import hexlify
  19. from calibre.utils.ipc.launch import Worker
  20. from calibre.utils.ipc.worker import PARALLEL_FUNCS
  21. from calibre import detect_ncpus as cpu_count
  22. from calibre.constants import iswindows
  23. _counter = 0
  24.  
  25. class ConnectedWorker(Thread):
  26.     
  27.     def __init__(self, worker, conn, rfile):
  28.         Thread.__init__(self)
  29.         self.daemon = True
  30.         self.conn = conn
  31.         self.worker = worker
  32.         self.notifications = Queue()
  33.         self._returncode = 'dummy'
  34.         self.killed = False
  35.         self.log_path = worker.log_path
  36.         self.rfile = rfile
  37.  
  38.     
  39.     def start_job(self, job):
  40.         notification = PARALLEL_FUNCS[job.name][-1] is not None
  41.         self.conn.send((job.name, job.args, job.kwargs, job.description))
  42.         if notification:
  43.             self.start()
  44.         else:
  45.             self.conn.close()
  46.         self.job = job
  47.  
  48.     
  49.     def run(self):
  50.         while True:
  51.             
  52.             try:
  53.                 x = self.conn.recv()
  54.                 self.notifications.put(x)
  55.             continue
  56.             except BaseException:
  57.                 break
  58.                 continue
  59.             
  60.  
  61.             None<EXCEPTION MATCH>BaseException
  62.         
  63.         try:
  64.             self.conn.close()
  65.         except BaseException:
  66.             pass
  67.  
  68.  
  69.     
  70.     def kill(self):
  71.         self.killed = True
  72.         
  73.         try:
  74.             self.worker.kill()
  75.         except BaseException:
  76.             pass
  77.  
  78.  
  79.     
  80.     def is_alive(self):
  81.         if not (self.killed):
  82.             pass
  83.         return self.worker.is_alive
  84.  
  85.     is_alive = property(is_alive)
  86.     
  87.     def returncode(self):
  88.         if self._returncode != 'dummy':
  89.             return self._returncode
  90.         r = self.worker.returncode
  91.         if self.killed and r is None:
  92.             self._returncode = 1
  93.             return 1
  94.         return r
  95.  
  96.     returncode = property(returncode)
  97.  
  98.  
  99. class CriticalError(Exception):
  100.     pass
  101.  
  102.  
  103. class Server(Thread):
  104.     
  105.     def __init__(self, notify_on_job_done = (lambda x: x), pool_size = None, limit = sys.maxint, enforce_cpu_limit = True):
  106.         global _counter
  107.         Thread.__init__(self)
  108.         self.daemon = True
  109.         self.id = _counter + 1
  110.         _counter += 1
  111.         if enforce_cpu_limit:
  112.             limit = min(limit, cpu_count())
  113.         
  114.         self.pool_size = None if pool_size is None else pool_size
  115.         self.notify_on_job_done = notify_on_job_done
  116.         self.auth_key = os.urandom(32)
  117.         self.address = None(arbitrary_address if iswindows else 'AF_UNIX')
  118.         if iswindows and self.address[1] == ':':
  119.             self.address = self.address[2:]
  120.         
  121.         self.listener = Listener(address = self.address, authkey = self.auth_key, backlog = 4)
  122.         self.add_jobs_queue = Queue()
  123.         self.changed_jobs_queue = Queue()
  124.         self.kill_queue = Queue()
  125.         self.waiting_jobs = deque()
  126.         self.processing_jobs = deque()
  127.         self.pool = deque()
  128.         self.workers = deque()
  129.         self.launched_worker_count = 0
  130.         self._worker_launch_lock = RLock()
  131.         self.start()
  132.  
  133.     
  134.     def launch_worker(self, gui = False, redirect_output = None):
  135.         self._worker_launch_lock.__enter__()
  136.         
  137.         try:
  138.             self.launched_worker_count += 1
  139.             id = self.launched_worker_count
  140.         finally:
  141.             pass
  142.  
  143.         rfile = os.path.join(tempfile.gettempdir(), 'calibre_ipc_result_%d_%d.pickle' % (self.id, id))
  144.         env = {
  145.             'CALIBRE_WORKER_ADDRESS': hexlify(cPickle.dumps(self.listener.address, -1)),
  146.             'CALIBRE_WORKER_KEY': hexlify(self.auth_key),
  147.             'CALIBRE_WORKER_RESULT': hexlify(rfile) }
  148.         for i in range(2):
  149.             cw = self.do_launch(env, gui, redirect_output, rfile)
  150.             if isinstance(cw, ConnectedWorker):
  151.                 break
  152.                 continue
  153.             None if redirect_output is None else self._worker_launch_lock
  154.         
  155.         if isinstance(cw, basestring):
  156.             raise CriticalError('Failed to launch worker process:\n' + cw)
  157.         isinstance(cw, basestring)
  158.         return cw
  159.  
  160.     
  161.     def do_launch(self, env, gui, redirect_output, rfile):
  162.         w = Worker(env, gui = gui)
  163.         
  164.         try:
  165.             w(redirect_output = redirect_output)
  166.             conn = self.listener.accept()
  167.             if conn is None:
  168.                 raise Exception('Failed to launch worker process')
  169.             conn is None
  170.         except BaseException:
  171.             
  172.             try:
  173.                 w.kill()
  174.             except:
  175.                 pass
  176.  
  177.             import traceback
  178.             return traceback.format_exc()
  179.  
  180.         return ConnectedWorker(w, conn, rfile)
  181.  
  182.     
  183.     def add_job(self, job):
  184.         job.done2 = self.notify_on_job_done
  185.         self.add_jobs_queue.put(job)
  186.  
  187.     
  188.     def run_job(self, job, gui = True, redirect_output = False):
  189.         w = self.launch_worker(gui = gui, redirect_output = redirect_output)
  190.         w.start_job(job)
  191.  
  192.     
  193.     def run(self):
  194.         while True:
  195.             
  196.             try:
  197.                 job = self.add_jobs_queue.get(True, 0.2)
  198.                 if job is None:
  199.                     break
  200.                 
  201.                 self.waiting_jobs.append(job)
  202.             except Empty:
  203.                 pass
  204.  
  205.             for worker in self.workers:
  206.                 while True:
  207.                     
  208.                     try:
  209.                         n = worker.notifications.get_nowait()
  210.                         worker.job.notifications.put(n)
  211.                         self.changed_jobs_queue.put(job)
  212.                     continue
  213.                     except Empty:
  214.                         break
  215.                         continue
  216.                     
  217.  
  218.                     None<EXCEPTION MATCH>Empty
  219.             
  220.             for w in self.workers:
  221.                 if not w.is_alive:
  222.                     continue
  223.                 _[1][w]
  224.             
  225.             if len(self.pool) + len(self.workers) < self.pool_size:
  226.                 
  227.                 try:
  228.                     self.pool.append(self.launch_worker())
  229.                 except Exception:
  230.                     [ job for worker in _[1] ]
  231.                     [ job for worker in _[1] ]
  232.                     []
  233.                 except:
  234.                     [ job for worker in _[1] ]<EXCEPTION MATCH>Exception
  235.                 
  236.  
  237.             [ job for worker in _[1] ]
  238.             if len(self.pool) > 0 and len(self.waiting_jobs) > 0:
  239.                 job = self.waiting_jobs.pop()
  240.                 job.start_time = time.time()
  241.                 self.changed_jobs_queue.put(job)
  242.             
  243.             while True:
  244.                 
  245.                 try:
  246.                     j = self.kill_queue.get_nowait()
  247.                     self._kill_job(j)
  248.                 continue
  249.                 except Empty:
  250.                     break
  251.                     continue
  252.                 
  253.  
  254.                 None<EXCEPTION MATCH>Empty
  255.  
  256.     
  257.     def kill_job(self, job):
  258.         self.kill_queue.put(job)
  259.  
  260.     
  261.     def killall(self):
  262.         for worker in self.workers:
  263.             self.kill_queue.put(worker.job)
  264.         
  265.  
  266.     
  267.     def _kill_job(self, job):
  268.         if job.start_time is None:
  269.             job.kill_on_start = True
  270.             return None
  271.         for worker in self.workers:
  272.             if job is worker.job:
  273.                 worker.kill()
  274.                 job.killed = True
  275.                 break
  276.                 continue
  277.             job.start_time is None
  278.         
  279.  
  280.     
  281.     def split(self, tasks):
  282.         ans = []
  283.         count = 0
  284.         pos = 0
  285.         delta = int(ceil(len(tasks) / float(self.pool_size)))
  286.         while count < len(tasks):
  287.             section = []
  288.             for t in tasks[pos:pos + delta]:
  289.                 section.append((count, t))
  290.                 count += 1
  291.             
  292.             ans.append(section)
  293.             pos += delta
  294.         return ans
  295.  
  296.     
  297.     def close(self):
  298.         
  299.         try:
  300.             self.add_jobs_queue.put(None)
  301.             self.listener.close()
  302.         except:
  303.             pass
  304.  
  305.         time.sleep(0.2)
  306.         for worker in self.workers:
  307.             
  308.             try:
  309.                 worker.kill()
  310.             continue
  311.             continue
  312.  
  313.         
  314.         for worker in self.pool:
  315.             
  316.             try:
  317.                 worker.kill()
  318.             continue
  319.             continue
  320.  
  321.         
  322.  
  323.     
  324.     def __enter__(self):
  325.         return self
  326.  
  327.     
  328.     def __exit__(self, *args):
  329.         self.close()
  330.  
  331.  
  332.