home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import sys
- import os
- import string
- import time
- from pprint import pprint
- import unittest
- from test_all import db, dbshelve, test_support, verbose, have_threads, get_new_environment_path
- musicdata = {
- 1: ('Bad English', 'The Price Of Love', 'Rock'),
- 2: ('DNA featuring Suzanne Vega', "Tom's Diner", 'Rock'),
- 3: ('George Michael', 'Praying For Time', 'Rock'),
- 4: ('Gloria Estefan', 'Here We Are', 'Rock'),
- 5: ('Linda Ronstadt', "Don't Know Much", 'Rock'),
- 6: ('Michael Bolton', 'How Am I Supposed To Live Without You', 'Blues'),
- 7: ('Paul Young', 'Oh Girl', 'Rock'),
- 8: ('Paula Abdul', 'Opposites Attract', 'Rock'),
- 9: ('Richard Marx', "Should've Known Better", 'Rock'),
- 10: ('Rod Stewart', 'Forever Young', 'Rock'),
- 11: ('Roxette', 'Dangerous', 'Rock'),
- 12: ('Sheena Easton', 'The Lover In Me', 'Rock'),
- 13: ("Sinead O'Connor", 'Nothing Compares 2 U', 'Rock'),
- 14: ('Stevie B.', 'Because I Love You', 'Rock'),
- 15: ('Taylor Dayne', 'Love Will Lead You Back', 'Rock'),
- 16: ('The Bangles', 'Eternal Flame', 'Rock'),
- 17: ('Wilson Phillips', 'Release Me', 'Rock'),
- 18: ('Billy Joel', 'Blonde Over Blue', 'Rock'),
- 19: ('Billy Joel', 'Famous Last Words', 'Rock'),
- 20: ('Billy Joel', 'Lullabye (Goodnight, My Angel)', 'Rock'),
- 21: ('Billy Joel', 'The River Of Dreams', 'Rock'),
- 22: ('Billy Joel', 'Two Thousand Years', 'Rock'),
- 23: ('Janet Jackson', 'Alright', 'Rock'),
- 24: ('Janet Jackson', 'Black Cat', 'Rock'),
- 25: ('Janet Jackson', 'Come Back To Me', 'Rock'),
- 26: ('Janet Jackson', 'Escapade', 'Rock'),
- 27: ('Janet Jackson', 'Love Will Never Do (Without You)', 'Rock'),
- 28: ('Janet Jackson', 'Miss You Much', 'Rock'),
- 29: ('Janet Jackson', 'Rhythm Nation', 'Rock'),
- 30: ('Janet Jackson', 'State Of The World', 'Rock'),
- 31: ('Janet Jackson', 'The Knowledge', 'Rock'),
- 32: ('Spyro Gyra', 'End of Romanticism', 'Jazz'),
- 33: ('Spyro Gyra', 'Heliopolis', 'Jazz'),
- 34: ('Spyro Gyra', 'Jubilee', 'Jazz'),
- 35: ('Spyro Gyra', 'Little Linda', 'Jazz'),
- 36: ('Spyro Gyra', 'Morning Dance', 'Jazz'),
- 37: ('Spyro Gyra', 'Song for Lorraine', 'Jazz'),
- 38: ('Yes', 'Owner Of A Lonely Heart', 'Rock'),
- 39: ('Yes', 'Rhythm Of Love', 'Rock'),
- 40: ('Cusco', 'Dream Catcher', 'New Age'),
- 41: ('Cusco', 'Geronimos Laughter', 'New Age'),
- 42: ('Cusco', 'Ghost Dance', 'New Age'),
- 43: ('Blue Man Group', 'Drumbone', 'New Age'),
- 44: ('Blue Man Group', 'Endless Column', 'New Age'),
- 45: ('Blue Man Group', 'Klein Mandelbrot', 'New Age'),
- 46: ('Kenny G', 'Silhouette', 'Jazz'),
- 47: ('Sade', 'Smooth Operator', 'Jazz'),
- 48: ('David Arkenstone', 'Papillon (On The Wings Of The Butterfly)', 'New Age'),
- 49: ('David Arkenstone', 'Stepping Stars', 'New Age'),
- 50: ('David Arkenstone', 'Carnation Lily Lily Rose', 'New Age'),
- 51: ('David Lanz', 'Behind The Waterfall', 'New Age'),
- 52: ('David Lanz', "Cristofori's Dream", 'New Age'),
- 53: ('David Lanz', 'Heartsounds', 'New Age'),
- 54: ('David Lanz', 'Leaves on the Seine', 'New Age'),
- 99: ('unknown artist', 'Unnamed song', 'Unknown') }
-
- class AssociateErrorTestCase(unittest.TestCase):
-
- def setUp(self):
- self.filename = self.__class__.__name__ + '.db'
- self.homeDir = get_new_environment_path()
- self.env = db.DBEnv()
- self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL)
-
-
- def tearDown(self):
- self.env.close()
- self.env = None
- test_support.rmtree(self.homeDir)
-
-
- def test00_associateDBError(self):
- if verbose:
- print '\n', '-=' * 30
- print 'Running %s.test00_associateDBError...' % self.__class__.__name__
-
- dupDB = db.DB(self.env)
- dupDB.set_flags(db.DB_DUP)
- dupDB.open(self.filename, 'primary', db.DB_BTREE, db.DB_CREATE)
- secDB = db.DB(self.env)
- secDB.open(self.filename, 'secondary', db.DB_BTREE, db.DB_CREATE)
-
- try:
-
- def f(a, b):
- return a + b
-
- dupDB.associate(secDB, f)
- except db.DBError:
- secDB.close()
- dupDB.close()
-
- secDB.close()
- dupDB.close()
- self.fail('DBError exception was expected')
-
-
-
- class AssociateTestCase(unittest.TestCase):
- keytype = ''
- envFlags = 0
- dbFlags = 0
-
- def setUp(self):
- self.filename = self.__class__.__name__ + '.db'
- self.homeDir = get_new_environment_path()
- self.env = db.DBEnv()
- self.env.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_THREAD | self.envFlags)
-
-
- def tearDown(self):
- self.closeDB()
- self.env.close()
- self.env = None
- test_support.rmtree(self.homeDir)
-
-
- def addDataToDB(self, d, txn = None):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = '%02d' % key
-
- d.put(key, '|'.join(value), txn = txn)
-
-
-
- def createDB(self, txn = None):
- self.cur = None
- self.secDB = None
- self.primary = db.DB(self.env)
- self.primary.set_get_returns_none(2)
- if db.version() >= (4, 1):
- self.primary.open(self.filename, 'primary', self.dbtype, db.DB_CREATE | db.DB_THREAD | self.dbFlags, txn = txn)
- else:
- self.primary.open(self.filename, 'primary', self.dbtype, db.DB_CREATE | db.DB_THREAD | self.dbFlags)
-
-
- def closeDB(self):
- if self.cur:
- self.cur.close()
- self.cur = None
-
- if self.secDB:
- self.secDB.close()
- self.secDB = None
-
- self.primary.close()
- self.primary = None
-
-
- def getDB(self):
- return self.primary
-
-
- def test01_associateWithDB(self):
- if verbose:
- print '\n', '-=' * 30
- print 'Running %s.test01_associateWithDB...' % self.__class__.__name__
-
- self.createDB()
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.set_get_returns_none(2)
- self.secDB.open(self.filename, 'secondary', db.DB_BTREE, db.DB_CREATE | db.DB_THREAD | self.dbFlags)
- self.getDB().associate(self.secDB, self.getGenre)
- self.addDataToDB(self.getDB())
- self.finish_test(self.secDB)
-
-
- def test02_associateAfterDB(self):
- if verbose:
- print '\n', '-=' * 30
- print 'Running %s.test02_associateAfterDB...' % self.__class__.__name__
-
- self.createDB()
- self.addDataToDB(self.getDB())
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.open(self.filename, 'secondary', db.DB_BTREE, db.DB_CREATE | db.DB_THREAD | self.dbFlags)
- self.getDB().associate(self.secDB, self.getGenre, db.DB_CREATE)
- self.finish_test(self.secDB)
-
-
- def finish_test(self, secDB, txn = None):
- vals = secDB.pget('Blues', txn = txn)
- self.assertEqual(vals, None, vals)
- vals = secDB.pget('Unknown', txn = txn)
- if not vals[0] == 99:
- pass
- self.assert_(vals[0] == '99', vals)
- vals[1].index('Unknown')
- vals[1].index('Unnamed')
- vals[1].index('unknown')
- if verbose:
- print 'Primary key traversal:'
-
- self.cur = self.getDB().cursor(txn)
- count = 0
- rec = self.cur.first()
- while rec is not None:
- if type(self.keytype) == type(''):
- self.assert_(int(rec[0]))
- elif rec[0]:
- pass
- self.assert_(type(rec[0]) == type(0))
- count = count + 1
- if verbose:
- print rec
-
- rec = getattr(self.cur, 'next')()
- self.assertEqual(count, len(musicdata))
- if verbose:
- print 'Secondary key traversal:'
-
- self.cur = secDB.cursor(txn)
- count = 0
- vals = self.cur.pget('Unknown', flags = db.DB_LAST)
- if not vals[1] == 99:
- pass
- self.assert_(vals[1] == '99', vals)
- self.assertEqual(vals[0], 'Unknown')
- vals[2].index('Unknown')
- vals[2].index('Unnamed')
- vals[2].index('unknown')
- vals = self.cur.pget('Unknown', data = 'wrong value', flags = db.DB_GET_BOTH)
- self.assertEqual(vals, None, vals)
- rec = self.cur.first()
- self.assertEqual(rec[0], 'Jazz')
- while rec is not None:
- count = count + 1
- if verbose:
- print rec
-
- rec = getattr(self.cur, 'next')()
- self.assertEqual(count, len(musicdata) - 1)
- self.cur = None
-
-
- def getGenre(self, priKey, priData):
- self.assertEqual(type(priData), type(''))
- genre = priData.split('|')[2]
- if verbose:
- print 'getGenre key: %r data: %r' % (priKey, priData)
-
- if genre == 'Blues':
- return db.DB_DONOTINDEX
- return genre
-
-
-
- class AssociateHashTestCase(AssociateTestCase):
- dbtype = db.DB_HASH
-
-
- class AssociateBTreeTestCase(AssociateTestCase):
- dbtype = db.DB_BTREE
-
-
- class AssociateRecnoTestCase(AssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-
- class AssociateBTreeTxnTestCase(AssociateBTreeTestCase):
- envFlags = db.DB_INIT_TXN
- dbFlags = 0
-
- def txn_finish_test(self, sDB, txn):
-
- try:
- self.finish_test(sDB, txn = txn)
- finally:
- if self.cur:
- self.cur.close()
- self.cur = None
-
- if txn:
- txn.commit()
-
-
-
-
- def test13_associate_in_transaction(self):
- if verbose:
- print '\n', '-=' * 30
- print 'Running %s.test13_associateAutoCommit...' % self.__class__.__name__
-
- txn = self.env.txn_begin()
-
- try:
- self.createDB(txn = txn)
- self.secDB = db.DB(self.env)
- self.secDB.set_flags(db.DB_DUP)
- self.secDB.set_get_returns_none(2)
- self.secDB.open(self.filename, 'secondary', db.DB_BTREE, db.DB_CREATE | db.DB_THREAD, txn = txn)
- if db.version() >= (4, 1):
- self.getDB().associate(self.secDB, self.getGenre, txn = txn)
- else:
- self.getDB().associate(self.secDB, self.getGenre)
- self.addDataToDB(self.getDB(), txn = txn)
- except:
- txn.abort()
- raise
-
- self.txn_finish_test(self.secDB, txn = txn)
-
-
-
- class ShelveAssociateTestCase(AssociateTestCase):
-
- def createDB(self):
- self.primary = dbshelve.open(self.filename, dbname = 'primary', dbenv = self.env, filetype = self.dbtype)
-
-
- def addDataToDB(self, d):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = '%02d' % key
-
- d.put(key, value)
-
-
-
- def getGenre(self, priKey, priData):
- self.assertEqual(type(priData), type(()))
- if verbose:
- print 'getGenre key: %r data: %r' % (priKey, priData)
-
- genre = priData[2]
- if genre == 'Blues':
- return db.DB_DONOTINDEX
- return genre
-
-
-
- class ShelveAssociateHashTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_HASH
-
-
- class ShelveAssociateBTreeTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_BTREE
-
-
- class ShelveAssociateRecnoTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-
- class ThreadedAssociateTestCase(AssociateTestCase):
-
- def addDataToDB(self, d):
- t1 = Thread(target = self.writer1, args = (d,))
- t2 = Thread(target = self.writer2, args = (d,))
- t1.setDaemon(True)
- t2.setDaemon(True)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
-
-
- def writer1(self, d):
- for key, value in musicdata.items():
- if type(self.keytype) == type(''):
- key = '%02d' % key
-
- d.put(key, '|'.join(value))
-
-
-
- def writer2(self, d):
- for x in range(100, 600):
- key = 'z%2d' % x
- value = [
- key] * 4
- d.put(key, '|'.join(value))
-
-
-
-
- class ThreadedAssociateHashTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_HASH
-
-
- class ThreadedAssociateBTreeTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_BTREE
-
-
- class ThreadedAssociateRecnoTestCase(ShelveAssociateTestCase):
- dbtype = db.DB_RECNO
- keytype = 0
-
-
- def test_suite():
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(AssociateErrorTestCase))
- suite.addTest(unittest.makeSuite(AssociateHashTestCase))
- suite.addTest(unittest.makeSuite(AssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(AssociateRecnoTestCase))
- if db.version() >= (4, 1):
- suite.addTest(unittest.makeSuite(AssociateBTreeTxnTestCase))
-
- suite.addTest(unittest.makeSuite(ShelveAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ShelveAssociateRecnoTestCase))
- if have_threads:
- suite.addTest(unittest.makeSuite(ThreadedAssociateHashTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateBTreeTestCase))
- suite.addTest(unittest.makeSuite(ThreadedAssociateRecnoTestCase))
-
- return suite
-
- if __name__ == '__main__':
- unittest.main(defaultTest = 'test_suite')
-
-