home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_thread.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  5.2 KB  |  169 lines

  1. import os
  2. import unittest
  3. import random
  4. from test import test_support
  5. import thread
  6. import time
  7.  
  8.  
  9. NUMTASKS = 10
  10. NUMTRIPS = 3
  11.  
  12.  
  13. _print_mutex = thread.allocate_lock()
  14.  
  15. def verbose_print(arg):
  16.     """Helper function for printing out debugging output."""
  17.     if test_support.verbose:
  18.         with _print_mutex:
  19.             print arg
  20.  
  21.  
  22. class BasicThreadTest(unittest.TestCase):
  23.  
  24.     def setUp(self):
  25.         self.done_mutex = thread.allocate_lock()
  26.         self.done_mutex.acquire()
  27.         self.running_mutex = thread.allocate_lock()
  28.         self.random_mutex = thread.allocate_lock()
  29.         self.running = 0
  30.         self.next_ident = 0
  31.  
  32.  
  33. class ThreadRunningTests(BasicThreadTest):
  34.  
  35.     def newtask(self):
  36.         with self.running_mutex:
  37.             self.next_ident += 1
  38.             verbose_print("creating task %s" % self.next_ident)
  39.             thread.start_new_thread(self.task, (self.next_ident,))
  40.             self.running += 1
  41.  
  42.     def task(self, ident):
  43.         with self.random_mutex:
  44.             delay = random.random() / 10000.0
  45.         verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
  46.         time.sleep(delay)
  47.         verbose_print("task %s done" % ident)
  48.         with self.running_mutex:
  49.             self.running -= 1
  50.             if self.running == 0:
  51.                 self.done_mutex.release()
  52.  
  53.     def test_starting_threads(self):
  54.         # Basic test for thread creation.
  55.         for i in range(NUMTASKS):
  56.             self.newtask()
  57.         verbose_print("waiting for tasks to complete...")
  58.         self.done_mutex.acquire()
  59.         verbose_print("all tasks done")
  60.  
  61.     def test_stack_size(self):
  62.         # Various stack size tests.
  63.         self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")
  64.  
  65.         thread.stack_size(0)
  66.         self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")
  67.  
  68.         if os.name not in ("nt", "os2", "posix"):
  69.             return
  70.  
  71.         tss_supported = True
  72.         try:
  73.             thread.stack_size(4096)
  74.         except ValueError:
  75.             verbose_print("caught expected ValueError setting "
  76.                             "stack_size(4096)")
  77.         except thread.error:
  78.             tss_supported = False
  79.             verbose_print("platform does not support changing thread stack "
  80.                             "size")
  81.  
  82.         if tss_supported:
  83.             fail_msg = "stack_size(%d) failed - should succeed"
  84.             for tss in (262144, 0x100000, 0):
  85.                 thread.stack_size(tss)
  86.                 self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
  87.                 verbose_print("successfully set stack_size(%d)" % tss)
  88.  
  89.             for tss in (262144, 0x100000):
  90.                 verbose_print("trying stack_size = (%d)" % tss)
  91.                 self.next_ident = 0
  92.                 for i in range(NUMTASKS):
  93.                     self.newtask()
  94.  
  95.                 verbose_print("waiting for all tasks to complete")
  96.                 self.done_mutex.acquire()
  97.                 verbose_print("all tasks done")
  98.  
  99.             thread.stack_size(0)
  100.  
  101.  
  102. class Barrier:
  103.     def __init__(self, num_threads):
  104.         self.num_threads = num_threads
  105.         self.waiting = 0
  106.         self.checkin_mutex  = thread.allocate_lock()
  107.         self.checkout_mutex = thread.allocate_lock()
  108.         self.checkout_mutex.acquire()
  109.  
  110.     def enter(self):
  111.         self.checkin_mutex.acquire()
  112.         self.waiting = self.waiting + 1
  113.         if self.waiting == self.num_threads:
  114.             self.waiting = self.num_threads - 1
  115.             self.checkout_mutex.release()
  116.             return
  117.         self.checkin_mutex.release()
  118.  
  119.         self.checkout_mutex.acquire()
  120.         self.waiting = self.waiting - 1
  121.         if self.waiting == 0:
  122.             self.checkin_mutex.release()
  123.             return
  124.         self.checkout_mutex.release()
  125.  
  126.  
  127. class BarrierTest(BasicThreadTest):
  128.  
  129.     def test_barrier(self):
  130.         self.bar = Barrier(NUMTASKS)
  131.         self.running = NUMTASKS
  132.         for i in range(NUMTASKS):
  133.             thread.start_new_thread(self.task2, (i,))
  134.         verbose_print("waiting for tasks to end")
  135.         self.done_mutex.acquire()
  136.         verbose_print("tasks done")
  137.  
  138.     def task2(self, ident):
  139.         for i in range(NUMTRIPS):
  140.             if ident == 0:
  141.                 # give it a good chance to enter the next
  142.                 # barrier before the others are all out
  143.                 # of the current one
  144.                 delay = 0
  145.             else:
  146.                 with self.random_mutex:
  147.                     delay = random.random() / 10000.0
  148.             verbose_print("task %s will run for %sus" %
  149.                           (ident, round(delay * 1e6)))
  150.             time.sleep(delay)
  151.             verbose_print("task %s entering %s" % (ident, i))
  152.             self.bar.enter()
  153.             verbose_print("task %s leaving barrier" % ident)
  154.         with self.running_mutex:
  155.             self.running -= 1
  156.             # Must release mutex before releasing done, else the main thread can
  157.             # exit and set mutex to None as part of global teardown; then
  158.             # mutex.release() raises AttributeError.
  159.             finished = self.running == 0
  160.         if finished:
  161.             self.done_mutex.release()
  162.  
  163.  
  164. def test_main():
  165.     test_support.run_unittest(ThreadRunningTests, BarrierTest)
  166.  
  167. if __name__ == "__main__":
  168.     test_main()
  169.