home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_dummy_threading.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  1.8 KB  |  64 lines

  1. from test import test_support
  2. import unittest
  3. import dummy_threading as _threading
  4. import time
  5.  
  6. class DummyThreadingTestCase(unittest.TestCase):
  7.  
  8.     class TestThread(_threading.Thread):
  9.  
  10.         def run(self):
  11.             global running
  12.             global sema
  13.             global mutex
  14.             # Uncomment if testing another module, such as the real 'threading'
  15.             # module.
  16.             #delay = random.random() * 2
  17.             delay = 0
  18.             if test_support.verbose:
  19.                 print 'task', self.name, 'will run for', delay, 'sec'
  20.             sema.acquire()
  21.             mutex.acquire()
  22.             running += 1
  23.             if test_support.verbose:
  24.                 print running, 'tasks are running'
  25.             mutex.release()
  26.             time.sleep(delay)
  27.             if test_support.verbose:
  28.                 print 'task', self.name, 'done'
  29.             mutex.acquire()
  30.             running -= 1
  31.             if test_support.verbose:
  32.                 print self.name, 'is finished.', running, 'tasks are running'
  33.             mutex.release()
  34.             sema.release()
  35.  
  36.     def setUp(self):
  37.         self.numtasks = 10
  38.         global sema
  39.         sema = _threading.BoundedSemaphore(value=3)
  40.         global mutex
  41.         mutex = _threading.RLock()
  42.         global running
  43.         running = 0
  44.         self.threads = []
  45.  
  46.     def test_tasks(self):
  47.         for i in range(self.numtasks):
  48.             t = self.TestThread(name="<thread %d>"%i)
  49.             self.threads.append(t)
  50.             t.start()
  51.  
  52.         if test_support.verbose:
  53.             print 'waiting for all tasks to complete'
  54.         for t in self.threads:
  55.             t.join()
  56.         if test_support.verbose:
  57.             print 'all tasks done'
  58.  
  59. def test_main():
  60.     test_support.run_unittest(DummyThreadingTestCase)
  61.  
  62. if __name__ == '__main__':
  63.     test_main()
  64.