home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_asynchat.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  8.0 KB  |  256 lines

  1. # test asynchat -- requires threading
  2.  
  3. import thread # If this fails, we can't test this module
  4. import asyncore, asynchat, socket, threading, time
  5. import unittest
  6. import sys
  7. from test import test_support
  8.  
  9. HOST = test_support.HOST
  10. SERVER_QUIT = 'QUIT\n'
  11.  
  12. class echo_server(threading.Thread):
  13.     # parameter to determine the number of bytes passed back to the
  14.     # client each send
  15.     chunk_size = 1
  16.  
  17.     def __init__(self, event):
  18.         threading.Thread.__init__(self)
  19.         self.event = event
  20.         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  21.         self.port = test_support.bind_port(self.sock)
  22.  
  23.     def run(self):
  24.         self.sock.listen(1)
  25.         self.event.set()
  26.         conn, client = self.sock.accept()
  27.         self.buffer = ""
  28.         # collect data until quit message is seen
  29.         while SERVER_QUIT not in self.buffer:
  30.             data = conn.recv(1)
  31.             if not data:
  32.                 break
  33.             self.buffer = self.buffer + data
  34.  
  35.         # remove the SERVER_QUIT message
  36.         self.buffer = self.buffer.replace(SERVER_QUIT, '')
  37.  
  38.         # re-send entire set of collected data
  39.         try:
  40.             # this may fail on some tests, such as test_close_when_done, since
  41.             # the client closes the channel when it's done sending
  42.             while self.buffer:
  43.                 n = conn.send(self.buffer[:self.chunk_size])
  44.                 time.sleep(0.001)
  45.                 self.buffer = self.buffer[n:]
  46.         except:
  47.             pass
  48.  
  49.         conn.close()
  50.         self.sock.close()
  51.  
  52. class echo_client(asynchat.async_chat):
  53.  
  54.     def __init__(self, terminator, server_port):
  55.         asynchat.async_chat.__init__(self)
  56.         self.contents = []
  57.         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  58.         self.connect((HOST, server_port))
  59.         self.set_terminator(terminator)
  60.         self.buffer = ''
  61.  
  62.     def handle_connect(self):
  63.         pass
  64.  
  65.     if sys.platform == 'darwin':
  66.         # select.poll returns a select.POLLHUP at the end of the tests
  67.         # on darwin, so just ignore it
  68.         def handle_expt(self):
  69.             pass
  70.  
  71.     def collect_incoming_data(self, data):
  72.         self.buffer += data
  73.  
  74.     def found_terminator(self):
  75.         self.contents.append(self.buffer)
  76.         self.buffer = ""
  77.  
  78.  
  79. def start_echo_server():
  80.     event = threading.Event()
  81.     s = echo_server(event)
  82.     s.start()
  83.     event.wait()
  84.     event.clear()
  85.     time.sleep(0.01) # Give server time to start accepting.
  86.     return s, event
  87.  
  88.  
  89. class TestAsynchat(unittest.TestCase):
  90.     usepoll = False
  91.  
  92.     def setUp (self):
  93.         pass
  94.  
  95.     def tearDown (self):
  96.         pass
  97.  
  98.     def line_terminator_check(self, term, server_chunk):
  99.         event = threading.Event()
  100.         s = echo_server(event)
  101.         s.chunk_size = server_chunk
  102.         s.start()
  103.         event.wait()
  104.         event.clear()
  105.         time.sleep(0.01) # Give server time to start accepting.
  106.         c = echo_client(term, s.port)
  107.         c.push("hello ")
  108.         c.push("world%s" % term)
  109.         c.push("I'm not dead yet!%s" % term)
  110.         c.push(SERVER_QUIT)
  111.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  112.         s.join()
  113.  
  114.         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  115.  
  116.     # the line terminator tests below check receiving variously-sized
  117.     # chunks back from the server in order to exercise all branches of
  118.     # async_chat.handle_read
  119.  
  120.     def test_line_terminator1(self):
  121.         # test one-character terminator
  122.         for l in (1,2,3):
  123.             self.line_terminator_check('\n', l)
  124.  
  125.     def test_line_terminator2(self):
  126.         # test two-character terminator
  127.         for l in (1,2,3):
  128.             self.line_terminator_check('\r\n', l)
  129.  
  130.     def test_line_terminator3(self):
  131.         # test three-character terminator
  132.         for l in (1,2,3):
  133.             self.line_terminator_check('qqq', l)
  134.  
  135.     def numeric_terminator_check(self, termlen):
  136.         # Try reading a fixed number of bytes
  137.         s, event = start_echo_server()
  138.         c = echo_client(termlen, s.port)
  139.         data = "hello world, I'm not dead yet!\n"
  140.         c.push(data)
  141.         c.push(SERVER_QUIT)
  142.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  143.         s.join()
  144.  
  145.         self.assertEqual(c.contents, [data[:termlen]])
  146.  
  147.     def test_numeric_terminator1(self):
  148.         # check that ints & longs both work (since type is
  149.         # explicitly checked in async_chat.handle_read)
  150.         self.numeric_terminator_check(1)
  151.         self.numeric_terminator_check(1L)
  152.  
  153.     def test_numeric_terminator2(self):
  154.         self.numeric_terminator_check(6L)
  155.  
  156.     def test_none_terminator(self):
  157.         # Try reading a fixed number of bytes
  158.         s, event = start_echo_server()
  159.         c = echo_client(None, s.port)
  160.         data = "hello world, I'm not dead yet!\n"
  161.         c.push(data)
  162.         c.push(SERVER_QUIT)
  163.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  164.         s.join()
  165.  
  166.         self.assertEqual(c.contents, [])
  167.         self.assertEqual(c.buffer, data)
  168.  
  169.     def test_simple_producer(self):
  170.         s, event = start_echo_server()
  171.         c = echo_client('\n', s.port)
  172.         data = "hello world\nI'm not dead yet!\n"
  173.         p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
  174.         c.push_with_producer(p)
  175.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  176.         s.join()
  177.  
  178.         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  179.  
  180.     def test_string_producer(self):
  181.         s, event = start_echo_server()
  182.         c = echo_client('\n', s.port)
  183.         data = "hello world\nI'm not dead yet!\n"
  184.         c.push_with_producer(data+SERVER_QUIT)
  185.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  186.         s.join()
  187.  
  188.         self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  189.  
  190.     def test_empty_line(self):
  191.         # checks that empty lines are handled correctly
  192.         s, event = start_echo_server()
  193.         c = echo_client('\n', s.port)
  194.         c.push("hello world\n\nI'm not dead yet!\n")
  195.         c.push(SERVER_QUIT)
  196.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  197.         s.join()
  198.  
  199.         self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
  200.  
  201.     def test_close_when_done(self):
  202.         s, event = start_echo_server()
  203.         c = echo_client('\n', s.port)
  204.         c.push("hello world\nI'm not dead yet!\n")
  205.         c.push(SERVER_QUIT)
  206.         c.close_when_done()
  207.         asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  208.         s.join()
  209.  
  210.         self.assertEqual(c.contents, [])
  211.         # the server might have been able to send a byte or two back, but this
  212.         # at least checks that it received something and didn't just fail
  213.         # (which could still result in the client not having received anything)
  214.         self.assertTrue(len(s.buffer) > 0)
  215.  
  216.  
  217. class TestAsynchat_WithPoll(TestAsynchat):
  218.     usepoll = True
  219.  
  220. class TestHelperFunctions(unittest.TestCase):
  221.     def test_find_prefix_at_end(self):
  222.         self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
  223.         self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
  224.  
  225. class TestFifo(unittest.TestCase):
  226.     def test_basic(self):
  227.         f = asynchat.fifo()
  228.         f.push(7)
  229.         f.push('a')
  230.         self.assertEqual(len(f), 2)
  231.         self.assertEqual(f.first(), 7)
  232.         self.assertEqual(f.pop(), (1, 7))
  233.         self.assertEqual(len(f), 1)
  234.         self.assertEqual(f.first(), 'a')
  235.         self.assertEqual(f.is_empty(), False)
  236.         self.assertEqual(f.pop(), (1, 'a'))
  237.         self.assertEqual(len(f), 0)
  238.         self.assertEqual(f.is_empty(), True)
  239.         self.assertEqual(f.pop(), (0, None))
  240.  
  241.     def test_given_list(self):
  242.         f = asynchat.fifo(['x', 17, 3])
  243.         self.assertEqual(len(f), 3)
  244.         self.assertEqual(f.pop(), (1, 'x'))
  245.         self.assertEqual(f.pop(), (1, 17))
  246.         self.assertEqual(f.pop(), (1, 3))
  247.         self.assertEqual(f.pop(), (0, None))
  248.  
  249.  
  250. def test_main(verbose=None):
  251.     test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
  252.                               TestHelperFunctions, TestFifo)
  253.  
  254. if __name__ == "__main__":
  255.     test_main(verbose=True)
  256.