home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_poll.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  4.4 KB  |  158 lines

  1. # Test case for the os.poll() function
  2.  
  3. import os, select, random, unittest
  4. from test.test_support import TestSkipped, TESTFN, run_unittest
  5.  
  6. try:
  7.     select.poll
  8. except AttributeError:
  9.     raise TestSkipped, "select.poll not defined -- skipping test_poll"
  10.  
  11.  
  12. def find_ready_matching(ready, flag):
  13.     match = []
  14.     for fd, mode in ready:
  15.         if mode & flag:
  16.             match.append(fd)
  17.     return match
  18.  
  19. class PollTests(unittest.TestCase):
  20.  
  21.     def test_poll1(self):
  22.         # Basic functional test of poll object
  23.         # Create a bunch of pipe and test that poll works with them.
  24.  
  25.         p = select.poll()
  26.  
  27.         NUM_PIPES = 12
  28.         MSG = " This is a test."
  29.         MSG_LEN = len(MSG)
  30.         readers = []
  31.         writers = []
  32.         r2w = {}
  33.         w2r = {}
  34.  
  35.         for i in range(NUM_PIPES):
  36.             rd, wr = os.pipe()
  37.             p.register(rd)
  38.             p.modify(rd, select.POLLIN)
  39.             p.register(wr, select.POLLOUT)
  40.             readers.append(rd)
  41.             writers.append(wr)
  42.             r2w[rd] = wr
  43.             w2r[wr] = rd
  44.  
  45.         bufs = []
  46.  
  47.         while writers:
  48.             ready = p.poll()
  49.             ready_writers = find_ready_matching(ready, select.POLLOUT)
  50.             if not ready_writers:
  51.                 raise RuntimeError, "no pipes ready for writing"
  52.             wr = random.choice(ready_writers)
  53.             os.write(wr, MSG)
  54.  
  55.             ready = p.poll()
  56.             ready_readers = find_ready_matching(ready, select.POLLIN)
  57.             if not ready_readers:
  58.                 raise RuntimeError, "no pipes ready for reading"
  59.             rd = random.choice(ready_readers)
  60.             buf = os.read(rd, MSG_LEN)
  61.             self.assertEqual(len(buf), MSG_LEN)
  62.             bufs.append(buf)
  63.             os.close(r2w[rd]) ; os.close( rd )
  64.             p.unregister( r2w[rd] )
  65.             p.unregister( rd )
  66.             writers.remove(r2w[rd])
  67.  
  68.         self.assertEqual(bufs, [MSG] * NUM_PIPES)
  69.  
  70.     def poll_unit_tests(self):
  71.         # returns NVAL for invalid file descriptor
  72.         FD = 42
  73.         try:
  74.             os.close(FD)
  75.         except OSError:
  76.             pass
  77.         p = select.poll()
  78.         p.register(FD)
  79.         r = p.poll()
  80.         self.assertEqual(r[0], (FD, select.POLLNVAL))
  81.  
  82.         f = open(TESTFN, 'w')
  83.         fd = f.fileno()
  84.         p = select.poll()
  85.         p.register(f)
  86.         r = p.poll()
  87.         self.assertEqual(r[0][0], fd)
  88.         f.close()
  89.         r = p.poll()
  90.         self.assertEqual(r[0], (fd, select.POLLNVAL))
  91.         os.unlink(TESTFN)
  92.  
  93.         # type error for invalid arguments
  94.         p = select.poll()
  95.         self.assertRaises(TypeError, p.register, p)
  96.         self.assertRaises(TypeError, p.unregister, p)
  97.  
  98.         # can't unregister non-existent object
  99.         p = select.poll()
  100.         self.assertRaises(KeyError, p.unregister, 3)
  101.  
  102.         # Test error cases
  103.         pollster = select.poll()
  104.         class Nope:
  105.             pass
  106.  
  107.         class Almost:
  108.             def fileno(self):
  109.                 return 'fileno'
  110.  
  111.         self.assertRaises(TypeError, pollster.register, Nope(), 0)
  112.         self.assertRaises(TypeError, pollster.register, Almost(), 0)
  113.  
  114.     # Another test case for poll().  This is copied from the test case for
  115.     # select(), modified to use poll() instead.
  116.  
  117.     def test_poll2(self):
  118.         cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
  119.         p = os.popen(cmd, 'r')
  120.         pollster = select.poll()
  121.         pollster.register( p, select.POLLIN )
  122.         for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
  123.             fdlist = pollster.poll(tout)
  124.             if (fdlist == []):
  125.                 continue
  126.             fd, flags = fdlist[0]
  127.             if flags & select.POLLHUP:
  128.                 line = p.readline()
  129.                 if line != "":
  130.                     self.fail('error: pipe seems to be closed, but still returns data')
  131.                 continue
  132.  
  133.             elif flags & select.POLLIN:
  134.                 line = p.readline()
  135.                 if not line:
  136.                     break
  137.                 continue
  138.             else:
  139.                 self.fail('Unexpected return value from select.poll: %s' % fdlist)
  140.         p.close()
  141.  
  142.     def test_poll3(self):
  143.         # test int overflow
  144.         pollster = select.poll()
  145.         pollster.register(1)
  146.  
  147.         self.assertRaises(OverflowError, pollster.poll, 1L << 64)
  148.  
  149.         x = 2 + 3
  150.         if x != 5:
  151.             self.fail('Overflow must have occurred')
  152.  
  153. def test_main():
  154.     run_unittest(PollTests)
  155.  
  156. if __name__ == '__main__':
  157.     test_main()
  158.