home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_kqueue.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  5.8 KB  |  167 lines

  1. """
  2. Tests for kqueue wrapper.
  3. """
  4. import socket
  5. import errno
  6. import time
  7. import select
  8. import sys
  9. import unittest
  10.  
  11. from test import test_support
  12. if not hasattr(select, "kqueue"):
  13.     raise test_support.TestSkipped("test works only on BSD")
  14.  
  15. class TestKQueue(unittest.TestCase):
  16.     def test_create_queue(self):
  17.         kq = select.kqueue()
  18.         self.assert_(kq.fileno() > 0, kq.fileno())
  19.         self.assert_(not kq.closed)
  20.         kq.close()
  21.         self.assert_(kq.closed)
  22.         self.assertRaises(ValueError, kq.fileno)
  23.  
  24.     def test_create_event(self):
  25.         fd = sys.stderr.fileno()
  26.         ev = select.kevent(fd)
  27.         other = select.kevent(1000)
  28.         self.assertEqual(ev.ident, fd)
  29.         self.assertEqual(ev.filter, select.KQ_FILTER_READ)
  30.         self.assertEqual(ev.flags, select.KQ_EV_ADD)
  31.         self.assertEqual(ev.fflags, 0)
  32.         self.assertEqual(ev.data, 0)
  33.         self.assertEqual(ev.udata, 0)
  34.         self.assertEqual(ev, ev)
  35.         self.assertNotEqual(ev, other)
  36.         self.assertEqual(cmp(ev, other), -1)
  37.         self.assert_(ev < other)
  38.         self.assert_(other >= ev)
  39.         self.assertRaises(TypeError, cmp, ev, None)
  40.         self.assertRaises(TypeError, cmp, ev, 1)
  41.         self.assertRaises(TypeError, cmp, ev, "ev")
  42.  
  43.         ev = select.kevent(fd, select.KQ_FILTER_WRITE)
  44.         self.assertEqual(ev.ident, fd)
  45.         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  46.         self.assertEqual(ev.flags, select.KQ_EV_ADD)
  47.         self.assertEqual(ev.fflags, 0)
  48.         self.assertEqual(ev.data, 0)
  49.         self.assertEqual(ev.udata, 0)
  50.         self.assertEqual(ev, ev)
  51.         self.assertNotEqual(ev, other)
  52.  
  53.         ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
  54.         self.assertEqual(ev.ident, fd)
  55.         self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  56.         self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
  57.         self.assertEqual(ev.fflags, 0)
  58.         self.assertEqual(ev.data, 0)
  59.         self.assertEqual(ev.udata, 0)
  60.         self.assertEqual(ev, ev)
  61.         self.assertNotEqual(ev, other)
  62.  
  63.         ev = select.kevent(1, 2, 3, 4, 5, 6)
  64.         self.assertEqual(ev.ident, 1)
  65.         self.assertEqual(ev.filter, 2)
  66.         self.assertEqual(ev.flags, 3)
  67.         self.assertEqual(ev.fflags, 4)
  68.         self.assertEqual(ev.data, 5)
  69.         self.assertEqual(ev.udata, 6)
  70.         self.assertEqual(ev, ev)
  71.         self.assertNotEqual(ev, other)
  72.  
  73.     def test_queue_event(self):
  74.         serverSocket = socket.socket()
  75.         serverSocket.bind(('127.0.0.1', 0))
  76.         serverSocket.listen(1)
  77.         client = socket.socket()
  78.         client.setblocking(False)
  79.         try:
  80.             client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
  81.         except socket.error, e:
  82.             self.assertEquals(e.args[0], errno.EINPROGRESS)
  83.         else:
  84.             #raise AssertionError("Connect should have raised EINPROGRESS")
  85.             pass # FreeBSD doesn't raise an exception here
  86.         server, addr = serverSocket.accept()
  87.  
  88.         if sys.platform.startswith("darwin"):
  89.             flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
  90.         else:
  91.             flags = 0
  92.  
  93.         kq = select.kqueue()
  94.         kq2 = select.kqueue.fromfd(kq.fileno())
  95.  
  96.         ev = select.kevent(server.fileno(),
  97.                            select.KQ_FILTER_WRITE,
  98.                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  99.         kq.control([ev], 0)
  100.         ev = select.kevent(server.fileno(),
  101.                            select.KQ_FILTER_READ,
  102.                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  103.         kq.control([ev], 0)
  104.         ev = select.kevent(client.fileno(),
  105.                            select.KQ_FILTER_WRITE,
  106.                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  107.         kq2.control([ev], 0)
  108.         ev = select.kevent(client.fileno(),
  109.                            select.KQ_FILTER_READ,
  110.                            select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  111.         kq2.control([ev], 0)
  112.  
  113.         events = kq.control(None, 4, 1)
  114.         events = [(e.ident, e.filter, e.flags) for e in events]
  115.         events.sort()
  116.         self.assertEquals(events, [
  117.             (client.fileno(), select.KQ_FILTER_WRITE, flags),
  118.             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
  119.  
  120.         client.send("Hello!")
  121.         server.send("world!!!")
  122.  
  123.         events = kq.control(None, 4, 1)
  124.         # We may need to call it several times
  125.         for i in range(5):
  126.             if len(events) == 4:
  127.                 break
  128.             events = kq.control(None, 4, 1)
  129.         events = [(e.ident, e.filter, e.flags) for e in events]
  130.         events.sort()
  131.  
  132.         self.assertEquals(events, [
  133.             (client.fileno(), select.KQ_FILTER_WRITE, flags),
  134.             (client.fileno(), select.KQ_FILTER_READ, flags),
  135.             (server.fileno(), select.KQ_FILTER_WRITE, flags),
  136.             (server.fileno(), select.KQ_FILTER_READ, flags)])
  137.  
  138.         # Remove completely client, and server read part
  139.         ev = select.kevent(client.fileno(),
  140.                            select.KQ_FILTER_WRITE,
  141.                            select.KQ_EV_DELETE)
  142.         kq.control([ev], 0)
  143.         ev = select.kevent(client.fileno(),
  144.                            select.KQ_FILTER_READ,
  145.                            select.KQ_EV_DELETE)
  146.         kq.control([ev], 0)
  147.         ev = select.kevent(server.fileno(),
  148.                            select.KQ_FILTER_READ,
  149.                            select.KQ_EV_DELETE)
  150.         kq.control([ev], 0, 0)
  151.  
  152.         events = kq.control([], 4, 0.99)
  153.         events = [(e.ident, e.filter, e.flags) for e in events]
  154.         events.sort()
  155.         self.assertEquals(events, [
  156.             (server.fileno(), select.KQ_FILTER_WRITE, flags)])
  157.  
  158.         client.close()
  159.         server.close()
  160.         serverSocket.close()
  161.  
  162. def test_main():
  163.     test_support.run_unittest(TestKQueue)
  164.  
  165. if __name__ == "__main__":
  166.     test_main()
  167.