home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- from __future__ import with_statement
- __license__ = 'GPL v3'
- __copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
- __docformat__ = 'restructuredtext en'
- import sys
- import os
- import cPickle
- import time
- import tempfile
- from math import ceil
- from threading import Thread, RLock
- from Queue import Queue, Empty
- from multiprocessing.connection import Listener, arbitrary_address
- from collections import deque
- from binascii import hexlify
- from calibre.utils.ipc.launch import Worker
- from calibre.utils.ipc.worker import PARALLEL_FUNCS
- from calibre import detect_ncpus as cpu_count
- from calibre.constants import iswindows
- _counter = 0
-
- class ConnectedWorker(Thread):
-
- def __init__(self, worker, conn, rfile):
- Thread.__init__(self)
- self.daemon = True
- self.conn = conn
- self.worker = worker
- self.notifications = Queue()
- self._returncode = 'dummy'
- self.killed = False
- self.log_path = worker.log_path
- self.rfile = rfile
-
-
- def start_job(self, job):
- notification = PARALLEL_FUNCS[job.name][-1] is not None
- self.conn.send((job.name, job.args, job.kwargs, job.description))
- if notification:
- self.start()
- else:
- self.conn.close()
- self.job = job
-
-
- def run(self):
- while True:
-
- try:
- x = self.conn.recv()
- self.notifications.put(x)
- continue
- except BaseException:
- break
- continue
-
-
- None<EXCEPTION MATCH>BaseException
-
- try:
- self.conn.close()
- except BaseException:
- pass
-
-
-
- def kill(self):
- self.killed = True
-
- try:
- self.worker.kill()
- except BaseException:
- pass
-
-
-
- def is_alive(self):
- if not (self.killed):
- pass
- return self.worker.is_alive
-
- is_alive = property(is_alive)
-
- def returncode(self):
- if self._returncode != 'dummy':
- return self._returncode
- r = self.worker.returncode
- if self.killed and r is None:
- self._returncode = 1
- return 1
- return r
-
- returncode = property(returncode)
-
-
- class CriticalError(Exception):
- pass
-
-
- class Server(Thread):
-
- def __init__(self, notify_on_job_done = (lambda x: x), pool_size = None, limit = sys.maxint, enforce_cpu_limit = True):
- global _counter
- Thread.__init__(self)
- self.daemon = True
- self.id = _counter + 1
- _counter += 1
- if enforce_cpu_limit:
- limit = min(limit, cpu_count())
-
- self.pool_size = None if pool_size is None else pool_size
- self.notify_on_job_done = notify_on_job_done
- self.auth_key = os.urandom(32)
- self.address = None(arbitrary_address if iswindows else 'AF_UNIX')
- if iswindows and self.address[1] == ':':
- self.address = self.address[2:]
-
- self.listener = Listener(address = self.address, authkey = self.auth_key, backlog = 4)
- self.add_jobs_queue = Queue()
- self.changed_jobs_queue = Queue()
- self.kill_queue = Queue()
- self.waiting_jobs = deque()
- self.processing_jobs = deque()
- self.pool = deque()
- self.workers = deque()
- self.launched_worker_count = 0
- self._worker_launch_lock = RLock()
- self.start()
-
-
- def launch_worker(self, gui = False, redirect_output = None):
- self._worker_launch_lock.__enter__()
-
- try:
- self.launched_worker_count += 1
- id = self.launched_worker_count
- finally:
- pass
-
- rfile = os.path.join(tempfile.gettempdir(), 'calibre_ipc_result_%d_%d.pickle' % (self.id, id))
- env = {
- 'CALIBRE_WORKER_ADDRESS': hexlify(cPickle.dumps(self.listener.address, -1)),
- 'CALIBRE_WORKER_KEY': hexlify(self.auth_key),
- 'CALIBRE_WORKER_RESULT': hexlify(rfile) }
- for i in range(2):
- cw = self.do_launch(env, gui, redirect_output, rfile)
- if isinstance(cw, ConnectedWorker):
- break
- continue
- None if redirect_output is None else self._worker_launch_lock
-
- if isinstance(cw, basestring):
- raise CriticalError('Failed to launch worker process:\n' + cw)
- isinstance(cw, basestring)
- return cw
-
-
- def do_launch(self, env, gui, redirect_output, rfile):
- w = Worker(env, gui = gui)
-
- try:
- w(redirect_output = redirect_output)
- conn = self.listener.accept()
- if conn is None:
- raise Exception('Failed to launch worker process')
- conn is None
- except BaseException:
-
- try:
- w.kill()
- except:
- pass
-
- import traceback
- return traceback.format_exc()
-
- return ConnectedWorker(w, conn, rfile)
-
-
- def add_job(self, job):
- job.done2 = self.notify_on_job_done
- self.add_jobs_queue.put(job)
-
-
- def run_job(self, job, gui = True, redirect_output = False):
- w = self.launch_worker(gui = gui, redirect_output = redirect_output)
- w.start_job(job)
-
-
- def run(self):
- while True:
-
- try:
- job = self.add_jobs_queue.get(True, 0.2)
- if job is None:
- break
-
- self.waiting_jobs.append(job)
- except Empty:
- pass
-
- for worker in self.workers:
- while True:
-
- try:
- n = worker.notifications.get_nowait()
- worker.job.notifications.put(n)
- self.changed_jobs_queue.put(job)
- continue
- except Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Empty
-
- for w in self.workers:
- if not w.is_alive:
- continue
- _[1][w]
-
- if len(self.pool) + len(self.workers) < self.pool_size:
-
- try:
- self.pool.append(self.launch_worker())
- except Exception:
- [ job for worker in _[1] ]
- [ job for worker in _[1] ]
- []
- except:
- [ job for worker in _[1] ]<EXCEPTION MATCH>Exception
-
-
- [ job for worker in _[1] ]
- if len(self.pool) > 0 and len(self.waiting_jobs) > 0:
- job = self.waiting_jobs.pop()
- job.start_time = time.time()
- self.changed_jobs_queue.put(job)
-
- while True:
-
- try:
- j = self.kill_queue.get_nowait()
- self._kill_job(j)
- continue
- except Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Empty
-
-
- def kill_job(self, job):
- self.kill_queue.put(job)
-
-
- def killall(self):
- for worker in self.workers:
- self.kill_queue.put(worker.job)
-
-
-
- def _kill_job(self, job):
- if job.start_time is None:
- job.kill_on_start = True
- return None
- for worker in self.workers:
- if job is worker.job:
- worker.kill()
- job.killed = True
- break
- continue
- job.start_time is None
-
-
-
- def split(self, tasks):
- ans = []
- count = 0
- pos = 0
- delta = int(ceil(len(tasks) / float(self.pool_size)))
- while count < len(tasks):
- section = []
- for t in tasks[pos:pos + delta]:
- section.append((count, t))
- count += 1
-
- ans.append(section)
- pos += delta
- return ans
-
-
- def close(self):
-
- try:
- self.add_jobs_queue.put(None)
- self.listener.close()
- except:
- pass
-
- time.sleep(0.2)
- for worker in self.workers:
-
- try:
- worker.kill()
- continue
- continue
-
-
- for worker in self.pool:
-
- try:
- worker.kill()
- continue
- continue
-
-
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, *args):
- self.close()
-
-
-