home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_io.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  43.8 KB  |  1,249 lines

  1. """Unit tests for io.py."""
  2. from __future__ import print_function
  3. from __future__ import unicode_literals
  4.  
  5. import os
  6. import sys
  7. import time
  8. import array
  9. import threading
  10. import random
  11. import unittest
  12. from itertools import chain, cycle
  13. from test import test_support
  14.  
  15. import codecs
  16. import io  # The module under test
  17.  
  18.  
  19. class MockRawIO(io.RawIOBase):
  20.  
  21.     def __init__(self, read_stack=()):
  22.         self._read_stack = list(read_stack)
  23.         self._write_stack = []
  24.  
  25.     def read(self, n=None):
  26.         try:
  27.             return self._read_stack.pop(0)
  28.         except:
  29.             return b""
  30.  
  31.     def write(self, b):
  32.         self._write_stack.append(b[:])
  33.         return len(b)
  34.  
  35.     def writable(self):
  36.         return True
  37.  
  38.     def fileno(self):
  39.         return 42
  40.  
  41.     def readable(self):
  42.         return True
  43.  
  44.     def seekable(self):
  45.         return True
  46.  
  47.     def seek(self, pos, whence):
  48.         pass
  49.  
  50.     def tell(self):
  51.         return 42
  52.  
  53.  
  54. class MockFileIO(io.BytesIO):
  55.  
  56.     def __init__(self, data):
  57.         self.read_history = []
  58.         io.BytesIO.__init__(self, data)
  59.  
  60.     def read(self, n=None):
  61.         res = io.BytesIO.read(self, n)
  62.         self.read_history.append(None if res is None else len(res))
  63.         return res
  64.  
  65.  
  66. class MockNonBlockWriterIO(io.RawIOBase):
  67.  
  68.     def __init__(self, blocking_script):
  69.         self._blocking_script = list(blocking_script)
  70.         self._write_stack = []
  71.  
  72.     def write(self, b):
  73.         self._write_stack.append(b[:])
  74.         n = self._blocking_script.pop(0)
  75.         if (n < 0):
  76.             raise io.BlockingIOError(0, "test blocking", -n)
  77.         else:
  78.             return n
  79.  
  80.     def writable(self):
  81.         return True
  82.  
  83.  
  84. class IOTest(unittest.TestCase):
  85.  
  86.     def tearDown(self):
  87.         test_support.unlink(test_support.TESTFN)
  88.  
  89.     def write_ops(self, f):
  90.         self.assertEqual(f.write(b"blah."), 5)
  91.         self.assertEqual(f.seek(0), 0)
  92.         self.assertEqual(f.write(b"Hello."), 6)
  93.         self.assertEqual(f.tell(), 6)
  94.         self.assertEqual(f.seek(-1, 1), 5)
  95.         self.assertEqual(f.tell(), 5)
  96.         self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
  97.         self.assertEqual(f.seek(0), 0)
  98.         self.assertEqual(f.write(b"h"), 1)
  99.         self.assertEqual(f.seek(-1, 2), 13)
  100.         self.assertEqual(f.tell(), 13)
  101.         self.assertEqual(f.truncate(12), 12)
  102.         self.assertEqual(f.tell(), 12)
  103.         self.assertRaises(TypeError, f.seek, 0.0)
  104.  
  105.     def read_ops(self, f, buffered=False):
  106.         data = f.read(5)
  107.         self.assertEqual(data, b"hello")
  108.         data = bytearray(data)
  109.         self.assertEqual(f.readinto(data), 5)
  110.         self.assertEqual(data, b" worl")
  111.         self.assertEqual(f.readinto(data), 2)
  112.         self.assertEqual(len(data), 5)
  113.         self.assertEqual(data[:2], b"d\n")
  114.         self.assertEqual(f.seek(0), 0)
  115.         self.assertEqual(f.read(20), b"hello world\n")
  116.         self.assertEqual(f.read(1), b"")
  117.         self.assertEqual(f.readinto(bytearray(b"x")), 0)
  118.         self.assertEqual(f.seek(-6, 2), 6)
  119.         self.assertEqual(f.read(5), b"world")
  120.         self.assertEqual(f.read(0), b"")
  121.         self.assertEqual(f.readinto(bytearray()), 0)
  122.         self.assertEqual(f.seek(-6, 1), 5)
  123.         self.assertEqual(f.read(5), b" worl")
  124.         self.assertEqual(f.tell(), 10)
  125.         self.assertRaises(TypeError, f.seek, 0.0)
  126.         if buffered:
  127.             f.seek(0)
  128.             self.assertEqual(f.read(), b"hello world\n")
  129.             f.seek(6)
  130.             self.assertEqual(f.read(), b"world\n")
  131.             self.assertEqual(f.read(), b"")
  132.  
  133.     LARGE = 2**31
  134.  
  135.     def large_file_ops(self, f):
  136.         assert f.readable()
  137.         assert f.writable()
  138.         self.assertEqual(f.seek(self.LARGE), self.LARGE)
  139.         self.assertEqual(f.tell(), self.LARGE)
  140.         self.assertEqual(f.write(b"xxx"), 3)
  141.         self.assertEqual(f.tell(), self.LARGE + 3)
  142.         self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
  143.         self.assertEqual(f.truncate(), self.LARGE + 2)
  144.         self.assertEqual(f.tell(), self.LARGE + 2)
  145.         self.assertEqual(f.seek(0, 2), self.LARGE + 2)
  146.         self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
  147.         self.assertEqual(f.tell(), self.LARGE + 1)
  148.         self.assertEqual(f.seek(0, 2), self.LARGE + 1)
  149.         self.assertEqual(f.seek(-1, 2), self.LARGE)
  150.         self.assertEqual(f.read(2), b"x")
  151.  
  152.     def test_raw_file_io(self):
  153.         f = io.open(test_support.TESTFN, "wb", buffering=0)
  154.         self.assertEqual(f.readable(), False)
  155.         self.assertEqual(f.writable(), True)
  156.         self.assertEqual(f.seekable(), True)
  157.         self.write_ops(f)
  158.         f.close()
  159.         f = io.open(test_support.TESTFN, "rb", buffering=0)
  160.         self.assertEqual(f.readable(), True)
  161.         self.assertEqual(f.writable(), False)
  162.         self.assertEqual(f.seekable(), True)
  163.         self.read_ops(f)
  164.         f.close()
  165.  
  166.     def test_buffered_file_io(self):
  167.         f = io.open(test_support.TESTFN, "wb")
  168.         self.assertEqual(f.readable(), False)
  169.         self.assertEqual(f.writable(), True)
  170.         self.assertEqual(f.seekable(), True)
  171.         self.write_ops(f)
  172.         f.close()
  173.         f = io.open(test_support.TESTFN, "rb")
  174.         self.assertEqual(f.readable(), True)
  175.         self.assertEqual(f.writable(), False)
  176.         self.assertEqual(f.seekable(), True)
  177.         self.read_ops(f, True)
  178.         f.close()
  179.  
  180.     def test_readline(self):
  181.         f = io.open(test_support.TESTFN, "wb")
  182.         f.write(b"abc\ndef\nxyzzy\nfoo")
  183.         f.close()
  184.         f = io.open(test_support.TESTFN, "rb")
  185.         self.assertEqual(f.readline(), b"abc\n")
  186.         self.assertEqual(f.readline(10), b"def\n")
  187.         self.assertEqual(f.readline(2), b"xy")
  188.         self.assertEqual(f.readline(4), b"zzy\n")
  189.         self.assertEqual(f.readline(), b"foo")
  190.         f.close()
  191.  
  192.     def test_raw_bytes_io(self):
  193.         f = io.BytesIO()
  194.         self.write_ops(f)
  195.         data = f.getvalue()
  196.         self.assertEqual(data, b"hello world\n")
  197.         f = io.BytesIO(data)
  198.         self.read_ops(f, True)
  199.  
  200.     def test_large_file_ops(self):
  201.         # On Windows and Mac OSX this test comsumes large resources; It takes
  202.         # a long time to build the >2GB file and takes >2GB of disk space
  203.         # therefore the resource must be enabled to run this test.
  204.         if sys.platform[:3] in ('win', 'os2') or sys.platform == 'darwin':
  205.             if not test_support.is_resource_enabled("largefile"):
  206.                 print("\nTesting large file ops skipped on %s." % sys.platform,
  207.                       file=sys.stderr)
  208.                 print("It requires %d bytes and a long time." % self.LARGE,
  209.                       file=sys.stderr)
  210.                 print("Use 'regrtest.py -u largefile test_io' to run it.",
  211.                       file=sys.stderr)
  212.                 return
  213.         f = io.open(test_support.TESTFN, "w+b", 0)
  214.         self.large_file_ops(f)
  215.         f.close()
  216.         f = io.open(test_support.TESTFN, "w+b")
  217.         self.large_file_ops(f)
  218.         f.close()
  219.  
  220.     def test_with_open(self):
  221.         for bufsize in (0, 1, 100):
  222.             f = None
  223.             with open(test_support.TESTFN, "wb", bufsize) as f:
  224.                 f.write(b"xxx")
  225.             self.assertEqual(f.closed, True)
  226.             f = None
  227.             try:
  228.                 with open(test_support.TESTFN, "wb", bufsize) as f:
  229.                     1/0
  230.             except ZeroDivisionError:
  231.                 self.assertEqual(f.closed, True)
  232.             else:
  233.                 self.fail("1/0 didn't raise an exception")
  234.  
  235.     def test_destructor(self):
  236.         record = []
  237.         class MyFileIO(io.FileIO):
  238.             def __del__(self):
  239.                 record.append(1)
  240.                 io.FileIO.__del__(self)
  241.             def close(self):
  242.                 record.append(2)
  243.                 io.FileIO.close(self)
  244.             def flush(self):
  245.                 record.append(3)
  246.                 io.FileIO.flush(self)
  247.         f = MyFileIO(test_support.TESTFN, "w")
  248.         f.write("xxx")
  249.         del f
  250.         self.assertEqual(record, [1, 2, 3])
  251.  
  252.     def test_close_flushes(self):
  253.         f = io.open(test_support.TESTFN, "wb")
  254.         f.write(b"xxx")
  255.         f.close()
  256.         f = io.open(test_support.TESTFN, "rb")
  257.         self.assertEqual(f.read(), b"xxx")
  258.         f.close()
  259.  
  260.     def XXXtest_array_writes(self):
  261.         # XXX memory view not available yet
  262.         a = array.array('i', range(10))
  263.         n = len(memoryview(a))
  264.         f = io.open(test_support.TESTFN, "wb", 0)
  265.         self.assertEqual(f.write(a), n)
  266.         f.close()
  267.         f = io.open(test_support.TESTFN, "wb")
  268.         self.assertEqual(f.write(a), n)
  269.         f.close()
  270.  
  271.     def test_closefd(self):
  272.         self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
  273.                           closefd=False)
  274.  
  275. class MemorySeekTestMixin:
  276.  
  277.     def testInit(self):
  278.         buf = self.buftype("1234567890")
  279.         bytesIo = self.ioclass(buf)
  280.  
  281.     def testRead(self):
  282.         buf = self.buftype("1234567890")
  283.         bytesIo = self.ioclass(buf)
  284.  
  285.         self.assertEquals(buf[:1], bytesIo.read(1))
  286.         self.assertEquals(buf[1:5], bytesIo.read(4))
  287.         self.assertEquals(buf[5:], bytesIo.read(900))
  288.         self.assertEquals(self.EOF, bytesIo.read())
  289.  
  290.     def testReadNoArgs(self):
  291.         buf = self.buftype("1234567890")
  292.         bytesIo = self.ioclass(buf)
  293.  
  294.         self.assertEquals(buf, bytesIo.read())
  295.         self.assertEquals(self.EOF, bytesIo.read())
  296.  
  297.     def testSeek(self):
  298.         buf = self.buftype("1234567890")
  299.         bytesIo = self.ioclass(buf)
  300.  
  301.         bytesIo.read(5)
  302.         bytesIo.seek(0)
  303.         self.assertEquals(buf, bytesIo.read())
  304.  
  305.         bytesIo.seek(3)
  306.         self.assertEquals(buf[3:], bytesIo.read())
  307.         self.assertRaises(TypeError, bytesIo.seek, 0.0)
  308.  
  309.     def testTell(self):
  310.         buf = self.buftype("1234567890")
  311.         bytesIo = self.ioclass(buf)
  312.  
  313.         self.assertEquals(0, bytesIo.tell())
  314.         bytesIo.seek(5)
  315.         self.assertEquals(5, bytesIo.tell())
  316.         bytesIo.seek(10000)
  317.         self.assertEquals(10000, bytesIo.tell())
  318.  
  319.  
  320. class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
  321.     @staticmethod
  322.     def buftype(s):
  323.         return s.encode("utf-8")
  324.     ioclass = io.BytesIO
  325.     EOF = b""
  326.  
  327.  
  328. class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
  329.     buftype = str
  330.     ioclass = io.StringIO
  331.     EOF = ""
  332.  
  333.  
  334. class BufferedReaderTest(unittest.TestCase):
  335.  
  336.     def testRead(self):
  337.         rawio = MockRawIO((b"abc", b"d", b"efg"))
  338.         bufio = io.BufferedReader(rawio)
  339.  
  340.         self.assertEquals(b"abcdef", bufio.read(6))
  341.  
  342.     def testBuffering(self):
  343.         data = b"abcdefghi"
  344.         dlen = len(data)
  345.  
  346.         tests = [
  347.             [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
  348.             [ 100, [ 3, 3, 3],     [ dlen ]    ],
  349.             [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
  350.         ]
  351.  
  352.         for bufsize, buf_read_sizes, raw_read_sizes in tests:
  353.             rawio = MockFileIO(data)
  354.             bufio = io.BufferedReader(rawio, buffer_size=bufsize)
  355.             pos = 0
  356.             for nbytes in buf_read_sizes:
  357.                 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
  358.                 pos += nbytes
  359.             self.assertEquals(rawio.read_history, raw_read_sizes)
  360.  
  361.     def testReadNonBlocking(self):
  362.         # Inject some None's in there to simulate EWOULDBLOCK
  363.         rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
  364.         bufio = io.BufferedReader(rawio)
  365.  
  366.         self.assertEquals(b"abcd", bufio.read(6))
  367.         self.assertEquals(b"e", bufio.read(1))
  368.         self.assertEquals(b"fg", bufio.read())
  369.         self.assert_(None is bufio.read())
  370.         self.assertEquals(b"", bufio.read())
  371.  
  372.     def testReadToEof(self):
  373.         rawio = MockRawIO((b"abc", b"d", b"efg"))
  374.         bufio = io.BufferedReader(rawio)
  375.  
  376.         self.assertEquals(b"abcdefg", bufio.read(9000))
  377.  
  378.     def testReadNoArgs(self):
  379.         rawio = MockRawIO((b"abc", b"d", b"efg"))
  380.         bufio = io.BufferedReader(rawio)
  381.  
  382.         self.assertEquals(b"abcdefg", bufio.read())
  383.  
  384.     def testFileno(self):
  385.         rawio = MockRawIO((b"abc", b"d", b"efg"))
  386.         bufio = io.BufferedReader(rawio)
  387.  
  388.         self.assertEquals(42, bufio.fileno())
  389.  
  390.     def testFilenoNoFileno(self):
  391.         # XXX will we always have fileno() function? If so, kill
  392.         # this test. Else, write it.
  393.         pass
  394.  
  395.     def testThreads(self):
  396.         try:
  397.             # Write out many bytes with exactly the same number of 0's,
  398.             # 1's... 255's. This will help us check that concurrent reading
  399.             # doesn't duplicate or forget contents.
  400.             N = 1000
  401.             l = range(256) * N
  402.             random.shuffle(l)
  403.             s = bytes(bytearray(l))
  404.             with io.open(test_support.TESTFN, "wb") as f:
  405.                 f.write(s)
  406.             with io.open(test_support.TESTFN, "rb", buffering=0) as raw:
  407.                 bufio = io.BufferedReader(raw, 8)
  408.                 errors = []
  409.                 results = []
  410.                 def f():
  411.                     try:
  412.                         # Intra-buffer read then buffer-flushing read
  413.                         for n in cycle([1, 19]):
  414.                             s = bufio.read(n)
  415.                             if not s:
  416.                                 break
  417.                             # list.append() is atomic
  418.                             results.append(s)
  419.                     except Exception as e:
  420.                         errors.append(e)
  421.                         raise
  422.                 threads = [threading.Thread(target=f) for x in range(20)]
  423.                 for t in threads:
  424.                     t.start()
  425.                 time.sleep(0.02) # yield
  426.                 for t in threads:
  427.                     t.join()
  428.                 self.assertFalse(errors,
  429.                     "the following exceptions were caught: %r" % errors)
  430.                 s = b''.join(results)
  431.                 for i in range(256):
  432.                     c = bytes(bytearray([i]))
  433.                     self.assertEqual(s.count(c), N)
  434.         finally:
  435.             test_support.unlink(test_support.TESTFN)
  436.  
  437.  
  438.  
  439. class BufferedWriterTest(unittest.TestCase):
  440.  
  441.     def testWrite(self):
  442.         # Write to the buffered IO but don't overflow the buffer.
  443.         writer = MockRawIO()
  444.         bufio = io.BufferedWriter(writer, 8)
  445.  
  446.         bufio.write(b"abc")
  447.  
  448.         self.assertFalse(writer._write_stack)
  449.  
  450.     def testWriteOverflow(self):
  451.         writer = MockRawIO()
  452.         bufio = io.BufferedWriter(writer, 8)
  453.  
  454.         bufio.write(b"abc")
  455.         bufio.write(b"defghijkl")
  456.  
  457.         self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
  458.  
  459.     def testWriteNonBlocking(self):
  460.         raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
  461.         bufio = io.BufferedWriter(raw, 8, 16)
  462.  
  463.         bufio.write(b"asdf")
  464.         bufio.write(b"asdfa")
  465.         self.assertEquals(b"asdfasdfa", raw._write_stack[0])
  466.  
  467.         bufio.write(b"asdfasdfasdf")
  468.         self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
  469.         bufio.write(b"asdfasdfasdf")
  470.         self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
  471.         self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
  472.  
  473.         bufio.write(b"asdfasdfasdf")
  474.  
  475.         # XXX I don't like this test. It relies too heavily on how the
  476.         # algorithm actually works, which we might change. Refactor
  477.         # later.
  478.  
  479.     def testFileno(self):
  480.         rawio = MockRawIO((b"abc", b"d", b"efg"))
  481.         bufio = io.BufferedWriter(rawio)
  482.  
  483.         self.assertEquals(42, bufio.fileno())
  484.  
  485.     def testFlush(self):
  486.         writer = MockRawIO()
  487.         bufio = io.BufferedWriter(writer, 8)
  488.  
  489.         bufio.write(b"abc")
  490.         bufio.flush()
  491.  
  492.         self.assertEquals(b"abc", writer._write_stack[0])
  493.  
  494.     def testThreads(self):
  495.         # BufferedWriter should not raise exceptions or crash
  496.         # when called from multiple threads.
  497.         try:
  498.             # We use a real file object because it allows us to
  499.             # exercise situations where the GIL is released before
  500.             # writing the buffer to the raw streams. This is in addition
  501.             # to concurrency issues due to switching threads in the middle
  502.             # of Python code.
  503.             with io.open(test_support.TESTFN, "wb", buffering=0) as raw:
  504.                 bufio = io.BufferedWriter(raw, 8)
  505.                 errors = []
  506.                 def f():
  507.                     try:
  508.                         # Write enough bytes to flush the buffer
  509.                         s = b"a" * 19
  510.                         for i in range(50):
  511.                             bufio.write(s)
  512.                     except Exception as e:
  513.                         errors.append(e)
  514.                         raise
  515.                 threads = [threading.Thread(target=f) for x in range(20)]
  516.                 for t in threads:
  517.                     t.start()
  518.                 time.sleep(0.02) # yield
  519.                 for t in threads:
  520.                     t.join()
  521.                 self.assertFalse(errors,
  522.                     "the following exceptions were caught: %r" % errors)
  523.         finally:
  524.             test_support.unlink(test_support.TESTFN)
  525.  
  526.  
  527. class BufferedRWPairTest(unittest.TestCase):
  528.  
  529.     def testRWPair(self):
  530.         r = MockRawIO(())
  531.         w = MockRawIO()
  532.         pair = io.BufferedRWPair(r, w)
  533.  
  534.         # XXX need implementation
  535.  
  536.  
  537. class BufferedRandomTest(unittest.TestCase):
  538.  
  539.     def testReadAndWrite(self):
  540.         raw = MockRawIO((b"asdf", b"ghjk"))
  541.         rw = io.BufferedRandom(raw, 8, 12)
  542.  
  543.         self.assertEqual(b"as", rw.read(2))
  544.         rw.write(b"ddd")
  545.         rw.write(b"eee")
  546.         self.assertFalse(raw._write_stack) # Buffer writes
  547.         self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
  548.         self.assertEquals(b"dddeee", raw._write_stack[0])
  549.  
  550.     def testSeekAndTell(self):
  551.         raw = io.BytesIO(b"asdfghjkl")
  552.         rw = io.BufferedRandom(raw)
  553.  
  554.         self.assertEquals(b"as", rw.read(2))
  555.         self.assertEquals(2, rw.tell())
  556.         rw.seek(0, 0)
  557.         self.assertEquals(b"asdf", rw.read(4))
  558.  
  559.         rw.write(b"asdf")
  560.         rw.seek(0, 0)
  561.         self.assertEquals(b"asdfasdfl", rw.read())
  562.         self.assertEquals(9, rw.tell())
  563.         rw.seek(-4, 2)
  564.         self.assertEquals(5, rw.tell())
  565.         rw.seek(2, 1)
  566.         self.assertEquals(7, rw.tell())
  567.         self.assertEquals(b"fl", rw.read(11))
  568.         self.assertRaises(TypeError, rw.seek, 0.0)
  569.  
  570. # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
  571. # properties:
  572. #   - A single output character can correspond to many bytes of input.
  573. #   - The number of input bytes to complete the character can be
  574. #     undetermined until the last input byte is received.
  575. #   - The number of input bytes can vary depending on previous input.
  576. #   - A single input byte can correspond to many characters of output.
  577. #   - The number of output characters can be undetermined until the
  578. #     last input byte is received.
  579. #   - The number of output characters can vary depending on previous input.
  580.  
  581. class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
  582.     """
  583.     For testing seek/tell behavior with a stateful, buffering decoder.
  584.  
  585.     Input is a sequence of words.  Words may be fixed-length (length set
  586.     by input) or variable-length (period-terminated).  In variable-length
  587.     mode, extra periods are ignored.  Possible words are:
  588.       - 'i' followed by a number sets the input length, I (maximum 99).
  589.         When I is set to 0, words are space-terminated.
  590.       - 'o' followed by a number sets the output length, O (maximum 99).
  591.       - Any other word is converted into a word followed by a period on
  592.         the output.  The output word consists of the input word truncated
  593.         or padded out with hyphens to make its length equal to O.  If O
  594.         is 0, the word is output verbatim without truncating or padding.
  595.     I and O are initially set to 1.  When I changes, any buffered input is
  596.     re-scanned according to the new I.  EOF also terminates the last word.
  597.     """
  598.  
  599.     def __init__(self, errors='strict'):
  600.         codecs.IncrementalDecoder.__init__(self, errors)
  601.         self.reset()
  602.  
  603.     def __repr__(self):
  604.         return '<SID %x>' % id(self)
  605.  
  606.     def reset(self):
  607.         self.i = 1
  608.         self.o = 1
  609.         self.buffer = bytearray()
  610.  
  611.     def getstate(self):
  612.         i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
  613.         return bytes(self.buffer), i*100 + o
  614.  
  615.     def setstate(self, state):
  616.         buffer, io = state
  617.         self.buffer = bytearray(buffer)
  618.         i, o = divmod(io, 100)
  619.         self.i, self.o = i ^ 1, o ^ 1
  620.  
  621.     def decode(self, input, final=False):
  622.         output = ''
  623.         for b in input:
  624.             if self.i == 0: # variable-length, terminated with period
  625.                 if b == '.':
  626.                     if self.buffer:
  627.                         output += self.process_word()
  628.                 else:
  629.                     self.buffer.append(b)
  630.             else: # fixed-length, terminate after self.i bytes
  631.                 self.buffer.append(b)
  632.                 if len(self.buffer) == self.i:
  633.                     output += self.process_word()
  634.         if final and self.buffer: # EOF terminates the last word
  635.             output += self.process_word()
  636.         return output
  637.  
  638.     def process_word(self):
  639.         output = ''
  640.         if self.buffer[0] == ord('i'):
  641.             self.i = min(99, int(self.buffer[1:] or 0)) # set input length
  642.         elif self.buffer[0] == ord('o'):
  643.             self.o = min(99, int(self.buffer[1:] or 0)) # set output length
  644.         else:
  645.             output = self.buffer.decode('ascii')
  646.             if len(output) < self.o:
  647.                 output += '-'*self.o # pad out with hyphens
  648.             if self.o:
  649.                 output = output[:self.o] # truncate to output length
  650.             output += '.'
  651.         self.buffer = bytearray()
  652.         return output
  653.  
  654.     codecEnabled = False
  655.  
  656.     @classmethod
  657.     def lookupTestDecoder(cls, name):
  658.         if cls.codecEnabled and name == 'test_decoder':
  659.             return codecs.CodecInfo(
  660.                 name='test_decoder', encode=None, decode=None,
  661.                 incrementalencoder=None,
  662.                 streamreader=None, streamwriter=None,
  663.                 incrementaldecoder=cls)
  664.  
  665. # Register the previous decoder for testing.
  666. # Disabled by default, tests will enable it.
  667. codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
  668.  
  669.  
  670. class StatefulIncrementalDecoderTest(unittest.TestCase):
  671.     """
  672.     Make sure the StatefulIncrementalDecoder actually works.
  673.     """
  674.  
  675.     test_cases = [
  676.         # I=1, O=1 (fixed-length input == fixed-length output)
  677.         (b'abcd', False, 'a.b.c.d.'),
  678.         # I=0, O=0 (variable-length input, variable-length output)
  679.         (b'oiabcd', True, 'abcd.'),
  680.         # I=0, O=0 (should ignore extra periods)
  681.         (b'oi...abcd...', True, 'abcd.'),
  682.         # I=0, O=6 (variable-length input, fixed-length output)
  683.         (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
  684.         # I=2, O=6 (fixed-length input < fixed-length output)
  685.         (b'i.i2.o6xyz', True, 'xy----.z-----.'),
  686.         # I=6, O=3 (fixed-length input > fixed-length output)
  687.         (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
  688.         # I=0, then 3; O=29, then 15 (with longer output)
  689.         (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
  690.          'a----------------------------.' +
  691.          'b----------------------------.' +
  692.          'cde--------------------------.' +
  693.          'abcdefghijabcde.' +
  694.          'a.b------------.' +
  695.          '.c.------------.' +
  696.          'd.e------------.' +
  697.          'k--------------.' +
  698.          'l--------------.' +
  699.          'm--------------.')
  700.     ]
  701.  
  702.     def testDecoder(self):
  703.         # Try a few one-shot test cases.
  704.         for input, eof, output in self.test_cases:
  705.             d = StatefulIncrementalDecoder()
  706.             self.assertEquals(d.decode(input, eof), output)
  707.  
  708.         # Also test an unfinished decode, followed by forcing EOF.
  709.         d = StatefulIncrementalDecoder()
  710.         self.assertEquals(d.decode(b'oiabcd'), '')
  711.         self.assertEquals(d.decode(b'', 1), 'abcd.')
  712.  
  713. class TextIOWrapperTest(unittest.TestCase):
  714.  
  715.     def setUp(self):
  716.         self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
  717.         self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
  718.  
  719.     def tearDown(self):
  720.         test_support.unlink(test_support.TESTFN)
  721.  
  722.     def testLineBuffering(self):
  723.         r = io.BytesIO()
  724.         b = io.BufferedWriter(r, 1000)
  725.         t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
  726.         t.write(u"X")
  727.         self.assertEquals(r.getvalue(), b"")  # No flush happened
  728.         t.write(u"Y\nZ")
  729.         self.assertEquals(r.getvalue(), b"XY\nZ")  # All got flushed
  730.         t.write(u"A\rB")
  731.         self.assertEquals(r.getvalue(), b"XY\nZA\rB")
  732.  
  733.     def testEncodingErrorsReading(self):
  734.         # (1) default
  735.         b = io.BytesIO(b"abc\n\xff\n")
  736.         t = io.TextIOWrapper(b, encoding="ascii")
  737.         self.assertRaises(UnicodeError, t.read)
  738.         # (2) explicit strict
  739.         b = io.BytesIO(b"abc\n\xff\n")
  740.         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
  741.         self.assertRaises(UnicodeError, t.read)
  742.         # (3) ignore
  743.         b = io.BytesIO(b"abc\n\xff\n")
  744.         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
  745.         self.assertEquals(t.read(), "abc\n\n")
  746.         # (4) replace
  747.         b = io.BytesIO(b"abc\n\xff\n")
  748.         t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
  749.         self.assertEquals(t.read(), u"abc\n\ufffd\n")
  750.  
  751.     def testEncodingErrorsWriting(self):
  752.         # (1) default
  753.         b = io.BytesIO()
  754.         t = io.TextIOWrapper(b, encoding="ascii")
  755.         self.assertRaises(UnicodeError, t.write, u"\xff")
  756.         # (2) explicit strict
  757.         b = io.BytesIO()
  758.         t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
  759.         self.assertRaises(UnicodeError, t.write, u"\xff")
  760.         # (3) ignore
  761.         b = io.BytesIO()
  762.         t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
  763.                              newline="\n")
  764.         t.write(u"abc\xffdef\n")
  765.         t.flush()
  766.         self.assertEquals(b.getvalue(), b"abcdef\n")
  767.         # (4) replace
  768.         b = io.BytesIO()
  769.         t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
  770.                              newline="\n")
  771.         t.write(u"abc\xffdef\n")
  772.         t.flush()
  773.         self.assertEquals(b.getvalue(), b"abc?def\n")
  774.  
  775.     def testNewlinesInput(self):
  776.         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
  777.         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
  778.         for newline, expected in [
  779.             (None, normalized.decode("ascii").splitlines(True)),
  780.             ("", testdata.decode("ascii").splitlines(True)),
  781.             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  782.             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  783.             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
  784.             ]:
  785.             buf = io.BytesIO(testdata)
  786.             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
  787.             self.assertEquals(txt.readlines(), expected)
  788.             txt.seek(0)
  789.             self.assertEquals(txt.read(), "".join(expected))
  790.  
  791.     def testNewlinesOutput(self):
  792.         testdict = {
  793.             "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  794.             "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  795.             "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
  796.             "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
  797.             }
  798.         tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
  799.         for newline, expected in tests:
  800.             buf = io.BytesIO()
  801.             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
  802.             txt.write("AAA\nB")
  803.             txt.write("BB\nCCC\n")
  804.             txt.write("X\rY\r\nZ")
  805.             txt.flush()
  806.             self.assertEquals(buf.closed, False)
  807.             self.assertEquals(buf.getvalue(), expected)
  808.  
  809.     def testNewlines(self):
  810.         input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
  811.  
  812.         tests = [
  813.             [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
  814.             [ '', input_lines ],
  815.             [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
  816.             [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
  817.             [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
  818.         ]
  819.  
  820.         encodings = ('utf-8', 'latin-1')
  821.  
  822.         # Try a range of buffer sizes to test the case where \r is the last
  823.         # character in TextIOWrapper._pending_line.
  824.         for encoding in encodings:
  825.             # XXX: str.encode() should return bytes
  826.             data = bytes(''.join(input_lines).encode(encoding))
  827.             for do_reads in (False, True):
  828.                 for bufsize in range(1, 10):
  829.                     for newline, exp_lines in tests:
  830.                         bufio = io.BufferedReader(io.BytesIO(data), bufsize)
  831.                         textio = io.TextIOWrapper(bufio, newline=newline,
  832.                                                   encoding=encoding)
  833.                         if do_reads:
  834.                             got_lines = []
  835.                             while True:
  836.                                 c2 = textio.read(2)
  837.                                 if c2 == '':
  838.                                     break
  839.                                 self.assertEquals(len(c2), 2)
  840.                                 got_lines.append(c2 + textio.readline())
  841.                         else:
  842.                             got_lines = list(textio)
  843.  
  844.                         for got_line, exp_line in zip(got_lines, exp_lines):
  845.                             self.assertEquals(got_line, exp_line)
  846.                         self.assertEquals(len(got_lines), len(exp_lines))
  847.  
  848.     def testNewlinesInput(self):
  849.         testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
  850.         normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
  851.         for newline, expected in [
  852.             (None, normalized.decode("ascii").splitlines(True)),
  853.             ("", testdata.decode("ascii").splitlines(True)),
  854.             ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  855.             ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  856.             ("\r",  ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
  857.             ]:
  858.             buf = io.BytesIO(testdata)
  859.             txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
  860.             self.assertEquals(txt.readlines(), expected)
  861.             txt.seek(0)
  862.             self.assertEquals(txt.read(), "".join(expected))
  863.  
  864.     def testNewlinesOutput(self):
  865.         data = u"AAA\nBBB\rCCC\n"
  866.         data_lf = b"AAA\nBBB\rCCC\n"
  867.         data_cr = b"AAA\rBBB\rCCC\r"
  868.         data_crlf = b"AAA\r\nBBB\rCCC\r\n"
  869.         save_linesep = os.linesep
  870.         try:
  871.             for os.linesep, newline, expected in [
  872.                 ("\n", None, data_lf),
  873.                 ("\r\n", None, data_crlf),
  874.                 ("\n", "", data_lf),
  875.                 ("\r\n", "", data_lf),
  876.                 ("\n", "\n", data_lf),
  877.                 ("\r\n", "\n", data_lf),
  878.                 ("\n", "\r", data_cr),
  879.                 ("\r\n", "\r", data_cr),
  880.                 ("\n", "\r\n", data_crlf),
  881.                 ("\r\n", "\r\n", data_crlf),
  882.                 ]:
  883.                 buf = io.BytesIO()
  884.                 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
  885.                 txt.write(data)
  886.                 txt.close()
  887.                 self.assertEquals(buf.closed, True)
  888.                 self.assertRaises(ValueError, buf.getvalue)
  889.         finally:
  890.             os.linesep = save_linesep
  891.  
  892.     # Systematic tests of the text I/O API
  893.  
  894.     def testBasicIO(self):
  895.         for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
  896.             for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
  897.                 f = io.open(test_support.TESTFN, "w+", encoding=enc)
  898.                 f._CHUNK_SIZE = chunksize
  899.                 self.assertEquals(f.write(u"abc"), 3)
  900.                 f.close()
  901.                 f = io.open(test_support.TESTFN, "r+", encoding=enc)
  902.                 f._CHUNK_SIZE = chunksize
  903.                 self.assertEquals(f.tell(), 0)
  904.                 self.assertEquals(f.read(), u"abc")
  905.                 cookie = f.tell()
  906.                 self.assertEquals(f.seek(0), 0)
  907.                 self.assertEquals(f.read(2), u"ab")
  908.                 self.assertEquals(f.read(1), u"c")
  909.                 self.assertEquals(f.read(1), u"")
  910.                 self.assertEquals(f.read(), u"")
  911.                 self.assertEquals(f.tell(), cookie)
  912.                 self.assertEquals(f.seek(0), 0)
  913.                 self.assertEquals(f.seek(0, 2), cookie)
  914.                 self.assertEquals(f.write(u"def"), 3)
  915.                 self.assertEquals(f.seek(cookie), cookie)
  916.                 self.assertEquals(f.read(), u"def")
  917.                 if enc.startswith("utf"):
  918.                     self.multi_line_test(f, enc)
  919.                 f.close()
  920.  
  921.     def multi_line_test(self, f, enc):
  922.         f.seek(0)
  923.         f.truncate()
  924.         sample = u"s\xff\u0fff\uffff"
  925.         wlines = []
  926.         for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
  927.             chars = []
  928.             for i in range(size):
  929.                 chars.append(sample[i % len(sample)])
  930.             line = u"".join(chars) + u"\n"
  931.             wlines.append((f.tell(), line))
  932.             f.write(line)
  933.         f.seek(0)
  934.         rlines = []
  935.         while True:
  936.             pos = f.tell()
  937.             line = f.readline()
  938.             if not line:
  939.                 break
  940.             rlines.append((pos, line))
  941.         self.assertEquals(rlines, wlines)
  942.  
  943.     def testTelling(self):
  944.         f = io.open(test_support.TESTFN, "w+", encoding="utf8")
  945.         p0 = f.tell()
  946.         f.write(u"\xff\n")
  947.         p1 = f.tell()
  948.         f.write(u"\xff\n")
  949.         p2 = f.tell()
  950.         f.seek(0)
  951.         self.assertEquals(f.tell(), p0)
  952.         self.assertEquals(f.readline(), u"\xff\n")
  953.         self.assertEquals(f.tell(), p1)
  954.         self.assertEquals(f.readline(), u"\xff\n")
  955.         self.assertEquals(f.tell(), p2)
  956.         f.seek(0)
  957.         for line in f:
  958.             self.assertEquals(line, u"\xff\n")
  959.             self.assertRaises(IOError, f.tell)
  960.         self.assertEquals(f.tell(), p2)
  961.         f.close()
  962.  
  963.     def testSeeking(self):
  964.         chunk_size = io.TextIOWrapper._CHUNK_SIZE
  965.         prefix_size = chunk_size - 2
  966.         u_prefix = "a" * prefix_size
  967.         prefix = bytes(u_prefix.encode("utf-8"))
  968.         self.assertEquals(len(u_prefix), len(prefix))
  969.         u_suffix = "\u8888\n"
  970.         suffix = bytes(u_suffix.encode("utf-8"))
  971.         line = prefix + suffix
  972.         f = io.open(test_support.TESTFN, "wb")
  973.         f.write(line*2)
  974.         f.close()
  975.         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
  976.         s = f.read(prefix_size)
  977.         self.assertEquals(s, unicode(prefix, "ascii"))
  978.         self.assertEquals(f.tell(), prefix_size)
  979.         self.assertEquals(f.readline(), u_suffix)
  980.  
  981.     def testSeekingToo(self):
  982.         # Regression test for a specific bug
  983.         data = b'\xe0\xbf\xbf\n'
  984.         f = io.open(test_support.TESTFN, "wb")
  985.         f.write(data)
  986.         f.close()
  987.         f = io.open(test_support.TESTFN, "r", encoding="utf-8")
  988.         f._CHUNK_SIZE  # Just test that it exists
  989.         f._CHUNK_SIZE = 2
  990.         f.readline()
  991.         f.tell()
  992.  
  993.     def testSeekAndTell(self):
  994.         """Test seek/tell using the StatefulIncrementalDecoder."""
  995.  
  996.         def testSeekAndTellWithData(data, min_pos=0):
  997.             """Tell/seek to various points within a data stream and ensure
  998.             that the decoded data returned by read() is consistent."""
  999.             f = io.open(test_support.TESTFN, 'wb')
  1000.             f.write(data)
  1001.             f.close()
  1002.             f = io.open(test_support.TESTFN, encoding='test_decoder')
  1003.             decoded = f.read()
  1004.             f.close()
  1005.  
  1006.             for i in range(min_pos, len(decoded) + 1): # seek positions
  1007.                 for j in [1, 5, len(decoded) - i]: # read lengths
  1008.                     f = io.open(test_support.TESTFN, encoding='test_decoder')
  1009.                     self.assertEquals(f.read(i), decoded[:i])
  1010.                     cookie = f.tell()
  1011.                     self.assertEquals(f.read(j), decoded[i:i + j])
  1012.                     f.seek(cookie)
  1013.                     self.assertEquals(f.read(), decoded[i:])
  1014.                     f.close()
  1015.  
  1016.         # Enable the test decoder.
  1017.         StatefulIncrementalDecoder.codecEnabled = 1
  1018.  
  1019.         # Run the tests.
  1020.         try:
  1021.             # Try each test case.
  1022.             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  1023.                 testSeekAndTellWithData(input)
  1024.  
  1025.             # Position each test case so that it crosses a chunk boundary.
  1026.             CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
  1027.             for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  1028.                 offset = CHUNK_SIZE - len(input)//2
  1029.                 prefix = b'.'*offset
  1030.                 # Don't bother seeking into the prefix (takes too long).
  1031.                 min_pos = offset*2
  1032.                 testSeekAndTellWithData(prefix + input, min_pos)
  1033.  
  1034.         # Ensure our test decoder won't interfere with subsequent tests.
  1035.         finally:
  1036.             StatefulIncrementalDecoder.codecEnabled = 0
  1037.  
  1038.     def testEncodedWrites(self):
  1039.         data = u"1234567890"
  1040.         tests = ("utf-16",
  1041.                  "utf-16-le",
  1042.                  "utf-16-be",
  1043.                  "utf-32",
  1044.                  "utf-32-le",
  1045.                  "utf-32-be")
  1046.         for encoding in tests:
  1047.             buf = io.BytesIO()
  1048.             f = io.TextIOWrapper(buf, encoding=encoding)
  1049.             # Check if the BOM is written only once (see issue1753).
  1050.             f.write(data)
  1051.             f.write(data)
  1052.             f.seek(0)
  1053.             self.assertEquals(f.read(), data * 2)
  1054.             self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
  1055.  
  1056.     def timingTest(self):
  1057.         timer = time.time
  1058.         enc = "utf8"
  1059.         line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
  1060.         nlines = 10000
  1061.         nchars = len(line)
  1062.         nbytes = len(line.encode(enc))
  1063.         for chunk_size in (32, 64, 128, 256):
  1064.             f = io.open(test_support.TESTFN, "w+", encoding=enc)
  1065.             f._CHUNK_SIZE = chunk_size
  1066.             t0 = timer()
  1067.             for i in range(nlines):
  1068.                 f.write(line)
  1069.             f.flush()
  1070.             t1 = timer()
  1071.             f.seek(0)
  1072.             for line in f:
  1073.                 pass
  1074.             t2 = timer()
  1075.             f.seek(0)
  1076.             while f.readline():
  1077.                 pass
  1078.             t3 = timer()
  1079.             f.seek(0)
  1080.             while f.readline():
  1081.                 f.tell()
  1082.             t4 = timer()
  1083.             f.close()
  1084.             if test_support.verbose:
  1085.                 print("\nTiming test: %d lines of %d characters (%d bytes)" %
  1086.                       (nlines, nchars, nbytes))
  1087.                 print("File chunk size:          %6s" % f._CHUNK_SIZE)
  1088.                 print("Writing:                  %6.3f seconds" % (t1-t0))
  1089.                 print("Reading using iteration:  %6.3f seconds" % (t2-t1))
  1090.                 print("Reading using readline(): %6.3f seconds" % (t3-t2))
  1091.                 print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
  1092.  
  1093.     def testReadOneByOne(self):
  1094.         txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
  1095.         reads = ""
  1096.         while True:
  1097.             c = txt.read(1)
  1098.             if not c:
  1099.                 break
  1100.             reads += c
  1101.         self.assertEquals(reads, "AA\nBB")
  1102.  
  1103.     # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
  1104.     def testReadByChunk(self):
  1105.         # make sure "\r\n" straddles 128 char boundary.
  1106.         txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
  1107.         reads = ""
  1108.         while True:
  1109.             c = txt.read(128)
  1110.             if not c:
  1111.                 break
  1112.             reads += c
  1113.         self.assertEquals(reads, "A"*127+"\nB")
  1114.  
  1115.     def test_issue1395_1(self):
  1116.         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
  1117.  
  1118.         # read one char at a time
  1119.         reads = ""
  1120.         while True:
  1121.             c = txt.read(1)
  1122.             if not c:
  1123.                 break
  1124.             reads += c
  1125.         self.assertEquals(reads, self.normalized)
  1126.  
  1127.     def test_issue1395_2(self):
  1128.         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
  1129.         txt._CHUNK_SIZE = 4
  1130.  
  1131.         reads = ""
  1132.         while True:
  1133.             c = txt.read(4)
  1134.             if not c:
  1135.                 break
  1136.             reads += c
  1137.         self.assertEquals(reads, self.normalized)
  1138.  
  1139.     def test_issue1395_3(self):
  1140.         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
  1141.         txt._CHUNK_SIZE = 4
  1142.  
  1143.         reads = txt.read(4)
  1144.         reads += txt.read(4)
  1145.         reads += txt.readline()
  1146.         reads += txt.readline()
  1147.         reads += txt.readline()
  1148.         self.assertEquals(reads, self.normalized)
  1149.  
  1150.     def test_issue1395_4(self):
  1151.         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
  1152.         txt._CHUNK_SIZE = 4
  1153.  
  1154.         reads = txt.read(4)
  1155.         reads += txt.read()
  1156.         self.assertEquals(reads, self.normalized)
  1157.  
  1158.     def test_issue1395_5(self):
  1159.         txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
  1160.         txt._CHUNK_SIZE = 4
  1161.  
  1162.         reads = txt.read(4)
  1163.         pos = txt.tell()
  1164.         txt.seek(0)
  1165.         txt.seek(pos)
  1166.         self.assertEquals(txt.read(4), "BBB\n")
  1167.  
  1168.     def test_issue2282(self):
  1169.         buffer = io.BytesIO(self.testdata)
  1170.         txt = io.TextIOWrapper(buffer, encoding="ascii")
  1171.  
  1172.         self.assertEqual(buffer.seekable(), txt.seekable())
  1173.  
  1174.     def test_newline_decoder(self):
  1175.         import codecs
  1176.         decoder = codecs.getincrementaldecoder("utf-8")()
  1177.         decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
  1178.  
  1179.         self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
  1180.  
  1181.         self.assertEquals(decoder.decode(b'\xe8'), u"")
  1182.         self.assertEquals(decoder.decode(b'\xa2'), u"")
  1183.         self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
  1184.  
  1185.         self.assertEquals(decoder.decode(b'\xe8'), u"")
  1186.         self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
  1187.  
  1188.         decoder.setstate((b'', 0))
  1189.         self.assertEquals(decoder.decode(b'\n'), u"\n")
  1190.         self.assertEquals(decoder.decode(b'\r'), u"")
  1191.         self.assertEquals(decoder.decode(b'', final=True), u"\n")
  1192.         self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
  1193.  
  1194.         self.assertEquals(decoder.decode(b'\r'), u"")
  1195.         self.assertEquals(decoder.decode(b'a'), u"\na")
  1196.  
  1197.         self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
  1198.         self.assertEquals(decoder.decode(b'\r'), u"")
  1199.         self.assertEquals(decoder.decode(b'\r'), u"\n")
  1200.         self.assertEquals(decoder.decode(b'\na'), u"\na")
  1201.  
  1202.         self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
  1203.         self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
  1204.         self.assertEquals(decoder.decode(b'\n'), u"\n")
  1205.         self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
  1206.         self.assertEquals(decoder.decode(b'\n'), u"\n")
  1207.  
  1208.         decoder = codecs.getincrementaldecoder("utf-8")()
  1209.         decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
  1210.         self.assertEquals(decoder.newlines, None)
  1211.         decoder.decode(b"abc\n\r")
  1212.         self.assertEquals(decoder.newlines, u'\n')
  1213.         decoder.decode(b"\nabc")
  1214.         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
  1215.         decoder.decode(b"abc\r")
  1216.         self.assertEquals(decoder.newlines, ('\n', '\r\n'))
  1217.         decoder.decode(b"abc")
  1218.         self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
  1219.         decoder.decode(b"abc\r")
  1220.         decoder.reset()
  1221.         self.assertEquals(decoder.decode(b"abc"), "abc")
  1222.         self.assertEquals(decoder.newlines, None)
  1223.  
  1224. # XXX Tests for open()
  1225.  
  1226. class MiscIOTest(unittest.TestCase):
  1227.  
  1228.     def testImport__all__(self):
  1229.         for name in io.__all__:
  1230.             obj = getattr(io, name, None)
  1231.             self.assert_(obj is not None, name)
  1232.             if name == "open":
  1233.                 continue
  1234.             elif "error" in name.lower():
  1235.                 self.assert_(issubclass(obj, Exception), name)
  1236.             else:
  1237.                 self.assert_(issubclass(obj, io.IOBase))
  1238.  
  1239.  
  1240. def test_main():
  1241.     test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
  1242.                               BufferedReaderTest, BufferedWriterTest,
  1243.                               BufferedRWPairTest, BufferedRandomTest,
  1244.                               StatefulIncrementalDecoderTest,
  1245.                               TextIOWrapperTest, MiscIOTest)
  1246.  
  1247. if __name__ == "__main__":
  1248.     unittest.main()
  1249.