home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_subprocess.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  28.7 KB  |  726 lines

  1. import unittest
  2. from test import test_support
  3. import subprocess
  4. import sys
  5. import signal
  6. import os
  7. import tempfile
  8. import time
  9. import re
  10.  
  11. mswindows = (sys.platform == "win32")
  12.  
  13. #
  14. # Depends on the following external programs: Python
  15. #
  16.  
  17. if mswindows:
  18.     SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
  19.                                                 'os.O_BINARY);')
  20. else:
  21.     SETBINARY = ''
  22.  
  23. # In a debug build, stuff like "[6580 refs]" is printed to stderr at
  24. # shutdown time.  That frustrates tests trying to check stderr produced
  25. # from a spawned Python process.
  26. def remove_stderr_debug_decorations(stderr):
  27.     return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
  28.  
  29. class ProcessTestCase(unittest.TestCase):
  30.     def setUp(self):
  31.         # Try to minimize the number of children we have so this test
  32.         # doesn't crash on some buildbots (Alphas in particular).
  33.         if hasattr(test_support, "reap_children"):
  34.             test_support.reap_children()
  35.  
  36.     def tearDown(self):
  37.         # Try to minimize the number of children we have so this test
  38.         # doesn't crash on some buildbots (Alphas in particular).
  39.         if hasattr(test_support, "reap_children"):
  40.             test_support.reap_children()
  41.  
  42.     def mkstemp(self):
  43.         """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
  44.         if hasattr(tempfile, "mkstemp"):
  45.             return tempfile.mkstemp()
  46.         else:
  47.             fname = tempfile.mktemp()
  48.             return os.open(fname, os.O_RDWR|os.O_CREAT), fname
  49.  
  50.     #
  51.     # Generic tests
  52.     #
  53.     def test_call_seq(self):
  54.         # call() function with sequence argument
  55.         rc = subprocess.call([sys.executable, "-c",
  56.                               "import sys; sys.exit(47)"])
  57.         self.assertEqual(rc, 47)
  58.  
  59.     def test_check_call_zero(self):
  60.         # check_call() function with zero return code
  61.         rc = subprocess.check_call([sys.executable, "-c",
  62.                                     "import sys; sys.exit(0)"])
  63.         self.assertEqual(rc, 0)
  64.  
  65.     def test_check_call_nonzero(self):
  66.         # check_call() function with non-zero return code
  67.         try:
  68.             subprocess.check_call([sys.executable, "-c",
  69.                                    "import sys; sys.exit(47)"])
  70.         except subprocess.CalledProcessError, e:
  71.             self.assertEqual(e.returncode, 47)
  72.         else:
  73.             self.fail("Expected CalledProcessError")
  74.  
  75.     def test_call_kwargs(self):
  76.         # call() function with keyword args
  77.         newenv = os.environ.copy()
  78.         newenv["FRUIT"] = "banana"
  79.         rc = subprocess.call([sys.executable, "-c",
  80.                           'import sys, os;' \
  81.                           'sys.exit(os.getenv("FRUIT")=="banana")'],
  82.                         env=newenv)
  83.         self.assertEqual(rc, 1)
  84.  
  85.     def test_stdin_none(self):
  86.         # .stdin is None when not redirected
  87.         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
  88.                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  89.         p.wait()
  90.         self.assertEqual(p.stdin, None)
  91.  
  92.     def test_stdout_none(self):
  93.         # .stdout is None when not redirected
  94.         p = subprocess.Popen([sys.executable, "-c",
  95.                              'print "    this bit of output is from a '
  96.                              'test of stdout in a different '
  97.                              'process ..."'],
  98.                              stdin=subprocess.PIPE, stderr=subprocess.PIPE)
  99.         p.wait()
  100.         self.assertEqual(p.stdout, None)
  101.  
  102.     def test_stderr_none(self):
  103.         # .stderr is None when not redirected
  104.         p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
  105.                          stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  106.         p.wait()
  107.         self.assertEqual(p.stderr, None)
  108.  
  109.     def test_executable(self):
  110.         p = subprocess.Popen(["somethingyoudonthave",
  111.                               "-c", "import sys; sys.exit(47)"],
  112.                              executable=sys.executable)
  113.         p.wait()
  114.         self.assertEqual(p.returncode, 47)
  115.  
  116.     def test_stdin_pipe(self):
  117.         # stdin redirection
  118.         p = subprocess.Popen([sys.executable, "-c",
  119.                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
  120.                         stdin=subprocess.PIPE)
  121.         p.stdin.write("pear")
  122.         p.stdin.close()
  123.         p.wait()
  124.         self.assertEqual(p.returncode, 1)
  125.  
  126.     def test_stdin_filedes(self):
  127.         # stdin is set to open file descriptor
  128.         tf = tempfile.TemporaryFile()
  129.         d = tf.fileno()
  130.         os.write(d, "pear")
  131.         os.lseek(d, 0, 0)
  132.         p = subprocess.Popen([sys.executable, "-c",
  133.                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
  134.                          stdin=d)
  135.         p.wait()
  136.         self.assertEqual(p.returncode, 1)
  137.  
  138.     def test_stdin_fileobj(self):
  139.         # stdin is set to open file object
  140.         tf = tempfile.TemporaryFile()
  141.         tf.write("pear")
  142.         tf.seek(0)
  143.         p = subprocess.Popen([sys.executable, "-c",
  144.                          'import sys; sys.exit(sys.stdin.read() == "pear")'],
  145.                          stdin=tf)
  146.         p.wait()
  147.         self.assertEqual(p.returncode, 1)
  148.  
  149.     def test_stdout_pipe(self):
  150.         # stdout redirection
  151.         p = subprocess.Popen([sys.executable, "-c",
  152.                           'import sys; sys.stdout.write("orange")'],
  153.                          stdout=subprocess.PIPE)
  154.         self.assertEqual(p.stdout.read(), "orange")
  155.  
  156.     def test_stdout_filedes(self):
  157.         # stdout is set to open file descriptor
  158.         tf = tempfile.TemporaryFile()
  159.         d = tf.fileno()
  160.         p = subprocess.Popen([sys.executable, "-c",
  161.                           'import sys; sys.stdout.write("orange")'],
  162.                          stdout=d)
  163.         p.wait()
  164.         os.lseek(d, 0, 0)
  165.         self.assertEqual(os.read(d, 1024), "orange")
  166.  
  167.     def test_stdout_fileobj(self):
  168.         # stdout is set to open file object
  169.         tf = tempfile.TemporaryFile()
  170.         p = subprocess.Popen([sys.executable, "-c",
  171.                           'import sys; sys.stdout.write("orange")'],
  172.                          stdout=tf)
  173.         p.wait()
  174.         tf.seek(0)
  175.         self.assertEqual(tf.read(), "orange")
  176.  
  177.     def test_stderr_pipe(self):
  178.         # stderr redirection
  179.         p = subprocess.Popen([sys.executable, "-c",
  180.                           'import sys; sys.stderr.write("strawberry")'],
  181.                          stderr=subprocess.PIPE)
  182.         self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
  183.                          "strawberry")
  184.  
  185.     def test_stderr_filedes(self):
  186.         # stderr is set to open file descriptor
  187.         tf = tempfile.TemporaryFile()
  188.         d = tf.fileno()
  189.         p = subprocess.Popen([sys.executable, "-c",
  190.                           'import sys; sys.stderr.write("strawberry")'],
  191.                          stderr=d)
  192.         p.wait()
  193.         os.lseek(d, 0, 0)
  194.         self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
  195.                          "strawberry")
  196.  
  197.     def test_stderr_fileobj(self):
  198.         # stderr is set to open file object
  199.         tf = tempfile.TemporaryFile()
  200.         p = subprocess.Popen([sys.executable, "-c",
  201.                           'import sys; sys.stderr.write("strawberry")'],
  202.                          stderr=tf)
  203.         p.wait()
  204.         tf.seek(0)
  205.         self.assertEqual(remove_stderr_debug_decorations(tf.read()),
  206.                          "strawberry")
  207.  
  208.     def test_stdout_stderr_pipe(self):
  209.         # capture stdout and stderr to the same pipe
  210.         p = subprocess.Popen([sys.executable, "-c",
  211.                           'import sys;' \
  212.                           'sys.stdout.write("apple");' \
  213.                           'sys.stdout.flush();' \
  214.                           'sys.stderr.write("orange")'],
  215.                          stdout=subprocess.PIPE,
  216.                          stderr=subprocess.STDOUT)
  217.         output = p.stdout.read()
  218.         stripped = remove_stderr_debug_decorations(output)
  219.         self.assertEqual(stripped, "appleorange")
  220.  
  221.     def test_stdout_stderr_file(self):
  222.         # capture stdout and stderr to the same open file
  223.         tf = tempfile.TemporaryFile()
  224.         p = subprocess.Popen([sys.executable, "-c",
  225.                           'import sys;' \
  226.                           'sys.stdout.write("apple");' \
  227.                           'sys.stdout.flush();' \
  228.                           'sys.stderr.write("orange")'],
  229.                          stdout=tf,
  230.                          stderr=tf)
  231.         p.wait()
  232.         tf.seek(0)
  233.         output = tf.read()
  234.         stripped = remove_stderr_debug_decorations(output)
  235.         self.assertEqual(stripped, "appleorange")
  236.  
  237.     def test_stdout_filedes_of_stdout(self):
  238.         # stdout is set to 1 (#1531862).
  239.         cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
  240.         rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
  241.         self.assertEquals(rc, 2)
  242.  
  243.     def test_cwd(self):
  244.         tmpdir = tempfile.gettempdir()
  245.         # We cannot use os.path.realpath to canonicalize the path,
  246.         # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
  247.         cwd = os.getcwd()
  248.         os.chdir(tmpdir)
  249.         tmpdir = os.getcwd()
  250.         os.chdir(cwd)
  251.         p = subprocess.Popen([sys.executable, "-c",
  252.                           'import sys,os;' \
  253.                           'sys.stdout.write(os.getcwd())'],
  254.                          stdout=subprocess.PIPE,
  255.                          cwd=tmpdir)
  256.         normcase = os.path.normcase
  257.         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
  258.  
  259.     def test_env(self):
  260.         newenv = os.environ.copy()
  261.         newenv["FRUIT"] = "orange"
  262.         p = subprocess.Popen([sys.executable, "-c",
  263.                           'import sys,os;' \
  264.                           'sys.stdout.write(os.getenv("FRUIT"))'],
  265.                          stdout=subprocess.PIPE,
  266.                          env=newenv)
  267.         self.assertEqual(p.stdout.read(), "orange")
  268.  
  269.     def test_communicate_stdin(self):
  270.         p = subprocess.Popen([sys.executable, "-c",
  271.                               'import sys; sys.exit(sys.stdin.read() == "pear")'],
  272.                              stdin=subprocess.PIPE)
  273.         p.communicate("pear")
  274.         self.assertEqual(p.returncode, 1)
  275.  
  276.     def test_communicate_stdout(self):
  277.         p = subprocess.Popen([sys.executable, "-c",
  278.                               'import sys; sys.stdout.write("pineapple")'],
  279.                              stdout=subprocess.PIPE)
  280.         (stdout, stderr) = p.communicate()
  281.         self.assertEqual(stdout, "pineapple")
  282.         self.assertEqual(stderr, None)
  283.  
  284.     def test_communicate_stderr(self):
  285.         p = subprocess.Popen([sys.executable, "-c",
  286.                               'import sys; sys.stderr.write("pineapple")'],
  287.                              stderr=subprocess.PIPE)
  288.         (stdout, stderr) = p.communicate()
  289.         self.assertEqual(stdout, None)
  290.         self.assertEqual(remove_stderr_debug_decorations(stderr), "pineapple")
  291.  
  292.     def test_communicate(self):
  293.         p = subprocess.Popen([sys.executable, "-c",
  294.                           'import sys,os;'
  295.                           'sys.stderr.write("pineapple");'
  296.                           'sys.stdout.write(sys.stdin.read())'],
  297.                          stdin=subprocess.PIPE,
  298.                          stdout=subprocess.PIPE,
  299.                          stderr=subprocess.PIPE)
  300.         (stdout, stderr) = p.communicate("banana")
  301.         self.assertEqual(stdout, "banana")
  302.         self.assertEqual(remove_stderr_debug_decorations(stderr),
  303.                          "pineapple")
  304.  
  305.     # This test is Linux specific for simplicity to at least have
  306.     # some coverage.  It is not a platform specific bug.
  307.     if os.path.isdir('/proc/%d/fd' % os.getpid()):
  308.         # Test for the fd leak reported in http://bugs.python.org/issue2791.
  309.         def test_communicate_pipe_fd_leak(self):
  310.             fd_directory = '/proc/%d/fd' % os.getpid()
  311.             num_fds_before_popen = len(os.listdir(fd_directory))
  312.             p = subprocess.Popen([sys.executable, '-c', 'print()'],
  313.                                  stdout=subprocess.PIPE)
  314.             p.communicate()
  315.             num_fds_after_communicate = len(os.listdir(fd_directory))
  316.             del p
  317.             num_fds_after_destruction = len(os.listdir(fd_directory))
  318.             self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
  319.             self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
  320.  
  321.     def test_communicate_returns(self):
  322.         # communicate() should return None if no redirection is active
  323.         p = subprocess.Popen([sys.executable, "-c",
  324.                               "import sys; sys.exit(47)"])
  325.         (stdout, stderr) = p.communicate()
  326.         self.assertEqual(stdout, None)
  327.         self.assertEqual(stderr, None)
  328.  
  329.     def test_communicate_pipe_buf(self):
  330.         # communicate() with writes larger than pipe_buf
  331.         # This test will probably deadlock rather than fail, if
  332.         # communicate() does not work properly.
  333.         x, y = os.pipe()
  334.         if mswindows:
  335.             pipe_buf = 512
  336.         else:
  337.             pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
  338.         os.close(x)
  339.         os.close(y)
  340.         p = subprocess.Popen([sys.executable, "-c",
  341.                           'import sys,os;'
  342.                           'sys.stdout.write(sys.stdin.read(47));' \
  343.                           'sys.stderr.write("xyz"*%d);' \
  344.                           'sys.stdout.write(sys.stdin.read())' % pipe_buf],
  345.                          stdin=subprocess.PIPE,
  346.                          stdout=subprocess.PIPE,
  347.                          stderr=subprocess.PIPE)
  348.         string_to_write = "abc"*pipe_buf
  349.         (stdout, stderr) = p.communicate(string_to_write)
  350.         self.assertEqual(stdout, string_to_write)
  351.  
  352.     def test_writes_before_communicate(self):
  353.         # stdin.write before communicate()
  354.         p = subprocess.Popen([sys.executable, "-c",
  355.                           'import sys,os;' \
  356.                           'sys.stdout.write(sys.stdin.read())'],
  357.                          stdin=subprocess.PIPE,
  358.                          stdout=subprocess.PIPE,
  359.                          stderr=subprocess.PIPE)
  360.         p.stdin.write("banana")
  361.         (stdout, stderr) = p.communicate("split")
  362.         self.assertEqual(stdout, "bananasplit")
  363.         self.assertEqual(remove_stderr_debug_decorations(stderr), "")
  364.  
  365.     def test_universal_newlines(self):
  366.         p = subprocess.Popen([sys.executable, "-c",
  367.                           'import sys,os;' + SETBINARY +
  368.                           'sys.stdout.write("line1\\n");'
  369.                           'sys.stdout.flush();'
  370.                           'sys.stdout.write("line2\\r");'
  371.                           'sys.stdout.flush();'
  372.                           'sys.stdout.write("line3\\r\\n");'
  373.                           'sys.stdout.flush();'
  374.                           'sys.stdout.write("line4\\r");'
  375.                           'sys.stdout.flush();'
  376.                           'sys.stdout.write("\\nline5");'
  377.                           'sys.stdout.flush();'
  378.                           'sys.stdout.write("\\nline6");'],
  379.                          stdout=subprocess.PIPE,
  380.                          universal_newlines=1)
  381.         stdout = p.stdout.read()
  382.         if hasattr(file, 'newlines'):
  383.             # Interpreter with universal newline support
  384.             self.assertEqual(stdout,
  385.                              "line1\nline2\nline3\nline4\nline5\nline6")
  386.         else:
  387.             # Interpreter without universal newline support
  388.             self.assertEqual(stdout,
  389.                              "line1\nline2\rline3\r\nline4\r\nline5\nline6")
  390.  
  391.     def test_universal_newlines_communicate(self):
  392.         # universal newlines through communicate()
  393.         p = subprocess.Popen([sys.executable, "-c",
  394.                           'import sys,os;' + SETBINARY +
  395.                           'sys.stdout.write("line1\\n");'
  396.                           'sys.stdout.flush();'
  397.                           'sys.stdout.write("line2\\r");'
  398.                           'sys.stdout.flush();'
  399.                           'sys.stdout.write("line3\\r\\n");'
  400.                           'sys.stdout.flush();'
  401.                           'sys.stdout.write("line4\\r");'
  402.                           'sys.stdout.flush();'
  403.                           'sys.stdout.write("\\nline5");'
  404.                           'sys.stdout.flush();'
  405.                           'sys.stdout.write("\\nline6");'],
  406.                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  407.                          universal_newlines=1)
  408.         (stdout, stderr) = p.communicate()
  409.         if hasattr(file, 'newlines'):
  410.             # Interpreter with universal newline support
  411.             self.assertEqual(stdout,
  412.                              "line1\nline2\nline3\nline4\nline5\nline6")
  413.         else:
  414.             # Interpreter without universal newline support
  415.             self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
  416.  
  417.     def test_no_leaking(self):
  418.         # Make sure we leak no resources
  419.         if not hasattr(test_support, "is_resource_enabled") \
  420.                or test_support.is_resource_enabled("subprocess") and not mswindows:
  421.             max_handles = 1026 # too much for most UNIX systems
  422.         else:
  423.             max_handles = 65
  424.         for i in range(max_handles):
  425.             p = subprocess.Popen([sys.executable, "-c",
  426.                     "import sys;sys.stdout.write(sys.stdin.read())"],
  427.                     stdin=subprocess.PIPE,
  428.                     stdout=subprocess.PIPE,
  429.                     stderr=subprocess.PIPE)
  430.             data = p.communicate("lime")[0]
  431.             self.assertEqual(data, "lime")
  432.  
  433.  
  434.     def test_list2cmdline(self):
  435.         self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
  436.                          '"a b c" d e')
  437.         self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
  438.                          'ab\\"c \\ d')
  439.         self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
  440.                          'ab\\"c " \\\\" d')
  441.         self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
  442.                          'a\\\\\\b "de fg" h')
  443.         self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
  444.                          'a\\\\\\"b c d')
  445.         self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
  446.                          '"a\\\\b c" d e')
  447.         self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
  448.                          '"a\\\\b\\ c" d e')
  449.         self.assertEqual(subprocess.list2cmdline(['ab', '']),
  450.                          'ab ""')
  451.         self.assertEqual(subprocess.list2cmdline(['echo', 'foo|bar']),
  452.                          'echo "foo|bar"')
  453.  
  454.  
  455.     def test_poll(self):
  456.         p = subprocess.Popen([sys.executable,
  457.                           "-c", "import time; time.sleep(1)"])
  458.         count = 0
  459.         while p.poll() is None:
  460.             time.sleep(0.1)
  461.             count += 1
  462.         # We expect that the poll loop probably went around about 10 times,
  463.         # but, based on system scheduling we can't control, it's possible
  464.         # poll() never returned None.  It "should be" very rare that it
  465.         # didn't go around at least twice.
  466.         self.assert_(count >= 2)
  467.         # Subsequent invocations should just return the returncode
  468.         self.assertEqual(p.poll(), 0)
  469.  
  470.  
  471.     def test_wait(self):
  472.         p = subprocess.Popen([sys.executable,
  473.                           "-c", "import time; time.sleep(2)"])
  474.         self.assertEqual(p.wait(), 0)
  475.         # Subsequent invocations should just return the returncode
  476.         self.assertEqual(p.wait(), 0)
  477.  
  478.  
  479.     def test_invalid_bufsize(self):
  480.         # an invalid type of the bufsize argument should raise
  481.         # TypeError.
  482.         try:
  483.             subprocess.Popen([sys.executable, "-c", "pass"], "orange")
  484.         except TypeError:
  485.             pass
  486.         else:
  487.             self.fail("Expected TypeError")
  488.  
  489.     #
  490.     # POSIX tests
  491.     #
  492.     if not mswindows:
  493.         def test_exceptions(self):
  494.             # catched & re-raised exceptions
  495.             try:
  496.                 p = subprocess.Popen([sys.executable, "-c", ""],
  497.                                  cwd="/this/path/does/not/exist")
  498.             except OSError, e:
  499.                 # The attribute child_traceback should contain "os.chdir"
  500.                 # somewhere.
  501.                 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
  502.             else:
  503.                 self.fail("Expected OSError")
  504.  
  505.         def _suppress_core_files(self):
  506.             """Try to prevent core files from being created.
  507.             Returns previous ulimit if successful, else None.
  508.             """
  509.             try:
  510.                 import resource
  511.                 old_limit = resource.getrlimit(resource.RLIMIT_CORE)
  512.                 resource.setrlimit(resource.RLIMIT_CORE, (0,0))
  513.                 return old_limit
  514.             except (ImportError, ValueError, resource.error):
  515.                 return None
  516.  
  517.         def _unsuppress_core_files(self, old_limit):
  518.             """Return core file behavior to default."""
  519.             if old_limit is None:
  520.                 return
  521.             try:
  522.                 import resource
  523.                 resource.setrlimit(resource.RLIMIT_CORE, old_limit)
  524.             except (ImportError, ValueError, resource.error):
  525.                 return
  526.  
  527.         def test_run_abort(self):
  528.             # returncode handles signal termination
  529.             old_limit = self._suppress_core_files()
  530.             try:
  531.                 p = subprocess.Popen([sys.executable,
  532.                                       "-c", "import os; os.abort()"])
  533.             finally:
  534.                 self._unsuppress_core_files(old_limit)
  535.             p.wait()
  536.             self.assertEqual(-p.returncode, signal.SIGABRT)
  537.  
  538.         def test_preexec(self):
  539.             # preexec function
  540.             p = subprocess.Popen([sys.executable, "-c",
  541.                               'import sys,os;' \
  542.                               'sys.stdout.write(os.getenv("FRUIT"))'],
  543.                              stdout=subprocess.PIPE,
  544.                              preexec_fn=lambda: os.putenv("FRUIT", "apple"))
  545.             self.assertEqual(p.stdout.read(), "apple")
  546.  
  547.         def test_args_string(self):
  548.             # args is a string
  549.             f, fname = self.mkstemp()
  550.             os.write(f, "#!/bin/sh\n")
  551.             os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
  552.                         sys.executable)
  553.             os.close(f)
  554.             os.chmod(fname, 0700)
  555.             p = subprocess.Popen(fname)
  556.             p.wait()
  557.             os.remove(fname)
  558.             self.assertEqual(p.returncode, 47)
  559.  
  560.         def test_invalid_args(self):
  561.             # invalid arguments should raise ValueError
  562.             self.assertRaises(ValueError, subprocess.call,
  563.                               [sys.executable,
  564.                                "-c", "import sys; sys.exit(47)"],
  565.                               startupinfo=47)
  566.             self.assertRaises(ValueError, subprocess.call,
  567.                               [sys.executable,
  568.                                "-c", "import sys; sys.exit(47)"],
  569.                               creationflags=47)
  570.  
  571.         def test_shell_sequence(self):
  572.             # Run command through the shell (sequence)
  573.             newenv = os.environ.copy()
  574.             newenv["FRUIT"] = "apple"
  575.             p = subprocess.Popen(["echo $FRUIT"], shell=1,
  576.                                  stdout=subprocess.PIPE,
  577.                                  env=newenv)
  578.             self.assertEqual(p.stdout.read().strip(), "apple")
  579.  
  580.         def test_shell_string(self):
  581.             # Run command through the shell (string)
  582.             newenv = os.environ.copy()
  583.             newenv["FRUIT"] = "apple"
  584.             p = subprocess.Popen("echo $FRUIT", shell=1,
  585.                                  stdout=subprocess.PIPE,
  586.                                  env=newenv)
  587.             self.assertEqual(p.stdout.read().strip(), "apple")
  588.  
  589.         def test_call_string(self):
  590.             # call() function with string argument on UNIX
  591.             f, fname = self.mkstemp()
  592.             os.write(f, "#!/bin/sh\n")
  593.             os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
  594.                         sys.executable)
  595.             os.close(f)
  596.             os.chmod(fname, 0700)
  597.             rc = subprocess.call(fname)
  598.             os.remove(fname)
  599.             self.assertEqual(rc, 47)
  600.  
  601.         def DISABLED_test_send_signal(self):
  602.             p = subprocess.Popen([sys.executable,
  603.                               "-c", "input()"])
  604.  
  605.             self.assert_(p.poll() is None, p.poll())
  606.             p.send_signal(signal.SIGINT)
  607.             self.assertNotEqual(p.wait(), 0)
  608.  
  609.         def DISABLED_test_kill(self):
  610.             p = subprocess.Popen([sys.executable,
  611.                             "-c", "input()"])
  612.  
  613.             self.assert_(p.poll() is None, p.poll())
  614.             p.kill()
  615.             self.assertEqual(p.wait(), -signal.SIGKILL)
  616.  
  617.         def DISABLED_test_terminate(self):
  618.             p = subprocess.Popen([sys.executable,
  619.                             "-c", "input()"])
  620.  
  621.             self.assert_(p.poll() is None, p.poll())
  622.             p.terminate()
  623.             self.assertEqual(p.wait(), -signal.SIGTERM)
  624.  
  625.     #
  626.     # Windows tests
  627.     #
  628.     if mswindows:
  629.         def test_startupinfo(self):
  630.             # startupinfo argument
  631.             # We uses hardcoded constants, because we do not want to
  632.             # depend on win32all.
  633.             STARTF_USESHOWWINDOW = 1
  634.             SW_MAXIMIZE = 3
  635.             startupinfo = subprocess.STARTUPINFO()
  636.             startupinfo.dwFlags = STARTF_USESHOWWINDOW
  637.             startupinfo.wShowWindow = SW_MAXIMIZE
  638.             # Since Python is a console process, it won't be affected
  639.             # by wShowWindow, but the argument should be silently
  640.             # ignored
  641.             subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
  642.                         startupinfo=startupinfo)
  643.  
  644.         def test_creationflags(self):
  645.             # creationflags argument
  646.             CREATE_NEW_CONSOLE = 16
  647.             sys.stderr.write("    a DOS box should flash briefly ...\n")
  648.             subprocess.call(sys.executable +
  649.                                 ' -c "import time; time.sleep(0.25)"',
  650.                             creationflags=CREATE_NEW_CONSOLE)
  651.  
  652.         def test_invalid_args(self):
  653.             # invalid arguments should raise ValueError
  654.             self.assertRaises(ValueError, subprocess.call,
  655.                               [sys.executable,
  656.                                "-c", "import sys; sys.exit(47)"],
  657.                               preexec_fn=lambda: 1)
  658.             self.assertRaises(ValueError, subprocess.call,
  659.                               [sys.executable,
  660.                                "-c", "import sys; sys.exit(47)"],
  661.                               stdout=subprocess.PIPE,
  662.                               close_fds=True)
  663.  
  664.         def test_close_fds(self):
  665.             # close file descriptors
  666.             rc = subprocess.call([sys.executable, "-c",
  667.                                   "import sys; sys.exit(47)"],
  668.                                   close_fds=True)
  669.             self.assertEqual(rc, 47)
  670.  
  671.         def test_shell_sequence(self):
  672.             # Run command through the shell (sequence)
  673.             newenv = os.environ.copy()
  674.             newenv["FRUIT"] = "physalis"
  675.             p = subprocess.Popen(["set"], shell=1,
  676.                                  stdout=subprocess.PIPE,
  677.                                  env=newenv)
  678.             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
  679.  
  680.         def test_shell_string(self):
  681.             # Run command through the shell (string)
  682.             newenv = os.environ.copy()
  683.             newenv["FRUIT"] = "physalis"
  684.             p = subprocess.Popen("set", shell=1,
  685.                                  stdout=subprocess.PIPE,
  686.                                  env=newenv)
  687.             self.assertNotEqual(p.stdout.read().find("physalis"), -1)
  688.  
  689.         def test_call_string(self):
  690.             # call() function with string argument on Windows
  691.             rc = subprocess.call(sys.executable +
  692.                                  ' -c "import sys; sys.exit(47)"')
  693.             self.assertEqual(rc, 47)
  694.  
  695.         def DISABLED_test_send_signal(self):
  696.             p = subprocess.Popen([sys.executable,
  697.                               "-c", "input()"])
  698.  
  699.             self.assert_(p.poll() is None, p.poll())
  700.             p.send_signal(signal.SIGTERM)
  701.             self.assertNotEqual(p.wait(), 0)
  702.  
  703.         def DISABLED_test_kill(self):
  704.             p = subprocess.Popen([sys.executable,
  705.                             "-c", "input()"])
  706.  
  707.             self.assert_(p.poll() is None, p.poll())
  708.             p.kill()
  709.             self.assertNotEqual(p.wait(), 0)
  710.  
  711.         def DISABLED_test_terminate(self):
  712.             p = subprocess.Popen([sys.executable,
  713.                             "-c", "input()"])
  714.  
  715.             self.assert_(p.poll() is None, p.poll())
  716.             p.terminate()
  717.             self.assertNotEqual(p.wait(), 0)
  718.  
  719. def test_main():
  720.     test_support.run_unittest(ProcessTestCase)
  721.     if hasattr(test_support, "reap_children"):
  722.         test_support.reap_children()
  723.  
  724. if __name__ == "__main__":
  725.     test_main()
  726.