home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_queue.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  11.5 KB  |  322 lines

  1. # Some simple queue module tests, plus some failure conditions
  2. # to ensure the Queue locks remain stable.
  3. import Queue
  4. import sys
  5. import threading
  6. import time
  7. import unittest
  8. from test import test_support
  9.  
  10. QUEUE_SIZE = 5
  11.  
  12. # A thread to run a function that unclogs a blocked Queue.
  13. class _TriggerThread(threading.Thread):
  14.     def __init__(self, fn, args):
  15.         self.fn = fn
  16.         self.args = args
  17.         self.startedEvent = threading.Event()
  18.         threading.Thread.__init__(self)
  19.  
  20.     def run(self):
  21.         # The sleep isn't necessary, but is intended to give the blocking
  22.         # function in the main thread a chance at actually blocking before
  23.         # we unclog it.  But if the sleep is longer than the timeout-based
  24.         # tests wait in their blocking functions, those tests will fail.
  25.         # So we give them much longer timeout values compared to the
  26.         # sleep here (I aimed at 10 seconds for blocking functions --
  27.         # they should never actually wait that long - they should make
  28.         # progress as soon as we call self.fn()).
  29.         time.sleep(0.1)
  30.         self.startedEvent.set()
  31.         self.fn(*self.args)
  32.  
  33.  
  34. # Execute a function that blocks, and in a separate thread, a function that
  35. # triggers the release.  Returns the result of the blocking function.  Caution:
  36. # block_func must guarantee to block until trigger_func is called, and
  37. # trigger_func must guarantee to change queue state so that block_func can make
  38. # enough progress to return.  In particular, a block_func that just raises an
  39. # exception regardless of whether trigger_func is called will lead to
  40. # timing-dependent sporadic failures, and one of those went rarely seen but
  41. # undiagnosed for years.  Now block_func must be unexceptional.  If block_func
  42. # is supposed to raise an exception, call do_exceptional_blocking_test()
  43. # instead.
  44.  
  45. class BlockingTestMixin:
  46.  
  47.     def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
  48.         self.t = _TriggerThread(trigger_func, trigger_args)
  49.         self.t.start()
  50.         self.result = block_func(*block_args)
  51.         # If block_func returned before our thread made the call, we failed!
  52.         if not self.t.startedEvent.is_set():
  53.             self.fail("blocking function '%r' appeared not to block" %
  54.                       block_func)
  55.         self.t.join(10) # make sure the thread terminates
  56.         if self.t.is_alive():
  57.             self.fail("trigger function '%r' appeared to not return" %
  58.                       trigger_func)
  59.         return self.result
  60.  
  61.     # Call this instead if block_func is supposed to raise an exception.
  62.     def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
  63.                                    trigger_args, expected_exception_class):
  64.         self.t = _TriggerThread(trigger_func, trigger_args)
  65.         self.t.start()
  66.         try:
  67.             try:
  68.                 block_func(*block_args)
  69.             except expected_exception_class:
  70.                 raise
  71.             else:
  72.                 self.fail("expected exception of kind %r" %
  73.                                  expected_exception_class)
  74.         finally:
  75.             self.t.join(10) # make sure the thread terminates
  76.             if self.t.is_alive():
  77.                 self.fail("trigger function '%r' appeared to not return" %
  78.                                  trigger_func)
  79.             if not self.t.startedEvent.is_set():
  80.                 self.fail("trigger thread ended but event never set")
  81.  
  82.  
  83. class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
  84.     def setUp(self):
  85.         self.cum = 0
  86.         self.cumlock = threading.Lock()
  87.  
  88.     def simple_queue_test(self, q):
  89.         if not q.empty():
  90.             raise RuntimeError, "Call this function with an empty queue"
  91.         # I guess we better check things actually queue correctly a little :)
  92.         q.put(111)
  93.         q.put(333)
  94.         q.put(222)
  95.         target_order = dict(Queue = [111, 333, 222],
  96.                             LifoQueue = [222, 333, 111],
  97.                             PriorityQueue = [111, 222, 333])
  98.         actual_order = [q.get(), q.get(), q.get()]
  99.         self.assertEquals(actual_order, target_order[q.__class__.__name__],
  100.                           "Didn't seem to queue the correct data!")
  101.         for i in range(QUEUE_SIZE-1):
  102.             q.put(i)
  103.             self.assert_(not q.empty(), "Queue should not be empty")
  104.         self.assert_(not q.full(), "Queue should not be full")
  105.         q.put("last")
  106.         self.assert_(q.full(), "Queue should be full")
  107.         try:
  108.             q.put("full", block=0)
  109.             self.fail("Didn't appear to block with a full queue")
  110.         except Queue.Full:
  111.             pass
  112.         try:
  113.             q.put("full", timeout=0.01)
  114.             self.fail("Didn't appear to time-out with a full queue")
  115.         except Queue.Full:
  116.             pass
  117.         # Test a blocking put
  118.         self.do_blocking_test(q.put, ("full",), q.get, ())
  119.         self.do_blocking_test(q.put, ("full", True, 10), q.get, ())
  120.         # Empty it
  121.         for i in range(QUEUE_SIZE):
  122.             q.get()
  123.         self.assert_(q.empty(), "Queue should be empty")
  124.         try:
  125.             q.get(block=0)
  126.             self.fail("Didn't appear to block with an empty queue")
  127.         except Queue.Empty:
  128.             pass
  129.         try:
  130.             q.get(timeout=0.01)
  131.             self.fail("Didn't appear to time-out with an empty queue")
  132.         except Queue.Empty:
  133.             pass
  134.         # Test a blocking get
  135.         self.do_blocking_test(q.get, (), q.put, ('empty',))
  136.         self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
  137.  
  138.  
  139.     def worker(self, q):
  140.         while True:
  141.             x = q.get()
  142.             if x is None:
  143.                 q.task_done()
  144.                 return
  145.             with self.cumlock:
  146.                 self.cum += x
  147.             q.task_done()
  148.  
  149.     def queue_join_test(self, q):
  150.         self.cum = 0
  151.         for i in (0,1):
  152.             threading.Thread(target=self.worker, args=(q,)).start()
  153.         for i in xrange(100):
  154.             q.put(i)
  155.         q.join()
  156.         self.assertEquals(self.cum, sum(range(100)),
  157.                           "q.join() did not block until all tasks were done")
  158.         for i in (0,1):
  159.             q.put(None)         # instruct the threads to close
  160.         q.join()                # verify that you can join twice
  161.  
  162.     def test_queue_task_done(self):
  163.         # Test to make sure a queue task completed successfully.
  164.         q = self.type2test()
  165.         try:
  166.             q.task_done()
  167.         except ValueError:
  168.             pass
  169.         else:
  170.             self.fail("Did not detect task count going negative")
  171.  
  172.     def test_queue_join(self):
  173.         # Test that a queue join()s successfully, and before anything else
  174.         # (done twice for insurance).
  175.         q = self.type2test()
  176.         self.queue_join_test(q)
  177.         self.queue_join_test(q)
  178.         try:
  179.             q.task_done()
  180.         except ValueError:
  181.             pass
  182.         else:
  183.             self.fail("Did not detect task count going negative")
  184.  
  185.     def test_simple_queue(self):
  186.         # Do it a couple of times on the same queue.
  187.         # Done twice to make sure works with same instance reused.
  188.         q = self.type2test(QUEUE_SIZE)
  189.         self.simple_queue_test(q)
  190.         self.simple_queue_test(q)
  191.  
  192.  
  193. class QueueTest(BaseQueueTest):
  194.     type2test = Queue.Queue
  195.  
  196. class LifoQueueTest(BaseQueueTest):
  197.     type2test = Queue.LifoQueue
  198.  
  199. class PriorityQueueTest(BaseQueueTest):
  200.     type2test = Queue.PriorityQueue
  201.  
  202.  
  203.  
  204. # A Queue subclass that can provoke failure at a moment's notice :)
  205. class FailingQueueException(Exception):
  206.     pass
  207.  
  208. class FailingQueue(Queue.Queue):
  209.     def __init__(self, *args):
  210.         self.fail_next_put = False
  211.         self.fail_next_get = False
  212.         Queue.Queue.__init__(self, *args)
  213.     def _put(self, item):
  214.         if self.fail_next_put:
  215.             self.fail_next_put = False
  216.             raise FailingQueueException, "You Lose"
  217.         return Queue.Queue._put(self, item)
  218.     def _get(self):
  219.         if self.fail_next_get:
  220.             self.fail_next_get = False
  221.             raise FailingQueueException, "You Lose"
  222.         return Queue.Queue._get(self)
  223.  
  224. class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
  225.  
  226.     def failing_queue_test(self, q):
  227.         if not q.empty():
  228.             raise RuntimeError, "Call this function with an empty queue"
  229.         for i in range(QUEUE_SIZE-1):
  230.             q.put(i)
  231.         # Test a failing non-blocking put.
  232.         q.fail_next_put = True
  233.         try:
  234.             q.put("oops", block=0)
  235.             self.fail("The queue didn't fail when it should have")
  236.         except FailingQueueException:
  237.             pass
  238.         q.fail_next_put = True
  239.         try:
  240.             q.put("oops", timeout=0.1)
  241.             self.fail("The queue didn't fail when it should have")
  242.         except FailingQueueException:
  243.             pass
  244.         q.put("last")
  245.         self.assert_(q.full(), "Queue should be full")
  246.         # Test a failing blocking put
  247.         q.fail_next_put = True
  248.         try:
  249.             self.do_blocking_test(q.put, ("full",), q.get, ())
  250.             self.fail("The queue didn't fail when it should have")
  251.         except FailingQueueException:
  252.             pass
  253.         # Check the Queue isn't damaged.
  254.         # put failed, but get succeeded - re-add
  255.         q.put("last")
  256.         # Test a failing timeout put
  257.         q.fail_next_put = True
  258.         try:
  259.             self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
  260.                                               FailingQueueException)
  261.             self.fail("The queue didn't fail when it should have")
  262.         except FailingQueueException:
  263.             pass
  264.         # Check the Queue isn't damaged.
  265.         # put failed, but get succeeded - re-add
  266.         q.put("last")
  267.         self.assert_(q.full(), "Queue should be full")
  268.         q.get()
  269.         self.assert_(not q.full(), "Queue should not be full")
  270.         q.put("last")
  271.         self.assert_(q.full(), "Queue should be full")
  272.         # Test a blocking put
  273.         self.do_blocking_test(q.put, ("full",), q.get, ())
  274.         # Empty it
  275.         for i in range(QUEUE_SIZE):
  276.             q.get()
  277.         self.assert_(q.empty(), "Queue should be empty")
  278.         q.put("first")
  279.         q.fail_next_get = True
  280.         try:
  281.             q.get()
  282.             self.fail("The queue didn't fail when it should have")
  283.         except FailingQueueException:
  284.             pass
  285.         self.assert_(not q.empty(), "Queue should not be empty")
  286.         q.fail_next_get = True
  287.         try:
  288.             q.get(timeout=0.1)
  289.             self.fail("The queue didn't fail when it should have")
  290.         except FailingQueueException:
  291.             pass
  292.         self.assert_(not q.empty(), "Queue should not be empty")
  293.         q.get()
  294.         self.assert_(q.empty(), "Queue should be empty")
  295.         q.fail_next_get = True
  296.         try:
  297.             self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
  298.                                               FailingQueueException)
  299.             self.fail("The queue didn't fail when it should have")
  300.         except FailingQueueException:
  301.             pass
  302.         # put succeeded, but get failed.
  303.         self.assert_(not q.empty(), "Queue should not be empty")
  304.         q.get()
  305.         self.assert_(q.empty(), "Queue should be empty")
  306.  
  307.     def test_failing_queue(self):
  308.         # Test to make sure a queue is functioning correctly.
  309.         # Done twice to the same instance.
  310.         q = FailingQueue(QUEUE_SIZE)
  311.         self.failing_queue_test(q)
  312.         self.failing_queue_test(q)
  313.  
  314.  
  315. def test_main():
  316.     test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
  317.                               FailingQueueTest)
  318.  
  319.  
  320. if __name__ == "__main__":
  321.     test_main()
  322.