home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_file.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  18.0 KB  |  547 lines

  1. import sys
  2. import os
  3. import unittest
  4. import itertools
  5. import time
  6. import threading
  7. from array import array
  8. from weakref import proxy
  9.  
  10. from test import test_support
  11. from test.test_support import TESTFN, findfile, run_unittest
  12. from UserList import UserList
  13.  
  14. class AutoFileTests(unittest.TestCase):
  15.     # file tests for which a test file is automatically set up
  16.  
  17.     def setUp(self):
  18.         self.f = open(TESTFN, 'wb')
  19.  
  20.     def tearDown(self):
  21.         if self.f:
  22.             self.f.close()
  23.         os.remove(TESTFN)
  24.  
  25.     def testWeakRefs(self):
  26.         # verify weak references
  27.         p = proxy(self.f)
  28.         p.write('teststring')
  29.         self.assertEquals(self.f.tell(), p.tell())
  30.         self.f.close()
  31.         self.f = None
  32.         self.assertRaises(ReferenceError, getattr, p, 'tell')
  33.  
  34.     def testAttributes(self):
  35.         # verify expected attributes exist
  36.         f = self.f
  37.         softspace = f.softspace
  38.         f.name     # merely shouldn't blow up
  39.         f.mode     # ditto
  40.         f.closed   # ditto
  41.  
  42.         # verify softspace is writable
  43.         f.softspace = softspace    # merely shouldn't blow up
  44.  
  45.         # verify the others aren't
  46.         for attr in 'name', 'mode', 'closed':
  47.             self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
  48.  
  49.     def testReadinto(self):
  50.         # verify readinto
  51.         self.f.write('12')
  52.         self.f.close()
  53.         a = array('c', 'x'*10)
  54.         self.f = open(TESTFN, 'rb')
  55.         n = self.f.readinto(a)
  56.         self.assertEquals('12', a.tostring()[:n])
  57.  
  58.     def testWritelinesUserList(self):
  59.         # verify writelines with instance sequence
  60.         l = UserList(['1', '2'])
  61.         self.f.writelines(l)
  62.         self.f.close()
  63.         self.f = open(TESTFN, 'rb')
  64.         buf = self.f.read()
  65.         self.assertEquals(buf, '12')
  66.  
  67.     def testWritelinesIntegers(self):
  68.         # verify writelines with integers
  69.         self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
  70.  
  71.     def testWritelinesIntegersUserList(self):
  72.         # verify writelines with integers in UserList
  73.         l = UserList([1,2,3])
  74.         self.assertRaises(TypeError, self.f.writelines, l)
  75.  
  76.     def testWritelinesNonString(self):
  77.         # verify writelines with non-string object
  78.         class NonString:
  79.             pass
  80.  
  81.         self.assertRaises(TypeError, self.f.writelines,
  82.                           [NonString(), NonString()])
  83.  
  84.     def testRepr(self):
  85.         # verify repr works
  86.         self.assert_(repr(self.f).startswith("<open file '" + TESTFN))
  87.  
  88.     def testErrors(self):
  89.         f = self.f
  90.         self.assertEquals(f.name, TESTFN)
  91.         self.assert_(not f.isatty())
  92.         self.assert_(not f.closed)
  93.  
  94.         self.assertRaises(TypeError, f.readinto, "")
  95.         f.close()
  96.         self.assert_(f.closed)
  97.  
  98.     def testMethods(self):
  99.         methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
  100.                    'readline', 'readlines', 'seek', 'tell', 'truncate',
  101.                    'write', 'xreadlines', '__iter__']
  102.         if sys.platform.startswith('atheos'):
  103.             methods.remove('truncate')
  104.  
  105.         # __exit__ should close the file
  106.         self.f.__exit__(None, None, None)
  107.         self.assert_(self.f.closed)
  108.  
  109.         for methodname in methods:
  110.             method = getattr(self.f, methodname)
  111.             # should raise on closed file
  112.             self.assertRaises(ValueError, method)
  113.         self.assertRaises(ValueError, self.f.writelines, [])
  114.  
  115.         # file is closed, __exit__ shouldn't do anything
  116.         self.assertEquals(self.f.__exit__(None, None, None), None)
  117.         # it must also return None if an exception was given
  118.         try:
  119.             1/0
  120.         except:
  121.             self.assertEquals(self.f.__exit__(*sys.exc_info()), None)
  122.  
  123.  
  124. class OtherFileTests(unittest.TestCase):
  125.  
  126.     def testModeStrings(self):
  127.         # check invalid mode strings
  128.         for mode in ("", "aU", "wU+"):
  129.             try:
  130.                 f = open(TESTFN, mode)
  131.             except ValueError:
  132.                 pass
  133.             else:
  134.                 f.close()
  135.                 self.fail('%r is an invalid file mode' % mode)
  136.  
  137.         # Some invalid modes fail on Windows, but pass on Unix
  138.         # Issue3965: avoid a crash on Windows when filename is unicode
  139.         for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
  140.             try:
  141.                 f = open(name, "rr")
  142.             except IOError:
  143.                 pass
  144.             else:
  145.                 f.close()
  146.  
  147.     def testStdin(self):
  148.         # This causes the interpreter to exit on OSF1 v5.1.
  149.         if sys.platform != 'osf1V5':
  150.             self.assertRaises(IOError, sys.stdin.seek, -1)
  151.         else:
  152.             print >>sys.__stdout__, (
  153.                 '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
  154.                 ' Test manually.')
  155.         self.assertRaises(IOError, sys.stdin.truncate)
  156.  
  157.     def testUnicodeOpen(self):
  158.         # verify repr works for unicode too
  159.         f = open(unicode(TESTFN), "w")
  160.         self.assert_(repr(f).startswith("<open file u'" + TESTFN))
  161.         f.close()
  162.         os.unlink(TESTFN)
  163.  
  164.     def testBadModeArgument(self):
  165.         # verify that we get a sensible error message for bad mode argument
  166.         bad_mode = "qwerty"
  167.         try:
  168.             f = open(TESTFN, bad_mode)
  169.         except ValueError, msg:
  170.             if msg[0] != 0:
  171.                 s = str(msg)
  172.                 if s.find(TESTFN) != -1 or s.find(bad_mode) == -1:
  173.                     self.fail("bad error message for invalid mode: %s" % s)
  174.             # if msg[0] == 0, we're probably on Windows where there may be
  175.             # no obvious way to discover why open() failed.
  176.         else:
  177.             f.close()
  178.             self.fail("no error for invalid mode: %s" % bad_mode)
  179.  
  180.     def testSetBufferSize(self):
  181.         # make sure that explicitly setting the buffer size doesn't cause
  182.         # misbehaviour especially with repeated close() calls
  183.         for s in (-1, 0, 1, 512):
  184.             try:
  185.                 f = open(TESTFN, 'w', s)
  186.                 f.write(str(s))
  187.                 f.close()
  188.                 f.close()
  189.                 f = open(TESTFN, 'r', s)
  190.                 d = int(f.read())
  191.                 f.close()
  192.                 f.close()
  193.             except IOError, msg:
  194.                 self.fail('error setting buffer size %d: %s' % (s, str(msg)))
  195.             self.assertEquals(d, s)
  196.  
  197.     def testTruncateOnWindows(self):
  198.         os.unlink(TESTFN)
  199.  
  200.         def bug801631():
  201.             # SF bug <http://www.python.org/sf/801631>
  202.             # "file.truncate fault on windows"
  203.             f = open(TESTFN, 'wb')
  204.             f.write('12345678901')   # 11 bytes
  205.             f.close()
  206.  
  207.             f = open(TESTFN,'rb+')
  208.             data = f.read(5)
  209.             if data != '12345':
  210.                 self.fail("Read on file opened for update failed %r" % data)
  211.             if f.tell() != 5:
  212.                 self.fail("File pos after read wrong %d" % f.tell())
  213.  
  214.             f.truncate()
  215.             if f.tell() != 5:
  216.                 self.fail("File pos after ftruncate wrong %d" % f.tell())
  217.  
  218.             f.close()
  219.             size = os.path.getsize(TESTFN)
  220.             if size != 5:
  221.                 self.fail("File size after ftruncate wrong %d" % size)
  222.  
  223.         try:
  224.             bug801631()
  225.         finally:
  226.             os.unlink(TESTFN)
  227.  
  228.     def testIteration(self):
  229.         # Test the complex interaction when mixing file-iteration and the
  230.         # various read* methods. Ostensibly, the mixture could just be tested
  231.         # to work when it should work according to the Python language,
  232.         # instead of fail when it should fail according to the current CPython
  233.         # implementation.  People don't always program Python the way they
  234.         # should, though, and the implemenation might change in subtle ways,
  235.         # so we explicitly test for errors, too; the test will just have to
  236.         # be updated when the implementation changes.
  237.         dataoffset = 16384
  238.         filler = "ham\n"
  239.         assert not dataoffset % len(filler), \
  240.             "dataoffset must be multiple of len(filler)"
  241.         nchunks = dataoffset // len(filler)
  242.         testlines = [
  243.             "spam, spam and eggs\n",
  244.             "eggs, spam, ham and spam\n",
  245.             "saussages, spam, spam and eggs\n",
  246.             "spam, ham, spam and eggs\n",
  247.             "spam, spam, spam, spam, spam, ham, spam\n",
  248.             "wonderful spaaaaaam.\n"
  249.         ]
  250.         methods = [("readline", ()), ("read", ()), ("readlines", ()),
  251.                    ("readinto", (array("c", " "*100),))]
  252.  
  253.         try:
  254.             # Prepare the testfile
  255.             bag = open(TESTFN, "w")
  256.             bag.write(filler * nchunks)
  257.             bag.writelines(testlines)
  258.             bag.close()
  259.             # Test for appropriate errors mixing read* and iteration
  260.             for methodname, args in methods:
  261.                 f = open(TESTFN)
  262.                 if f.next() != filler:
  263.                     self.fail, "Broken testfile"
  264.                 meth = getattr(f, methodname)
  265.                 try:
  266.                     meth(*args)
  267.                 except ValueError:
  268.                     pass
  269.                 else:
  270.                     self.fail("%s%r after next() didn't raise ValueError" %
  271.                                      (methodname, args))
  272.                 f.close()
  273.  
  274.             # Test to see if harmless (by accident) mixing of read* and
  275.             # iteration still works. This depends on the size of the internal
  276.             # iteration buffer (currently 8192,) but we can test it in a
  277.             # flexible manner.  Each line in the bag o' ham is 4 bytes
  278.             # ("h", "a", "m", "\n"), so 4096 lines of that should get us
  279.             # exactly on the buffer boundary for any power-of-2 buffersize
  280.             # between 4 and 16384 (inclusive).
  281.             f = open(TESTFN)
  282.             for i in range(nchunks):
  283.                 f.next()
  284.             testline = testlines.pop(0)
  285.             try:
  286.                 line = f.readline()
  287.             except ValueError:
  288.                 self.fail("readline() after next() with supposedly empty "
  289.                           "iteration-buffer failed anyway")
  290.             if line != testline:
  291.                 self.fail("readline() after next() with empty buffer "
  292.                           "failed. Got %r, expected %r" % (line, testline))
  293.             testline = testlines.pop(0)
  294.             buf = array("c", "\x00" * len(testline))
  295.             try:
  296.                 f.readinto(buf)
  297.             except ValueError:
  298.                 self.fail("readinto() after next() with supposedly empty "
  299.                           "iteration-buffer failed anyway")
  300.             line = buf.tostring()
  301.             if line != testline:
  302.                 self.fail("readinto() after next() with empty buffer "
  303.                           "failed. Got %r, expected %r" % (line, testline))
  304.  
  305.             testline = testlines.pop(0)
  306.             try:
  307.                 line = f.read(len(testline))
  308.             except ValueError:
  309.                 self.fail("read() after next() with supposedly empty "
  310.                           "iteration-buffer failed anyway")
  311.             if line != testline:
  312.                 self.fail("read() after next() with empty buffer "
  313.                           "failed. Got %r, expected %r" % (line, testline))
  314.             try:
  315.                 lines = f.readlines()
  316.             except ValueError:
  317.                 self.fail("readlines() after next() with supposedly empty "
  318.                           "iteration-buffer failed anyway")
  319.             if lines != testlines:
  320.                 self.fail("readlines() after next() with empty buffer "
  321.                           "failed. Got %r, expected %r" % (line, testline))
  322.             # Reading after iteration hit EOF shouldn't hurt either
  323.             f = open(TESTFN)
  324.             try:
  325.                 for line in f:
  326.                     pass
  327.                 try:
  328.                     f.readline()
  329.                     f.readinto(buf)
  330.                     f.read()
  331.                     f.readlines()
  332.                 except ValueError:
  333.                     self.fail("read* failed after next() consumed file")
  334.             finally:
  335.                 f.close()
  336.         finally:
  337.             os.unlink(TESTFN)
  338.  
  339. class FileSubclassTests(unittest.TestCase):
  340.  
  341.     def testExit(self):
  342.         # test that exiting with context calls subclass' close
  343.         class C(file):
  344.             def __init__(self, *args):
  345.                 self.subclass_closed = False
  346.                 file.__init__(self, *args)
  347.             def close(self):
  348.                 self.subclass_closed = True
  349.                 file.close(self)
  350.  
  351.         with C(TESTFN, 'w') as f:
  352.             pass
  353.         self.failUnless(f.subclass_closed)
  354.  
  355.  
  356. class FileThreadingTests(unittest.TestCase):
  357.     # These tests check the ability to call various methods of file objects
  358.     # (including close()) concurrently without crashing the Python interpreter.
  359.     # See #815646, #595601
  360.  
  361.     def setUp(self):
  362.         self.f = None
  363.         self.filename = TESTFN
  364.         with open(self.filename, "w") as f:
  365.             f.write("\n".join("0123456789"))
  366.         self._count_lock = threading.Lock()
  367.         self.close_count = 0
  368.         self.close_success_count = 0
  369.  
  370.     def tearDown(self):
  371.         if self.f:
  372.             try:
  373.                 self.f.close()
  374.             except (EnvironmentError, ValueError):
  375.                 pass
  376.         try:
  377.             os.remove(self.filename)
  378.         except EnvironmentError:
  379.             pass
  380.  
  381.     def _create_file(self):
  382.         self.f = open(self.filename, "w+")
  383.  
  384.     def _close_file(self):
  385.         with self._count_lock:
  386.             self.close_count += 1
  387.         self.f.close()
  388.         with self._count_lock:
  389.             self.close_success_count += 1
  390.  
  391.     def _close_and_reopen_file(self):
  392.         self._close_file()
  393.         # if close raises an exception thats fine, self.f remains valid so
  394.         # we don't need to reopen.
  395.         self._create_file()
  396.  
  397.     def _run_workers(self, func, nb_workers, duration=0.2):
  398.         with self._count_lock:
  399.             self.close_count = 0
  400.             self.close_success_count = 0
  401.         self.do_continue = True
  402.         threads = []
  403.         try:
  404.             for i in range(nb_workers):
  405.                 t = threading.Thread(target=func)
  406.                 t.start()
  407.                 threads.append(t)
  408.             for _ in xrange(100):
  409.                 time.sleep(duration/100)
  410.                 with self._count_lock:
  411.                     if self.close_count-self.close_success_count > nb_workers+1:
  412.                         if test_support.verbose:
  413.                             print 'Q',
  414.                         break
  415.             time.sleep(duration)
  416.         finally:
  417.             self.do_continue = False
  418.             for t in threads:
  419.                 t.join()
  420.  
  421.     def _test_close_open_io(self, io_func, nb_workers=5):
  422.         def worker():
  423.             self._create_file()
  424.             funcs = itertools.cycle((
  425.                 lambda: io_func(),
  426.                 lambda: self._close_and_reopen_file(),
  427.             ))
  428.             for f in funcs:
  429.                 if not self.do_continue:
  430.                     break
  431.                 try:
  432.                     f()
  433.                 except (IOError, ValueError):
  434.                     pass
  435.         self._run_workers(worker, nb_workers)
  436.         if test_support.verbose:
  437.             # Useful verbose statistics when tuning this test to take
  438.             # less time to run but still ensuring that its still useful.
  439.             #
  440.             # the percent of close calls that raised an error
  441.             percent = 100. - 100.*self.close_success_count/self.close_count
  442.             print self.close_count, ('%.4f ' % percent),
  443.  
  444.     def test_close_open(self):
  445.         def io_func():
  446.             pass
  447.         self._test_close_open_io(io_func)
  448.  
  449.     def test_close_open_flush(self):
  450.         def io_func():
  451.             self.f.flush()
  452.         self._test_close_open_io(io_func)
  453.  
  454.     def test_close_open_iter(self):
  455.         def io_func():
  456.             list(iter(self.f))
  457.         self._test_close_open_io(io_func)
  458.  
  459.     def test_close_open_isatty(self):
  460.         def io_func():
  461.             self.f.isatty()
  462.         self._test_close_open_io(io_func)
  463.  
  464.     def test_close_open_print(self):
  465.         def io_func():
  466.             print >> self.f, ''
  467.         self._test_close_open_io(io_func)
  468.  
  469.     def test_close_open_read(self):
  470.         def io_func():
  471.             self.f.read(0)
  472.         self._test_close_open_io(io_func)
  473.  
  474.     def test_close_open_readinto(self):
  475.         def io_func():
  476.             a = array('c', 'xxxxx')
  477.             self.f.readinto(a)
  478.         self._test_close_open_io(io_func)
  479.  
  480.     def test_close_open_readline(self):
  481.         def io_func():
  482.             self.f.readline()
  483.         self._test_close_open_io(io_func)
  484.  
  485.     def test_close_open_readlines(self):
  486.         def io_func():
  487.             self.f.readlines()
  488.         self._test_close_open_io(io_func)
  489.  
  490.     def test_close_open_seek(self):
  491.         def io_func():
  492.             self.f.seek(0, 0)
  493.         self._test_close_open_io(io_func)
  494.  
  495.     def test_close_open_tell(self):
  496.         def io_func():
  497.             self.f.tell()
  498.         self._test_close_open_io(io_func)
  499.  
  500.     def test_close_open_truncate(self):
  501.         def io_func():
  502.             self.f.truncate()
  503.         self._test_close_open_io(io_func)
  504.  
  505.     def test_close_open_write(self):
  506.         def io_func():
  507.             self.f.write('')
  508.         self._test_close_open_io(io_func)
  509.  
  510.     def test_close_open_writelines(self):
  511.         def io_func():
  512.             self.f.writelines('')
  513.         self._test_close_open_io(io_func)
  514.  
  515.  
  516. class StdoutTests(unittest.TestCase):
  517.  
  518.     def test_move_stdout_on_write(self):
  519.         # Issue 3242: sys.stdout can be replaced (and freed) during a
  520.         # print statement; prevent a segfault in this case
  521.         save_stdout = sys.stdout
  522.  
  523.         class File:
  524.             def write(self, data):
  525.                 if '\n' in data:
  526.                     sys.stdout = save_stdout
  527.  
  528.         try:
  529.             sys.stdout = File()
  530.             print "some text"
  531.         finally:
  532.             sys.stdout = save_stdout
  533.  
  534.  
  535. def test_main():
  536.     # Historically, these tests have been sloppy about removing TESTFN.
  537.     # So get rid of it no matter what.
  538.     try:
  539.         run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
  540.             FileThreadingTests, StdoutTests)
  541.     finally:
  542.         if os.path.exists(TESTFN):
  543.             os.unlink(TESTFN)
  544.  
  545. if __name__ == '__main__':
  546.     test_main()
  547.