home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / fnb101.zip / Lib / site-packages / Fnorb / orb / ThreadPoolQueue.py < prev    next >
Text File  |  1999-06-28  |  3KB  |  133 lines

  1. #!/usr/bin/env python
  2. #############################################################################
  3. # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
  4. # All Rights Reserved.
  5. #
  6. # The software contained on this media is the property of the
  7. # DSTC Pty Ltd.  Use of this software is strictly in accordance
  8. # with the license agreement in the accompanying LICENSE.DOC file.
  9. # If your distribution of this software does not contain a
  10. # LICENSE.DOC file then you have no rights to use this software
  11. # in any manner and should contact DSTC at the address below
  12. # to determine an appropriate licensing arrangement.
  13. #      DSTC Pty Ltd
  14. #      Level 7, Gehrmann Labs
  15. #      University of Queensland
  16. #      St Lucia, 4072
  17. #      Australia
  18. #      Tel: +61 7 3365 4310
  19. #      Fax: +61 7 3365 4311
  20. #      Email: enquiries@dstc.edu.au
  21. # This software is being provided "AS IS" without warranty of
  22. # any kind.  In no event shall DSTC Pty Ltd be liable for
  23. # damage of any kind arising out of or in connection with
  24. # the use or performance of this software.
  25. #
  26. # Project:      Distributed Environment
  27. # File:         $Source: /units/arch/src/Fnorb/orb/RCS/ThreadPoolQueue.py,v $
  28. #
  29. #############################################################################
  30. """ A queue serviced by a thread pool. """
  31.  
  32.  
  33. # Standard/built-in modules.
  34. import thread
  35.  
  36. # Fnorb modules.
  37. import condvar
  38.  
  39.  
  40. class ThreadPoolQueue:
  41.     """ A queue serviced by a thread pool. """
  42.  
  43.     def __init__(self, size, function):
  44.     """ Constructor. """
  45.  
  46.     self.__size = size
  47.     self.__function = function
  48.     self.__stopped = 0
  49.     self.__data = []
  50.     self.__cv = condvar.condvar()
  51.  
  52.     return
  53.  
  54.     def start(self):
  55.     """ Start servicing the queue. """
  56.  
  57.     # Start the appropriate number of worker threads.
  58.     for i in range(self.__size):
  59.         thread.start_new_thread(self.worker_thread, (i,))
  60.  
  61.     return
  62.  
  63.     def stop(self):
  64.     """ Stop servicing the queue. """
  65.  
  66.     self.__cv.acquire()
  67.     self.__stopped = 1
  68.     self.__cv.broadcast()
  69.  
  70.     return
  71.  
  72.     def wait(self):
  73.     """ Wait until all of the worker threads have finished. """
  74.  
  75.     self.__cv.acquire()
  76.     while self.__size > 0:
  77.         self.__cv.wait()
  78.     self.__cv.release()
  79.  
  80.     return
  81.     
  82.     def add_item(self, item):
  83.     """ Add a single item to the queue. """
  84.  
  85.     self.__cv.acquire()
  86.     self.__data.append(item)
  87.     self.__cv.signal()
  88.  
  89.     return
  90.  
  91.     def add_items(self, items):
  92.     """ Add a list of items to the queue. """
  93.  
  94.     self.__cv.acquire()
  95.     self.__data[len(self.__data):] = items
  96.     self.__cv.broadcast()
  97.  
  98.     return
  99.         
  100.     def worker_thread(self, i):
  101.     """ The worker!"""
  102.  
  103.     self.__cv.acquire()
  104.     while not self.__stopped:
  105.         # Is there an item on the queue for me to deal with?
  106.         if len(self.__data) > 0:
  107.         item = self.__data[0]
  108.         del self.__data[0]
  109.         self.__cv.release()
  110.  
  111.         # Do the work!
  112.         apply(self.__function, item)
  113.  
  114.         # Acquire the lock so that I can check to see if I am stopped
  115.         # or if there is some more work for me to do.
  116.         self.__cv.acquire()
  117.  
  118.         # Otherwise, we are not stopped, and there is nothing for me to
  119.         # do, so I'll just wait around a while...
  120.         else:
  121.         self.__cv.wait()
  122.  
  123.     # The thread pool has been stopped so let's get outta here.
  124.     self.__size = self.__size - 1
  125.     self.__cv.signal()
  126.  
  127.     # Explicitly exit the thread!
  128.     thread.exit()
  129.  
  130. #############################################################################
  131.