home *** CD-ROM | disk | FTP | other *** search
/ PC Professionell 2004 December / PCpro_2004_12.ISO / files / webserver / xampp / xampp-python-addon-1.4.9-installer.exe / test_thread.py < prev    next >
Encoding:
Python Source  |  2003-07-21  |  15.6 KB  |  501 lines

  1. """TestCases for multi-threaded access to a DB.
  2. """
  3.  
  4. import os
  5. import sys
  6. import time
  7. import errno
  8. import shutil
  9. import tempfile
  10. from pprint import pprint
  11. from whrandom import random
  12.  
  13. try:
  14.     True, False
  15. except NameError:
  16.     True = 1
  17.     False = 0
  18.  
  19. DASH = '-'
  20.  
  21. try:
  22.     from threading import Thread, currentThread
  23.     have_threads = True
  24. except ImportError:
  25.     have_threads = False
  26.  
  27. import unittest
  28. from test_all import verbose
  29.  
  30. try:
  31.     # For Python 2.3
  32.     from bsddb import db, dbutils
  33. except ImportError:
  34.     # For earlier Pythons w/distutils pybsddb
  35.     from bsddb3 import db, dbutils
  36.  
  37.  
  38. #----------------------------------------------------------------------
  39.  
  40. class BaseThreadedTestCase(unittest.TestCase):
  41.     dbtype       = db.DB_UNKNOWN  # must be set in derived class
  42.     dbopenflags  = 0
  43.     dbsetflags   = 0
  44.     envflags     = 0
  45.  
  46.     def setUp(self):
  47.         if verbose:
  48.             dbutils._deadlock_VerboseFile = sys.stdout
  49.  
  50.         homeDir = os.path.join(os.path.dirname(sys.argv[0]), 'db_home')
  51.         self.homeDir = homeDir
  52.         try:
  53.             os.mkdir(homeDir)
  54.         except OSError, e:
  55.             if e.errno <> errno.EEXIST: raise
  56.         self.env = db.DBEnv()
  57.         self.setEnvOpts()
  58.         self.env.open(homeDir, self.envflags | db.DB_CREATE)
  59.  
  60.         self.filename = self.__class__.__name__ + '.db'
  61.         self.d = db.DB(self.env)
  62.         if self.dbsetflags:
  63.             self.d.set_flags(self.dbsetflags)
  64.         self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE)
  65.  
  66.     def tearDown(self):
  67.         self.d.close()
  68.         self.env.close()
  69.         shutil.rmtree(self.homeDir)
  70.  
  71.     def setEnvOpts(self):
  72.         pass
  73.  
  74.     def makeData(self, key):
  75.         return DASH.join([key] * 5)
  76.  
  77.  
  78. #----------------------------------------------------------------------
  79.  
  80.  
  81. class ConcurrentDataStoreBase(BaseThreadedTestCase):
  82.     dbopenflags = db.DB_THREAD
  83.     envflags    = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
  84.     readers     = 0 # derived class should set
  85.     writers     = 0
  86.     records     = 1000
  87.  
  88.     def test01_1WriterMultiReaders(self):
  89.         if verbose:
  90.             print '\n', '-=' * 30
  91.             print "Running %s.test01_1WriterMultiReaders..." % \
  92.                   self.__class__.__name__
  93.  
  94.         threads = []
  95.         for x in range(self.writers):
  96.             wt = Thread(target = self.writerThread,
  97.                         args = (self.d, self.records, x),
  98.                         name = 'writer %d' % x,
  99.                         )#verbose = verbose)
  100.             threads.append(wt)
  101.  
  102.         for x in range(self.readers):
  103.             rt = Thread(target = self.readerThread,
  104.                         args = (self.d, x),
  105.                         name = 'reader %d' % x,
  106.                         )#verbose = verbose)
  107.             threads.append(rt)
  108.  
  109.         for t in threads:
  110.             t.start()
  111.         for t in threads:
  112.             t.join()
  113.  
  114.     def writerThread(self, d, howMany, writerNum):
  115.         #time.sleep(0.01 * writerNum + 0.01)
  116.         name = currentThread().getName()
  117.         start = howMany * writerNum
  118.         stop = howMany * (writerNum + 1) - 1
  119.         if verbose:
  120.             print "%s: creating records %d - %d" % (name, start, stop)
  121.  
  122.         for x in range(start, stop):
  123.             key = '%04d' % x
  124.             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
  125.                                  max_retries=12)
  126.             if verbose and x % 100 == 0:
  127.                 print "%s: records %d - %d finished" % (name, start, x)
  128.  
  129.         if verbose:
  130.             print "%s: finished creating records" % name
  131.  
  132. ##         # Each write-cursor will be exclusive, the only one that can update the DB...
  133. ##         if verbose: print "%s: deleting a few records" % name
  134. ##         c = d.cursor(flags = db.DB_WRITECURSOR)
  135. ##         for x in range(10):
  136. ##             key = int(random() * howMany) + start
  137. ##             key = '%04d' % key
  138. ##             if d.has_key(key):
  139. ##                 c.set(key)
  140. ##                 c.delete()
  141.  
  142. ##         c.close()
  143.         if verbose:
  144.             print "%s: thread finished" % name
  145.  
  146.     def readerThread(self, d, readerNum):
  147.         time.sleep(0.01 * readerNum)
  148.         name = currentThread().getName()
  149.  
  150.         for loop in range(5):
  151.             c = d.cursor()
  152.             count = 0
  153.             rec = c.first()
  154.             while rec:
  155.                 count += 1
  156.                 key, data = rec
  157.                 self.assertEqual(self.makeData(key), data)
  158.                 rec = c.next()
  159.             if verbose:
  160.                 print "%s: found %d records" % (name, count)
  161.             c.close()
  162.             time.sleep(0.05)
  163.  
  164.         if verbose:
  165.             print "%s: thread finished" % name
  166.  
  167.  
  168. class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
  169.     dbtype  = db.DB_BTREE
  170.     writers = 2
  171.     readers = 10
  172.     records = 1000
  173.  
  174.  
  175. class HashConcurrentDataStore(ConcurrentDataStoreBase):
  176.     dbtype  = db.DB_HASH
  177.     writers = 2
  178.     readers = 10
  179.     records = 1000
  180.  
  181.  
  182. #----------------------------------------------------------------------
  183.  
  184. class SimpleThreadedBase(BaseThreadedTestCase):
  185.     dbopenflags = db.DB_THREAD
  186.     envflags    = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
  187.     readers = 5
  188.     writers = 3
  189.     records = 1000
  190.  
  191.     def setEnvOpts(self):
  192.         self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
  193.  
  194.     def test02_SimpleLocks(self):
  195.         if verbose:
  196.             print '\n', '-=' * 30
  197.             print "Running %s.test02_SimpleLocks..." % self.__class__.__name__
  198.  
  199.         threads = []
  200.         for x in range(self.writers):
  201.             wt = Thread(target = self.writerThread,
  202.                         args = (self.d, self.records, x),
  203.                         name = 'writer %d' % x,
  204.                         )#verbose = verbose)
  205.             threads.append(wt)
  206.         for x in range(self.readers):
  207.             rt = Thread(target = self.readerThread,
  208.                         args = (self.d, x),
  209.                         name = 'reader %d' % x,
  210.                         )#verbose = verbose)
  211.             threads.append(rt)
  212.  
  213.         for t in threads:
  214.             t.start()
  215.         for t in threads:
  216.             t.join()
  217.  
  218.     def writerThread(self, d, howMany, writerNum):
  219.         name = currentThread().getName()
  220.         start = howMany * writerNum
  221.         stop = howMany * (writerNum + 1) - 1
  222.         if verbose:
  223.             print "%s: creating records %d - %d" % (name, start, stop)
  224.  
  225.         # create a bunch of records
  226.         for x in xrange(start, stop):
  227.             key = '%04d' % x
  228.             dbutils.DeadlockWrap(d.put, key, self.makeData(key),
  229.                                  max_retries=12)
  230.  
  231.             if verbose and x % 100 == 0:
  232.                 print "%s: records %d - %d finished" % (name, start, x)
  233.  
  234.             # do a bit or reading too
  235.             if random() <= 0.05:
  236.                 for y in xrange(start, x):
  237.                     key = '%04d' % x
  238.                     data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
  239.                     self.assertEqual(data, self.makeData(key))
  240.  
  241.         # flush them
  242.         try:
  243.             dbutils.DeadlockWrap(d.sync, max_retries=12)
  244.         except db.DBIncompleteError, val:
  245.             if verbose:
  246.                 print "could not complete sync()..."
  247.  
  248.         # read them back, deleting a few
  249.         for x in xrange(start, stop):
  250.             key = '%04d' % x
  251.             data = dbutils.DeadlockWrap(d.get, key, max_retries=12)
  252.             if verbose and x % 100 == 0:
  253.                 print "%s: fetched record (%s, %s)" % (name, key, data)
  254.             self.assertEqual(data, self.makeData(key))
  255.             if random() <= 0.10:
  256.                 dbutils.DeadlockWrap(d.delete, key, max_retries=12)
  257.                 if verbose:
  258.                     print "%s: deleted record %s" % (name, key)
  259.  
  260.         if verbose:
  261.             print "%s: thread finished" % name
  262.  
  263.     def readerThread(self, d, readerNum):
  264.         time.sleep(0.01 * readerNum)
  265.         name = currentThread().getName()
  266.  
  267.         for loop in range(5):
  268.             c = d.cursor()
  269.             count = 0
  270.             rec = dbutils.DeadlockWrap(c.first, max_retries=10)
  271.             while rec:
  272.                 count += 1
  273.                 key, data = rec
  274.                 self.assertEqual(self.makeData(key), data)
  275.                 rec = dbutils.DeadlockWrap(c.next, max_retries=10)
  276.             if verbose:
  277.                 print "%s: found %d records" % (name, count)
  278.             c.close()
  279.             time.sleep(0.05)
  280.  
  281.         if verbose:
  282.             print "%s: thread finished" % name
  283.  
  284.  
  285. class BTreeSimpleThreaded(SimpleThreadedBase):
  286.     dbtype = db.DB_BTREE
  287.  
  288.  
  289. class HashSimpleThreaded(SimpleThreadedBase):
  290.     dbtype = db.DB_HASH
  291.  
  292.  
  293. #----------------------------------------------------------------------
  294.  
  295.  
  296. class ThreadedTransactionsBase(BaseThreadedTestCase):
  297.     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
  298.     envflags    = (db.DB_THREAD |
  299.                    db.DB_INIT_MPOOL |
  300.                    db.DB_INIT_LOCK |
  301.                    db.DB_INIT_LOG |
  302.                    db.DB_INIT_TXN
  303.                    )
  304.     readers = 0
  305.     writers = 0
  306.     records = 2000
  307.     txnFlag = 0
  308.  
  309.     def setEnvOpts(self):
  310.         #self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
  311.         pass
  312.  
  313.     def test03_ThreadedTransactions(self):
  314.         if verbose:
  315.             print '\n', '-=' * 30
  316.             print "Running %s.test03_ThreadedTransactions..." % \
  317.                   self.__class__.__name__
  318.  
  319.         threads = []
  320.         for x in range(self.writers):
  321.             wt = Thread(target = self.writerThread,
  322.                         args = (self.d, self.records, x),
  323.                         name = 'writer %d' % x,
  324.                         )#verbose = verbose)
  325.             threads.append(wt)
  326.  
  327.         for x in range(self.readers):
  328.             rt = Thread(target = self.readerThread,
  329.                         args = (self.d, x),
  330.                         name = 'reader %d' % x,
  331.                         )#verbose = verbose)
  332.             threads.append(rt)
  333.  
  334.         dt = Thread(target = self.deadlockThread)
  335.         dt.start()
  336.  
  337.         for t in threads:
  338.             t.start()
  339.         for t in threads:
  340.             t.join()
  341.  
  342.         self.doLockDetect = False
  343.         dt.join()
  344.  
  345.     def doWrite(self, d, name, start, stop):
  346.         finished = False
  347.         while not finished:
  348.             try:
  349.                 txn = self.env.txn_begin(None, self.txnFlag)
  350.                 for x in range(start, stop):
  351.                     key = '%04d' % x
  352.                     d.put(key, self.makeData(key), txn)
  353.                     if verbose and x % 100 == 0:
  354.                         print "%s: records %d - %d finished" % (name, start, x)
  355.                 txn.commit()
  356.                 finished = True
  357.             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
  358.                 if verbose:
  359.                     print "%s: Aborting transaction (%s)" % (name, val[1])
  360.                 txn.abort()
  361.                 time.sleep(0.05)
  362.  
  363.     def writerThread(self, d, howMany, writerNum):
  364.         name = currentThread().getName()
  365.         start = howMany * writerNum
  366.         stop = howMany * (writerNum + 1) - 1
  367.         if verbose:
  368.             print "%s: creating records %d - %d" % (name, start, stop)
  369.  
  370.         step = 100
  371.         for x in range(start, stop, step):
  372.             self.doWrite(d, name, x, min(stop, x+step))
  373.  
  374.         if verbose:
  375.             print "%s: finished creating records" % name
  376.         if verbose:
  377.             print "%s: deleting a few records" % name
  378.  
  379.         finished = False
  380.         while not finished:
  381.             try:
  382.                 recs = []
  383.                 txn = self.env.txn_begin(None, self.txnFlag)
  384.                 for x in range(10):
  385.                     key = int(random() * howMany) + start
  386.                     key = '%04d' % key
  387.                     data = d.get(key, None, txn, db.DB_RMW)
  388.                     if data is not None:
  389.                         d.delete(key, txn)
  390.                         recs.append(key)
  391.                 txn.commit()
  392.                 finished = True
  393.                 if verbose:
  394.                     print "%s: deleted records %s" % (name, recs)
  395.             except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
  396.                 if verbose:
  397.                     print "%s: Aborting transaction (%s)" % (name, val[1])
  398.                 txn.abort()
  399.                 time.sleep(0.05)
  400.  
  401.         if verbose:
  402.             print "%s: thread finished" % name
  403.  
  404.     def readerThread(self, d, readerNum):
  405.         time.sleep(0.01 * readerNum + 0.05)
  406.         name = currentThread().getName()
  407.  
  408.         for loop in range(5):
  409.             finished = False
  410.             while not finished:
  411.                 try:
  412.                     txn = self.env.txn_begin(None, self.txnFlag)
  413.                     c = d.cursor(txn)
  414.                     count = 0
  415.                     rec = c.first()
  416.                     while rec:
  417.                         count += 1
  418.                         key, data = rec
  419.                         self.assertEqual(self.makeData(key), data)
  420.                         rec = c.next()
  421.                     if verbose: print "%s: found %d records" % (name, count)
  422.                     c.close()
  423.                     txn.commit()
  424.                     finished = True
  425.                 except (db.DBLockDeadlockError, db.DBLockNotGrantedError), val:
  426.                     if verbose:
  427.                         print "%s: Aborting transaction (%s)" % (name, val[1])
  428.                     c.close()
  429.                     txn.abort()
  430.                     time.sleep(0.05)
  431.  
  432.             time.sleep(0.05)
  433.  
  434.         if verbose:
  435.             print "%s: thread finished" % name
  436.  
  437.     def deadlockThread(self):
  438.         self.doLockDetect = True
  439.         while self.doLockDetect:
  440.             time.sleep(0.5)
  441.             try:
  442.                 aborted = self.env.lock_detect(
  443.                     db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
  444.                 if verbose and aborted:
  445.                     print "deadlock: Aborted %d deadlocked transaction(s)" \
  446.                           % aborted
  447.             except db.DBError:
  448.                 pass
  449.  
  450.  
  451. class BTreeThreadedTransactions(ThreadedTransactionsBase):
  452.     dbtype = db.DB_BTREE
  453.     writers = 3
  454.     readers = 5
  455.     records = 2000
  456.  
  457. class HashThreadedTransactions(ThreadedTransactionsBase):
  458.     dbtype = db.DB_HASH
  459.     writers = 1
  460.     readers = 5
  461.     records = 2000
  462.  
  463. class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
  464.     dbtype = db.DB_BTREE
  465.     writers = 3
  466.     readers = 5
  467.     records = 2000
  468.     txnFlag = db.DB_TXN_NOWAIT
  469.  
  470. class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
  471.     dbtype = db.DB_HASH
  472.     writers = 1
  473.     readers = 5
  474.     records = 2000
  475.     txnFlag = db.DB_TXN_NOWAIT
  476.  
  477.  
  478. #----------------------------------------------------------------------
  479.  
  480. def test_suite():
  481.     suite = unittest.TestSuite()
  482.  
  483.     if have_threads:
  484.         suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
  485.         suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
  486.         suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
  487.         suite.addTest(unittest.makeSuite(HashSimpleThreaded))
  488.         suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
  489.         suite.addTest(unittest.makeSuite(HashThreadedTransactions))
  490.         suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
  491.         suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
  492.  
  493.     else:
  494.         print "Threads not available, skipping thread tests."
  495.  
  496.     return suite
  497.  
  498.  
  499. if __name__ == '__main__':
  500.     unittest.main(defaultTest='test_suite')
  501.