home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_pty.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  7.3 KB  |  197 lines

  1. import errno
  2. import fcntl
  3. import pty
  4. import os
  5. import sys
  6. import signal
  7. from test.test_support import verbose, TestSkipped, run_unittest
  8. import unittest
  9.  
  10. TEST_STRING_1 = "I wish to buy a fish license.\n"
  11. TEST_STRING_2 = "For my pet fish, Eric.\n"
  12.  
  13. if verbose:
  14.     def debug(msg):
  15.         print msg
  16. else:
  17.     def debug(msg):
  18.         pass
  19.  
  20.  
  21. def normalize_output(data):
  22.     # Some operating systems do conversions on newline.  We could possibly
  23.     # fix that by doing the appropriate termios.tcsetattr()s.  I couldn't
  24.     # figure out the right combo on Tru64 and I don't have an IRIX box.
  25.     # So just normalize the output and doc the problem O/Ses by allowing
  26.     # certain combinations for some platforms, but avoid allowing other
  27.     # differences (like extra whitespace, trailing garbage, etc.)
  28.  
  29.     # This is about the best we can do without getting some feedback
  30.     # from someone more knowledgable.
  31.  
  32.     # OSF/1 (Tru64) apparently turns \n into \r\r\n.
  33.     if data.endswith('\r\r\n'):
  34.         return data.replace('\r\r\n', '\n')
  35.  
  36.     # IRIX apparently turns \n into \r\n.
  37.     if data.endswith('\r\n'):
  38.         return data.replace('\r\n', '\n')
  39.  
  40.     return data
  41.  
  42.  
  43. # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
  44. # because pty code is not too portable.
  45. # XXX(nnorwitz):  these tests leak fds when there is an error.
  46. class PtyTest(unittest.TestCase):
  47.     def setUp(self):
  48.         # isatty() and close() can hang on some platforms.  Set an alarm
  49.         # before running the test to make sure we don't hang forever.
  50.         self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
  51.         signal.alarm(10)
  52.  
  53.     def tearDown(self):
  54.         # remove alarm, restore old alarm handler
  55.         signal.alarm(0)
  56.         signal.signal(signal.SIGALRM, self.old_alarm)
  57.  
  58.     def handle_sig(self, sig, frame):
  59.         self.fail("isatty hung")
  60.  
  61.     def test_basic(self):
  62.         try:
  63.             debug("Calling master_open()")
  64.             master_fd, slave_name = pty.master_open()
  65.             debug("Got master_fd '%d', slave_name '%s'" %
  66.                   (master_fd, slave_name))
  67.             debug("Calling slave_open(%r)" % (slave_name,))
  68.             slave_fd = pty.slave_open(slave_name)
  69.             debug("Got slave_fd '%d'" % slave_fd)
  70.         except OSError:
  71.             # " An optional feature could not be imported " ... ?
  72.             raise TestSkipped, "Pseudo-terminals (seemingly) not functional."
  73.  
  74.         self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
  75.  
  76.         # Solaris requires reading the fd before anything is returned.
  77.         # My guess is that since we open and close the slave fd
  78.         # in master_open(), we need to read the EOF.
  79.  
  80.         # Ensure the fd is non-blocking in case there's nothing to read.
  81.         orig_flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
  82.         fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags | os.O_NONBLOCK)
  83.         try:
  84.             s1 = os.read(master_fd, 1024)
  85.             self.assertEquals('', s1)
  86.         except OSError, e:
  87.             if e.errno != errno.EAGAIN:
  88.                 raise
  89.         # Restore the original flags.
  90.         fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags)
  91.  
  92.         debug("Writing to slave_fd")
  93.         os.write(slave_fd, TEST_STRING_1)
  94.         s1 = os.read(master_fd, 1024)
  95.         self.assertEquals('I wish to buy a fish license.\n',
  96.                           normalize_output(s1))
  97.  
  98.         debug("Writing chunked output")
  99.         os.write(slave_fd, TEST_STRING_2[:5])
  100.         os.write(slave_fd, TEST_STRING_2[5:])
  101.         s2 = os.read(master_fd, 1024)
  102.         self.assertEquals('For my pet fish, Eric.\n', normalize_output(s2))
  103.  
  104.         os.close(slave_fd)
  105.         os.close(master_fd)
  106.  
  107.  
  108.     def test_fork(self):
  109.         debug("calling pty.fork()")
  110.         pid, master_fd = pty.fork()
  111.         if pid == pty.CHILD:
  112.             # stdout should be connected to a tty.
  113.             if not os.isatty(1):
  114.                 debug("Child's fd 1 is not a tty?!")
  115.                 os._exit(3)
  116.  
  117.             # After pty.fork(), the child should already be a session leader.
  118.             # (on those systems that have that concept.)
  119.             debug("In child, calling os.setsid()")
  120.             try:
  121.                 os.setsid()
  122.             except OSError:
  123.                 # Good, we already were session leader
  124.                 debug("Good: OSError was raised.")
  125.                 pass
  126.             except AttributeError:
  127.                 # Have pty, but not setsid()?
  128.                 debug("No setsid() available?")
  129.                 pass
  130.             except:
  131.                 # We don't want this error to propagate, escaping the call to
  132.                 # os._exit() and causing very peculiar behavior in the calling
  133.                 # regrtest.py !
  134.                 # Note: could add traceback printing here.
  135.                 debug("An unexpected error was raised.")
  136.                 os._exit(1)
  137.             else:
  138.                 debug("os.setsid() succeeded! (bad!)")
  139.                 os._exit(2)
  140.             os._exit(4)
  141.         else:
  142.             debug("Waiting for child (%d) to finish." % pid)
  143.             # In verbose mode, we have to consume the debug output from the
  144.             # child or the child will block, causing this test to hang in the
  145.             # parent's waitpid() call.  The child blocks after a
  146.             # platform-dependent amount of data is written to its fd.  On
  147.             # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
  148.             # X even the small writes in the child above will block it.  Also
  149.             # on Linux, the read() will throw an OSError (input/output error)
  150.             # when it tries to read past the end of the buffer but the child's
  151.             # already exited, so catch and discard those exceptions.  It's not
  152.             # worth checking for EIO.
  153.             while True:
  154.                 try:
  155.                     data = os.read(master_fd, 80)
  156.                 except OSError:
  157.                     break
  158.                 if not data:
  159.                     break
  160.                 sys.stdout.write(data.replace('\r\n', '\n'))
  161.  
  162.             ##line = os.read(master_fd, 80)
  163.             ##lines = line.replace('\r\n', '\n').split('\n')
  164.             ##if False and lines != ['In child, calling os.setsid()',
  165.             ##             'Good: OSError was raised.', '']:
  166.             ##    raise TestFailed("Unexpected output from child: %r" % line)
  167.  
  168.             (pid, status) = os.waitpid(pid, 0)
  169.             res = status >> 8
  170.             debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
  171.             if res == 1:
  172.                 self.fail("Child raised an unexpected exception in os.setsid()")
  173.             elif res == 2:
  174.                 self.fail("pty.fork() failed to make child a session leader.")
  175.             elif res == 3:
  176.                 self.fail("Child spawned by pty.fork() did not have a tty as stdout")
  177.             elif res != 4:
  178.                 self.fail("pty.fork() failed for unknown reasons.")
  179.  
  180.             ##debug("Reading from master_fd now that the child has exited")
  181.             ##try:
  182.             ##    s1 = os.read(master_fd, 1024)
  183.             ##except os.error:
  184.             ##    pass
  185.             ##else:
  186.             ##    raise TestFailed("Read from master_fd did not raise exception")
  187.  
  188.         os.close(master_fd)
  189.  
  190.         # pty.fork() passed.
  191.  
  192. def test_main(verbose=None):
  193.     run_unittest(PtyTest)
  194.  
  195. if __name__ == "__main__":
  196.     test_main()
  197.