home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_threading.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  15.7 KB  |  455 lines

  1. # Very rudimentary test of threading module
  2.  
  3. import test.test_support
  4. from test.test_support import verbose
  5. import random
  6. import re
  7. import sys
  8. import threading
  9. import thread
  10. import time
  11. import unittest
  12. import weakref
  13.  
  14. # A trivial mutable counter.
  15. class Counter(object):
  16.     def __init__(self):
  17.         self.value = 0
  18.     def inc(self):
  19.         self.value += 1
  20.     def dec(self):
  21.         self.value -= 1
  22.     def get(self):
  23.         return self.value
  24.  
  25. class TestThread(threading.Thread):
  26.     def __init__(self, name, testcase, sema, mutex, nrunning):
  27.         threading.Thread.__init__(self, name=name)
  28.         self.testcase = testcase
  29.         self.sema = sema
  30.         self.mutex = mutex
  31.         self.nrunning = nrunning
  32.  
  33.     def run(self):
  34.         delay = random.random() / 10000.0
  35.         if verbose:
  36.             print 'task %s will run for %.1f usec' % (
  37.                 self.name, delay * 1e6)
  38.  
  39.         with self.sema:
  40.             with self.mutex:
  41.                 self.nrunning.inc()
  42.                 if verbose:
  43.                     print self.nrunning.get(), 'tasks are running'
  44.                 self.testcase.assert_(self.nrunning.get() <= 3)
  45.  
  46.             time.sleep(delay)
  47.             if verbose:
  48.                 print 'task', self.name, 'done'
  49.  
  50.             with self.mutex:
  51.                 self.nrunning.dec()
  52.                 self.testcase.assert_(self.nrunning.get() >= 0)
  53.                 if verbose:
  54.                     print '%s is finished. %d tasks are running' % (
  55.                         self.name, self.nrunning.get())
  56.  
  57. class ThreadTests(unittest.TestCase):
  58.  
  59.     # Create a bunch of threads, let each do some work, wait until all are
  60.     # done.
  61.     def test_various_ops(self):
  62.         # This takes about n/3 seconds to run (about n/3 clumps of tasks,
  63.         # times about 1 second per clump).
  64.         NUMTASKS = 10
  65.  
  66.         # no more than 3 of the 10 can run at once
  67.         sema = threading.BoundedSemaphore(value=3)
  68.         mutex = threading.RLock()
  69.         numrunning = Counter()
  70.  
  71.         threads = []
  72.  
  73.         for i in range(NUMTASKS):
  74.             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
  75.             threads.append(t)
  76.             self.failUnlessEqual(t.ident, None)
  77.             self.assert_(re.match('<TestThread\(.*, initial\)>', repr(t)))
  78.             t.start()
  79.  
  80.         if verbose:
  81.             print 'waiting for all tasks to complete'
  82.         for t in threads:
  83.             t.join(NUMTASKS)
  84.             self.assert_(not t.is_alive())
  85.             self.failIfEqual(t.ident, 0)
  86.             self.assert_(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
  87.         if verbose:
  88.             print 'all tasks done'
  89.         self.assertEqual(numrunning.get(), 0)
  90.  
  91.     # run with a small(ish) thread stack size (256kB)
  92.     def test_various_ops_small_stack(self):
  93.         if verbose:
  94.             print 'with 256kB thread stack size...'
  95.         try:
  96.             threading.stack_size(262144)
  97.         except thread.error:
  98.             if verbose:
  99.                 print 'platform does not support changing thread stack size'
  100.             return
  101.         self.test_various_ops()
  102.         threading.stack_size(0)
  103.  
  104.     # run with a large thread stack size (1MB)
  105.     def test_various_ops_large_stack(self):
  106.         if verbose:
  107.             print 'with 1MB thread stack size...'
  108.         try:
  109.             threading.stack_size(0x100000)
  110.         except thread.error:
  111.             if verbose:
  112.                 print 'platform does not support changing thread stack size'
  113.             return
  114.         self.test_various_ops()
  115.         threading.stack_size(0)
  116.  
  117.     def test_foreign_thread(self):
  118.         # Check that a "foreign" thread can use the threading module.
  119.         def f(mutex):
  120.             # Acquiring an RLock forces an entry for the foreign
  121.             # thread to get made in the threading._active map.
  122.             r = threading.RLock()
  123.             r.acquire()
  124.             r.release()
  125.             mutex.release()
  126.  
  127.         mutex = threading.Lock()
  128.         mutex.acquire()
  129.         tid = thread.start_new_thread(f, (mutex,))
  130.         # Wait for the thread to finish.
  131.         mutex.acquire()
  132.         self.assert_(tid in threading._active)
  133.         self.assert_(isinstance(threading._active[tid],
  134.                                 threading._DummyThread))
  135.         del threading._active[tid]
  136.  
  137.     # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
  138.     # exposed at the Python level.  This test relies on ctypes to get at it.
  139.     def test_PyThreadState_SetAsyncExc(self):
  140.         try:
  141.             import ctypes
  142.         except ImportError:
  143.             if verbose:
  144.                 print "test_PyThreadState_SetAsyncExc can't import ctypes"
  145.             return  # can't do anything
  146.  
  147.         set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
  148.  
  149.         class AsyncExc(Exception):
  150.             pass
  151.  
  152.         exception = ctypes.py_object(AsyncExc)
  153.  
  154.         # `worker_started` is set by the thread when it's inside a try/except
  155.         # block waiting to catch the asynchronously set AsyncExc exception.
  156.         # `worker_saw_exception` is set by the thread upon catching that
  157.         # exception.
  158.         worker_started = threading.Event()
  159.         worker_saw_exception = threading.Event()
  160.  
  161.         class Worker(threading.Thread):
  162.             def run(self):
  163.                 self.id = thread.get_ident()
  164.                 self.finished = False
  165.  
  166.                 try:
  167.                     while True:
  168.                         worker_started.set()
  169.                         time.sleep(0.1)
  170.                 except AsyncExc:
  171.                     self.finished = True
  172.                     worker_saw_exception.set()
  173.  
  174.         t = Worker()
  175.         t.daemon = True # so if this fails, we don't hang Python at shutdown
  176.         t.start()
  177.         if verbose:
  178.             print "    started worker thread"
  179.  
  180.         # Try a thread id that doesn't make sense.
  181.         if verbose:
  182.             print "    trying nonsensical thread id"
  183.         result = set_async_exc(ctypes.c_long(-1), exception)
  184.         self.assertEqual(result, 0)  # no thread states modified
  185.  
  186.         # Now raise an exception in the worker thread.
  187.         if verbose:
  188.             print "    waiting for worker thread to get started"
  189.         worker_started.wait()
  190.         if verbose:
  191.             print "    verifying worker hasn't exited"
  192.         self.assert_(not t.finished)
  193.         if verbose:
  194.             print "    attempting to raise asynch exception in worker"
  195.         result = set_async_exc(ctypes.c_long(t.id), exception)
  196.         self.assertEqual(result, 1) # one thread state modified
  197.         if verbose:
  198.             print "    waiting for worker to say it caught the exception"
  199.         worker_saw_exception.wait(timeout=10)
  200.         self.assert_(t.finished)
  201.         if verbose:
  202.             print "    all OK -- joining worker"
  203.         if t.finished:
  204.             t.join()
  205.         # else the thread is still running, and we have no way to kill it
  206.  
  207.     def test_finalize_runnning_thread(self):
  208.         # Issue 1402: the PyGILState_Ensure / _Release functions may be called
  209.         # very late on python exit: on deallocation of a running thread for
  210.         # example.
  211.         try:
  212.             import ctypes
  213.         except ImportError:
  214.             if verbose:
  215.                 print("test_finalize_with_runnning_thread can't import ctypes")
  216.             return  # can't do anything
  217.  
  218.         import subprocess
  219.         rc = subprocess.call([sys.executable, "-c", """if 1:
  220.             import ctypes, sys, time, thread
  221.  
  222.             # This lock is used as a simple event variable.
  223.             ready = thread.allocate_lock()
  224.             ready.acquire()
  225.  
  226.             # Module globals are cleared before __del__ is run
  227.             # So we save the functions in class dict
  228.             class C:
  229.                 ensure = ctypes.pythonapi.PyGILState_Ensure
  230.                 release = ctypes.pythonapi.PyGILState_Release
  231.                 def __del__(self):
  232.                     state = self.ensure()
  233.                     self.release(state)
  234.  
  235.             def waitingThread():
  236.                 x = C()
  237.                 ready.release()
  238.                 time.sleep(100)
  239.  
  240.             thread.start_new_thread(waitingThread, ())
  241.             ready.acquire()  # Be sure the other thread is waiting.
  242.             sys.exit(42)
  243.             """])
  244.         self.assertEqual(rc, 42)
  245.  
  246.     def test_finalize_with_trace(self):
  247.         # Issue1733757
  248.         # Avoid a deadlock when sys.settrace steps into threading._shutdown
  249.         import subprocess
  250.         rc = subprocess.call([sys.executable, "-c", """if 1:
  251.             import sys, threading
  252.  
  253.             # A deadlock-killer, to prevent the
  254.             # testsuite to hang forever
  255.             def killer():
  256.                 import os, time
  257.                 time.sleep(2)
  258.                 print 'program blocked; aborting'
  259.                 os._exit(2)
  260.             t = threading.Thread(target=killer)
  261.             t.daemon = True
  262.             t.start()
  263.  
  264.             # This is the trace function
  265.             def func(frame, event, arg):
  266.                 threading.current_thread()
  267.                 return func
  268.  
  269.             sys.settrace(func)
  270.             """])
  271.         self.failIf(rc == 2, "interpreted was blocked")
  272.         self.failUnless(rc == 0, "Unexpected error")
  273.  
  274.  
  275.     def test_enumerate_after_join(self):
  276.         # Try hard to trigger #1703448: a thread is still returned in
  277.         # threading.enumerate() after it has been join()ed.
  278.         enum = threading.enumerate
  279.         old_interval = sys.getcheckinterval()
  280.         try:
  281.             for i in xrange(1, 100):
  282.                 # Try a couple times at each thread-switching interval
  283.                 # to get more interleavings.
  284.                 sys.setcheckinterval(i // 5)
  285.                 t = threading.Thread(target=lambda: None)
  286.                 t.start()
  287.                 t.join()
  288.                 l = enum()
  289.                 self.assertFalse(t in l,
  290.                     "#1703448 triggered after %d trials: %s" % (i, l))
  291.         finally:
  292.             sys.setcheckinterval(old_interval)
  293.  
  294.     def test_no_refcycle_through_target(self):
  295.         class RunSelfFunction(object):
  296.             def __init__(self, should_raise):
  297.                 # The links in this refcycle from Thread back to self
  298.                 # should be cleaned up when the thread completes.
  299.                 self.should_raise = should_raise
  300.                 self.thread = threading.Thread(target=self._run,
  301.                                                args=(self,),
  302.                                                kwargs={'yet_another':self})
  303.                 self.thread.start()
  304.  
  305.             def _run(self, other_ref, yet_another):
  306.                 if self.should_raise:
  307.                     raise SystemExit
  308.  
  309.         cyclic_object = RunSelfFunction(should_raise=False)
  310.         weak_cyclic_object = weakref.ref(cyclic_object)
  311.         cyclic_object.thread.join()
  312.         del cyclic_object
  313.         self.assertEquals(None, weak_cyclic_object(),
  314.                           msg=('%d references still around' %
  315.                                sys.getrefcount(weak_cyclic_object())))
  316.  
  317.         raising_cyclic_object = RunSelfFunction(should_raise=True)
  318.         weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
  319.         raising_cyclic_object.thread.join()
  320.         del raising_cyclic_object
  321.         self.assertEquals(None, weak_raising_cyclic_object(),
  322.                           msg=('%d references still around' %
  323.                                sys.getrefcount(weak_raising_cyclic_object())))
  324.  
  325.  
  326. class ThreadJoinOnShutdown(unittest.TestCase):
  327.  
  328.     def _run_and_join(self, script):
  329.         script = """if 1:
  330.             import sys, os, time, threading
  331.  
  332.             # a thread, which waits for the main program to terminate
  333.             def joiningfunc(mainthread):
  334.                 mainthread.join()
  335.                 print 'end of thread'
  336.         \n""" + script
  337.  
  338.         import subprocess
  339.         p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
  340.         rc = p.wait()
  341.         data = p.stdout.read().replace('\r', '')
  342.         self.assertEqual(data, "end of main\nend of thread\n")
  343.         self.failIf(rc == 2, "interpreter was blocked")
  344.         self.failUnless(rc == 0, "Unexpected error")
  345.  
  346.     def test_1_join_on_shutdown(self):
  347.         # The usual case: on exit, wait for a non-daemon thread
  348.         script = """if 1:
  349.             import os
  350.             t = threading.Thread(target=joiningfunc,
  351.                                  args=(threading.current_thread(),))
  352.             t.start()
  353.             time.sleep(0.1)
  354.             print 'end of main'
  355.             """
  356.         self._run_and_join(script)
  357.  
  358.  
  359.     def test_2_join_in_forked_process(self):
  360.         # Like the test above, but from a forked interpreter
  361.         import os
  362.         if not hasattr(os, 'fork'):
  363.             return
  364.         script = """if 1:
  365.             childpid = os.fork()
  366.             if childpid != 0:
  367.                 os.waitpid(childpid, 0)
  368.                 sys.exit(0)
  369.  
  370.             t = threading.Thread(target=joiningfunc,
  371.                                  args=(threading.current_thread(),))
  372.             t.start()
  373.             print 'end of main'
  374.             """
  375.         self._run_and_join(script)
  376.  
  377.     def test_3_join_in_forked_from_thread(self):
  378.         # Like the test above, but fork() was called from a worker thread
  379.         # In the forked process, the main Thread object must be marked as stopped.
  380.         import os
  381.         if not hasattr(os, 'fork'):
  382.             return
  383.         # Skip platforms with known problems forking from a worker thread.
  384.         # See http://bugs.python.org/issue3863.
  385.         if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'freebsd7', 'os2emx'):
  386.             print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
  387.                                  ' due to known OS bugs on'), sys.platform
  388.             return
  389.         script = """if 1:
  390.             main_thread = threading.current_thread()
  391.             def worker():
  392.                 childpid = os.fork()
  393.                 if childpid != 0:
  394.                     os.waitpid(childpid, 0)
  395.                     sys.exit(0)
  396.  
  397.                 t = threading.Thread(target=joiningfunc,
  398.                                      args=(main_thread,))
  399.                 print 'end of main'
  400.                 t.start()
  401.                 t.join() # Should not block: main_thread is already stopped
  402.  
  403.             w = threading.Thread(target=worker)
  404.             w.start()
  405.             """
  406.         self._run_and_join(script)
  407.  
  408.  
  409. class ThreadingExceptionTests(unittest.TestCase):
  410.     # A RuntimeError should be raised if Thread.start() is called
  411.     # multiple times.
  412.     def test_start_thread_again(self):
  413.         thread = threading.Thread()
  414.         thread.start()
  415.         self.assertRaises(RuntimeError, thread.start)
  416.  
  417.     def test_releasing_unacquired_rlock(self):
  418.         rlock = threading.RLock()
  419.         self.assertRaises(RuntimeError, rlock.release)
  420.  
  421.     def test_waiting_on_unacquired_condition(self):
  422.         cond = threading.Condition()
  423.         self.assertRaises(RuntimeError, cond.wait)
  424.  
  425.     def test_notify_on_unacquired_condition(self):
  426.         cond = threading.Condition()
  427.         self.assertRaises(RuntimeError, cond.notify)
  428.  
  429.     def test_semaphore_with_negative_value(self):
  430.         self.assertRaises(ValueError, threading.Semaphore, value = -1)
  431.         self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxint)
  432.  
  433.     def test_joining_current_thread(self):
  434.         current_thread = threading.current_thread()
  435.         self.assertRaises(RuntimeError, current_thread.join);
  436.  
  437.     def test_joining_inactive_thread(self):
  438.         thread = threading.Thread()
  439.         self.assertRaises(RuntimeError, thread.join)
  440.  
  441.     def test_daemonize_active_thread(self):
  442.         thread = threading.Thread()
  443.         thread.start()
  444.         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
  445.  
  446.  
  447. def test_main():
  448.     test.test_support.run_unittest(ThreadTests,
  449.                                    ThreadJoinOnShutdown,
  450.                                    ThreadingExceptionTests,
  451.                                    )
  452.  
  453. if __name__ == "__main__":
  454.     test_main()
  455.