home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_mailbox.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  76.6 KB  |  1,927 lines

  1. import os
  2. import sys
  3. import time
  4. import stat
  5. import socket
  6. import email
  7. import email.message
  8. import rfc822
  9. import re
  10. import StringIO
  11. from test import test_support
  12. import unittest
  13. import mailbox
  14. import glob
  15. try:
  16.     import fcntl
  17. except ImportError:
  18.     pass
  19.  
  20.  
  21. class TestBase(unittest.TestCase):
  22.  
  23.     def _check_sample(self, msg):
  24.         # Inspect a mailbox.Message representation of the sample message
  25.         self.assert_(isinstance(msg, email.message.Message))
  26.         self.assert_(isinstance(msg, mailbox.Message))
  27.         for key, value in _sample_headers.iteritems():
  28.             self.assert_(value in msg.get_all(key))
  29.         self.assert_(msg.is_multipart())
  30.         self.assert_(len(msg.get_payload()) == len(_sample_payloads))
  31.         for i, payload in enumerate(_sample_payloads):
  32.             part = msg.get_payload(i)
  33.             self.assert_(isinstance(part, email.message.Message))
  34.             self.assert_(not isinstance(part, mailbox.Message))
  35.             self.assert_(part.get_payload() == payload)
  36.  
  37.     def _delete_recursively(self, target):
  38.         # Delete a file or delete a directory recursively
  39.         if os.path.isdir(target):
  40.             for path, dirs, files in os.walk(target, topdown=False):
  41.                 for name in files:
  42.                     os.remove(os.path.join(path, name))
  43.                 for name in dirs:
  44.                     os.rmdir(os.path.join(path, name))
  45.             os.rmdir(target)
  46.         elif os.path.exists(target):
  47.             os.remove(target)
  48.  
  49.  
  50. class TestMailbox(TestBase):
  51.  
  52.     _factory = None     # Overridden by subclasses to reuse tests
  53.     _template = 'From: foo\n\n%s'
  54.  
  55.     def setUp(self):
  56.         self._path = test_support.TESTFN
  57.         self._delete_recursively(self._path)
  58.         self._box = self._factory(self._path)
  59.  
  60.     def tearDown(self):
  61.         self._box.close()
  62.         self._delete_recursively(self._path)
  63.  
  64.     def test_add(self):
  65.         # Add copies of a sample message
  66.         keys = []
  67.         keys.append(self._box.add(self._template % 0))
  68.         self.assert_(len(self._box) == 1)
  69.         keys.append(self._box.add(mailbox.Message(_sample_message)))
  70.         self.assert_(len(self._box) == 2)
  71.         keys.append(self._box.add(email.message_from_string(_sample_message)))
  72.         self.assert_(len(self._box) == 3)
  73.         keys.append(self._box.add(StringIO.StringIO(_sample_message)))
  74.         self.assert_(len(self._box) == 4)
  75.         keys.append(self._box.add(_sample_message))
  76.         self.assert_(len(self._box) == 5)
  77.         self.assert_(self._box.get_string(keys[0]) == self._template % 0)
  78.         for i in (1, 2, 3, 4):
  79.             self._check_sample(self._box[keys[i]])
  80.  
  81.     def test_remove(self):
  82.         # Remove messages using remove()
  83.         self._test_remove_or_delitem(self._box.remove)
  84.  
  85.     def test_delitem(self):
  86.         # Remove messages using __delitem__()
  87.         self._test_remove_or_delitem(self._box.__delitem__)
  88.  
  89.     def _test_remove_or_delitem(self, method):
  90.         # (Used by test_remove() and test_delitem().)
  91.         key0 = self._box.add(self._template % 0)
  92.         key1 = self._box.add(self._template % 1)
  93.         self.assert_(len(self._box) == 2)
  94.         method(key0)
  95.         l = len(self._box)
  96.         self.assert_(l == 1, "actual l: %s" % l)
  97.         self.assertRaises(KeyError, lambda: self._box[key0])
  98.         self.assertRaises(KeyError, lambda: method(key0))
  99.         self.assert_(self._box.get_string(key1) == self._template % 1)
  100.         key2 = self._box.add(self._template % 2)
  101.         self.assert_(len(self._box) == 2)
  102.         method(key2)
  103.         l = len(self._box)
  104.         self.assert_(l == 1, "actual l: %s" % l)
  105.         self.assertRaises(KeyError, lambda: self._box[key2])
  106.         self.assertRaises(KeyError, lambda: method(key2))
  107.         self.assert_(self._box.get_string(key1) == self._template % 1)
  108.         method(key1)
  109.         self.assert_(len(self._box) == 0)
  110.         self.assertRaises(KeyError, lambda: self._box[key1])
  111.         self.assertRaises(KeyError, lambda: method(key1))
  112.  
  113.     def test_discard(self, repetitions=10):
  114.         # Discard messages
  115.         key0 = self._box.add(self._template % 0)
  116.         key1 = self._box.add(self._template % 1)
  117.         self.assert_(len(self._box) == 2)
  118.         self._box.discard(key0)
  119.         self.assert_(len(self._box) == 1)
  120.         self.assertRaises(KeyError, lambda: self._box[key0])
  121.         self._box.discard(key0)
  122.         self.assert_(len(self._box) == 1)
  123.         self.assertRaises(KeyError, lambda: self._box[key0])
  124.  
  125.     def test_get(self):
  126.         # Retrieve messages using get()
  127.         key0 = self._box.add(self._template % 0)
  128.         msg = self._box.get(key0)
  129.         self.assert_(msg['from'] == 'foo')
  130.         self.assert_(msg.get_payload() == '0')
  131.         self.assert_(self._box.get('foo') is None)
  132.         self.assert_(self._box.get('foo', False) is False)
  133.         self._box.close()
  134.         self._box = self._factory(self._path, factory=rfc822.Message)
  135.         key1 = self._box.add(self._template % 1)
  136.         msg = self._box.get(key1)
  137.         self.assert_(msg['from'] == 'foo')
  138.         self.assert_(msg.fp.read() == '1')
  139.  
  140.     def test_getitem(self):
  141.         # Retrieve message using __getitem__()
  142.         key0 = self._box.add(self._template % 0)
  143.         msg = self._box[key0]
  144.         self.assert_(msg['from'] == 'foo')
  145.         self.assert_(msg.get_payload() == '0')
  146.         self.assertRaises(KeyError, lambda: self._box['foo'])
  147.         self._box.discard(key0)
  148.         self.assertRaises(KeyError, lambda: self._box[key0])
  149.  
  150.     def test_get_message(self):
  151.         # Get Message representations of messages
  152.         key0 = self._box.add(self._template % 0)
  153.         key1 = self._box.add(_sample_message)
  154.         msg0 = self._box.get_message(key0)
  155.         self.assert_(isinstance(msg0, mailbox.Message))
  156.         self.assert_(msg0['from'] == 'foo')
  157.         self.assert_(msg0.get_payload() == '0')
  158.         self._check_sample(self._box.get_message(key1))
  159.  
  160.     def test_get_string(self):
  161.         # Get string representations of messages
  162.         key0 = self._box.add(self._template % 0)
  163.         key1 = self._box.add(_sample_message)
  164.         self.assert_(self._box.get_string(key0) == self._template % 0)
  165.         self.assert_(self._box.get_string(key1) == _sample_message)
  166.  
  167.     def test_get_file(self):
  168.         # Get file representations of messages
  169.         key0 = self._box.add(self._template % 0)
  170.         key1 = self._box.add(_sample_message)
  171.         self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n')
  172.                      == self._template % 0)
  173.         self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n')
  174.                      == _sample_message)
  175.  
  176.     def test_iterkeys(self):
  177.         # Get keys using iterkeys()
  178.         self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
  179.  
  180.     def test_keys(self):
  181.         # Get keys using keys()
  182.         self._check_iteration(self._box.keys, do_keys=True, do_values=False)
  183.  
  184.     def test_itervalues(self):
  185.         # Get values using itervalues()
  186.         self._check_iteration(self._box.itervalues, do_keys=False,
  187.                               do_values=True)
  188.  
  189.     def test_iter(self):
  190.         # Get values using __iter__()
  191.         self._check_iteration(self._box.__iter__, do_keys=False,
  192.                               do_values=True)
  193.  
  194.     def test_values(self):
  195.         # Get values using values()
  196.         self._check_iteration(self._box.values, do_keys=False, do_values=True)
  197.  
  198.     def test_iteritems(self):
  199.         # Get keys and values using iteritems()
  200.         self._check_iteration(self._box.iteritems, do_keys=True,
  201.                               do_values=True)
  202.  
  203.     def test_items(self):
  204.         # Get keys and values using items()
  205.         self._check_iteration(self._box.items, do_keys=True, do_values=True)
  206.  
  207.     def _check_iteration(self, method, do_keys, do_values, repetitions=10):
  208.         for value in method():
  209.             self.fail("Not empty")
  210.         keys, values = [], []
  211.         for i in xrange(repetitions):
  212.             keys.append(self._box.add(self._template % i))
  213.             values.append(self._template % i)
  214.         if do_keys and not do_values:
  215.             returned_keys = list(method())
  216.         elif do_values and not do_keys:
  217.             returned_values = list(method())
  218.         else:
  219.             returned_keys, returned_values = [], []
  220.             for key, value in method():
  221.                 returned_keys.append(key)
  222.                 returned_values.append(value)
  223.         if do_keys:
  224.             self.assert_(len(keys) == len(returned_keys))
  225.             self.assert_(set(keys) == set(returned_keys))
  226.         if do_values:
  227.             count = 0
  228.             for value in returned_values:
  229.                 self.assert_(value['from'] == 'foo')
  230.                 self.assert_(int(value.get_payload()) < repetitions)
  231.                 count += 1
  232.             self.assert_(len(values) == count)
  233.  
  234.     def test_has_key(self):
  235.         # Check existence of keys using has_key()
  236.         self._test_has_key_or_contains(self._box.has_key)
  237.  
  238.     def test_contains(self):
  239.         # Check existence of keys using __contains__()
  240.         self._test_has_key_or_contains(self._box.__contains__)
  241.  
  242.     def _test_has_key_or_contains(self, method):
  243.         # (Used by test_has_key() and test_contains().)
  244.         self.assert_(not method('foo'))
  245.         key0 = self._box.add(self._template % 0)
  246.         self.assert_(method(key0))
  247.         self.assert_(not method('foo'))
  248.         key1 = self._box.add(self._template % 1)
  249.         self.assert_(method(key1))
  250.         self.assert_(method(key0))
  251.         self.assert_(not method('foo'))
  252.         self._box.remove(key0)
  253.         self.assert_(not method(key0))
  254.         self.assert_(method(key1))
  255.         self.assert_(not method('foo'))
  256.         self._box.remove(key1)
  257.         self.assert_(not method(key1))
  258.         self.assert_(not method(key0))
  259.         self.assert_(not method('foo'))
  260.  
  261.     def test_len(self, repetitions=10):
  262.         # Get message count
  263.         keys = []
  264.         for i in xrange(repetitions):
  265.             self.assert_(len(self._box) == i)
  266.             keys.append(self._box.add(self._template % i))
  267.             self.assert_(len(self._box) == i + 1)
  268.         for i in xrange(repetitions):
  269.             self.assert_(len(self._box) == repetitions - i)
  270.             self._box.remove(keys[i])
  271.             self.assert_(len(self._box) == repetitions - i - 1)
  272.  
  273.     def test_set_item(self):
  274.         # Modify messages using __setitem__()
  275.         key0 = self._box.add(self._template % 'original 0')
  276.         self.assert_(self._box.get_string(key0) == \
  277.                      self._template % 'original 0')
  278.         key1 = self._box.add(self._template % 'original 1')
  279.         self.assert_(self._box.get_string(key1) == \
  280.                      self._template % 'original 1')
  281.         self._box[key0] = self._template % 'changed 0'
  282.         self.assert_(self._box.get_string(key0) == \
  283.                      self._template % 'changed 0')
  284.         self._box[key1] = self._template % 'changed 1'
  285.         self.assert_(self._box.get_string(key1) == \
  286.                      self._template % 'changed 1')
  287.         self._box[key0] = _sample_message
  288.         self._check_sample(self._box[key0])
  289.         self._box[key1] = self._box[key0]
  290.         self._check_sample(self._box[key1])
  291.         self._box[key0] = self._template % 'original 0'
  292.         self.assert_(self._box.get_string(key0) ==
  293.                      self._template % 'original 0')
  294.         self._check_sample(self._box[key1])
  295.         self.assertRaises(KeyError,
  296.                           lambda: self._box.__setitem__('foo', 'bar'))
  297.         self.assertRaises(KeyError, lambda: self._box['foo'])
  298.         self.assert_(len(self._box) == 2)
  299.  
  300.     def test_clear(self, iterations=10):
  301.         # Remove all messages using clear()
  302.         keys = []
  303.         for i in xrange(iterations):
  304.             self._box.add(self._template % i)
  305.         for i, key in enumerate(keys):
  306.             self.assert_(self._box.get_string(key) == self._template % i)
  307.         self._box.clear()
  308.         self.assert_(len(self._box) == 0)
  309.         for i, key in enumerate(keys):
  310.             self.assertRaises(KeyError, lambda: self._box.get_string(key))
  311.  
  312.     def test_pop(self):
  313.         # Get and remove a message using pop()
  314.         key0 = self._box.add(self._template % 0)
  315.         self.assert_(key0 in self._box)
  316.         key1 = self._box.add(self._template % 1)
  317.         self.assert_(key1 in self._box)
  318.         self.assert_(self._box.pop(key0).get_payload() == '0')
  319.         self.assert_(key0 not in self._box)
  320.         self.assert_(key1 in self._box)
  321.         key2 = self._box.add(self._template % 2)
  322.         self.assert_(key2 in self._box)
  323.         self.assert_(self._box.pop(key2).get_payload() == '2')
  324.         self.assert_(key2 not in self._box)
  325.         self.assert_(key1 in self._box)
  326.         self.assert_(self._box.pop(key1).get_payload() == '1')
  327.         self.assert_(key1 not in self._box)
  328.         self.assert_(len(self._box) == 0)
  329.  
  330.     def test_popitem(self, iterations=10):
  331.         # Get and remove an arbitrary (key, message) using popitem()
  332.         keys = []
  333.         for i in xrange(10):
  334.             keys.append(self._box.add(self._template % i))
  335.         seen = []
  336.         for i in xrange(10):
  337.             key, msg = self._box.popitem()
  338.             self.assert_(key in keys)
  339.             self.assert_(key not in seen)
  340.             seen.append(key)
  341.             self.assert_(int(msg.get_payload()) == keys.index(key))
  342.         self.assert_(len(self._box) == 0)
  343.         for key in keys:
  344.             self.assertRaises(KeyError, lambda: self._box[key])
  345.  
  346.     def test_update(self):
  347.         # Modify multiple messages using update()
  348.         key0 = self._box.add(self._template % 'original 0')
  349.         key1 = self._box.add(self._template % 'original 1')
  350.         key2 = self._box.add(self._template % 'original 2')
  351.         self._box.update({key0: self._template % 'changed 0',
  352.                           key2: _sample_message})
  353.         self.assert_(len(self._box) == 3)
  354.         self.assert_(self._box.get_string(key0) ==
  355.                      self._template % 'changed 0')
  356.         self.assert_(self._box.get_string(key1) ==
  357.                      self._template % 'original 1')
  358.         self._check_sample(self._box[key2])
  359.         self._box.update([(key2, self._template % 'changed 2'),
  360.                     (key1, self._template % 'changed 1'),
  361.                     (key0, self._template % 'original 0')])
  362.         self.assert_(len(self._box) == 3)
  363.         self.assert_(self._box.get_string(key0) ==
  364.                      self._template % 'original 0')
  365.         self.assert_(self._box.get_string(key1) ==
  366.                      self._template % 'changed 1')
  367.         self.assert_(self._box.get_string(key2) ==
  368.                      self._template % 'changed 2')
  369.         self.assertRaises(KeyError,
  370.                           lambda: self._box.update({'foo': 'bar',
  371.                                           key0: self._template % "changed 0"}))
  372.         self.assert_(len(self._box) == 3)
  373.         self.assert_(self._box.get_string(key0) ==
  374.                      self._template % "changed 0")
  375.         self.assert_(self._box.get_string(key1) ==
  376.                      self._template % "changed 1")
  377.         self.assert_(self._box.get_string(key2) ==
  378.                      self._template % "changed 2")
  379.  
  380.     def test_flush(self):
  381.         # Write changes to disk
  382.         self._test_flush_or_close(self._box.flush, True)
  383.  
  384.     def test_lock_unlock(self):
  385.         # Lock and unlock the mailbox
  386.         self.assert_(not os.path.exists(self._get_lock_path()))
  387.         self._box.lock()
  388.         self.assert_(os.path.exists(self._get_lock_path()))
  389.         self._box.unlock()
  390.         self.assert_(not os.path.exists(self._get_lock_path()))
  391.  
  392.     def test_close(self):
  393.         # Close mailbox and flush changes to disk
  394.         self._test_flush_or_close(self._box.close, False)
  395.  
  396.     def _test_flush_or_close(self, method, should_call_close):
  397.         contents = [self._template % i for i in xrange(3)]
  398.         self._box.add(contents[0])
  399.         self._box.add(contents[1])
  400.         self._box.add(contents[2])
  401.         method()
  402.         if should_call_close:
  403.             self._box.close()
  404.         self._box = self._factory(self._path)
  405.         keys = self._box.keys()
  406.         self.assert_(len(keys) == 3)
  407.         for key in keys:
  408.             self.assert_(self._box.get_string(key) in contents)
  409.  
  410.     def test_dump_message(self):
  411.         # Write message representations to disk
  412.         for input in (email.message_from_string(_sample_message),
  413.                       _sample_message, StringIO.StringIO(_sample_message)):
  414.             output = StringIO.StringIO()
  415.             self._box._dump_message(input, output)
  416.             self.assert_(output.getvalue() ==
  417.                          _sample_message.replace('\n', os.linesep))
  418.         output = StringIO.StringIO()
  419.         self.assertRaises(TypeError,
  420.                           lambda: self._box._dump_message(None, output))
  421.  
  422.     def _get_lock_path(self):
  423.         # Return the path of the dot lock file. May be overridden.
  424.         return self._path + '.lock'
  425.  
  426.  
  427. class TestMailboxSuperclass(TestBase):
  428.  
  429.     def test_notimplemented(self):
  430.         # Test that all Mailbox methods raise NotImplementedException.
  431.         box = mailbox.Mailbox('path')
  432.         self.assertRaises(NotImplementedError, lambda: box.add(''))
  433.         self.assertRaises(NotImplementedError, lambda: box.remove(''))
  434.         self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
  435.         self.assertRaises(NotImplementedError, lambda: box.discard(''))
  436.         self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
  437.         self.assertRaises(NotImplementedError, lambda: box.iterkeys())
  438.         self.assertRaises(NotImplementedError, lambda: box.keys())
  439.         self.assertRaises(NotImplementedError, lambda: box.itervalues().next())
  440.         self.assertRaises(NotImplementedError, lambda: box.__iter__().next())
  441.         self.assertRaises(NotImplementedError, lambda: box.values())
  442.         self.assertRaises(NotImplementedError, lambda: box.iteritems().next())
  443.         self.assertRaises(NotImplementedError, lambda: box.items())
  444.         self.assertRaises(NotImplementedError, lambda: box.get(''))
  445.         self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
  446.         self.assertRaises(NotImplementedError, lambda: box.get_message(''))
  447.         self.assertRaises(NotImplementedError, lambda: box.get_string(''))
  448.         self.assertRaises(NotImplementedError, lambda: box.get_file(''))
  449.         self.assertRaises(NotImplementedError, lambda: box.has_key(''))
  450.         self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
  451.         self.assertRaises(NotImplementedError, lambda: box.__len__())
  452.         self.assertRaises(NotImplementedError, lambda: box.clear())
  453.         self.assertRaises(NotImplementedError, lambda: box.pop(''))
  454.         self.assertRaises(NotImplementedError, lambda: box.popitem())
  455.         self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
  456.         self.assertRaises(NotImplementedError, lambda: box.flush())
  457.         self.assertRaises(NotImplementedError, lambda: box.lock())
  458.         self.assertRaises(NotImplementedError, lambda: box.unlock())
  459.         self.assertRaises(NotImplementedError, lambda: box.close())
  460.  
  461.  
  462. class TestMaildir(TestMailbox):
  463.  
  464.     _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
  465.  
  466.     def setUp(self):
  467.         TestMailbox.setUp(self)
  468.         if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
  469.             self._box.colon = '!'
  470.  
  471.     def test_add_MM(self):
  472.         # Add a MaildirMessage instance
  473.         msg = mailbox.MaildirMessage(self._template % 0)
  474.         msg.set_subdir('cur')
  475.         msg.set_info('foo')
  476.         key = self._box.add(msg)
  477.         self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
  478.                                                  (key, self._box.colon))))
  479.  
  480.     def test_get_MM(self):
  481.         # Get a MaildirMessage instance
  482.         msg = mailbox.MaildirMessage(self._template % 0)
  483.         msg.set_subdir('cur')
  484.         msg.set_flags('RF')
  485.         key = self._box.add(msg)
  486.         msg_returned = self._box.get_message(key)
  487.         self.assert_(isinstance(msg_returned, mailbox.MaildirMessage))
  488.         self.assert_(msg_returned.get_subdir() == 'cur')
  489.         self.assert_(msg_returned.get_flags() == 'FR')
  490.  
  491.     def test_set_MM(self):
  492.         # Set with a MaildirMessage instance
  493.         msg0 = mailbox.MaildirMessage(self._template % 0)
  494.         msg0.set_flags('TP')
  495.         key = self._box.add(msg0)
  496.         msg_returned = self._box.get_message(key)
  497.         self.assert_(msg_returned.get_subdir() == 'new')
  498.         self.assert_(msg_returned.get_flags() == 'PT')
  499.         msg1 = mailbox.MaildirMessage(self._template % 1)
  500.         self._box[key] = msg1
  501.         msg_returned = self._box.get_message(key)
  502.         self.assert_(msg_returned.get_subdir() == 'new')
  503.         self.assert_(msg_returned.get_flags() == '')
  504.         self.assert_(msg_returned.get_payload() == '1')
  505.         msg2 = mailbox.MaildirMessage(self._template % 2)
  506.         msg2.set_info('2,S')
  507.         self._box[key] = msg2
  508.         self._box[key] = self._template % 3
  509.         msg_returned = self._box.get_message(key)
  510.         self.assert_(msg_returned.get_subdir() == 'new')
  511.         self.assert_(msg_returned.get_flags() == 'S')
  512.         self.assert_(msg_returned.get_payload() == '3')
  513.  
  514.     def test_consistent_factory(self):
  515.         # Add a message.
  516.         msg = mailbox.MaildirMessage(self._template % 0)
  517.         msg.set_subdir('cur')
  518.         msg.set_flags('RF')
  519.         key = self._box.add(msg)
  520.  
  521.         # Create new mailbox with
  522.         class FakeMessage(mailbox.MaildirMessage):
  523.             pass
  524.         box = mailbox.Maildir(self._path, factory=FakeMessage)
  525.         box.colon = self._box.colon
  526.         msg2 = box.get_message(key)
  527.         self.assert_(isinstance(msg2, FakeMessage))
  528.  
  529.     def test_initialize_new(self):
  530.         # Initialize a non-existent mailbox
  531.         self.tearDown()
  532.         self._box = mailbox.Maildir(self._path)
  533.         self._check_basics(factory=rfc822.Message)
  534.         self._delete_recursively(self._path)
  535.         self._box = self._factory(self._path, factory=None)
  536.         self._check_basics()
  537.  
  538.     def test_initialize_existing(self):
  539.         # Initialize an existing mailbox
  540.         self.tearDown()
  541.         for subdir in '', 'tmp', 'new', 'cur':
  542.             os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
  543.         self._box = mailbox.Maildir(self._path)
  544.         self._check_basics(factory=rfc822.Message)
  545.         self._box = mailbox.Maildir(self._path, factory=None)
  546.         self._check_basics()
  547.  
  548.     def _check_basics(self, factory=None):
  549.         # (Used by test_open_new() and test_open_existing().)
  550.         self.assertEqual(self._box._path, os.path.abspath(self._path))
  551.         self.assertEqual(self._box._factory, factory)
  552.         for subdir in '', 'tmp', 'new', 'cur':
  553.             path = os.path.join(self._path, subdir)
  554.             mode = os.stat(path)[stat.ST_MODE]
  555.             self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
  556.  
  557.     def test_list_folders(self):
  558.         # List folders
  559.         self._box.add_folder('one')
  560.         self._box.add_folder('two')
  561.         self._box.add_folder('three')
  562.         self.assert_(len(self._box.list_folders()) == 3)
  563.         self.assert_(set(self._box.list_folders()) ==
  564.                      set(('one', 'two', 'three')))
  565.  
  566.     def test_get_folder(self):
  567.         # Open folders
  568.         self._box.add_folder('foo.bar')
  569.         folder0 = self._box.get_folder('foo.bar')
  570.         folder0.add(self._template % 'bar')
  571.         self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar')))
  572.         folder1 = self._box.get_folder('foo.bar')
  573.         self.assert_(folder1.get_string(folder1.keys()[0]) == \
  574.                      self._template % 'bar')
  575.  
  576.     def test_add_and_remove_folders(self):
  577.         # Delete folders
  578.         self._box.add_folder('one')
  579.         self._box.add_folder('two')
  580.         self.assert_(len(self._box.list_folders()) == 2)
  581.         self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
  582.         self._box.remove_folder('one')
  583.         self.assert_(len(self._box.list_folders()) == 1)
  584.         self.assert_(set(self._box.list_folders()) == set(('two',)))
  585.         self._box.add_folder('three')
  586.         self.assert_(len(self._box.list_folders()) == 2)
  587.         self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
  588.         self._box.remove_folder('three')
  589.         self.assert_(len(self._box.list_folders()) == 1)
  590.         self.assert_(set(self._box.list_folders()) == set(('two',)))
  591.         self._box.remove_folder('two')
  592.         self.assert_(len(self._box.list_folders()) == 0)
  593.         self.assert_(self._box.list_folders() == [])
  594.  
  595.     def test_clean(self):
  596.         # Remove old files from 'tmp'
  597.         foo_path = os.path.join(self._path, 'tmp', 'foo')
  598.         bar_path = os.path.join(self._path, 'tmp', 'bar')
  599.         f = open(foo_path, 'w')
  600.         f.write("@")
  601.         f.close()
  602.         f = open(bar_path, 'w')
  603.         f.write("@")
  604.         f.close()
  605.         self._box.clean()
  606.         self.assert_(os.path.exists(foo_path))
  607.         self.assert_(os.path.exists(bar_path))
  608.         foo_stat = os.stat(foo_path)
  609.         os.utime(foo_path, (time.time() - 129600 - 2,
  610.                             foo_stat.st_mtime))
  611.         self._box.clean()
  612.         self.assert_(not os.path.exists(foo_path))
  613.         self.assert_(os.path.exists(bar_path))
  614.  
  615.     def test_create_tmp(self, repetitions=10):
  616.         # Create files in tmp directory
  617.         hostname = socket.gethostname()
  618.         if '/' in hostname:
  619.             hostname = hostname.replace('/', r'\057')
  620.         if ':' in hostname:
  621.             hostname = hostname.replace(':', r'\072')
  622.         pid = os.getpid()
  623.         pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
  624.                              r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
  625.         previous_groups = None
  626.         for x in xrange(repetitions):
  627.             tmp_file = self._box._create_tmp()
  628.             head, tail = os.path.split(tmp_file.name)
  629.             self.assertEqual(head, os.path.abspath(os.path.join(self._path,
  630.                                                                 "tmp")),
  631.                              "File in wrong location: '%s'" % head)
  632.             match = pattern.match(tail)
  633.             self.assert_(match is not None, "Invalid file name: '%s'" % tail)
  634.             groups = match.groups()
  635.             if previous_groups is not None:
  636.                 self.assert_(int(groups[0] >= previous_groups[0]),
  637.                              "Non-monotonic seconds: '%s' before '%s'" %
  638.                              (previous_groups[0], groups[0]))
  639.                 self.assert_(int(groups[1] >= previous_groups[1]) or
  640.                              groups[0] != groups[1],
  641.                              "Non-monotonic milliseconds: '%s' before '%s'" %
  642.                              (previous_groups[1], groups[1]))
  643.                 self.assert_(int(groups[2]) == pid,
  644.                              "Process ID mismatch: '%s' should be '%s'" %
  645.                              (groups[2], pid))
  646.                 self.assert_(int(groups[3]) == int(previous_groups[3]) + 1,
  647.                              "Non-sequential counter: '%s' before '%s'" %
  648.                              (previous_groups[3], groups[3]))
  649.                 self.assert_(groups[4] == hostname,
  650.                              "Host name mismatch: '%s' should be '%s'" %
  651.                              (groups[4], hostname))
  652.             previous_groups = groups
  653.             tmp_file.write(_sample_message)
  654.             tmp_file.seek(0)
  655.             self.assert_(tmp_file.read() == _sample_message)
  656.             tmp_file.close()
  657.         file_count = len(os.listdir(os.path.join(self._path, "tmp")))
  658.         self.assert_(file_count == repetitions,
  659.                      "Wrong file count: '%s' should be '%s'" %
  660.                      (file_count, repetitions))
  661.  
  662.     def test_refresh(self):
  663.         # Update the table of contents
  664.         self.assert_(self._box._toc == {})
  665.         key0 = self._box.add(self._template % 0)
  666.         key1 = self._box.add(self._template % 1)
  667.         self.assert_(self._box._toc == {})
  668.         self._box._refresh()
  669.         self.assert_(self._box._toc == {key0: os.path.join('new', key0),
  670.                                         key1: os.path.join('new', key1)})
  671.         key2 = self._box.add(self._template % 2)
  672.         self.assert_(self._box._toc == {key0: os.path.join('new', key0),
  673.                                         key1: os.path.join('new', key1)})
  674.         self._box._refresh()
  675.         self.assert_(self._box._toc == {key0: os.path.join('new', key0),
  676.                                         key1: os.path.join('new', key1),
  677.                                         key2: os.path.join('new', key2)})
  678.  
  679.     def test_lookup(self):
  680.         # Look up message subpaths in the TOC
  681.         self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
  682.         key0 = self._box.add(self._template % 0)
  683.         self.assert_(self._box._lookup(key0) == os.path.join('new', key0))
  684.         os.remove(os.path.join(self._path, 'new', key0))
  685.         self.assert_(self._box._toc == {key0: os.path.join('new', key0)})
  686.         self.assertRaises(KeyError, lambda: self._box._lookup(key0))
  687.         self.assert_(self._box._toc == {})
  688.  
  689.     def test_lock_unlock(self):
  690.         # Lock and unlock the mailbox. For Maildir, this does nothing.
  691.         self._box.lock()
  692.         self._box.unlock()
  693.  
  694.     def test_folder (self):
  695.         # Test for bug #1569790: verify that folders returned by .get_folder()
  696.         # use the same factory function.
  697.         def dummy_factory (s):
  698.             return None
  699.         box = self._factory(self._path, factory=dummy_factory)
  700.         folder = box.add_folder('folder1')
  701.         self.assert_(folder._factory is dummy_factory)
  702.  
  703.         folder1_alias = box.get_folder('folder1')
  704.         self.assert_(folder1_alias._factory is dummy_factory)
  705.  
  706.     def test_directory_in_folder (self):
  707.         # Test that mailboxes still work if there's a stray extra directory
  708.         # in a folder.
  709.         for i in range(10):
  710.             self._box.add(mailbox.Message(_sample_message))
  711.  
  712.         # Create a stray directory
  713.         os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
  714.  
  715.         # Check that looping still works with the directory present.
  716.         for msg in self._box:
  717.             pass
  718.  
  719.     def test_file_permissions(self):
  720.         # Verify that message files are created without execute permissions
  721.         if not hasattr(os, "stat") or not hasattr(os, "umask"):
  722.             return
  723.         msg = mailbox.MaildirMessage(self._template % 0)
  724.         orig_umask = os.umask(0)
  725.         try:
  726.             key = self._box.add(msg)
  727.         finally:
  728.             os.umask(orig_umask)
  729.         path = os.path.join(self._path, self._box._lookup(key))
  730.         mode = os.stat(path).st_mode
  731.         self.assert_(mode & 0111 == 0)
  732.  
  733.     def test_folder_file_perms(self):
  734.         # From bug #3228, we want to verify that the file created inside a Maildir
  735.         # subfolder isn't marked as executable.
  736.         if not hasattr(os, "stat") or not hasattr(os, "umask"):
  737.             return
  738.  
  739.         orig_umask = os.umask(0)
  740.         try:
  741.             subfolder = self._box.add_folder('subfolder')
  742.         finally:
  743.             os.umask(orig_umask)
  744.  
  745.         path = os.path.join(subfolder._path, 'maildirfolder')
  746.         st = os.stat(path)
  747.         perms = st.st_mode
  748.         self.assertFalse((perms & 0111)) # Execute bits should all be off.
  749.  
  750.  
  751. class _TestMboxMMDF(TestMailbox):
  752.  
  753.     def tearDown(self):
  754.         self._box.close()
  755.         self._delete_recursively(self._path)
  756.         for lock_remnant in glob.glob(self._path + '.*'):
  757.             test_support.unlink(lock_remnant)
  758.  
  759.     def test_add_from_string(self):
  760.         # Add a string starting with 'From ' to the mailbox
  761.         key = self._box.add('From foo@bar blah\nFrom: foo\n\n0')
  762.         self.assert_(self._box[key].get_from() == 'foo@bar blah')
  763.         self.assert_(self._box[key].get_payload() == '0')
  764.  
  765.     def test_add_mbox_or_mmdf_message(self):
  766.         # Add an mboxMessage or MMDFMessage
  767.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  768.             msg = class_('From foo@bar blah\nFrom: foo\n\n0')
  769.             key = self._box.add(msg)
  770.  
  771.     def test_open_close_open(self):
  772.         # Open and inspect previously-created mailbox
  773.         values = [self._template % i for i in xrange(3)]
  774.         for value in values:
  775.             self._box.add(value)
  776.         self._box.close()
  777.         mtime = os.path.getmtime(self._path)
  778.         self._box = self._factory(self._path)
  779.         self.assert_(len(self._box) == 3)
  780.         for key in self._box.iterkeys():
  781.             self.assert_(self._box.get_string(key) in values)
  782.         self._box.close()
  783.         self.assert_(mtime == os.path.getmtime(self._path))
  784.  
  785.     def test_add_and_close(self):
  786.         # Verifying that closing a mailbox doesn't change added items
  787.         self._box.add(_sample_message)
  788.         for i in xrange(3):
  789.             self._box.add(self._template % i)
  790.         self._box.add(_sample_message)
  791.         self._box._file.flush()
  792.         self._box._file.seek(0)
  793.         contents = self._box._file.read()
  794.         self._box.close()
  795.         self.assert_(contents == open(self._path, 'rb').read())
  796.         self._box = self._factory(self._path)
  797.  
  798.     def test_lock_conflict(self):
  799.         # Fork off a subprocess that will lock the file for 2 seconds,
  800.         # unlock it, and then exit.
  801.         if not hasattr(os, 'fork'):
  802.             return
  803.         pid = os.fork()
  804.         if pid == 0:
  805.             # In the child, lock the mailbox.
  806.             self._box.lock()
  807.             time.sleep(2)
  808.             self._box.unlock()
  809.             os._exit(0)
  810.  
  811.         # In the parent, sleep a bit to give the child time to acquire
  812.         # the lock.
  813.         time.sleep(0.5)
  814.         try:
  815.             self.assertRaises(mailbox.ExternalClashError,
  816.                               self._box.lock)
  817.         finally:
  818.             # Wait for child to exit.  Locking should now succeed.
  819.             exited_pid, status = os.waitpid(pid, 0)
  820.  
  821.         self._box.lock()
  822.         self._box.unlock()
  823.  
  824.     def test_relock(self):
  825.         # Test case for bug #1575506: the mailbox class was locking the
  826.         # wrong file object in its flush() method.
  827.         msg = "Subject: sub\n\nbody\n"
  828.         key1 = self._box.add(msg)
  829.         self._box.flush()
  830.         self._box.close()
  831.  
  832.         self._box = self._factory(self._path)
  833.         self._box.lock()
  834.         key2 = self._box.add(msg)
  835.         self._box.flush()
  836.         self.assert_(self._box._locked)
  837.         self._box.close()
  838.  
  839.  
  840. class TestMbox(_TestMboxMMDF):
  841.  
  842.     _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
  843.  
  844.     def test_file_perms(self):
  845.         # From bug #3228, we want to verify that the mailbox file isn't executable,
  846.         # even if the umask is set to something that would leave executable bits set.
  847.         # We only run this test on platforms that support umask.
  848.         if hasattr(os, 'umask') and hasattr(os, 'stat'):
  849.             try:
  850.                 old_umask = os.umask(0077)
  851.                 self._box.close()
  852.                 os.unlink(self._path)
  853.                 self._box = mailbox.mbox(self._path, create=True)
  854.                 self._box.add('')
  855.                 self._box.close()
  856.             finally:
  857.                 os.umask(old_umask)
  858.  
  859.             st = os.stat(self._path)
  860.             perms = st.st_mode
  861.             self.assertFalse((perms & 0111)) # Execute bits should all be off.
  862.  
  863. class TestMMDF(_TestMboxMMDF):
  864.  
  865.     _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
  866.  
  867.  
  868. class TestMH(TestMailbox):
  869.  
  870.     _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
  871.  
  872.     def test_list_folders(self):
  873.         # List folders
  874.         self._box.add_folder('one')
  875.         self._box.add_folder('two')
  876.         self._box.add_folder('three')
  877.         self.assert_(len(self._box.list_folders()) == 3)
  878.         self.assert_(set(self._box.list_folders()) ==
  879.                      set(('one', 'two', 'three')))
  880.  
  881.     def test_get_folder(self):
  882.         # Open folders
  883.         def dummy_factory (s):
  884.             return None
  885.         self._box = self._factory(self._path, dummy_factory)
  886.  
  887.         new_folder = self._box.add_folder('foo.bar')
  888.         folder0 = self._box.get_folder('foo.bar')
  889.         folder0.add(self._template % 'bar')
  890.         self.assert_(os.path.isdir(os.path.join(self._path, 'foo.bar')))
  891.         folder1 = self._box.get_folder('foo.bar')
  892.         self.assert_(folder1.get_string(folder1.keys()[0]) == \
  893.                      self._template % 'bar')
  894.  
  895.         # Test for bug #1569790: verify that folders returned by .get_folder()
  896.         # use the same factory function.
  897.         self.assert_(new_folder._factory is self._box._factory)
  898.         self.assert_(folder0._factory is self._box._factory)
  899.  
  900.     def test_add_and_remove_folders(self):
  901.         # Delete folders
  902.         self._box.add_folder('one')
  903.         self._box.add_folder('two')
  904.         self.assert_(len(self._box.list_folders()) == 2)
  905.         self.assert_(set(self._box.list_folders()) == set(('one', 'two')))
  906.         self._box.remove_folder('one')
  907.         self.assert_(len(self._box.list_folders()) == 1)
  908.         self.assert_(set(self._box.list_folders()) == set(('two',)))
  909.         self._box.add_folder('three')
  910.         self.assert_(len(self._box.list_folders()) == 2)
  911.         self.assert_(set(self._box.list_folders()) == set(('two', 'three')))
  912.         self._box.remove_folder('three')
  913.         self.assert_(len(self._box.list_folders()) == 1)
  914.         self.assert_(set(self._box.list_folders()) == set(('two',)))
  915.         self._box.remove_folder('two')
  916.         self.assert_(len(self._box.list_folders()) == 0)
  917.         self.assert_(self._box.list_folders() == [])
  918.  
  919.     def test_sequences(self):
  920.         # Get and set sequences
  921.         self.assert_(self._box.get_sequences() == {})
  922.         msg0 = mailbox.MHMessage(self._template % 0)
  923.         msg0.add_sequence('foo')
  924.         key0 = self._box.add(msg0)
  925.         self.assert_(self._box.get_sequences() == {'foo':[key0]})
  926.         msg1 = mailbox.MHMessage(self._template % 1)
  927.         msg1.set_sequences(['bar', 'replied', 'foo'])
  928.         key1 = self._box.add(msg1)
  929.         self.assert_(self._box.get_sequences() ==
  930.                      {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
  931.         msg0.set_sequences(['flagged'])
  932.         self._box[key0] = msg0
  933.         self.assert_(self._box.get_sequences() ==
  934.                      {'foo':[key1], 'bar':[key1], 'replied':[key1],
  935.                       'flagged':[key0]})
  936.         self._box.remove(key1)
  937.         self.assert_(self._box.get_sequences() == {'flagged':[key0]})
  938.  
  939.     def test_pack(self):
  940.         # Pack the contents of the mailbox
  941.         msg0 = mailbox.MHMessage(self._template % 0)
  942.         msg1 = mailbox.MHMessage(self._template % 1)
  943.         msg2 = mailbox.MHMessage(self._template % 2)
  944.         msg3 = mailbox.MHMessage(self._template % 3)
  945.         msg0.set_sequences(['foo', 'unseen'])
  946.         msg1.set_sequences(['foo'])
  947.         msg2.set_sequences(['foo', 'flagged'])
  948.         msg3.set_sequences(['foo', 'bar', 'replied'])
  949.         key0 = self._box.add(msg0)
  950.         key1 = self._box.add(msg1)
  951.         key2 = self._box.add(msg2)
  952.         key3 = self._box.add(msg3)
  953.         self.assert_(self._box.get_sequences() ==
  954.                      {'foo':[key0,key1,key2,key3], 'unseen':[key0],
  955.                       'flagged':[key2], 'bar':[key3], 'replied':[key3]})
  956.         self._box.remove(key2)
  957.         self.assert_(self._box.get_sequences() ==
  958.                      {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
  959.                       'replied':[key3]})
  960.         self._box.pack()
  961.         self.assert_(self._box.keys() == [1, 2, 3])
  962.         key0 = key0
  963.         key1 = key0 + 1
  964.         key2 = key1 + 1
  965.         self.assert_(self._box.get_sequences() ==
  966.                      {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
  967.  
  968.         # Test case for packing while holding the mailbox locked.
  969.         key0 = self._box.add(msg1)
  970.         key1 = self._box.add(msg1)
  971.         key2 = self._box.add(msg1)
  972.         key3 = self._box.add(msg1)
  973.  
  974.         self._box.remove(key0)
  975.         self._box.remove(key2)
  976.         self._box.lock()
  977.         self._box.pack()
  978.         self._box.unlock()
  979.         self.assert_(self._box.get_sequences() ==
  980.                      {'foo':[1, 2, 3, 4, 5],
  981.                       'unseen':[1], 'bar':[3], 'replied':[3]})
  982.  
  983.     def _get_lock_path(self):
  984.         return os.path.join(self._path, '.mh_sequences.lock')
  985.  
  986.  
  987. class TestBabyl(TestMailbox):
  988.  
  989.     _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
  990.  
  991.     def tearDown(self):
  992.         self._box.close()
  993.         self._delete_recursively(self._path)
  994.         for lock_remnant in glob.glob(self._path + '.*'):
  995.             test_support.unlink(lock_remnant)
  996.  
  997.     def test_labels(self):
  998.         # Get labels from the mailbox
  999.         self.assert_(self._box.get_labels() == [])
  1000.         msg0 = mailbox.BabylMessage(self._template % 0)
  1001.         msg0.add_label('foo')
  1002.         key0 = self._box.add(msg0)
  1003.         self.assert_(self._box.get_labels() == ['foo'])
  1004.         msg1 = mailbox.BabylMessage(self._template % 1)
  1005.         msg1.set_labels(['bar', 'answered', 'foo'])
  1006.         key1 = self._box.add(msg1)
  1007.         self.assert_(set(self._box.get_labels()) == set(['foo', 'bar']))
  1008.         msg0.set_labels(['blah', 'filed'])
  1009.         self._box[key0] = msg0
  1010.         self.assert_(set(self._box.get_labels()) ==
  1011.                      set(['foo', 'bar', 'blah']))
  1012.         self._box.remove(key1)
  1013.         self.assert_(set(self._box.get_labels()) == set(['blah']))
  1014.  
  1015.  
  1016. class TestMessage(TestBase):
  1017.  
  1018.     _factory = mailbox.Message      # Overridden by subclasses to reuse tests
  1019.  
  1020.     def setUp(self):
  1021.         self._path = test_support.TESTFN
  1022.  
  1023.     def tearDown(self):
  1024.         self._delete_recursively(self._path)
  1025.  
  1026.     def test_initialize_with_eMM(self):
  1027.         # Initialize based on email.message.Message instance
  1028.         eMM = email.message_from_string(_sample_message)
  1029.         msg = self._factory(eMM)
  1030.         self._post_initialize_hook(msg)
  1031.         self._check_sample(msg)
  1032.  
  1033.     def test_initialize_with_string(self):
  1034.         # Initialize based on string
  1035.         msg = self._factory(_sample_message)
  1036.         self._post_initialize_hook(msg)
  1037.         self._check_sample(msg)
  1038.  
  1039.     def test_initialize_with_file(self):
  1040.         # Initialize based on contents of file
  1041.         f = open(self._path, 'w+')
  1042.         f.write(_sample_message)
  1043.         f.seek(0)
  1044.         msg = self._factory(f)
  1045.         self._post_initialize_hook(msg)
  1046.         self._check_sample(msg)
  1047.         f.close()
  1048.  
  1049.     def test_initialize_with_nothing(self):
  1050.         # Initialize without arguments
  1051.         msg = self._factory()
  1052.         self._post_initialize_hook(msg)
  1053.         self.assert_(isinstance(msg, email.message.Message))
  1054.         self.assert_(isinstance(msg, mailbox.Message))
  1055.         self.assert_(isinstance(msg, self._factory))
  1056.         self.assert_(msg.keys() == [])
  1057.         self.assert_(not msg.is_multipart())
  1058.         self.assert_(msg.get_payload() == None)
  1059.  
  1060.     def test_initialize_incorrectly(self):
  1061.         # Initialize with invalid argument
  1062.         self.assertRaises(TypeError, lambda: self._factory(object()))
  1063.  
  1064.     def test_become_message(self):
  1065.         # Take on the state of another message
  1066.         eMM = email.message_from_string(_sample_message)
  1067.         msg = self._factory()
  1068.         msg._become_message(eMM)
  1069.         self._check_sample(msg)
  1070.  
  1071.     def test_explain_to(self):
  1072.         # Copy self's format-specific data to other message formats.
  1073.         # This test is superficial; better ones are in TestMessageConversion.
  1074.         msg = self._factory()
  1075.         for class_ in (mailbox.Message, mailbox.MaildirMessage,
  1076.                        mailbox.mboxMessage, mailbox.MHMessage,
  1077.                        mailbox.BabylMessage, mailbox.MMDFMessage):
  1078.             other_msg = class_()
  1079.             msg._explain_to(other_msg)
  1080.         other_msg = email.message.Message()
  1081.         self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
  1082.  
  1083.     def _post_initialize_hook(self, msg):
  1084.         # Overridden by subclasses to check extra things after initialization
  1085.         pass
  1086.  
  1087.  
  1088. class TestMaildirMessage(TestMessage):
  1089.  
  1090.     _factory = mailbox.MaildirMessage
  1091.  
  1092.     def _post_initialize_hook(self, msg):
  1093.         self.assert_(msg._subdir == 'new')
  1094.         self.assert_(msg._info == '')
  1095.  
  1096.     def test_subdir(self):
  1097.         # Use get_subdir() and set_subdir()
  1098.         msg = mailbox.MaildirMessage(_sample_message)
  1099.         self.assert_(msg.get_subdir() == 'new')
  1100.         msg.set_subdir('cur')
  1101.         self.assert_(msg.get_subdir() == 'cur')
  1102.         msg.set_subdir('new')
  1103.         self.assert_(msg.get_subdir() == 'new')
  1104.         self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
  1105.         self.assert_(msg.get_subdir() == 'new')
  1106.         msg.set_subdir('new')
  1107.         self.assert_(msg.get_subdir() == 'new')
  1108.         self._check_sample(msg)
  1109.  
  1110.     def test_flags(self):
  1111.         # Use get_flags(), set_flags(), add_flag(), remove_flag()
  1112.         msg = mailbox.MaildirMessage(_sample_message)
  1113.         self.assert_(msg.get_flags() == '')
  1114.         self.assert_(msg.get_subdir() == 'new')
  1115.         msg.set_flags('F')
  1116.         self.assert_(msg.get_subdir() == 'new')
  1117.         self.assert_(msg.get_flags() == 'F')
  1118.         msg.set_flags('SDTP')
  1119.         self.assert_(msg.get_flags() == 'DPST')
  1120.         msg.add_flag('FT')
  1121.         self.assert_(msg.get_flags() == 'DFPST')
  1122.         msg.remove_flag('TDRP')
  1123.         self.assert_(msg.get_flags() == 'FS')
  1124.         self.assert_(msg.get_subdir() == 'new')
  1125.         self._check_sample(msg)
  1126.  
  1127.     def test_date(self):
  1128.         # Use get_date() and set_date()
  1129.         msg = mailbox.MaildirMessage(_sample_message)
  1130.         self.assert_(abs(msg.get_date() - time.time()) < 60)
  1131.         msg.set_date(0.0)
  1132.         self.assert_(msg.get_date() == 0.0)
  1133.  
  1134.     def test_info(self):
  1135.         # Use get_info() and set_info()
  1136.         msg = mailbox.MaildirMessage(_sample_message)
  1137.         self.assert_(msg.get_info() == '')
  1138.         msg.set_info('1,foo=bar')
  1139.         self.assert_(msg.get_info() == '1,foo=bar')
  1140.         self.assertRaises(TypeError, lambda: msg.set_info(None))
  1141.         self._check_sample(msg)
  1142.  
  1143.     def test_info_and_flags(self):
  1144.         # Test interaction of info and flag methods
  1145.         msg = mailbox.MaildirMessage(_sample_message)
  1146.         self.assert_(msg.get_info() == '')
  1147.         msg.set_flags('SF')
  1148.         self.assert_(msg.get_flags() == 'FS')
  1149.         self.assert_(msg.get_info() == '2,FS')
  1150.         msg.set_info('1,')
  1151.         self.assert_(msg.get_flags() == '')
  1152.         self.assert_(msg.get_info() == '1,')
  1153.         msg.remove_flag('RPT')
  1154.         self.assert_(msg.get_flags() == '')
  1155.         self.assert_(msg.get_info() == '1,')
  1156.         msg.add_flag('D')
  1157.         self.assert_(msg.get_flags() == 'D')
  1158.         self.assert_(msg.get_info() == '2,D')
  1159.         self._check_sample(msg)
  1160.  
  1161.  
  1162. class _TestMboxMMDFMessage(TestMessage):
  1163.  
  1164.     _factory = mailbox._mboxMMDFMessage
  1165.  
  1166.     def _post_initialize_hook(self, msg):
  1167.         self._check_from(msg)
  1168.  
  1169.     def test_initialize_with_unixfrom(self):
  1170.         # Initialize with a message that already has a _unixfrom attribute
  1171.         msg = mailbox.Message(_sample_message)
  1172.         msg.set_unixfrom('From foo@bar blah')
  1173.         msg = mailbox.mboxMessage(msg)
  1174.         self.assert_(msg.get_from() == 'foo@bar blah', msg.get_from())
  1175.  
  1176.     def test_from(self):
  1177.         # Get and set "From " line
  1178.         msg = mailbox.mboxMessage(_sample_message)
  1179.         self._check_from(msg)
  1180.         msg.set_from('foo bar')
  1181.         self.assert_(msg.get_from() == 'foo bar')
  1182.         msg.set_from('foo@bar', True)
  1183.         self._check_from(msg, 'foo@bar')
  1184.         msg.set_from('blah@temp', time.localtime())
  1185.         self._check_from(msg, 'blah@temp')
  1186.  
  1187.     def test_flags(self):
  1188.         # Use get_flags(), set_flags(), add_flag(), remove_flag()
  1189.         msg = mailbox.mboxMessage(_sample_message)
  1190.         self.assert_(msg.get_flags() == '')
  1191.         msg.set_flags('F')
  1192.         self.assert_(msg.get_flags() == 'F')
  1193.         msg.set_flags('XODR')
  1194.         self.assert_(msg.get_flags() == 'RODX')
  1195.         msg.add_flag('FA')
  1196.         self.assert_(msg.get_flags() == 'RODFAX')
  1197.         msg.remove_flag('FDXA')
  1198.         self.assert_(msg.get_flags() == 'RO')
  1199.         self._check_sample(msg)
  1200.  
  1201.     def _check_from(self, msg, sender=None):
  1202.         # Check contents of "From " line
  1203.         if sender is None:
  1204.             sender = "MAILER-DAEMON"
  1205.         self.assert_(re.match(sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:"
  1206.                               r"\d{2} \d{4}", msg.get_from()) is not None)
  1207.  
  1208.  
  1209. class TestMboxMessage(_TestMboxMMDFMessage):
  1210.  
  1211.     _factory = mailbox.mboxMessage
  1212.  
  1213.  
  1214. class TestMHMessage(TestMessage):
  1215.  
  1216.     _factory = mailbox.MHMessage
  1217.  
  1218.     def _post_initialize_hook(self, msg):
  1219.         self.assert_(msg._sequences == [])
  1220.  
  1221.     def test_sequences(self):
  1222.         # Get, set, join, and leave sequences
  1223.         msg = mailbox.MHMessage(_sample_message)
  1224.         self.assert_(msg.get_sequences() == [])
  1225.         msg.set_sequences(['foobar'])
  1226.         self.assert_(msg.get_sequences() == ['foobar'])
  1227.         msg.set_sequences([])
  1228.         self.assert_(msg.get_sequences() == [])
  1229.         msg.add_sequence('unseen')
  1230.         self.assert_(msg.get_sequences() == ['unseen'])
  1231.         msg.add_sequence('flagged')
  1232.         self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
  1233.         msg.add_sequence('flagged')
  1234.         self.assert_(msg.get_sequences() == ['unseen', 'flagged'])
  1235.         msg.remove_sequence('unseen')
  1236.         self.assert_(msg.get_sequences() == ['flagged'])
  1237.         msg.add_sequence('foobar')
  1238.         self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
  1239.         msg.remove_sequence('replied')
  1240.         self.assert_(msg.get_sequences() == ['flagged', 'foobar'])
  1241.         msg.set_sequences(['foobar', 'replied'])
  1242.         self.assert_(msg.get_sequences() == ['foobar', 'replied'])
  1243.  
  1244.  
  1245. class TestBabylMessage(TestMessage):
  1246.  
  1247.     _factory = mailbox.BabylMessage
  1248.  
  1249.     def _post_initialize_hook(self, msg):
  1250.         self.assert_(msg._labels == [])
  1251.  
  1252.     def test_labels(self):
  1253.         # Get, set, join, and leave labels
  1254.         msg = mailbox.BabylMessage(_sample_message)
  1255.         self.assert_(msg.get_labels() == [])
  1256.         msg.set_labels(['foobar'])
  1257.         self.assert_(msg.get_labels() == ['foobar'])
  1258.         msg.set_labels([])
  1259.         self.assert_(msg.get_labels() == [])
  1260.         msg.add_label('filed')
  1261.         self.assert_(msg.get_labels() == ['filed'])
  1262.         msg.add_label('resent')
  1263.         self.assert_(msg.get_labels() == ['filed', 'resent'])
  1264.         msg.add_label('resent')
  1265.         self.assert_(msg.get_labels() == ['filed', 'resent'])
  1266.         msg.remove_label('filed')
  1267.         self.assert_(msg.get_labels() == ['resent'])
  1268.         msg.add_label('foobar')
  1269.         self.assert_(msg.get_labels() == ['resent', 'foobar'])
  1270.         msg.remove_label('unseen')
  1271.         self.assert_(msg.get_labels() == ['resent', 'foobar'])
  1272.         msg.set_labels(['foobar', 'answered'])
  1273.         self.assert_(msg.get_labels() == ['foobar', 'answered'])
  1274.  
  1275.     def test_visible(self):
  1276.         # Get, set, and update visible headers
  1277.         msg = mailbox.BabylMessage(_sample_message)
  1278.         visible = msg.get_visible()
  1279.         self.assert_(visible.keys() == [])
  1280.         self.assert_(visible.get_payload() is None)
  1281.         visible['User-Agent'] = 'FooBar 1.0'
  1282.         visible['X-Whatever'] = 'Blah'
  1283.         self.assert_(msg.get_visible().keys() == [])
  1284.         msg.set_visible(visible)
  1285.         visible = msg.get_visible()
  1286.         self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
  1287.         self.assert_(visible['User-Agent'] == 'FooBar 1.0')
  1288.         self.assert_(visible['X-Whatever'] == 'Blah')
  1289.         self.assert_(visible.get_payload() is None)
  1290.         msg.update_visible()
  1291.         self.assert_(visible.keys() == ['User-Agent', 'X-Whatever'])
  1292.         self.assert_(visible.get_payload() is None)
  1293.         visible = msg.get_visible()
  1294.         self.assert_(visible.keys() == ['User-Agent', 'Date', 'From', 'To',
  1295.                                         'Subject'])
  1296.         for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
  1297.             self.assert_(visible[header] == msg[header])
  1298.  
  1299.  
  1300. class TestMMDFMessage(_TestMboxMMDFMessage):
  1301.  
  1302.     _factory = mailbox.MMDFMessage
  1303.  
  1304.  
  1305. class TestMessageConversion(TestBase):
  1306.  
  1307.     def test_plain_to_x(self):
  1308.         # Convert Message to all formats
  1309.         for class_ in (mailbox.Message, mailbox.MaildirMessage,
  1310.                        mailbox.mboxMessage, mailbox.MHMessage,
  1311.                        mailbox.BabylMessage, mailbox.MMDFMessage):
  1312.             msg_plain = mailbox.Message(_sample_message)
  1313.             msg = class_(msg_plain)
  1314.             self._check_sample(msg)
  1315.  
  1316.     def test_x_to_plain(self):
  1317.         # Convert all formats to Message
  1318.         for class_ in (mailbox.Message, mailbox.MaildirMessage,
  1319.                        mailbox.mboxMessage, mailbox.MHMessage,
  1320.                        mailbox.BabylMessage, mailbox.MMDFMessage):
  1321.             msg = class_(_sample_message)
  1322.             msg_plain = mailbox.Message(msg)
  1323.             self._check_sample(msg_plain)
  1324.  
  1325.     def test_x_to_invalid(self):
  1326.         # Convert all formats to an invalid format
  1327.         for class_ in (mailbox.Message, mailbox.MaildirMessage,
  1328.                        mailbox.mboxMessage, mailbox.MHMessage,
  1329.                        mailbox.BabylMessage, mailbox.MMDFMessage):
  1330.             self.assertRaises(TypeError, lambda: class_(False))
  1331.  
  1332.     def test_maildir_to_maildir(self):
  1333.         # Convert MaildirMessage to MaildirMessage
  1334.         msg_maildir = mailbox.MaildirMessage(_sample_message)
  1335.         msg_maildir.set_flags('DFPRST')
  1336.         msg_maildir.set_subdir('cur')
  1337.         date = msg_maildir.get_date()
  1338.         msg = mailbox.MaildirMessage(msg_maildir)
  1339.         self._check_sample(msg)
  1340.         self.assert_(msg.get_flags() == 'DFPRST')
  1341.         self.assert_(msg.get_subdir() == 'cur')
  1342.         self.assert_(msg.get_date() == date)
  1343.  
  1344.     def test_maildir_to_mboxmmdf(self):
  1345.         # Convert MaildirMessage to mboxmessage and MMDFMessage
  1346.         pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
  1347.                  ('T', 'D'), ('DFPRST', 'RDFA'))
  1348.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1349.             msg_maildir = mailbox.MaildirMessage(_sample_message)
  1350.             msg_maildir.set_date(0.0)
  1351.             for setting, result in pairs:
  1352.                 msg_maildir.set_flags(setting)
  1353.                 msg = class_(msg_maildir)
  1354.                 self.assert_(msg.get_flags() == result)
  1355.                 self.assert_(msg.get_from() == 'MAILER-DAEMON %s' %
  1356.                              time.asctime(time.gmtime(0.0)))
  1357.             msg_maildir.set_subdir('cur')
  1358.             self.assert_(class_(msg_maildir).get_flags() == 'RODFA')
  1359.  
  1360.     def test_maildir_to_mh(self):
  1361.         # Convert MaildirMessage to MHMessage
  1362.         msg_maildir = mailbox.MaildirMessage(_sample_message)
  1363.         pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
  1364.                  ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
  1365.                  ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
  1366.         for setting, result in pairs:
  1367.             msg_maildir.set_flags(setting)
  1368.             self.assert_(mailbox.MHMessage(msg_maildir).get_sequences() == \
  1369.                          result)
  1370.  
  1371.     def test_maildir_to_babyl(self):
  1372.         # Convert MaildirMessage to Babyl
  1373.         msg_maildir = mailbox.MaildirMessage(_sample_message)
  1374.         pairs = (('D', ['unseen']), ('F', ['unseen']),
  1375.                  ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
  1376.                  ('S', []), ('T', ['unseen', 'deleted']),
  1377.                  ('DFPRST', ['deleted', 'answered', 'forwarded']))
  1378.         for setting, result in pairs:
  1379.             msg_maildir.set_flags(setting)
  1380.             self.assert_(mailbox.BabylMessage(msg_maildir).get_labels() == \
  1381.                          result)
  1382.  
  1383.     def test_mboxmmdf_to_maildir(self):
  1384.         # Convert mboxMessage and MMDFMessage to MaildirMessage
  1385.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1386.             msg_mboxMMDF = class_(_sample_message)
  1387.             msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
  1388.             pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
  1389.                      ('RODFA', 'FRST'))
  1390.             for setting, result in pairs:
  1391.                 msg_mboxMMDF.set_flags(setting)
  1392.                 msg = mailbox.MaildirMessage(msg_mboxMMDF)
  1393.                 self.assert_(msg.get_flags() == result)
  1394.                 self.assert_(msg.get_date() == 0.0, msg.get_date())
  1395.             msg_mboxMMDF.set_flags('O')
  1396.             self.assert_(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir() == \
  1397.                          'cur')
  1398.  
  1399.     def test_mboxmmdf_to_mboxmmdf(self):
  1400.         # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
  1401.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1402.             msg_mboxMMDF = class_(_sample_message)
  1403.             msg_mboxMMDF.set_flags('RODFA')
  1404.             msg_mboxMMDF.set_from('foo@bar')
  1405.             for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1406.                 msg2 = class2_(msg_mboxMMDF)
  1407.                 self.assert_(msg2.get_flags() == 'RODFA')
  1408.                 self.assert_(msg2.get_from() == 'foo@bar')
  1409.  
  1410.     def test_mboxmmdf_to_mh(self):
  1411.         # Convert mboxMessage and MMDFMessage to MHMessage
  1412.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1413.             msg_mboxMMDF = class_(_sample_message)
  1414.             pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
  1415.                      ('F', ['unseen', 'flagged']),
  1416.                      ('A', ['unseen', 'replied']),
  1417.                      ('RODFA', ['replied', 'flagged']))
  1418.             for setting, result in pairs:
  1419.                 msg_mboxMMDF.set_flags(setting)
  1420.                 self.assert_(mailbox.MHMessage(msg_mboxMMDF).get_sequences() \
  1421.                              == result)
  1422.  
  1423.     def test_mboxmmdf_to_babyl(self):
  1424.         # Convert mboxMessage and MMDFMessage to BabylMessage
  1425.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1426.             msg = class_(_sample_message)
  1427.             pairs = (('R', []), ('O', ['unseen']),
  1428.                      ('D', ['unseen', 'deleted']), ('F', ['unseen']),
  1429.                      ('A', ['unseen', 'answered']),
  1430.                      ('RODFA', ['deleted', 'answered']))
  1431.             for setting, result in pairs:
  1432.                 msg.set_flags(setting)
  1433.                 self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
  1434.  
  1435.     def test_mh_to_maildir(self):
  1436.         # Convert MHMessage to MaildirMessage
  1437.         pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
  1438.         for setting, result in pairs:
  1439.             msg = mailbox.MHMessage(_sample_message)
  1440.             msg.add_sequence(setting)
  1441.             self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
  1442.             self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
  1443.         msg = mailbox.MHMessage(_sample_message)
  1444.         msg.add_sequence('unseen')
  1445.         msg.add_sequence('replied')
  1446.         msg.add_sequence('flagged')
  1447.         self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'FR')
  1448.         self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
  1449.  
  1450.     def test_mh_to_mboxmmdf(self):
  1451.         # Convert MHMessage to mboxMessage and MMDFMessage
  1452.         pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
  1453.         for setting, result in pairs:
  1454.             msg = mailbox.MHMessage(_sample_message)
  1455.             msg.add_sequence(setting)
  1456.             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1457.                 self.assert_(class_(msg).get_flags() == result)
  1458.         msg = mailbox.MHMessage(_sample_message)
  1459.         msg.add_sequence('unseen')
  1460.         msg.add_sequence('replied')
  1461.         msg.add_sequence('flagged')
  1462.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1463.             self.assert_(class_(msg).get_flags() == 'OFA')
  1464.  
  1465.     def test_mh_to_mh(self):
  1466.         # Convert MHMessage to MHMessage
  1467.         msg = mailbox.MHMessage(_sample_message)
  1468.         msg.add_sequence('unseen')
  1469.         msg.add_sequence('replied')
  1470.         msg.add_sequence('flagged')
  1471.         self.assert_(mailbox.MHMessage(msg).get_sequences() == \
  1472.                      ['unseen', 'replied', 'flagged'])
  1473.  
  1474.     def test_mh_to_babyl(self):
  1475.         # Convert MHMessage to BabylMessage
  1476.         pairs = (('unseen', ['unseen']), ('replied', ['answered']),
  1477.                  ('flagged', []))
  1478.         for setting, result in pairs:
  1479.             msg = mailbox.MHMessage(_sample_message)
  1480.             msg.add_sequence(setting)
  1481.             self.assert_(mailbox.BabylMessage(msg).get_labels() == result)
  1482.         msg = mailbox.MHMessage(_sample_message)
  1483.         msg.add_sequence('unseen')
  1484.         msg.add_sequence('replied')
  1485.         msg.add_sequence('flagged')
  1486.         self.assert_(mailbox.BabylMessage(msg).get_labels() == \
  1487.                      ['unseen', 'answered'])
  1488.  
  1489.     def test_babyl_to_maildir(self):
  1490.         # Convert BabylMessage to MaildirMessage
  1491.         pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
  1492.                  ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
  1493.                  ('resent', 'PS'))
  1494.         for setting, result in pairs:
  1495.             msg = mailbox.BabylMessage(_sample_message)
  1496.             msg.add_label(setting)
  1497.             self.assert_(mailbox.MaildirMessage(msg).get_flags() == result)
  1498.             self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
  1499.         msg = mailbox.BabylMessage(_sample_message)
  1500.         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1501.                       'edited', 'resent'):
  1502.             msg.add_label(label)
  1503.         self.assert_(mailbox.MaildirMessage(msg).get_flags() == 'PRT')
  1504.         self.assert_(mailbox.MaildirMessage(msg).get_subdir() == 'cur')
  1505.  
  1506.     def test_babyl_to_mboxmmdf(self):
  1507.         # Convert BabylMessage to mboxMessage and MMDFMessage
  1508.         pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
  1509.                  ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
  1510.                  ('resent', 'RO'))
  1511.         for setting, result in pairs:
  1512.             for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1513.                 msg = mailbox.BabylMessage(_sample_message)
  1514.                 msg.add_label(setting)
  1515.                 self.assert_(class_(msg).get_flags() == result)
  1516.         msg = mailbox.BabylMessage(_sample_message)
  1517.         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1518.                       'edited', 'resent'):
  1519.             msg.add_label(label)
  1520.         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
  1521.             self.assert_(class_(msg).get_flags() == 'ODA')
  1522.  
  1523.     def test_babyl_to_mh(self):
  1524.         # Convert BabylMessage to MHMessage
  1525.         pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
  1526.                  ('answered', ['replied']), ('forwarded', []), ('edited', []),
  1527.                  ('resent', []))
  1528.         for setting, result in pairs:
  1529.             msg = mailbox.BabylMessage(_sample_message)
  1530.             msg.add_label(setting)
  1531.             self.assert_(mailbox.MHMessage(msg).get_sequences() == result)
  1532.         msg = mailbox.BabylMessage(_sample_message)
  1533.         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1534.                       'edited', 'resent'):
  1535.             msg.add_label(label)
  1536.         self.assert_(mailbox.MHMessage(msg).get_sequences() == \
  1537.                      ['unseen', 'replied'])
  1538.  
  1539.     def test_babyl_to_babyl(self):
  1540.         # Convert BabylMessage to BabylMessage
  1541.         msg = mailbox.BabylMessage(_sample_message)
  1542.         msg.update_visible()
  1543.         for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
  1544.                       'edited', 'resent'):
  1545.             msg.add_label(label)
  1546.         msg2 = mailbox.BabylMessage(msg)
  1547.         self.assert_(msg2.get_labels() == ['unseen', 'deleted', 'filed',
  1548.                                            'answered', 'forwarded', 'edited',
  1549.                                            'resent'])
  1550.         self.assert_(msg.get_visible().keys() == msg2.get_visible().keys())
  1551.         for key in msg.get_visible().keys():
  1552.             self.assert_(msg.get_visible()[key] == msg2.get_visible()[key])
  1553.  
  1554.  
  1555. class TestProxyFileBase(TestBase):
  1556.  
  1557.     def _test_read(self, proxy):
  1558.         # Read by byte
  1559.         proxy.seek(0)
  1560.         self.assert_(proxy.read() == 'bar')
  1561.         proxy.seek(1)
  1562.         self.assert_(proxy.read() == 'ar')
  1563.         proxy.seek(0)
  1564.         self.assert_(proxy.read(2) == 'ba')
  1565.         proxy.seek(1)
  1566.         self.assert_(proxy.read(-1) == 'ar')
  1567.         proxy.seek(2)
  1568.         self.assert_(proxy.read(1000) == 'r')
  1569.  
  1570.     def _test_readline(self, proxy):
  1571.         # Read by line
  1572.         proxy.seek(0)
  1573.         self.assert_(proxy.readline() == 'foo' + os.linesep)
  1574.         self.assert_(proxy.readline() == 'bar' + os.linesep)
  1575.         self.assert_(proxy.readline() == 'fred' + os.linesep)
  1576.         self.assert_(proxy.readline() == 'bob')
  1577.         proxy.seek(2)
  1578.         self.assert_(proxy.readline() == 'o' + os.linesep)
  1579.         proxy.seek(6 + 2 * len(os.linesep))
  1580.         self.assert_(proxy.readline() == 'fred' + os.linesep)
  1581.         proxy.seek(6 + 2 * len(os.linesep))
  1582.         self.assert_(proxy.readline(2) == 'fr')
  1583.         self.assert_(proxy.readline(-10) == 'ed' + os.linesep)
  1584.  
  1585.     def _test_readlines(self, proxy):
  1586.         # Read multiple lines
  1587.         proxy.seek(0)
  1588.         self.assert_(proxy.readlines() == ['foo' + os.linesep,
  1589.                                            'bar' + os.linesep,
  1590.                                            'fred' + os.linesep, 'bob'])
  1591.         proxy.seek(0)
  1592.         self.assert_(proxy.readlines(2) == ['foo' + os.linesep])
  1593.         proxy.seek(3 + len(os.linesep))
  1594.         self.assert_(proxy.readlines(4 + len(os.linesep)) ==
  1595.                      ['bar' + os.linesep, 'fred' + os.linesep])
  1596.         proxy.seek(3)
  1597.         self.assert_(proxy.readlines(1000) == [os.linesep, 'bar' + os.linesep,
  1598.                                                'fred' + os.linesep, 'bob'])
  1599.  
  1600.     def _test_iteration(self, proxy):
  1601.         # Iterate by line
  1602.         proxy.seek(0)
  1603.         iterator = iter(proxy)
  1604.         self.assert_(iterator.next() == 'foo' + os.linesep)
  1605.         self.assert_(iterator.next() == 'bar' + os.linesep)
  1606.         self.assert_(iterator.next() == 'fred' + os.linesep)
  1607.         self.assert_(iterator.next() == 'bob')
  1608.         self.assertRaises(StopIteration, lambda: iterator.next())
  1609.  
  1610.     def _test_seek_and_tell(self, proxy):
  1611.         # Seek and use tell to check position
  1612.         proxy.seek(3)
  1613.         self.assert_(proxy.tell() == 3)
  1614.         self.assert_(proxy.read(len(os.linesep)) == os.linesep)
  1615.         proxy.seek(2, 1)
  1616.         self.assert_(proxy.read(1 + len(os.linesep)) == 'r' + os.linesep)
  1617.         proxy.seek(-3 - len(os.linesep), 2)
  1618.         self.assert_(proxy.read(3) == 'bar')
  1619.         proxy.seek(2, 0)
  1620.         self.assert_(proxy.read() == 'o' + os.linesep + 'bar' + os.linesep)
  1621.         proxy.seek(100)
  1622.         self.assert_(proxy.read() == '')
  1623.  
  1624.     def _test_close(self, proxy):
  1625.         # Close a file
  1626.         proxy.close()
  1627.         self.assertRaises(AttributeError, lambda: proxy.close())
  1628.  
  1629.  
  1630. class TestProxyFile(TestProxyFileBase):
  1631.  
  1632.     def setUp(self):
  1633.         self._path = test_support.TESTFN
  1634.         self._file = open(self._path, 'wb+')
  1635.  
  1636.     def tearDown(self):
  1637.         self._file.close()
  1638.         self._delete_recursively(self._path)
  1639.  
  1640.     def test_initialize(self):
  1641.         # Initialize and check position
  1642.         self._file.write('foo')
  1643.         pos = self._file.tell()
  1644.         proxy0 = mailbox._ProxyFile(self._file)
  1645.         self.assert_(proxy0.tell() == pos)
  1646.         self.assert_(self._file.tell() == pos)
  1647.         proxy1 = mailbox._ProxyFile(self._file, 0)
  1648.         self.assert_(proxy1.tell() == 0)
  1649.         self.assert_(self._file.tell() == pos)
  1650.  
  1651.     def test_read(self):
  1652.         self._file.write('bar')
  1653.         self._test_read(mailbox._ProxyFile(self._file))
  1654.  
  1655.     def test_readline(self):
  1656.         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1657.                                                   os.linesep))
  1658.         self._test_readline(mailbox._ProxyFile(self._file))
  1659.  
  1660.     def test_readlines(self):
  1661.         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1662.                                                   os.linesep))
  1663.         self._test_readlines(mailbox._ProxyFile(self._file))
  1664.  
  1665.     def test_iteration(self):
  1666.         self._file.write('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
  1667.                                                   os.linesep))
  1668.         self._test_iteration(mailbox._ProxyFile(self._file))
  1669.  
  1670.     def test_seek_and_tell(self):
  1671.         self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
  1672.         self._test_seek_and_tell(mailbox._ProxyFile(self._file))
  1673.  
  1674.     def test_close(self):
  1675.         self._file.write('foo%sbar%s' % (os.linesep, os.linesep))
  1676.         self._test_close(mailbox._ProxyFile(self._file))
  1677.  
  1678.  
  1679. class TestPartialFile(TestProxyFileBase):
  1680.  
  1681.     def setUp(self):
  1682.         self._path = test_support.TESTFN
  1683.         self._file = open(self._path, 'wb+')
  1684.  
  1685.     def tearDown(self):
  1686.         self._file.close()
  1687.         self._delete_recursively(self._path)
  1688.  
  1689.     def test_initialize(self):
  1690.         # Initialize and check position
  1691.         self._file.write('foo' + os.linesep + 'bar')
  1692.         pos = self._file.tell()
  1693.         proxy = mailbox._PartialFile(self._file, 2, 5)
  1694.         self.assert_(proxy.tell() == 0)
  1695.         self.assert_(self._file.tell() == pos)
  1696.  
  1697.     def test_read(self):
  1698.         self._file.write('***bar***')
  1699.         self._test_read(mailbox._PartialFile(self._file, 3, 6))
  1700.  
  1701.     def test_readline(self):
  1702.         self._file.write('!!!!!foo%sbar%sfred%sbob!!!!!' %
  1703.                          (os.linesep, os.linesep, os.linesep))
  1704.         self._test_readline(mailbox._PartialFile(self._file, 5,
  1705.                                                  18 + 3 * len(os.linesep)))
  1706.  
  1707.     def test_readlines(self):
  1708.         self._file.write('foo%sbar%sfred%sbob?????' %
  1709.                          (os.linesep, os.linesep, os.linesep))
  1710.         self._test_readlines(mailbox._PartialFile(self._file, 0,
  1711.                                                   13 + 3 * len(os.linesep)))
  1712.  
  1713.     def test_iteration(self):
  1714.         self._file.write('____foo%sbar%sfred%sbob####' %
  1715.                          (os.linesep, os.linesep, os.linesep))
  1716.         self._test_iteration(mailbox._PartialFile(self._file, 4,
  1717.                                                   17 + 3 * len(os.linesep)))
  1718.  
  1719.     def test_seek_and_tell(self):
  1720.         self._file.write('(((foo%sbar%s$$$' % (os.linesep, os.linesep))
  1721.         self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
  1722.                                                       9 + 2 * len(os.linesep)))
  1723.  
  1724.     def test_close(self):
  1725.         self._file.write('&foo%sbar%s^' % (os.linesep, os.linesep))
  1726.         self._test_close(mailbox._PartialFile(self._file, 1,
  1727.                                               6 + 3 * len(os.linesep)))
  1728.  
  1729.  
  1730. ## Start: tests from the original module (for backward compatibility).
  1731.  
  1732. FROM_ = "From some.body@dummy.domain  Sat Jul 24 13:43:35 2004\n"
  1733. DUMMY_MESSAGE = """\
  1734. From: some.body@dummy.domain
  1735. To: me@my.domain
  1736. Subject: Simple Test
  1737.  
  1738. This is a dummy message.
  1739. """
  1740.  
  1741. class MaildirTestCase(unittest.TestCase):
  1742.  
  1743.     def setUp(self):
  1744.         # create a new maildir mailbox to work with:
  1745.         self._dir = test_support.TESTFN
  1746.         os.mkdir(self._dir)
  1747.         os.mkdir(os.path.join(self._dir, "cur"))
  1748.         os.mkdir(os.path.join(self._dir, "tmp"))
  1749.         os.mkdir(os.path.join(self._dir, "new"))
  1750.         self._counter = 1
  1751.         self._msgfiles = []
  1752.  
  1753.     def tearDown(self):
  1754.         map(os.unlink, self._msgfiles)
  1755.         os.rmdir(os.path.join(self._dir, "cur"))
  1756.         os.rmdir(os.path.join(self._dir, "tmp"))
  1757.         os.rmdir(os.path.join(self._dir, "new"))
  1758.         os.rmdir(self._dir)
  1759.  
  1760.     def createMessage(self, dir, mbox=False):
  1761.         t = int(time.time() % 1000000)
  1762.         pid = self._counter
  1763.         self._counter += 1
  1764.         filename = os.extsep.join((str(t), str(pid), "myhostname", "mydomain"))
  1765.         tmpname = os.path.join(self._dir, "tmp", filename)
  1766.         newname = os.path.join(self._dir, dir, filename)
  1767.         fp = open(tmpname, "w")
  1768.         self._msgfiles.append(tmpname)
  1769.         if mbox:
  1770.             fp.write(FROM_)
  1771.         fp.write(DUMMY_MESSAGE)
  1772.         fp.close()
  1773.         if hasattr(os, "link"):
  1774.             os.link(tmpname, newname)
  1775.         else:
  1776.             fp = open(newname, "w")
  1777.             fp.write(DUMMY_MESSAGE)
  1778.             fp.close()
  1779.         self._msgfiles.append(newname)
  1780.         return tmpname
  1781.  
  1782.     def test_empty_maildir(self):
  1783.         """Test an empty maildir mailbox"""
  1784.         # Test for regression on bug #117490:
  1785.         # Make sure the boxes attribute actually gets set.
  1786.         self.mbox = mailbox.Maildir(test_support.TESTFN)
  1787.         #self.assert_(hasattr(self.mbox, "boxes"))
  1788.         #self.assert_(len(self.mbox.boxes) == 0)
  1789.         self.assert_(self.mbox.next() is None)
  1790.         self.assert_(self.mbox.next() is None)
  1791.  
  1792.     def test_nonempty_maildir_cur(self):
  1793.         self.createMessage("cur")
  1794.         self.mbox = mailbox.Maildir(test_support.TESTFN)
  1795.         #self.assert_(len(self.mbox.boxes) == 1)
  1796.         self.assert_(self.mbox.next() is not None)
  1797.         self.assert_(self.mbox.next() is None)
  1798.         self.assert_(self.mbox.next() is None)
  1799.  
  1800.     def test_nonempty_maildir_new(self):
  1801.         self.createMessage("new")
  1802.         self.mbox = mailbox.Maildir(test_support.TESTFN)
  1803.         #self.assert_(len(self.mbox.boxes) == 1)
  1804.         self.assert_(self.mbox.next() is not None)
  1805.         self.assert_(self.mbox.next() is None)
  1806.         self.assert_(self.mbox.next() is None)
  1807.  
  1808.     def test_nonempty_maildir_both(self):
  1809.         self.createMessage("cur")
  1810.         self.createMessage("new")
  1811.         self.mbox = mailbox.Maildir(test_support.TESTFN)
  1812.         #self.assert_(len(self.mbox.boxes) == 2)
  1813.         self.assert_(self.mbox.next() is not None)
  1814.         self.assert_(self.mbox.next() is not None)
  1815.         self.assert_(self.mbox.next() is None)
  1816.         self.assert_(self.mbox.next() is None)
  1817.  
  1818.     def test_unix_mbox(self):
  1819.         ### should be better!
  1820.         import email.parser
  1821.         fname = self.createMessage("cur", True)
  1822.         n = 0
  1823.         for msg in mailbox.PortableUnixMailbox(open(fname),
  1824.                                                email.parser.Parser().parse):
  1825.             n += 1
  1826.             self.assertEqual(msg["subject"], "Simple Test")
  1827.             self.assertEqual(len(str(msg)), len(FROM_)+len(DUMMY_MESSAGE))
  1828.         self.assertEqual(n, 1)
  1829.  
  1830. ## End: classes from the original module (for backward compatibility).
  1831.  
  1832.  
  1833. _sample_message = """\
  1834. Return-Path: <gkj@gregorykjohnson.com>
  1835. X-Original-To: gkj+person@localhost
  1836. Delivered-To: gkj+person@localhost
  1837. Received: from localhost (localhost [127.0.0.1])
  1838.         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
  1839.         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
  1840. Delivered-To: gkj@sundance.gregorykjohnson.com
  1841. Received: from localhost [127.0.0.1]
  1842.         by localhost with POP3 (fetchmail-6.2.5)
  1843.         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
  1844. Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
  1845.         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
  1846.         for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
  1847. Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
  1848.         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
  1849. Date: Wed, 13 Jul 2005 17:23:11 -0400
  1850. From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
  1851. To: gkj@gregorykjohnson.com
  1852. Subject: Sample message
  1853. Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
  1854. Mime-Version: 1.0
  1855. Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
  1856. Content-Disposition: inline
  1857. User-Agent: Mutt/1.5.9i
  1858.  
  1859.  
  1860. --NMuMz9nt05w80d4+
  1861. Content-Type: text/plain; charset=us-ascii
  1862. Content-Disposition: inline
  1863.  
  1864. This is a sample message.
  1865.  
  1866. --
  1867. Gregory K. Johnson
  1868.  
  1869. --NMuMz9nt05w80d4+
  1870. Content-Type: application/octet-stream
  1871. Content-Disposition: attachment; filename="text.gz"
  1872. Content-Transfer-Encoding: base64
  1873.  
  1874. H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
  1875. 3FYlAAAA
  1876.  
  1877. --NMuMz9nt05w80d4+--
  1878. """
  1879.  
  1880. _sample_headers = {
  1881.     "Return-Path":"<gkj@gregorykjohnson.com>",
  1882.     "X-Original-To":"gkj+person@localhost",
  1883.     "Delivered-To":"gkj+person@localhost",
  1884.     "Received":"""from localhost (localhost [127.0.0.1])
  1885.         by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
  1886.         for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
  1887.     "Delivered-To":"gkj@sundance.gregorykjohnson.com",
  1888.     "Received":"""from localhost [127.0.0.1]
  1889.         by localhost with POP3 (fetchmail-6.2.5)
  1890.         for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
  1891.     "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
  1892.         by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
  1893.         for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
  1894.     "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
  1895.         id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
  1896.     "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
  1897.     "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
  1898.     "To":"gkj@gregorykjohnson.com",
  1899.     "Subject":"Sample message",
  1900.     "Mime-Version":"1.0",
  1901.     "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
  1902.     "Content-Disposition":"inline",
  1903.     "User-Agent": "Mutt/1.5.9i" }
  1904.  
  1905. _sample_payloads = ("""This is a sample message.
  1906.  
  1907. --
  1908. Gregory K. Johnson
  1909. """,
  1910. """H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
  1911. 3FYlAAAA
  1912. """)
  1913.  
  1914.  
  1915. def test_main():
  1916.     tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
  1917.              TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
  1918.              TestMHMessage, TestBabylMessage, TestMMDFMessage,
  1919.              TestMessageConversion, TestProxyFile, TestPartialFile,
  1920.              MaildirTestCase)
  1921.     test_support.run_unittest(*tests)
  1922.     test_support.reap_children()
  1923.  
  1924.  
  1925. if __name__ == '__main__':
  1926.     test_main()
  1927.