home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __all__ = [
- 'makeRequests',
- 'NoResultsPending',
- 'NoWorkersAvailable',
- 'ThreadPool',
- 'WorkRequest',
- 'WorkerThread']
- __author__ = 'Christopher Arndt'
- __version__ = '1.2.3'
- __revision__ = '$Revision: 1.5 $'
- __date__ = '$Date: 2006/06/23 12:32:25 $'
- __license__ = 'Python license'
- import threading
- import Queue
-
- class NoResultsPending(Exception):
- pass
-
-
- class NoWorkersAvailable(Exception):
- pass
-
-
- class WorkerThread(threading.Thread):
-
- def __init__(self, requestsQueue, resultsQueue, **kwds):
- threading.Thread.__init__(self, **kwds)
- self.setDaemon(1)
- self.workRequestQueue = requestsQueue
- self.resultQueue = resultsQueue
- self._dismissed = threading.Event()
- self.start()
-
-
- def run(self):
- while not self._dismissed.isSet():
- request = self.workRequestQueue.get()
- if self._dismissed.isSet():
- self.workRequestQueue.put(request)
- break
-
-
- try:
- self.resultQueue.put((request, request.callable(*request.args, **request.kwds)))
- continue
- request.exception = True
- import traceback
- self.resultQueue.put((request, traceback.format_exc()))
- continue
-
-
-
- def dismiss(self):
- self._dismissed.set()
-
-
-
- class WorkRequest:
-
- def __init__(self, callable, args = None, kwds = None, requestID = None, callback = None, exc_callback = None):
- if requestID is None:
- self.requestID = id(self)
- else:
-
- try:
- hash(requestID)
- except TypeError:
- raise TypeError('requestID must be hashable.')
-
- self.requestID = requestID
- self.exception = False
- self.callback = callback
- self.exc_callback = exc_callback
- self.callable = callable
- if not args:
- pass
- self.args = []
- if not kwds:
- pass
- self.kwds = { }
-
-
-
- class ThreadPool:
-
- def __init__(self, num_workers, q_size = 0):
- self.requestsQueue = Queue.Queue(q_size)
- self.resultsQueue = Queue.Queue()
- self.workers = []
- self.workRequests = { }
- self.createWorkers(num_workers)
-
-
- def createWorkers(self, num_workers):
- for i in range(num_workers):
- self.workers.append(WorkerThread(self.requestsQueue, self.resultsQueue))
-
-
-
- def dismissWorkers(self, num_workers):
- for i in range(min(num_workers, len(self.workers))):
- worker = self.workers.pop()
- worker.dismiss()
-
-
-
- def putRequest(self, request, block = True, timeout = 0):
- self.requestsQueue.put(request, block, timeout)
- self.workRequests[request.requestID] = request
-
-
- def poll(self, block = False):
- while True:
- if not self.workRequests:
- raise NoResultsPending
- self.workRequests
- if block and not (self.workers):
- raise NoWorkersAvailable
- not (self.workers)
-
- try:
- (request, result) = self.resultsQueue.get(block = block)
- if request.exception and request.exc_callback:
- request.exc_callback(request, result)
-
- if request.callback:
- if request.exception:
- pass
- if not (request.exc_callback):
- request.callback(request, result)
-
- del self.workRequests[request.requestID]
- continue
- except Queue.Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Queue.Empty
-
-
- def wait(self, sleep = 0):
- while None:
-
- try:
- self.poll(True)
- time.sleep(sleep)
- continue
- except NoResultsPending:
- break
- continue
-
-
- return None
-
-
-
- def makeRequests(callable, args_list, callback = None, exc_callback = None):
- requests = []
- for item in args_list:
- if isinstance(item, tuple):
- requests.append(WorkRequest(callable, item[0], item[1], callback = callback, exc_callback = exc_callback))
- continue
- requests.append(WorkRequest(callable, [
- item], None, callback = callback, exc_callback = exc_callback))
-
- return requests
-
- if __name__ == '__main__':
- import random
- import time
-
- def do_something(data):
- time.sleep(random.randint(1, 5))
- result = round(random.random() * data, 5)
- if result > 3:
- raise RuntimeError('Something extraordinary happened!')
- result > 3
- return result
-
-
- def print_result(request, result):
- print '**Result: %s from request #%s' % (result, request.requestID)
-
-
- def handle_exception(request, exc_info):
- print 'Exception occured in request #%s: %s' % (request.requestID, exc_info[1])
-
- data = [ random.randint(1, 10) for i in range(20) ]
- requests = makeRequests(do_something, data, print_result, handle_exception)
- data = [ ((random.randint(1, 10),), { }) for i in range(20) ]
- requests.extend(makeRequests(do_something, data, print_result, handle_exception))
- main = ThreadPool(3)
- for req in requests:
- main.putRequest(req)
- print 'Work request #%s added.' % req.requestID
-
- i = 0
- while None:
-
- try:
- main.poll()
- print 'Main thread working...'
- time.sleep(0.5)
- if i == 10:
- print 'Adding 3 more worker threads...'
- main.createWorkers(3)
-
- i += 1
- continue
- except KeyboardInterrupt:
- []
- []
- []
- print 'Interrupted!'
- break
- continue
- except NoResultsPending:
- print 'All results collected.'
- break
- continue
-
- except:
- None<EXCEPTION MATCH>NoResultsPending
- return None
-
-