home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_multiprocessing.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  52.1 KB  |  1,826 lines

  1. #!/usr/bin/env python
  2.  
  3. #
  4. # Unit tests for the multiprocessing package
  5. #
  6.  
  7. import unittest
  8. import threading
  9. import Queue
  10. import time
  11. import sys
  12. import os
  13. import gc
  14. import signal
  15. import array
  16. import copy
  17. import socket
  18. import random
  19. import logging
  20.  
  21.  
  22. # Work around broken sem_open implementations
  23. try:
  24.     import multiprocessing.synchronize
  25. except ImportError, e:
  26.     from test.test_support import TestSkipped
  27.     raise TestSkipped(e)
  28.  
  29. import multiprocessing.dummy
  30. import multiprocessing.connection
  31. import multiprocessing.managers
  32. import multiprocessing.heap
  33. import multiprocessing.pool
  34. import _multiprocessing
  35.  
  36. from multiprocessing import util
  37.  
  38. #
  39. #
  40. #
  41.  
  42. latin = str
  43.  
  44. #
  45. # Constants
  46. #
  47.  
  48. LOG_LEVEL = util.SUBWARNING
  49. #LOG_LEVEL = logging.WARNING
  50.  
  51. DELTA = 0.1
  52. CHECK_TIMINGS = False     # making true makes tests take a lot longer
  53.                           # and can sometimes cause some non-serious
  54.                           # failures because some calls block a bit
  55.                           # longer than expected
  56. if CHECK_TIMINGS:
  57.     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
  58. else:
  59.     TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
  60.  
  61. HAVE_GETVALUE = not getattr(_multiprocessing,
  62.                             'HAVE_BROKEN_SEM_GETVALUE', False)
  63.  
  64. #
  65. # Creates a wrapper for a function which records the time it takes to finish
  66. #
  67.  
  68. class TimingWrapper(object):
  69.  
  70.     def __init__(self, func):
  71.         self.func = func
  72.         self.elapsed = None
  73.  
  74.     def __call__(self, *args, **kwds):
  75.         t = time.time()
  76.         try:
  77.             return self.func(*args, **kwds)
  78.         finally:
  79.             self.elapsed = time.time() - t
  80.  
  81. #
  82. # Base class for test cases
  83. #
  84.  
  85. class BaseTestCase(object):
  86.  
  87.     ALLOWED_TYPES = ('processes', 'manager', 'threads')
  88.  
  89.     def assertTimingAlmostEqual(self, a, b):
  90.         if CHECK_TIMINGS:
  91.             self.assertAlmostEqual(a, b, 1)
  92.  
  93.     def assertReturnsIfImplemented(self, value, func, *args):
  94.         try:
  95.             res = func(*args)
  96.         except NotImplementedError:
  97.             pass
  98.         else:
  99.             return self.assertEqual(value, res)
  100.  
  101. #
  102. # Return the value of a semaphore
  103. #
  104.  
  105. def get_value(self):
  106.     try:
  107.         return self.get_value()
  108.     except AttributeError:
  109.         try:
  110.             return self._Semaphore__value
  111.         except AttributeError:
  112.             try:
  113.                 return self._value
  114.             except AttributeError:
  115.                 raise NotImplementedError
  116.  
  117. #
  118. # Testcases
  119. #
  120.  
  121. class _TestProcess(BaseTestCase):
  122.  
  123.     ALLOWED_TYPES = ('processes', 'threads')
  124.  
  125.     def test_current(self):
  126.         if self.TYPE == 'threads':
  127.             return
  128.  
  129.         current = self.current_process()
  130.         authkey = current.authkey
  131.  
  132.         self.assertTrue(current.is_alive())
  133.         self.assertTrue(not current.daemon)
  134.         self.assertTrue(isinstance(authkey, bytes))
  135.         self.assertTrue(len(authkey) > 0)
  136.         self.assertEqual(current.ident, os.getpid())
  137.         self.assertEqual(current.exitcode, None)
  138.  
  139.     def _test(self, q, *args, **kwds):
  140.         current = self.current_process()
  141.         q.put(args)
  142.         q.put(kwds)
  143.         q.put(current.name)
  144.         if self.TYPE != 'threads':
  145.             q.put(bytes(current.authkey))
  146.             q.put(current.pid)
  147.  
  148.     def test_process(self):
  149.         q = self.Queue(1)
  150.         e = self.Event()
  151.         args = (q, 1, 2)
  152.         kwargs = {'hello':23, 'bye':2.54}
  153.         name = 'SomeProcess'
  154.         p = self.Process(
  155.             target=self._test, args=args, kwargs=kwargs, name=name
  156.             )
  157.         p.daemon = True
  158.         current = self.current_process()
  159.  
  160.         if self.TYPE != 'threads':
  161.             self.assertEquals(p.authkey, current.authkey)
  162.         self.assertEquals(p.is_alive(), False)
  163.         self.assertEquals(p.daemon, True)
  164.         self.assertTrue(p not in self.active_children())
  165.         self.assertTrue(type(self.active_children()) is list)
  166.         self.assertEqual(p.exitcode, None)
  167.  
  168.         p.start()
  169.  
  170.         self.assertEquals(p.exitcode, None)
  171.         self.assertEquals(p.is_alive(), True)
  172.         self.assertTrue(p in self.active_children())
  173.  
  174.         self.assertEquals(q.get(), args[1:])
  175.         self.assertEquals(q.get(), kwargs)
  176.         self.assertEquals(q.get(), p.name)
  177.         if self.TYPE != 'threads':
  178.             self.assertEquals(q.get(), current.authkey)
  179.             self.assertEquals(q.get(), p.pid)
  180.  
  181.         p.join()
  182.  
  183.         self.assertEquals(p.exitcode, 0)
  184.         self.assertEquals(p.is_alive(), False)
  185.         self.assertTrue(p not in self.active_children())
  186.  
  187.     def _test_terminate(self):
  188.         time.sleep(1000)
  189.  
  190.     def test_terminate(self):
  191.         if self.TYPE == 'threads':
  192.             return
  193.  
  194.         p = self.Process(target=self._test_terminate)
  195.         p.daemon = True
  196.         p.start()
  197.  
  198.         self.assertEqual(p.is_alive(), True)
  199.         self.assertTrue(p in self.active_children())
  200.         self.assertEqual(p.exitcode, None)
  201.  
  202.         p.terminate()
  203.  
  204.         join = TimingWrapper(p.join)
  205.         self.assertEqual(join(), None)
  206.         self.assertTimingAlmostEqual(join.elapsed, 0.0)
  207.  
  208.         self.assertEqual(p.is_alive(), False)
  209.         self.assertTrue(p not in self.active_children())
  210.  
  211.         p.join()
  212.  
  213.         # XXX sometimes get p.exitcode == 0 on Windows ...
  214.         #self.assertEqual(p.exitcode, -signal.SIGTERM)
  215.  
  216.     def test_cpu_count(self):
  217.         try:
  218.             cpus = multiprocessing.cpu_count()
  219.         except NotImplementedError:
  220.             cpus = 1
  221.         self.assertTrue(type(cpus) is int)
  222.         self.assertTrue(cpus >= 1)
  223.  
  224.     def test_active_children(self):
  225.         self.assertEqual(type(self.active_children()), list)
  226.  
  227.         p = self.Process(target=time.sleep, args=(DELTA,))
  228.         self.assertTrue(p not in self.active_children())
  229.  
  230.         p.start()
  231.         self.assertTrue(p in self.active_children())
  232.  
  233.         p.join()
  234.         self.assertTrue(p not in self.active_children())
  235.  
  236.     def _test_recursion(self, wconn, id):
  237.         from multiprocessing import forking
  238.         wconn.send(id)
  239.         if len(id) < 2:
  240.             for i in range(2):
  241.                 p = self.Process(
  242.                     target=self._test_recursion, args=(wconn, id+[i])
  243.                     )
  244.                 p.start()
  245.                 p.join()
  246.  
  247.     def test_recursion(self):
  248.         rconn, wconn = self.Pipe(duplex=False)
  249.         self._test_recursion(wconn, [])
  250.  
  251.         time.sleep(DELTA)
  252.         result = []
  253.         while rconn.poll():
  254.             result.append(rconn.recv())
  255.  
  256.         expected = [
  257.             [],
  258.               [0],
  259.                 [0, 0],
  260.                 [0, 1],
  261.               [1],
  262.                 [1, 0],
  263.                 [1, 1]
  264.             ]
  265.         self.assertEqual(result, expected)
  266.  
  267. #
  268. #
  269. #
  270.  
  271. class _UpperCaser(multiprocessing.Process):
  272.  
  273.     def __init__(self):
  274.         multiprocessing.Process.__init__(self)
  275.         self.child_conn, self.parent_conn = multiprocessing.Pipe()
  276.  
  277.     def run(self):
  278.         self.parent_conn.close()
  279.         for s in iter(self.child_conn.recv, None):
  280.             self.child_conn.send(s.upper())
  281.         self.child_conn.close()
  282.  
  283.     def submit(self, s):
  284.         assert type(s) is str
  285.         self.parent_conn.send(s)
  286.         return self.parent_conn.recv()
  287.  
  288.     def stop(self):
  289.         self.parent_conn.send(None)
  290.         self.parent_conn.close()
  291.         self.child_conn.close()
  292.  
  293. class _TestSubclassingProcess(BaseTestCase):
  294.  
  295.     ALLOWED_TYPES = ('processes',)
  296.  
  297.     def test_subclassing(self):
  298.         uppercaser = _UpperCaser()
  299.         uppercaser.start()
  300.         self.assertEqual(uppercaser.submit('hello'), 'HELLO')
  301.         self.assertEqual(uppercaser.submit('world'), 'WORLD')
  302.         uppercaser.stop()
  303.         uppercaser.join()
  304.  
  305. #
  306. #
  307. #
  308.  
  309. def queue_empty(q):
  310.     if hasattr(q, 'empty'):
  311.         return q.empty()
  312.     else:
  313.         return q.qsize() == 0
  314.  
  315. def queue_full(q, maxsize):
  316.     if hasattr(q, 'full'):
  317.         return q.full()
  318.     else:
  319.         return q.qsize() == maxsize
  320.  
  321.  
  322. class _TestQueue(BaseTestCase):
  323.  
  324.  
  325.     def _test_put(self, queue, child_can_start, parent_can_continue):
  326.         child_can_start.wait()
  327.         for i in range(6):
  328.             queue.get()
  329.         parent_can_continue.set()
  330.  
  331.     def test_put(self):
  332.         MAXSIZE = 6
  333.         queue = self.Queue(maxsize=MAXSIZE)
  334.         child_can_start = self.Event()
  335.         parent_can_continue = self.Event()
  336.  
  337.         proc = self.Process(
  338.             target=self._test_put,
  339.             args=(queue, child_can_start, parent_can_continue)
  340.             )
  341.         proc.daemon = True
  342.         proc.start()
  343.  
  344.         self.assertEqual(queue_empty(queue), True)
  345.         self.assertEqual(queue_full(queue, MAXSIZE), False)
  346.  
  347.         queue.put(1)
  348.         queue.put(2, True)
  349.         queue.put(3, True, None)
  350.         queue.put(4, False)
  351.         queue.put(5, False, None)
  352.         queue.put_nowait(6)
  353.  
  354.         # the values may be in buffer but not yet in pipe so sleep a bit
  355.         time.sleep(DELTA)
  356.  
  357.         self.assertEqual(queue_empty(queue), False)
  358.         self.assertEqual(queue_full(queue, MAXSIZE), True)
  359.  
  360.         put = TimingWrapper(queue.put)
  361.         put_nowait = TimingWrapper(queue.put_nowait)
  362.  
  363.         self.assertRaises(Queue.Full, put, 7, False)
  364.         self.assertTimingAlmostEqual(put.elapsed, 0)
  365.  
  366.         self.assertRaises(Queue.Full, put, 7, False, None)
  367.         self.assertTimingAlmostEqual(put.elapsed, 0)
  368.  
  369.         self.assertRaises(Queue.Full, put_nowait, 7)
  370.         self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
  371.  
  372.         self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
  373.         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
  374.  
  375.         self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
  376.         self.assertTimingAlmostEqual(put.elapsed, 0)
  377.  
  378.         self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
  379.         self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
  380.  
  381.         child_can_start.set()
  382.         parent_can_continue.wait()
  383.  
  384.         self.assertEqual(queue_empty(queue), True)
  385.         self.assertEqual(queue_full(queue, MAXSIZE), False)
  386.  
  387.         proc.join()
  388.  
  389.     def _test_get(self, queue, child_can_start, parent_can_continue):
  390.         child_can_start.wait()
  391.         #queue.put(1)
  392.         queue.put(2)
  393.         queue.put(3)
  394.         queue.put(4)
  395.         queue.put(5)
  396.         parent_can_continue.set()
  397.  
  398.     def test_get(self):
  399.         queue = self.Queue()
  400.         child_can_start = self.Event()
  401.         parent_can_continue = self.Event()
  402.  
  403.         proc = self.Process(
  404.             target=self._test_get,
  405.             args=(queue, child_can_start, parent_can_continue)
  406.             )
  407.         proc.daemon = True
  408.         proc.start()
  409.  
  410.         self.assertEqual(queue_empty(queue), True)
  411.  
  412.         child_can_start.set()
  413.         parent_can_continue.wait()
  414.  
  415.         time.sleep(DELTA)
  416.         self.assertEqual(queue_empty(queue), False)
  417.  
  418.         # Hangs unexpectedly, remove for now
  419.         #self.assertEqual(queue.get(), 1)
  420.         self.assertEqual(queue.get(True, None), 2)
  421.         self.assertEqual(queue.get(True), 3)
  422.         self.assertEqual(queue.get(timeout=1), 4)
  423.         self.assertEqual(queue.get_nowait(), 5)
  424.  
  425.         self.assertEqual(queue_empty(queue), True)
  426.  
  427.         get = TimingWrapper(queue.get)
  428.         get_nowait = TimingWrapper(queue.get_nowait)
  429.  
  430.         self.assertRaises(Queue.Empty, get, False)
  431.         self.assertTimingAlmostEqual(get.elapsed, 0)
  432.  
  433.         self.assertRaises(Queue.Empty, get, False, None)
  434.         self.assertTimingAlmostEqual(get.elapsed, 0)
  435.  
  436.         self.assertRaises(Queue.Empty, get_nowait)
  437.         self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
  438.  
  439.         self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
  440.         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
  441.  
  442.         self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
  443.         self.assertTimingAlmostEqual(get.elapsed, 0)
  444.  
  445.         self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
  446.         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
  447.  
  448.         proc.join()
  449.  
  450.     def _test_fork(self, queue):
  451.         for i in range(10, 20):
  452.             queue.put(i)
  453.         # note that at this point the items may only be buffered, so the
  454.         # process cannot shutdown until the feeder thread has finished
  455.         # pushing items onto the pipe.
  456.  
  457.     def test_fork(self):
  458.         # Old versions of Queue would fail to create a new feeder
  459.         # thread for a forked process if the original process had its
  460.         # own feeder thread.  This test checks that this no longer
  461.         # happens.
  462.  
  463.         queue = self.Queue()
  464.  
  465.         # put items on queue so that main process starts a feeder thread
  466.         for i in range(10):
  467.             queue.put(i)
  468.  
  469.         # wait to make sure thread starts before we fork a new process
  470.         time.sleep(DELTA)
  471.  
  472.         # fork process
  473.         p = self.Process(target=self._test_fork, args=(queue,))
  474.         p.start()
  475.  
  476.         # check that all expected items are in the queue
  477.         for i in range(20):
  478.             self.assertEqual(queue.get(), i)
  479.         self.assertRaises(Queue.Empty, queue.get, False)
  480.  
  481.         p.join()
  482.  
  483.     def test_qsize(self):
  484.         q = self.Queue()
  485.         try:
  486.             self.assertEqual(q.qsize(), 0)
  487.         except NotImplementedError:
  488.             return
  489.         q.put(1)
  490.         self.assertEqual(q.qsize(), 1)
  491.         q.put(5)
  492.         self.assertEqual(q.qsize(), 2)
  493.         q.get()
  494.         self.assertEqual(q.qsize(), 1)
  495.         q.get()
  496.         self.assertEqual(q.qsize(), 0)
  497.  
  498.     def _test_task_done(self, q):
  499.         for obj in iter(q.get, None):
  500.             time.sleep(DELTA)
  501.             q.task_done()
  502.  
  503.     def test_task_done(self):
  504.         queue = self.JoinableQueue()
  505.  
  506.         if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
  507.             return
  508.  
  509.         workers = [self.Process(target=self._test_task_done, args=(queue,))
  510.                    for i in xrange(4)]
  511.  
  512.         for p in workers:
  513.             p.start()
  514.  
  515.         for i in xrange(10):
  516.             queue.put(i)
  517.  
  518.         queue.join()
  519.  
  520.         for p in workers:
  521.             queue.put(None)
  522.  
  523.         for p in workers:
  524.             p.join()
  525.  
  526. #
  527. #
  528. #
  529.  
  530. class _TestLock(BaseTestCase):
  531.  
  532.     def test_lock(self):
  533.         lock = self.Lock()
  534.         self.assertEqual(lock.acquire(), True)
  535.         self.assertEqual(lock.acquire(False), False)
  536.         self.assertEqual(lock.release(), None)
  537.         self.assertRaises((ValueError, threading.ThreadError), lock.release)
  538.  
  539.     def test_rlock(self):
  540.         lock = self.RLock()
  541.         self.assertEqual(lock.acquire(), True)
  542.         self.assertEqual(lock.acquire(), True)
  543.         self.assertEqual(lock.acquire(), True)
  544.         self.assertEqual(lock.release(), None)
  545.         self.assertEqual(lock.release(), None)
  546.         self.assertEqual(lock.release(), None)
  547.         self.assertRaises((AssertionError, RuntimeError), lock.release)
  548.  
  549.  
  550. class _TestSemaphore(BaseTestCase):
  551.  
  552.     def _test_semaphore(self, sem):
  553.         self.assertReturnsIfImplemented(2, get_value, sem)
  554.         self.assertEqual(sem.acquire(), True)
  555.         self.assertReturnsIfImplemented(1, get_value, sem)
  556.         self.assertEqual(sem.acquire(), True)
  557.         self.assertReturnsIfImplemented(0, get_value, sem)
  558.         self.assertEqual(sem.acquire(False), False)
  559.         self.assertReturnsIfImplemented(0, get_value, sem)
  560.         self.assertEqual(sem.release(), None)
  561.         self.assertReturnsIfImplemented(1, get_value, sem)
  562.         self.assertEqual(sem.release(), None)
  563.         self.assertReturnsIfImplemented(2, get_value, sem)
  564.  
  565.     def test_semaphore(self):
  566.         sem = self.Semaphore(2)
  567.         self._test_semaphore(sem)
  568.         self.assertEqual(sem.release(), None)
  569.         self.assertReturnsIfImplemented(3, get_value, sem)
  570.         self.assertEqual(sem.release(), None)
  571.         self.assertReturnsIfImplemented(4, get_value, sem)
  572.  
  573.     def test_bounded_semaphore(self):
  574.         sem = self.BoundedSemaphore(2)
  575.         self._test_semaphore(sem)
  576.         # Currently fails on OS/X
  577.         #if HAVE_GETVALUE:
  578.         #    self.assertRaises(ValueError, sem.release)
  579.         #    self.assertReturnsIfImplemented(2, get_value, sem)
  580.  
  581.     def test_timeout(self):
  582.         if self.TYPE != 'processes':
  583.             return
  584.  
  585.         sem = self.Semaphore(0)
  586.         acquire = TimingWrapper(sem.acquire)
  587.  
  588.         self.assertEqual(acquire(False), False)
  589.         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
  590.  
  591.         self.assertEqual(acquire(False, None), False)
  592.         self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
  593.  
  594.         self.assertEqual(acquire(False, TIMEOUT1), False)
  595.         self.assertTimingAlmostEqual(acquire.elapsed, 0)
  596.  
  597.         self.assertEqual(acquire(True, TIMEOUT2), False)
  598.         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
  599.  
  600.         self.assertEqual(acquire(timeout=TIMEOUT3), False)
  601.         self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
  602.  
  603.  
  604. class _TestCondition(BaseTestCase):
  605.  
  606.     def f(self, cond, sleeping, woken, timeout=None):
  607.         cond.acquire()
  608.         sleeping.release()
  609.         cond.wait(timeout)
  610.         woken.release()
  611.         cond.release()
  612.  
  613.     def check_invariant(self, cond):
  614.         # this is only supposed to succeed when there are no sleepers
  615.         if self.TYPE == 'processes':
  616.             try:
  617.                 sleepers = (cond._sleeping_count.get_value() -
  618.                             cond._woken_count.get_value())
  619.                 self.assertEqual(sleepers, 0)
  620.                 self.assertEqual(cond._wait_semaphore.get_value(), 0)
  621.             except NotImplementedError:
  622.                 pass
  623.  
  624.     def test_notify(self):
  625.         cond = self.Condition()
  626.         sleeping = self.Semaphore(0)
  627.         woken = self.Semaphore(0)
  628.  
  629.         p = self.Process(target=self.f, args=(cond, sleeping, woken))
  630.         p.daemon = True
  631.         p.start()
  632.  
  633.         p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
  634.         p.daemon = True
  635.         p.start()
  636.  
  637.         # wait for both children to start sleeping
  638.         sleeping.acquire()
  639.         sleeping.acquire()
  640.  
  641.         # check no process/thread has woken up
  642.         time.sleep(DELTA)
  643.         self.assertReturnsIfImplemented(0, get_value, woken)
  644.  
  645.         # wake up one process/thread
  646.         cond.acquire()
  647.         cond.notify()
  648.         cond.release()
  649.  
  650.         # check one process/thread has woken up
  651.         time.sleep(DELTA)
  652.         self.assertReturnsIfImplemented(1, get_value, woken)
  653.  
  654.         # wake up another
  655.         cond.acquire()
  656.         cond.notify()
  657.         cond.release()
  658.  
  659.         # check other has woken up
  660.         time.sleep(DELTA)
  661.         self.assertReturnsIfImplemented(2, get_value, woken)
  662.  
  663.         # check state is not mucked up
  664.         self.check_invariant(cond)
  665.         p.join()
  666.  
  667.     def test_notify_all(self):
  668.         cond = self.Condition()
  669.         sleeping = self.Semaphore(0)
  670.         woken = self.Semaphore(0)
  671.  
  672.         # start some threads/processes which will timeout
  673.         for i in range(3):
  674.             p = self.Process(target=self.f,
  675.                              args=(cond, sleeping, woken, TIMEOUT1))
  676.             p.daemon = True
  677.             p.start()
  678.  
  679.             t = threading.Thread(target=self.f,
  680.                                  args=(cond, sleeping, woken, TIMEOUT1))
  681.             t.daemon = True
  682.             t.start()
  683.  
  684.         # wait for them all to sleep
  685.         for i in xrange(6):
  686.             sleeping.acquire()
  687.  
  688.         # check they have all timed out
  689.         for i in xrange(6):
  690.             woken.acquire()
  691.         self.assertReturnsIfImplemented(0, get_value, woken)
  692.  
  693.         # check state is not mucked up
  694.         self.check_invariant(cond)
  695.  
  696.         # start some more threads/processes
  697.         for i in range(3):
  698.             p = self.Process(target=self.f, args=(cond, sleeping, woken))
  699.             p.daemon = True
  700.             p.start()
  701.  
  702.             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
  703.             t.daemon = True
  704.             t.start()
  705.  
  706.         # wait for them to all sleep
  707.         for i in xrange(6):
  708.             sleeping.acquire()
  709.  
  710.         # check no process/thread has woken up
  711.         time.sleep(DELTA)
  712.         self.assertReturnsIfImplemented(0, get_value, woken)
  713.  
  714.         # wake them all up
  715.         cond.acquire()
  716.         cond.notify_all()
  717.         cond.release()
  718.  
  719.         # check they have all woken
  720.         time.sleep(DELTA)
  721.         self.assertReturnsIfImplemented(6, get_value, woken)
  722.  
  723.         # check state is not mucked up
  724.         self.check_invariant(cond)
  725.  
  726.     def test_timeout(self):
  727.         cond = self.Condition()
  728.         wait = TimingWrapper(cond.wait)
  729.         cond.acquire()
  730.         res = wait(TIMEOUT1)
  731.         cond.release()
  732.         self.assertEqual(res, None)
  733.         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
  734.  
  735.  
  736. class _TestEvent(BaseTestCase):
  737.  
  738.     def _test_event(self, event):
  739.         time.sleep(TIMEOUT2)
  740.         event.set()
  741.  
  742.     def test_event(self):
  743.         event = self.Event()
  744.         wait = TimingWrapper(event.wait)
  745.  
  746.         # Removed temporaily, due to API shear, this does not
  747.         # work with threading._Event objects. is_set == isSet
  748.         #self.assertEqual(event.is_set(), False)
  749.  
  750.         self.assertEqual(wait(0.0), None)
  751.         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  752.         self.assertEqual(wait(TIMEOUT1), None)
  753.         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
  754.  
  755.         event.set()
  756.  
  757.         # See note above on the API differences
  758.         # self.assertEqual(event.is_set(), True)
  759.         self.assertEqual(wait(), None)
  760.         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  761.         self.assertEqual(wait(TIMEOUT1), None)
  762.         self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  763.         # self.assertEqual(event.is_set(), True)
  764.  
  765.         event.clear()
  766.  
  767.         #self.assertEqual(event.is_set(), False)
  768.  
  769.         self.Process(target=self._test_event, args=(event,)).start()
  770.         self.assertEqual(wait(), None)
  771.  
  772. #
  773. #
  774. #
  775.  
  776. class _TestValue(BaseTestCase):
  777.  
  778.     codes_values = [
  779.         ('i', 4343, 24234),
  780.         ('d', 3.625, -4.25),
  781.         ('h', -232, 234),
  782.         ('c', latin('x'), latin('y'))
  783.         ]
  784.  
  785.     def _test(self, values):
  786.         for sv, cv in zip(values, self.codes_values):
  787.             sv.value = cv[2]
  788.  
  789.  
  790.     def test_value(self, raw=False):
  791.         if self.TYPE != 'processes':
  792.             return
  793.  
  794.         if raw:
  795.             values = [self.RawValue(code, value)
  796.                       for code, value, _ in self.codes_values]
  797.         else:
  798.             values = [self.Value(code, value)
  799.                       for code, value, _ in self.codes_values]
  800.  
  801.         for sv, cv in zip(values, self.codes_values):
  802.             self.assertEqual(sv.value, cv[1])
  803.  
  804.         proc = self.Process(target=self._test, args=(values,))
  805.         proc.start()
  806.         proc.join()
  807.  
  808.         for sv, cv in zip(values, self.codes_values):
  809.             self.assertEqual(sv.value, cv[2])
  810.  
  811.     def test_rawvalue(self):
  812.         self.test_value(raw=True)
  813.  
  814.     def test_getobj_getlock(self):
  815.         if self.TYPE != 'processes':
  816.             return
  817.  
  818.         val1 = self.Value('i', 5)
  819.         lock1 = val1.get_lock()
  820.         obj1 = val1.get_obj()
  821.  
  822.         val2 = self.Value('i', 5, lock=None)
  823.         lock2 = val2.get_lock()
  824.         obj2 = val2.get_obj()
  825.  
  826.         lock = self.Lock()
  827.         val3 = self.Value('i', 5, lock=lock)
  828.         lock3 = val3.get_lock()
  829.         obj3 = val3.get_obj()
  830.         self.assertEqual(lock, lock3)
  831.  
  832.         arr4 = self.RawValue('i', 5)
  833.         self.assertFalse(hasattr(arr4, 'get_lock'))
  834.         self.assertFalse(hasattr(arr4, 'get_obj'))
  835.  
  836.  
  837. class _TestArray(BaseTestCase):
  838.  
  839.     def f(self, seq):
  840.         for i in range(1, len(seq)):
  841.             seq[i] += seq[i-1]
  842.  
  843.     def test_array(self, raw=False):
  844.         if self.TYPE != 'processes':
  845.             return
  846.  
  847.         seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
  848.         if raw:
  849.             arr = self.RawArray('i', seq)
  850.         else:
  851.             arr = self.Array('i', seq)
  852.  
  853.         self.assertEqual(len(arr), len(seq))
  854.         self.assertEqual(arr[3], seq[3])
  855.         self.assertEqual(list(arr[2:7]), list(seq[2:7]))
  856.  
  857.         arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
  858.  
  859.         self.assertEqual(list(arr[:]), seq)
  860.  
  861.         self.f(seq)
  862.  
  863.         p = self.Process(target=self.f, args=(arr,))
  864.         p.start()
  865.         p.join()
  866.  
  867.         self.assertEqual(list(arr[:]), seq)
  868.  
  869.     def test_rawarray(self):
  870.         self.test_array(raw=True)
  871.  
  872.     def test_getobj_getlock_obj(self):
  873.         if self.TYPE != 'processes':
  874.             return
  875.  
  876.         arr1 = self.Array('i', range(10))
  877.         lock1 = arr1.get_lock()
  878.         obj1 = arr1.get_obj()
  879.  
  880.         arr2 = self.Array('i', range(10), lock=None)
  881.         lock2 = arr2.get_lock()
  882.         obj2 = arr2.get_obj()
  883.  
  884.         lock = self.Lock()
  885.         arr3 = self.Array('i', range(10), lock=lock)
  886.         lock3 = arr3.get_lock()
  887.         obj3 = arr3.get_obj()
  888.         self.assertEqual(lock, lock3)
  889.  
  890.         arr4 = self.RawArray('i', range(10))
  891.         self.assertFalse(hasattr(arr4, 'get_lock'))
  892.         self.assertFalse(hasattr(arr4, 'get_obj'))
  893.  
  894. #
  895. #
  896. #
  897.  
  898. class _TestContainers(BaseTestCase):
  899.  
  900.     ALLOWED_TYPES = ('manager',)
  901.  
  902.     def test_list(self):
  903.         a = self.list(range(10))
  904.         self.assertEqual(a[:], range(10))
  905.  
  906.         b = self.list()
  907.         self.assertEqual(b[:], [])
  908.  
  909.         b.extend(range(5))
  910.         self.assertEqual(b[:], range(5))
  911.  
  912.         self.assertEqual(b[2], 2)
  913.         self.assertEqual(b[2:10], [2,3,4])
  914.  
  915.         b *= 2
  916.         self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
  917.  
  918.         self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
  919.  
  920.         self.assertEqual(a[:], range(10))
  921.  
  922.         d = [a, b]
  923.         e = self.list(d)
  924.         self.assertEqual(
  925.             e[:],
  926.             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
  927.             )
  928.  
  929.         f = self.list([a])
  930.         a.append('hello')
  931.         self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
  932.  
  933.     def test_dict(self):
  934.         d = self.dict()
  935.         indices = range(65, 70)
  936.         for i in indices:
  937.             d[i] = chr(i)
  938.         self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
  939.         self.assertEqual(sorted(d.keys()), indices)
  940.         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
  941.         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
  942.  
  943.     def test_namespace(self):
  944.         n = self.Namespace()
  945.         n.name = 'Bob'
  946.         n.job = 'Builder'
  947.         n._hidden = 'hidden'
  948.         self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
  949.         del n.job
  950.         self.assertEqual(str(n), "Namespace(name='Bob')")
  951.         self.assertTrue(hasattr(n, 'name'))
  952.         self.assertTrue(not hasattr(n, 'job'))
  953.  
  954. #
  955. #
  956. #
  957.  
  958. def sqr(x, wait=0.0):
  959.     time.sleep(wait)
  960.     return x*x
  961. class _TestPool(BaseTestCase):
  962.  
  963.     def test_apply(self):
  964.         papply = self.pool.apply
  965.         self.assertEqual(papply(sqr, (5,)), sqr(5))
  966.         self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
  967.  
  968.     def test_map(self):
  969.         pmap = self.pool.map
  970.         self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
  971.         self.assertEqual(pmap(sqr, range(100), chunksize=20),
  972.                          map(sqr, range(100)))
  973.  
  974.     def test_async(self):
  975.         res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
  976.         get = TimingWrapper(res.get)
  977.         self.assertEqual(get(), 49)
  978.         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
  979.  
  980.     def test_async_timeout(self):
  981.         res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
  982.         get = TimingWrapper(res.get)
  983.         self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
  984.         self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
  985.  
  986.     def test_imap(self):
  987.         it = self.pool.imap(sqr, range(10))
  988.         self.assertEqual(list(it), map(sqr, range(10)))
  989.  
  990.         it = self.pool.imap(sqr, range(10))
  991.         for i in range(10):
  992.             self.assertEqual(it.next(), i*i)
  993.         self.assertRaises(StopIteration, it.next)
  994.  
  995.         it = self.pool.imap(sqr, range(1000), chunksize=100)
  996.         for i in range(1000):
  997.             self.assertEqual(it.next(), i*i)
  998.         self.assertRaises(StopIteration, it.next)
  999.  
  1000.     def test_imap_unordered(self):
  1001.         it = self.pool.imap_unordered(sqr, range(1000))
  1002.         self.assertEqual(sorted(it), map(sqr, range(1000)))
  1003.  
  1004.         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
  1005.         self.assertEqual(sorted(it), map(sqr, range(1000)))
  1006.  
  1007.     def test_make_pool(self):
  1008.         p = multiprocessing.Pool(3)
  1009.         self.assertEqual(3, len(p._pool))
  1010.         p.close()
  1011.         p.join()
  1012.  
  1013.     def test_terminate(self):
  1014.         if self.TYPE == 'manager':
  1015.             # On Unix a forked process increfs each shared object to
  1016.             # which its parent process held a reference.  If the
  1017.             # forked process gets terminated then there is likely to
  1018.             # be a reference leak.  So to prevent
  1019.             # _TestZZZNumberOfObjects from failing we skip this test
  1020.             # when using a manager.
  1021.             return
  1022.  
  1023.         result = self.pool.map_async(
  1024.             time.sleep, [0.1 for i in range(10000)], chunksize=1
  1025.             )
  1026.         self.pool.terminate()
  1027.         join = TimingWrapper(self.pool.join)
  1028.         join()
  1029.         self.assertTrue(join.elapsed < 0.2)
  1030. #
  1031. # Test that manager has expected number of shared objects left
  1032. #
  1033.  
  1034. class _TestZZZNumberOfObjects(BaseTestCase):
  1035.     # Because test cases are sorted alphabetically, this one will get
  1036.     # run after all the other tests for the manager.  It tests that
  1037.     # there have been no "reference leaks" for the manager's shared
  1038.     # objects.  Note the comment in _TestPool.test_terminate().
  1039.     ALLOWED_TYPES = ('manager',)
  1040.  
  1041.     def test_number_of_objects(self):
  1042.         EXPECTED_NUMBER = 1                # the pool object is still alive
  1043.         multiprocessing.active_children()  # discard dead process objs
  1044.         gc.collect()                       # do garbage collection
  1045.         refs = self.manager._number_of_objects()
  1046.         if refs != EXPECTED_NUMBER:
  1047.             print self.manager._debug_info()
  1048.  
  1049.         self.assertEqual(refs, EXPECTED_NUMBER)
  1050.  
  1051. #
  1052. # Test of creating a customized manager class
  1053. #
  1054.  
  1055. from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
  1056.  
  1057. class FooBar(object):
  1058.     def f(self):
  1059.         return 'f()'
  1060.     def g(self):
  1061.         raise ValueError
  1062.     def _h(self):
  1063.         return '_h()'
  1064.  
  1065. def baz():
  1066.     for i in xrange(10):
  1067.         yield i*i
  1068.  
  1069. class IteratorProxy(BaseProxy):
  1070.     _exposed_ = ('next', '__next__')
  1071.     def __iter__(self):
  1072.         return self
  1073.     def next(self):
  1074.         return self._callmethod('next')
  1075.     def __next__(self):
  1076.         return self._callmethod('__next__')
  1077.  
  1078. class MyManager(BaseManager):
  1079.     pass
  1080.  
  1081. MyManager.register('Foo', callable=FooBar)
  1082. MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
  1083. MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
  1084.  
  1085.  
  1086. class _TestMyManager(BaseTestCase):
  1087.  
  1088.     ALLOWED_TYPES = ('manager',)
  1089.  
  1090.     def test_mymanager(self):
  1091.         manager = MyManager()
  1092.         manager.start()
  1093.  
  1094.         foo = manager.Foo()
  1095.         bar = manager.Bar()
  1096.         baz = manager.baz()
  1097.  
  1098.         foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
  1099.         bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
  1100.  
  1101.         self.assertEqual(foo_methods, ['f', 'g'])
  1102.         self.assertEqual(bar_methods, ['f', '_h'])
  1103.  
  1104.         self.assertEqual(foo.f(), 'f()')
  1105.         self.assertRaises(ValueError, foo.g)
  1106.         self.assertEqual(foo._callmethod('f'), 'f()')
  1107.         self.assertRaises(RemoteError, foo._callmethod, '_h')
  1108.  
  1109.         self.assertEqual(bar.f(), 'f()')
  1110.         self.assertEqual(bar._h(), '_h()')
  1111.         self.assertEqual(bar._callmethod('f'), 'f()')
  1112.         self.assertEqual(bar._callmethod('_h'), '_h()')
  1113.  
  1114.         self.assertEqual(list(baz), [i*i for i in range(10)])
  1115.  
  1116.         manager.shutdown()
  1117.  
  1118. #
  1119. # Test of connecting to a remote server and using xmlrpclib for serialization
  1120. #
  1121.  
  1122. _queue = Queue.Queue()
  1123. def get_queue():
  1124.     return _queue
  1125.  
  1126. class QueueManager(BaseManager):
  1127.     '''manager class used by server process'''
  1128. QueueManager.register('get_queue', callable=get_queue)
  1129.  
  1130. class QueueManager2(BaseManager):
  1131.     '''manager class which specifies the same interface as QueueManager'''
  1132. QueueManager2.register('get_queue')
  1133.  
  1134.  
  1135. SERIALIZER = 'xmlrpclib'
  1136.  
  1137. class _TestRemoteManager(BaseTestCase):
  1138.  
  1139.     ALLOWED_TYPES = ('manager',)
  1140.  
  1141.     def _putter(self, address, authkey):
  1142.         manager = QueueManager2(
  1143.             address=address, authkey=authkey, serializer=SERIALIZER
  1144.             )
  1145.         manager.connect()
  1146.         queue = manager.get_queue()
  1147.         queue.put(('hello world', None, True, 2.25))
  1148.  
  1149.     def test_remote(self):
  1150.         authkey = os.urandom(32)
  1151.  
  1152.         manager = QueueManager(
  1153.             address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
  1154.             )
  1155.         manager.start()
  1156.  
  1157.         p = self.Process(target=self._putter, args=(manager.address, authkey))
  1158.         p.start()
  1159.  
  1160.         manager2 = QueueManager2(
  1161.             address=manager.address, authkey=authkey, serializer=SERIALIZER
  1162.             )
  1163.         manager2.connect()
  1164.         queue = manager2.get_queue()
  1165.  
  1166.         # Note that xmlrpclib will deserialize object as a list not a tuple
  1167.         self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
  1168.  
  1169.         # Because we are using xmlrpclib for serialization instead of
  1170.         # pickle this will cause a serialization error.
  1171.         self.assertRaises(Exception, queue.put, time.sleep)
  1172.  
  1173.         # Make queue finalizer run before the server is stopped
  1174.         del queue
  1175.         manager.shutdown()
  1176.  
  1177. #
  1178. #
  1179. #
  1180.  
  1181. SENTINEL = latin('')
  1182.  
  1183. class _TestConnection(BaseTestCase):
  1184.  
  1185.     ALLOWED_TYPES = ('processes', 'threads')
  1186.  
  1187.     def _echo(self, conn):
  1188.         for msg in iter(conn.recv_bytes, SENTINEL):
  1189.             conn.send_bytes(msg)
  1190.         conn.close()
  1191.  
  1192.     def test_connection(self):
  1193.         conn, child_conn = self.Pipe()
  1194.  
  1195.         p = self.Process(target=self._echo, args=(child_conn,))
  1196.         p.daemon = True
  1197.         p.start()
  1198.  
  1199.         seq = [1, 2.25, None]
  1200.         msg = latin('hello world')
  1201.         longmsg = msg * 10
  1202.         arr = array.array('i', range(4))
  1203.  
  1204.         if self.TYPE == 'processes':
  1205.             self.assertEqual(type(conn.fileno()), int)
  1206.  
  1207.         self.assertEqual(conn.send(seq), None)
  1208.         self.assertEqual(conn.recv(), seq)
  1209.  
  1210.         self.assertEqual(conn.send_bytes(msg), None)
  1211.         self.assertEqual(conn.recv_bytes(), msg)
  1212.  
  1213.         if self.TYPE == 'processes':
  1214.             buffer = array.array('i', [0]*10)
  1215.             expected = list(arr) + [0] * (10 - len(arr))
  1216.             self.assertEqual(conn.send_bytes(arr), None)
  1217.             self.assertEqual(conn.recv_bytes_into(buffer),
  1218.                              len(arr) * buffer.itemsize)
  1219.             self.assertEqual(list(buffer), expected)
  1220.  
  1221.             buffer = array.array('i', [0]*10)
  1222.             expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
  1223.             self.assertEqual(conn.send_bytes(arr), None)
  1224.             self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
  1225.                              len(arr) * buffer.itemsize)
  1226.             self.assertEqual(list(buffer), expected)
  1227.  
  1228.             buffer = bytearray(latin(' ' * 40))
  1229.             self.assertEqual(conn.send_bytes(longmsg), None)
  1230.             try:
  1231.                 res = conn.recv_bytes_into(buffer)
  1232.             except multiprocessing.BufferTooShort, e:
  1233.                 self.assertEqual(e.args, (longmsg,))
  1234.             else:
  1235.                 self.fail('expected BufferTooShort, got %s' % res)
  1236.  
  1237.         poll = TimingWrapper(conn.poll)
  1238.  
  1239.         self.assertEqual(poll(), False)
  1240.         self.assertTimingAlmostEqual(poll.elapsed, 0)
  1241.  
  1242.         self.assertEqual(poll(TIMEOUT1), False)
  1243.         self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
  1244.  
  1245.         conn.send(None)
  1246.  
  1247.         self.assertEqual(poll(TIMEOUT1), True)
  1248.         self.assertTimingAlmostEqual(poll.elapsed, 0)
  1249.  
  1250.         self.assertEqual(conn.recv(), None)
  1251.  
  1252.         really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
  1253.         conn.send_bytes(really_big_msg)
  1254.         self.assertEqual(conn.recv_bytes(), really_big_msg)
  1255.  
  1256.         conn.send_bytes(SENTINEL)                          # tell child to quit
  1257.         child_conn.close()
  1258.  
  1259.         if self.TYPE == 'processes':
  1260.             self.assertEqual(conn.readable, True)
  1261.             self.assertEqual(conn.writable, True)
  1262.             self.assertRaises(EOFError, conn.recv)
  1263.             self.assertRaises(EOFError, conn.recv_bytes)
  1264.  
  1265.         p.join()
  1266.  
  1267.     def test_duplex_false(self):
  1268.         reader, writer = self.Pipe(duplex=False)
  1269.         self.assertEqual(writer.send(1), None)
  1270.         self.assertEqual(reader.recv(), 1)
  1271.         if self.TYPE == 'processes':
  1272.             self.assertEqual(reader.readable, True)
  1273.             self.assertEqual(reader.writable, False)
  1274.             self.assertEqual(writer.readable, False)
  1275.             self.assertEqual(writer.writable, True)
  1276.             self.assertRaises(IOError, reader.send, 2)
  1277.             self.assertRaises(IOError, writer.recv)
  1278.             self.assertRaises(IOError, writer.poll)
  1279.  
  1280.     def test_spawn_close(self):
  1281.         # We test that a pipe connection can be closed by parent
  1282.         # process immediately after child is spawned.  On Windows this
  1283.         # would have sometimes failed on old versions because
  1284.         # child_conn would be closed before the child got a chance to
  1285.         # duplicate it.
  1286.         conn, child_conn = self.Pipe()
  1287.  
  1288.         p = self.Process(target=self._echo, args=(child_conn,))
  1289.         p.start()
  1290.         child_conn.close()    # this might complete before child initializes
  1291.  
  1292.         msg = latin('hello')
  1293.         conn.send_bytes(msg)
  1294.         self.assertEqual(conn.recv_bytes(), msg)
  1295.  
  1296.         conn.send_bytes(SENTINEL)
  1297.         conn.close()
  1298.         p.join()
  1299.  
  1300.     def test_sendbytes(self):
  1301.         if self.TYPE != 'processes':
  1302.             return
  1303.  
  1304.         msg = latin('abcdefghijklmnopqrstuvwxyz')
  1305.         a, b = self.Pipe()
  1306.  
  1307.         a.send_bytes(msg)
  1308.         self.assertEqual(b.recv_bytes(), msg)
  1309.  
  1310.         a.send_bytes(msg, 5)
  1311.         self.assertEqual(b.recv_bytes(), msg[5:])
  1312.  
  1313.         a.send_bytes(msg, 7, 8)
  1314.         self.assertEqual(b.recv_bytes(), msg[7:7+8])
  1315.  
  1316.         a.send_bytes(msg, 26)
  1317.         self.assertEqual(b.recv_bytes(), latin(''))
  1318.  
  1319.         a.send_bytes(msg, 26, 0)
  1320.         self.assertEqual(b.recv_bytes(), latin(''))
  1321.  
  1322.         self.assertRaises(ValueError, a.send_bytes, msg, 27)
  1323.  
  1324.         self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
  1325.  
  1326.         self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
  1327.  
  1328.         self.assertRaises(ValueError, a.send_bytes, msg, -1)
  1329.  
  1330.         self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
  1331.  
  1332. class _TestListenerClient(BaseTestCase):
  1333.  
  1334.     ALLOWED_TYPES = ('processes', 'threads')
  1335.  
  1336.     def _test(self, address):
  1337.         conn = self.connection.Client(address)
  1338.         conn.send('hello')
  1339.         conn.close()
  1340.  
  1341.     def test_listener_client(self):
  1342.         for family in self.connection.families:
  1343.             l = self.connection.Listener(family=family)
  1344.             p = self.Process(target=self._test, args=(l.address,))
  1345.             p.daemon = True
  1346.             p.start()
  1347.             conn = l.accept()
  1348.             self.assertEqual(conn.recv(), 'hello')
  1349.             p.join()
  1350.             l.close()
  1351. #
  1352. # Test of sending connection and socket objects between processes
  1353. #
  1354. """
  1355. class _TestPicklingConnections(BaseTestCase):
  1356.  
  1357.     ALLOWED_TYPES = ('processes',)
  1358.  
  1359.     def _listener(self, conn, families):
  1360.         for fam in families:
  1361.             l = self.connection.Listener(family=fam)
  1362.             conn.send(l.address)
  1363.             new_conn = l.accept()
  1364.             conn.send(new_conn)
  1365.  
  1366.         if self.TYPE == 'processes':
  1367.             l = socket.socket()
  1368.             l.bind(('localhost', 0))
  1369.             conn.send(l.getsockname())
  1370.             l.listen(1)
  1371.             new_conn, addr = l.accept()
  1372.             conn.send(new_conn)
  1373.  
  1374.         conn.recv()
  1375.  
  1376.     def _remote(self, conn):
  1377.         for (address, msg) in iter(conn.recv, None):
  1378.             client = self.connection.Client(address)
  1379.             client.send(msg.upper())
  1380.             client.close()
  1381.  
  1382.         if self.TYPE == 'processes':
  1383.             address, msg = conn.recv()
  1384.             client = socket.socket()
  1385.             client.connect(address)
  1386.             client.sendall(msg.upper())
  1387.             client.close()
  1388.  
  1389.         conn.close()
  1390.  
  1391.     def test_pickling(self):
  1392.         try:
  1393.             multiprocessing.allow_connection_pickling()
  1394.         except ImportError:
  1395.             return
  1396.  
  1397.         families = self.connection.families
  1398.  
  1399.         lconn, lconn0 = self.Pipe()
  1400.         lp = self.Process(target=self._listener, args=(lconn0, families))
  1401.         lp.start()
  1402.         lconn0.close()
  1403.  
  1404.         rconn, rconn0 = self.Pipe()
  1405.         rp = self.Process(target=self._remote, args=(rconn0,))
  1406.         rp.start()
  1407.         rconn0.close()
  1408.  
  1409.         for fam in families:
  1410.             msg = ('This connection uses family %s' % fam).encode('ascii')
  1411.             address = lconn.recv()
  1412.             rconn.send((address, msg))
  1413.             new_conn = lconn.recv()
  1414.             self.assertEqual(new_conn.recv(), msg.upper())
  1415.  
  1416.         rconn.send(None)
  1417.  
  1418.         if self.TYPE == 'processes':
  1419.             msg = latin('This connection uses a normal socket')
  1420.             address = lconn.recv()
  1421.             rconn.send((address, msg))
  1422.             if hasattr(socket, 'fromfd'):
  1423.                 new_conn = lconn.recv()
  1424.                 self.assertEqual(new_conn.recv(100), msg.upper())
  1425.             else:
  1426.                 # XXX On Windows with Py2.6 need to backport fromfd()
  1427.                 discard = lconn.recv_bytes()
  1428.  
  1429.         lconn.send(None)
  1430.  
  1431.         rconn.close()
  1432.         lconn.close()
  1433.  
  1434.         lp.join()
  1435.         rp.join()
  1436. """
  1437. #
  1438. #
  1439. #
  1440.  
  1441. class _TestHeap(BaseTestCase):
  1442.  
  1443.     ALLOWED_TYPES = ('processes',)
  1444.  
  1445.     def test_heap(self):
  1446.         iterations = 5000
  1447.         maxblocks = 50
  1448.         blocks = []
  1449.  
  1450.         # create and destroy lots of blocks of different sizes
  1451.         for i in xrange(iterations):
  1452.             size = int(random.lognormvariate(0, 1) * 1000)
  1453.             b = multiprocessing.heap.BufferWrapper(size)
  1454.             blocks.append(b)
  1455.             if len(blocks) > maxblocks:
  1456.                 i = random.randrange(maxblocks)
  1457.                 del blocks[i]
  1458.  
  1459.         # get the heap object
  1460.         heap = multiprocessing.heap.BufferWrapper._heap
  1461.  
  1462.         # verify the state of the heap
  1463.         all = []
  1464.         occupied = 0
  1465.         for L in heap._len_to_seq.values():
  1466.             for arena, start, stop in L:
  1467.                 all.append((heap._arenas.index(arena), start, stop,
  1468.                             stop-start, 'free'))
  1469.         for arena, start, stop in heap._allocated_blocks:
  1470.             all.append((heap._arenas.index(arena), start, stop,
  1471.                         stop-start, 'occupied'))
  1472.             occupied += (stop-start)
  1473.  
  1474.         all.sort()
  1475.  
  1476.         for i in range(len(all)-1):
  1477.             (arena, start, stop) = all[i][:3]
  1478.             (narena, nstart, nstop) = all[i+1][:3]
  1479.             self.assertTrue((arena != narena and nstart == 0) or
  1480.                             (stop == nstart))
  1481.  
  1482. #
  1483. #
  1484. #
  1485.  
  1486. try:
  1487.     from ctypes import Structure, Value, copy, c_int, c_double
  1488. except ImportError:
  1489.     Structure = object
  1490.     c_int = c_double = None
  1491.  
  1492. class _Foo(Structure):
  1493.     _fields_ = [
  1494.         ('x', c_int),
  1495.         ('y', c_double)
  1496.         ]
  1497.  
  1498. class _TestSharedCTypes(BaseTestCase):
  1499.  
  1500.     ALLOWED_TYPES = ('processes',)
  1501.  
  1502.     def _double(self, x, y, foo, arr, string):
  1503.         x.value *= 2
  1504.         y.value *= 2
  1505.         foo.x *= 2
  1506.         foo.y *= 2
  1507.         string.value *= 2
  1508.         for i in range(len(arr)):
  1509.             arr[i] *= 2
  1510.  
  1511.     def test_sharedctypes(self, lock=False):
  1512.         if c_int is None:
  1513.             return
  1514.  
  1515.         x = Value('i', 7, lock=lock)
  1516.         y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
  1517.         foo = Value(_Foo, 3, 2, lock=lock)
  1518.         arr = Array('d', range(10), lock=lock)
  1519.         string = Array('c', 20, lock=lock)
  1520.         string.value = 'hello'
  1521.  
  1522.         p = self.Process(target=self._double, args=(x, y, foo, arr, string))
  1523.         p.start()
  1524.         p.join()
  1525.  
  1526.         self.assertEqual(x.value, 14)
  1527.         self.assertAlmostEqual(y.value, 2.0/3.0)
  1528.         self.assertEqual(foo.x, 6)
  1529.         self.assertAlmostEqual(foo.y, 4.0)
  1530.         for i in range(10):
  1531.             self.assertAlmostEqual(arr[i], i*2)
  1532.         self.assertEqual(string.value, latin('hellohello'))
  1533.  
  1534.     def test_synchronize(self):
  1535.         self.test_sharedctypes(lock=True)
  1536.  
  1537.     def test_copy(self):
  1538.         if c_int is None:
  1539.             return
  1540.  
  1541.         foo = _Foo(2, 5.0)
  1542.         bar = copy(foo)
  1543.         foo.x = 0
  1544.         foo.y = 0
  1545.         self.assertEqual(bar.x, 2)
  1546.         self.assertAlmostEqual(bar.y, 5.0)
  1547.  
  1548. #
  1549. #
  1550. #
  1551.  
  1552. class _TestFinalize(BaseTestCase):
  1553.  
  1554.     ALLOWED_TYPES = ('processes',)
  1555.  
  1556.     def _test_finalize(self, conn):
  1557.         class Foo(object):
  1558.             pass
  1559.  
  1560.         a = Foo()
  1561.         util.Finalize(a, conn.send, args=('a',))
  1562.         del a           # triggers callback for a
  1563.  
  1564.         b = Foo()
  1565.         close_b = util.Finalize(b, conn.send, args=('b',))
  1566.         close_b()       # triggers callback for b
  1567.         close_b()       # does nothing because callback has already been called
  1568.         del b           # does nothing because callback has already been called
  1569.  
  1570.         c = Foo()
  1571.         util.Finalize(c, conn.send, args=('c',))
  1572.  
  1573.         d10 = Foo()
  1574.         util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
  1575.  
  1576.         d01 = Foo()
  1577.         util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
  1578.         d02 = Foo()
  1579.         util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
  1580.         d03 = Foo()
  1581.         util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
  1582.  
  1583.         util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
  1584.  
  1585.         util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
  1586.  
  1587.         # call mutliprocessing's cleanup function then exit process without
  1588.         # garbage collecting locals
  1589.         util._exit_function()
  1590.         conn.close()
  1591.         os._exit(0)
  1592.  
  1593.     def test_finalize(self):
  1594.         conn, child_conn = self.Pipe()
  1595.  
  1596.         p = self.Process(target=self._test_finalize, args=(child_conn,))
  1597.         p.start()
  1598.         p.join()
  1599.  
  1600.         result = [obj for obj in iter(conn.recv, 'STOP')]
  1601.         self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
  1602.  
  1603. #
  1604. # Test that from ... import * works for each module
  1605. #
  1606.  
  1607. class _TestImportStar(BaseTestCase):
  1608.  
  1609.     ALLOWED_TYPES = ('processes',)
  1610.  
  1611.     def test_import(self):
  1612.         modules = (
  1613.             'multiprocessing', 'multiprocessing.connection',
  1614.             'multiprocessing.heap', 'multiprocessing.managers',
  1615.             'multiprocessing.pool', 'multiprocessing.process',
  1616.             'multiprocessing.reduction', 'multiprocessing.sharedctypes',
  1617.             'multiprocessing.synchronize', 'multiprocessing.util'
  1618.             )
  1619.  
  1620.         for name in modules:
  1621.             __import__(name)
  1622.             mod = sys.modules[name]
  1623.  
  1624.             for attr in getattr(mod, '__all__', ()):
  1625.                 self.assertTrue(
  1626.                     hasattr(mod, attr),
  1627.                     '%r does not have attribute %r' % (mod, attr)
  1628.                     )
  1629.  
  1630. #
  1631. # Quick test that logging works -- does not test logging output
  1632. #
  1633.  
  1634. class _TestLogging(BaseTestCase):
  1635.  
  1636.     ALLOWED_TYPES = ('processes',)
  1637.  
  1638.     def test_enable_logging(self):
  1639.         logger = multiprocessing.get_logger()
  1640.         logger.setLevel(util.SUBWARNING)
  1641.         self.assertTrue(logger is not None)
  1642.         logger.debug('this will not be printed')
  1643.         logger.info('nor will this')
  1644.         logger.setLevel(LOG_LEVEL)
  1645.  
  1646.     def _test_level(self, conn):
  1647.         logger = multiprocessing.get_logger()
  1648.         conn.send(logger.getEffectiveLevel())
  1649.  
  1650.     def test_level(self):
  1651.         LEVEL1 = 32
  1652.         LEVEL2 = 37
  1653.  
  1654.         logger = multiprocessing.get_logger()
  1655.         root_logger = logging.getLogger()
  1656.         root_level = root_logger.level
  1657.  
  1658.         reader, writer = multiprocessing.Pipe(duplex=False)
  1659.  
  1660.         logger.setLevel(LEVEL1)
  1661.         self.Process(target=self._test_level, args=(writer,)).start()
  1662.         self.assertEqual(LEVEL1, reader.recv())
  1663.  
  1664.         logger.setLevel(logging.NOTSET)
  1665.         root_logger.setLevel(LEVEL2)
  1666.         self.Process(target=self._test_level, args=(writer,)).start()
  1667.         self.assertEqual(LEVEL2, reader.recv())
  1668.  
  1669.         root_logger.setLevel(root_level)
  1670.         logger.setLevel(level=LOG_LEVEL)
  1671.  
  1672. #
  1673. # Functions used to create test cases from the base ones in this module
  1674. #
  1675.  
  1676. def get_attributes(Source, names):
  1677.     d = {}
  1678.     for name in names:
  1679.         obj = getattr(Source, name)
  1680.         if type(obj) == type(get_attributes):
  1681.             obj = staticmethod(obj)
  1682.         d[name] = obj
  1683.     return d
  1684.  
  1685. def create_test_cases(Mixin, type):
  1686.     result = {}
  1687.     glob = globals()
  1688.     Type = type[0].upper() + type[1:]
  1689.  
  1690.     for name in glob.keys():
  1691.         if name.startswith('_Test'):
  1692.             base = glob[name]
  1693.             if type in base.ALLOWED_TYPES:
  1694.                 newname = 'With' + Type + name[1:]
  1695.                 class Temp(base, unittest.TestCase, Mixin):
  1696.                     pass
  1697.                 result[newname] = Temp
  1698.                 Temp.__name__ = newname
  1699.                 Temp.__module__ = Mixin.__module__
  1700.     return result
  1701.  
  1702. #
  1703. # Create test cases
  1704. #
  1705.  
  1706. class ProcessesMixin(object):
  1707.     TYPE = 'processes'
  1708.     Process = multiprocessing.Process
  1709.     locals().update(get_attributes(multiprocessing, (
  1710.         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1711.         'Condition', 'Event', 'Value', 'Array', 'RawValue',
  1712.         'RawArray', 'current_process', 'active_children', 'Pipe',
  1713.         'connection', 'JoinableQueue'
  1714.         )))
  1715.  
  1716. testcases_processes = create_test_cases(ProcessesMixin, type='processes')
  1717. globals().update(testcases_processes)
  1718.  
  1719.  
  1720. class ManagerMixin(object):
  1721.     TYPE = 'manager'
  1722.     Process = multiprocessing.Process
  1723.     manager = object.__new__(multiprocessing.managers.SyncManager)
  1724.     locals().update(get_attributes(manager, (
  1725.         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1726.        'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
  1727.         'Namespace', 'JoinableQueue'
  1728.         )))
  1729.  
  1730. testcases_manager = create_test_cases(ManagerMixin, type='manager')
  1731. globals().update(testcases_manager)
  1732.  
  1733.  
  1734. class ThreadsMixin(object):
  1735.     TYPE = 'threads'
  1736.     Process = multiprocessing.dummy.Process
  1737.     locals().update(get_attributes(multiprocessing.dummy, (
  1738.         'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1739.         'Condition', 'Event', 'Value', 'Array', 'current_process',
  1740.         'active_children', 'Pipe', 'connection', 'dict', 'list',
  1741.         'Namespace', 'JoinableQueue'
  1742.         )))
  1743.  
  1744. testcases_threads = create_test_cases(ThreadsMixin, type='threads')
  1745. globals().update(testcases_threads)
  1746.  
  1747. class OtherTest(unittest.TestCase):
  1748.     # TODO: add more tests for deliver/answer challenge.
  1749.     def test_deliver_challenge_auth_failure(self):
  1750.         class _FakeConnection(object):
  1751.             def recv_bytes(self, size):
  1752.                 return b'something bogus'
  1753.             def send_bytes(self, data):
  1754.                 pass
  1755.         self.assertRaises(multiprocessing.AuthenticationError,
  1756.                           multiprocessing.connection.deliver_challenge,
  1757.                           _FakeConnection(), b'abc')
  1758.  
  1759.     def test_answer_challenge_auth_failure(self):
  1760.         class _FakeConnection(object):
  1761.             def __init__(self):
  1762.                 self.count = 0
  1763.             def recv_bytes(self, size):
  1764.                 self.count += 1
  1765.                 if self.count == 1:
  1766.                     return multiprocessing.connection.CHALLENGE
  1767.                 elif self.count == 2:
  1768.                     return b'something bogus'
  1769.                 return b''
  1770.             def send_bytes(self, data):
  1771.                 pass
  1772.         self.assertRaises(multiprocessing.AuthenticationError,
  1773.                           multiprocessing.connection.answer_challenge,
  1774.                           _FakeConnection(), b'abc')
  1775.  
  1776. testcases_other = [OtherTest]
  1777.  
  1778. #
  1779. #
  1780. #
  1781.  
  1782. def test_main(run=None):
  1783.     if sys.platform.startswith("linux"):
  1784.         try:
  1785.             lock = multiprocessing.RLock()
  1786.         except OSError:
  1787.             from test.test_support import TestSkipped
  1788.             raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
  1789.  
  1790.     if run is None:
  1791.         from test.test_support import run_unittest as run
  1792.  
  1793.     util.get_temp_dir()     # creates temp directory for use by all processes
  1794.  
  1795.     multiprocessing.get_logger().setLevel(LOG_LEVEL)
  1796.  
  1797.     ProcessesMixin.pool = multiprocessing.Pool(4)
  1798.     ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
  1799.     ManagerMixin.manager.__init__()
  1800.     ManagerMixin.manager.start()
  1801.     ManagerMixin.pool = ManagerMixin.manager.Pool(4)
  1802.  
  1803.     testcases = (
  1804.         sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
  1805.         sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
  1806.         sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
  1807.         testcases_other
  1808.         )
  1809.  
  1810.     loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
  1811.     suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
  1812.     run(suite)
  1813.  
  1814.     ThreadsMixin.pool.terminate()
  1815.     ProcessesMixin.pool.terminate()
  1816.     ManagerMixin.pool.terminate()
  1817.     ManagerMixin.manager.shutdown()
  1818.  
  1819.     del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
  1820.  
  1821. def main():
  1822.     test_main(unittest.TextTestRunner(verbosity=2).run)
  1823.  
  1824. if __name__ == '__main__':
  1825.     main()
  1826.