home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_bsddb.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  11.2 KB  |  353 lines

  1. #! /usr/bin/env python
  2. """Test script for the bsddb C module by Roger E. Masse
  3.    Adapted to unittest format and expanded scope by Raymond Hettinger
  4. """
  5. import os, sys
  6. import copy
  7. import bsddb
  8. import dbhash # Just so we know it's imported
  9. import unittest
  10. from test import test_support
  11.  
  12. class TestBSDDB(unittest.TestCase):
  13.     openflag = 'c'
  14.  
  15.     def setUp(self):
  16.         self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
  17.         self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
  18.         for k, v in self.d.iteritems():
  19.             self.f[k] = v
  20.  
  21.     def tearDown(self):
  22.         self.f.sync()
  23.         self.f.close()
  24.         if self.fname is None:
  25.             return
  26.         try:
  27.             os.remove(self.fname)
  28.         except os.error:
  29.             pass
  30.  
  31.     def test_getitem(self):
  32.         for k, v in self.d.iteritems():
  33.             self.assertEqual(self.f[k], v)
  34.  
  35.     def test_len(self):
  36.         self.assertEqual(len(self.f), len(self.d))
  37.  
  38.     def test_change(self):
  39.         self.f['r'] = 'discovered'
  40.         self.assertEqual(self.f['r'], 'discovered')
  41.         self.assert_('r' in self.f.keys())
  42.         self.assert_('discovered' in self.f.values())
  43.  
  44.     def test_close_and_reopen(self):
  45.         if self.fname is None:
  46.             # if we're using an in-memory only db, we can't reopen it
  47.             # so finish here.
  48.             return
  49.         self.f.close()
  50.         self.f = self.openmethod[0](self.fname, 'w')
  51.         for k, v in self.d.iteritems():
  52.             self.assertEqual(self.f[k], v)
  53.  
  54.     def assertSetEquals(self, seqn1, seqn2):
  55.         self.assertEqual(set(seqn1), set(seqn2))
  56.  
  57.     def test_mapping_iteration_methods(self):
  58.         f = self.f
  59.         d = self.d
  60.         self.assertSetEquals(d, f)
  61.         self.assertSetEquals(d.keys(), f.keys())
  62.         self.assertSetEquals(d.values(), f.values())
  63.         self.assertSetEquals(d.items(), f.items())
  64.         self.assertSetEquals(d.iterkeys(), f.iterkeys())
  65.         self.assertSetEquals(d.itervalues(), f.itervalues())
  66.         self.assertSetEquals(d.iteritems(), f.iteritems())
  67.  
  68.     def test_iter_while_modifying_values(self):
  69.         di = iter(self.d)
  70.         while 1:
  71.             try:
  72.                 key = di.next()
  73.                 self.d[key] = 'modified '+key
  74.             except StopIteration:
  75.                 break
  76.  
  77.         # it should behave the same as a dict.  modifying values
  78.         # of existing keys should not break iteration.  (adding
  79.         # or removing keys should)
  80.         loops_left = len(self.f)
  81.         fi = iter(self.f)
  82.         while 1:
  83.             try:
  84.                 key = fi.next()
  85.                 self.f[key] = 'modified '+key
  86.                 loops_left -= 1
  87.             except StopIteration:
  88.                 break
  89.         self.assertEqual(loops_left, 0)
  90.  
  91.         self.test_mapping_iteration_methods()
  92.  
  93.     def test_iter_abort_on_changed_size(self):
  94.         def DictIterAbort():
  95.             di = iter(self.d)
  96.             while 1:
  97.                 try:
  98.                     di.next()
  99.                     self.d['newkey'] = 'SPAM'
  100.                 except StopIteration:
  101.                     break
  102.         self.assertRaises(RuntimeError, DictIterAbort)
  103.  
  104.         def DbIterAbort():
  105.             fi = iter(self.f)
  106.             while 1:
  107.                 try:
  108.                     fi.next()
  109.                     self.f['newkey'] = 'SPAM'
  110.                 except StopIteration:
  111.                     break
  112.         self.assertRaises(RuntimeError, DbIterAbort)
  113.  
  114.     def test_iteritems_abort_on_changed_size(self):
  115.         def DictIteritemsAbort():
  116.             di = self.d.iteritems()
  117.             while 1:
  118.                 try:
  119.                     di.next()
  120.                     self.d['newkey'] = 'SPAM'
  121.                 except StopIteration:
  122.                     break
  123.         self.assertRaises(RuntimeError, DictIteritemsAbort)
  124.  
  125.         def DbIteritemsAbort():
  126.             fi = self.f.iteritems()
  127.             while 1:
  128.                 try:
  129.                     key, value = fi.next()
  130.                     del self.f[key]
  131.                 except StopIteration:
  132.                     break
  133.         self.assertRaises(RuntimeError, DbIteritemsAbort)
  134.  
  135.     def test_iteritems_while_modifying_values(self):
  136.         di = self.d.iteritems()
  137.         while 1:
  138.             try:
  139.                 k, v = di.next()
  140.                 self.d[k] = 'modified '+v
  141.             except StopIteration:
  142.                 break
  143.  
  144.         # it should behave the same as a dict.  modifying values
  145.         # of existing keys should not break iteration.  (adding
  146.         # or removing keys should)
  147.         loops_left = len(self.f)
  148.         fi = self.f.iteritems()
  149.         while 1:
  150.             try:
  151.                 k, v = fi.next()
  152.                 self.f[k] = 'modified '+v
  153.                 loops_left -= 1
  154.             except StopIteration:
  155.                 break
  156.         self.assertEqual(loops_left, 0)
  157.  
  158.         self.test_mapping_iteration_methods()
  159.  
  160.     def test_first_next_looping(self):
  161.         items = [self.f.first()]
  162.         for i in xrange(1, len(self.f)):
  163.             items.append(self.f.next())
  164.         self.assertSetEquals(items, self.d.items())
  165.  
  166.     def test_previous_last_looping(self):
  167.         items = [self.f.last()]
  168.         for i in xrange(1, len(self.f)):
  169.             items.append(self.f.previous())
  170.         self.assertSetEquals(items, self.d.items())
  171.  
  172.     def test_first_while_deleting(self):
  173.         # Test for bug 1725856
  174.         self.assert_(len(self.d) >= 2, "test requires >=2 items")
  175.         for _ in self.d:
  176.             key = self.f.first()[0]
  177.             del self.f[key]
  178.         self.assertEqual([], self.f.items(), "expected empty db after test")
  179.  
  180.     def test_last_while_deleting(self):
  181.         # Test for bug 1725856's evil twin
  182.         self.assert_(len(self.d) >= 2, "test requires >=2 items")
  183.         for _ in self.d:
  184.             key = self.f.last()[0]
  185.             del self.f[key]
  186.         self.assertEqual([], self.f.items(), "expected empty db after test")
  187.  
  188.     def test_set_location(self):
  189.         self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
  190.  
  191.     def test_contains(self):
  192.         for k in self.d:
  193.             self.assert_(k in self.f)
  194.         self.assert_('not here' not in self.f)
  195.  
  196.     def test_has_key(self):
  197.         for k in self.d:
  198.             self.assert_(self.f.has_key(k))
  199.         self.assert_(not self.f.has_key('not here'))
  200.  
  201.     def test_clear(self):
  202.         self.f.clear()
  203.         self.assertEqual(len(self.f), 0)
  204.  
  205.     def test__no_deadlock_first(self, debug=0):
  206.         # do this so that testers can see what function we're in in
  207.         # verbose mode when we deadlock.
  208.         sys.stdout.flush()
  209.  
  210.         # in pybsddb's _DBWithCursor this causes an internal DBCursor
  211.         # object is created.  Other test_ methods in this class could
  212.         # inadvertently cause the deadlock but an explicit test is needed.
  213.         if debug: print "A"
  214.         k,v = self.f.first()
  215.         if debug: print "B", k
  216.         self.f[k] = "deadlock.  do not pass go.  do not collect $200."
  217.         if debug: print "C"
  218.         # if the bsddb implementation leaves the DBCursor open during
  219.         # the database write and locking+threading support is enabled
  220.         # the cursor's read lock will deadlock the write lock request..
  221.  
  222.         # test the iterator interface
  223.         if True:
  224.             if debug: print "D"
  225.             i = self.f.iteritems()
  226.             k,v = i.next()
  227.             if debug: print "E"
  228.             self.f[k] = "please don't deadlock"
  229.             if debug: print "F"
  230.             while 1:
  231.                 try:
  232.                     k,v = i.next()
  233.                 except StopIteration:
  234.                     break
  235.             if debug: print "F2"
  236.  
  237.             i = iter(self.f)
  238.             if debug: print "G"
  239.             while i:
  240.                 try:
  241.                     if debug: print "H"
  242.                     k = i.next()
  243.                     if debug: print "I"
  244.                     self.f[k] = "deadlocks-r-us"
  245.                     if debug: print "J"
  246.                 except StopIteration:
  247.                     i = None
  248.             if debug: print "K"
  249.  
  250.         # test the legacy cursor interface mixed with writes
  251.         self.assert_(self.f.first()[0] in self.d)
  252.         k = self.f.next()[0]
  253.         self.assert_(k in self.d)
  254.         self.f[k] = "be gone with ye deadlocks"
  255.         self.assert_(self.f[k], "be gone with ye deadlocks")
  256.  
  257.     def test_for_cursor_memleak(self):
  258.         # do the bsddb._DBWithCursor iterator internals leak cursors?
  259.         nc1 = len(self.f._cursor_refs)
  260.         # create iterator
  261.         i = self.f.iteritems()
  262.         nc2 = len(self.f._cursor_refs)
  263.         # use the iterator (should run to the first yield, creating the cursor)
  264.         k, v = i.next()
  265.         nc3 = len(self.f._cursor_refs)
  266.         # destroy the iterator; this should cause the weakref callback
  267.         # to remove the cursor object from self.f._cursor_refs
  268.         del i
  269.         nc4 = len(self.f._cursor_refs)
  270.  
  271.         self.assertEqual(nc1, nc2)
  272.         self.assertEqual(nc1, nc4)
  273.         self.assert_(nc3 == nc1+1)
  274.  
  275.     def test_popitem(self):
  276.         k, v = self.f.popitem()
  277.         self.assert_(k in self.d)
  278.         self.assert_(v in self.d.values())
  279.         self.assert_(k not in self.f)
  280.         self.assertEqual(len(self.d)-1, len(self.f))
  281.  
  282.     def test_pop(self):
  283.         k = 'w'
  284.         v = self.f.pop(k)
  285.         self.assertEqual(v, self.d[k])
  286.         self.assert_(k not in self.f)
  287.         self.assert_(v not in self.f.values())
  288.         self.assertEqual(len(self.d)-1, len(self.f))
  289.  
  290.     def test_get(self):
  291.         self.assertEqual(self.f.get('NotHere'), None)
  292.         self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
  293.         self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
  294.  
  295.     def test_setdefault(self):
  296.         self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
  297.         self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
  298.  
  299.     def test_update(self):
  300.         new = dict(y='life', u='of', i='brian')
  301.         self.f.update(new)
  302.         self.d.update(new)
  303.         for k, v in self.d.iteritems():
  304.             self.assertEqual(self.f[k], v)
  305.  
  306.     def test_keyordering(self):
  307.         if self.openmethod[0] is not bsddb.btopen:
  308.             return
  309.         keys = self.d.keys()
  310.         keys.sort()
  311.         self.assertEqual(self.f.first()[0], keys[0])
  312.         self.assertEqual(self.f.next()[0], keys[1])
  313.         self.assertEqual(self.f.last()[0], keys[-1])
  314.         self.assertEqual(self.f.previous()[0], keys[-2])
  315.         self.assertEqual(list(self.f), keys)
  316.  
  317. class TestBTree(TestBSDDB):
  318.     fname = test_support.TESTFN
  319.     openmethod = [bsddb.btopen]
  320.  
  321. class TestBTree_InMemory(TestBSDDB):
  322.     fname = None
  323.     openmethod = [bsddb.btopen]
  324.  
  325. class TestBTree_InMemory_Truncate(TestBSDDB):
  326.     fname = None
  327.     openflag = 'n'
  328.     openmethod = [bsddb.btopen]
  329.  
  330. class TestHashTable(TestBSDDB):
  331.     fname = test_support.TESTFN
  332.     openmethod = [bsddb.hashopen]
  333.  
  334. class TestHashTable_InMemory(TestBSDDB):
  335.     fname = None
  336.     openmethod = [bsddb.hashopen]
  337.  
  338. ##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
  339. ##         #                                   appears broken... at least on
  340. ##         #                                   Solaris Intel - rmasse 1/97
  341.  
  342. def test_main(verbose=None):
  343.     test_support.run_unittest(
  344.         TestBTree,
  345.         TestHashTable,
  346.         TestBTree_InMemory,
  347.         TestHashTable_InMemory,
  348.         TestBTree_InMemory_Truncate,
  349.     )
  350.  
  351. if __name__ == "__main__":
  352.     test_main(verbose=True)
  353.