home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
fnb101.zip
/
Lib
/
site-packages
/
Fnorb
/
orb
/
ThreadPoolQueue.py
< prev
next >
Wrap
Text File
|
1999-06-28
|
3KB
|
133 lines
#!/usr/bin/env python
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
# All Rights Reserved.
#
# The software contained on this media is the property of the
# DSTC Pty Ltd. Use of this software is strictly in accordance
# with the license agreement in the accompanying LICENSE.DOC file.
# If your distribution of this software does not contain a
# LICENSE.DOC file then you have no rights to use this software
# in any manner and should contact DSTC at the address below
# to determine an appropriate licensing arrangement.
#
# DSTC Pty Ltd
# Level 7, Gehrmann Labs
# University of Queensland
# St Lucia, 4072
# Australia
# Tel: +61 7 3365 4310
# Fax: +61 7 3365 4311
# Email: enquiries@dstc.edu.au
#
# This software is being provided "AS IS" without warranty of
# any kind. In no event shall DSTC Pty Ltd be liable for
# damage of any kind arising out of or in connection with
# the use or performance of this software.
#
# Project: Distributed Environment
# File: $Source: /units/arch/src/Fnorb/orb/RCS/ThreadPoolQueue.py,v $
#
#############################################################################
""" A queue serviced by a thread pool. """
# Standard/built-in modules.
import thread
# Fnorb modules.
import condvar
class ThreadPoolQueue:
""" A queue serviced by a thread pool. """
def __init__(self, size, function):
""" Constructor. """
self.__size = size
self.__function = function
self.__stopped = 0
self.__data = []
self.__cv = condvar.condvar()
return
def start(self):
""" Start servicing the queue. """
# Start the appropriate number of worker threads.
for i in range(self.__size):
thread.start_new_thread(self.worker_thread, (i,))
return
def stop(self):
""" Stop servicing the queue. """
self.__cv.acquire()
self.__stopped = 1
self.__cv.broadcast()
return
def wait(self):
""" Wait until all of the worker threads have finished. """
self.__cv.acquire()
while self.__size > 0:
self.__cv.wait()
self.__cv.release()
return
def add_item(self, item):
""" Add a single item to the queue. """
self.__cv.acquire()
self.__data.append(item)
self.__cv.signal()
return
def add_items(self, items):
""" Add a list of items to the queue. """
self.__cv.acquire()
self.__data[len(self.__data):] = items
self.__cv.broadcast()
return
def worker_thread(self, i):
""" The worker!"""
self.__cv.acquire()
while not self.__stopped:
# Is there an item on the queue for me to deal with?
if len(self.__data) > 0:
item = self.__data[0]
del self.__data[0]
self.__cv.release()
# Do the work!
apply(self.__function, item)
# Acquire the lock so that I can check to see if I am stopped
# or if there is some more work for me to do.
self.__cv.acquire()
# Otherwise, we are not stopped, and there is nothing for me to
# do, so I'll just wait around a while...
else:
self.__cv.wait()
# The thread pool has been stopped so let's get outta here.
self.__size = self.__size - 1
self.__cv.signal()
# Explicitly exit the thread!
thread.exit()
#############################################################################