home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_303 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  14.8 KB  |  531 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import os
  5. import sys
  6. import time
  7. import errno
  8. from random import random
  9. DASH = '-'
  10.  
  11. try:
  12.     WindowsError
  13. except NameError:
  14.     
  15.     class WindowsError(Exception):
  16.         pass
  17.  
  18.  
  19. import unittest
  20. from test_all import db, dbutils, test_support, verbose, have_threads, get_new_environment_path, get_new_database_path
  21. if have_threads:
  22.     from threading import Thread
  23.     import sys
  24.     if sys.version_info[0] < 3:
  25.         from threading import currentThread
  26.     else:
  27.         from threading import current_thread as currentThread
  28.  
  29.  
  30. class BaseThreadedTestCase(unittest.TestCase):
  31.     dbtype = db.DB_UNKNOWN
  32.     dbopenflags = 0
  33.     dbsetflags = 0
  34.     envflags = 0
  35.     import sys
  36.     if sys.version_info[:3] < (2, 4, 0):
  37.         
  38.         def assertTrue(self, expr, msg = None):
  39.             self.failUnless(expr, msg = msg)
  40.  
  41.     
  42.     
  43.     def setUp(self):
  44.         if verbose:
  45.             dbutils._deadlock_VerboseFile = sys.stdout
  46.         
  47.         self.homeDir = get_new_environment_path()
  48.         self.env = db.DBEnv()
  49.         self.setEnvOpts()
  50.         self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
  51.         self.filename = self.__class__.__name__ + '.db'
  52.         self.d = db.DB(self.env)
  53.         if self.dbsetflags:
  54.             self.d.set_flags(self.dbsetflags)
  55.         
  56.         self.d.open(self.filename, self.dbtype, self.dbopenflags | db.DB_CREATE)
  57.  
  58.     
  59.     def tearDown(self):
  60.         self.d.close()
  61.         self.env.close()
  62.         test_support.rmtree(self.homeDir)
  63.  
  64.     
  65.     def setEnvOpts(self):
  66.         pass
  67.  
  68.     
  69.     def makeData(self, key):
  70.         return DASH.join([
  71.             key] * 5)
  72.  
  73.  
  74.  
  75. class ConcurrentDataStoreBase(BaseThreadedTestCase):
  76.     dbopenflags = db.DB_THREAD
  77.     envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL
  78.     readers = 0
  79.     writers = 0
  80.     records = 1000
  81.     
  82.     def test01_1WriterMultiReaders(self):
  83.         if verbose:
  84.             print '\n', '-=' * 30
  85.             print 'Running %s.test01_1WriterMultiReaders...' % self.__class__.__name__
  86.         
  87.         keys = range(self.records)
  88.         import random
  89.         random.shuffle(keys)
  90.         records_per_writer = self.records // self.writers
  91.         readers_per_writer = self.readers // self.writers
  92.         self.assertEqual(self.records, self.writers * records_per_writer)
  93.         self.assertEqual(self.readers, self.writers * readers_per_writer)
  94.         self.assertTrue(records_per_writer % readers_per_writer == 0)
  95.         readers = []
  96.         for x in xrange(self.readers):
  97.             rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x)
  98.             import sys
  99.             if sys.version_info[0] < 3:
  100.                 rt.setDaemon(True)
  101.             else:
  102.                 rt.daemon = True
  103.             readers.append(rt)
  104.         
  105.         writers = []
  106.         for x in xrange(self.writers):
  107.             a = keys[records_per_writer * x:records_per_writer * (x + 1)]
  108.             a.sort()
  109.             b = readers[readers_per_writer * x:readers_per_writer * (x + 1)]
  110.             wt = Thread(target = self.writerThread, args = (self.d, a, b), name = 'writer %d' % x)
  111.             writers.append(wt)
  112.         
  113.         for t in writers:
  114.             import sys
  115.             if sys.version_info[0] < 3:
  116.                 t.setDaemon(True)
  117.             else:
  118.                 t.daemon = True
  119.             t.start()
  120.         
  121.         for t in writers:
  122.             t.join()
  123.         
  124.         for t in readers:
  125.             t.join()
  126.         
  127.  
  128.     
  129.     def writerThread(self, d, keys, readers):
  130.         import sys
  131.         if sys.version_info[0] < 3:
  132.             name = currentThread().getName()
  133.         else:
  134.             name = currentThread().name
  135.         if verbose:
  136.             print '%s: creating records %d - %d' % (name, start, stop)
  137.         
  138.         count = len(keys) // len(readers)
  139.         count2 = count
  140.         for x in keys:
  141.             key = '%04d' % x
  142.             dbutils.DeadlockWrap(d.put, key, self.makeData(key), max_retries = 12)
  143.             if verbose and x % 100 == 0:
  144.                 print '%s: records %d - %d finished' % (name, start, x)
  145.             
  146.             count2 -= 1
  147.             if not count2:
  148.                 readers.pop().start()
  149.                 count2 = count
  150.                 continue
  151.         
  152.         if verbose:
  153.             print '%s: finished creating records' % name
  154.         
  155.         if verbose:
  156.             print '%s: thread finished' % name
  157.         
  158.  
  159.     
  160.     def readerThread(self, d, readerNum):
  161.         import sys
  162.         if sys.version_info[0] < 3:
  163.             name = currentThread().getName()
  164.         else:
  165.             name = currentThread().name
  166.         for i in xrange(5):
  167.             c = d.cursor()
  168.             count = 0
  169.             rec = c.first()
  170.             while rec:
  171.                 count += 1
  172.                 (key, data) = rec
  173.                 self.assertEqual(self.makeData(key), data)
  174.                 rec = c.next()
  175.             if verbose:
  176.                 print '%s: found %d records' % (name, count)
  177.             
  178.             c.close()
  179.         
  180.         if verbose:
  181.             print '%s: thread finished' % name
  182.         
  183.  
  184.  
  185.  
  186. class BTreeConcurrentDataStore(ConcurrentDataStoreBase):
  187.     dbtype = db.DB_BTREE
  188.     writers = 2
  189.     readers = 10
  190.     records = 1000
  191.  
  192.  
  193. class HashConcurrentDataStore(ConcurrentDataStoreBase):
  194.     dbtype = db.DB_HASH
  195.     writers = 2
  196.     readers = 10
  197.     records = 1000
  198.  
  199.  
  200. class SimpleThreadedBase(BaseThreadedTestCase):
  201.     dbopenflags = db.DB_THREAD
  202.     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK
  203.     readers = 10
  204.     writers = 2
  205.     records = 1000
  206.     
  207.     def setEnvOpts(self):
  208.         self.env.set_lk_detect(db.DB_LOCK_DEFAULT)
  209.  
  210.     
  211.     def test02_SimpleLocks(self):
  212.         if verbose:
  213.             print '\n', '-=' * 30
  214.             print 'Running %s.test02_SimpleLocks...' % self.__class__.__name__
  215.         
  216.         keys = range(self.records)
  217.         import random
  218.         random.shuffle(keys)
  219.         records_per_writer = self.records // self.writers
  220.         readers_per_writer = self.readers // self.writers
  221.         self.assertEqual(self.records, self.writers * records_per_writer)
  222.         self.assertEqual(self.readers, self.writers * readers_per_writer)
  223.         self.assertTrue(records_per_writer % readers_per_writer == 0)
  224.         readers = []
  225.         for x in xrange(self.readers):
  226.             rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x)
  227.             import sys
  228.             if sys.version_info[0] < 3:
  229.                 rt.setDaemon(True)
  230.             else:
  231.                 rt.daemon = True
  232.             readers.append(rt)
  233.         
  234.         writers = []
  235.         for x in xrange(self.writers):
  236.             a = keys[records_per_writer * x:records_per_writer * (x + 1)]
  237.             a.sort()
  238.             b = readers[readers_per_writer * x:readers_per_writer * (x + 1)]
  239.             wt = Thread(target = self.writerThread, args = (self.d, a, b), name = 'writer %d' % x)
  240.             writers.append(wt)
  241.         
  242.         for t in writers:
  243.             import sys
  244.             if sys.version_info[0] < 3:
  245.                 t.setDaemon(True)
  246.             else:
  247.                 t.daemon = True
  248.             t.start()
  249.         
  250.         for t in writers:
  251.             t.join()
  252.         
  253.         for t in readers:
  254.             t.join()
  255.         
  256.  
  257.     
  258.     def writerThread(self, d, keys, readers):
  259.         import sys
  260.         if sys.version_info[0] < 3:
  261.             name = currentThread().getName()
  262.         else:
  263.             name = currentThread().name
  264.         if verbose:
  265.             print '%s: creating records %d - %d' % (name, start, stop)
  266.         
  267.         count = len(keys) // len(readers)
  268.         count2 = count
  269.         for x in keys:
  270.             key = '%04d' % x
  271.             dbutils.DeadlockWrap(d.put, key, self.makeData(key), max_retries = 12)
  272.             if verbose and x % 100 == 0:
  273.                 print '%s: records %d - %d finished' % (name, start, x)
  274.             
  275.             count2 -= 1
  276.             if not count2:
  277.                 readers.pop().start()
  278.                 count2 = count
  279.                 continue
  280.         
  281.         if verbose:
  282.             print '%s: thread finished' % name
  283.         
  284.  
  285.     
  286.     def readerThread(self, d, readerNum):
  287.         import sys
  288.         if sys.version_info[0] < 3:
  289.             name = currentThread().getName()
  290.         else:
  291.             name = currentThread().name
  292.         c = d.cursor()
  293.         count = 0
  294.         rec = dbutils.DeadlockWrap(c.first, max_retries = 10)
  295.         while rec:
  296.             count += 1
  297.             (key, data) = rec
  298.             self.assertEqual(self.makeData(key), data)
  299.             rec = dbutils.DeadlockWrap(c.next, max_retries = 10)
  300.         if verbose:
  301.             print '%s: found %d records' % (name, count)
  302.         
  303.         c.close()
  304.         if verbose:
  305.             print '%s: thread finished' % name
  306.         
  307.  
  308.  
  309.  
  310. class BTreeSimpleThreaded(SimpleThreadedBase):
  311.     dbtype = db.DB_BTREE
  312.  
  313.  
  314. class HashSimpleThreaded(SimpleThreadedBase):
  315.     dbtype = db.DB_HASH
  316.  
  317.  
  318. class ThreadedTransactionsBase(BaseThreadedTestCase):
  319.     dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
  320.     envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK | db.DB_INIT_LOG | db.DB_INIT_TXN
  321.     readers = 0
  322.     writers = 0
  323.     records = 2000
  324.     txnFlag = 0
  325.     
  326.     def setEnvOpts(self):
  327.         pass
  328.  
  329.     
  330.     def test03_ThreadedTransactions(self):
  331.         if verbose:
  332.             print '\n', '-=' * 30
  333.             print 'Running %s.test03_ThreadedTransactions...' % self.__class__.__name__
  334.         
  335.         keys = range(self.records)
  336.         import random
  337.         random.shuffle(keys)
  338.         records_per_writer = self.records // self.writers
  339.         readers_per_writer = self.readers // self.writers
  340.         self.assertEqual(self.records, self.writers * records_per_writer)
  341.         self.assertEqual(self.readers, self.writers * readers_per_writer)
  342.         self.assertTrue(records_per_writer % readers_per_writer == 0)
  343.         readers = []
  344.         for x in xrange(self.readers):
  345.             rt = Thread(target = self.readerThread, args = (self.d, x), name = 'reader %d' % x)
  346.             import sys
  347.             if sys.version_info[0] < 3:
  348.                 rt.setDaemon(True)
  349.             else:
  350.                 rt.daemon = True
  351.             readers.append(rt)
  352.         
  353.         writers = []
  354.         for x in xrange(self.writers):
  355.             a = keys[records_per_writer * x:records_per_writer * (x + 1)]
  356.             b = readers[readers_per_writer * x:readers_per_writer * (x + 1)]
  357.             wt = Thread(target = self.writerThread, args = (self.d, a, b), name = 'writer %d' % x)
  358.             writers.append(wt)
  359.         
  360.         dt = Thread(target = self.deadlockThread)
  361.         import sys
  362.         if sys.version_info[0] < 3:
  363.             dt.setDaemon(True)
  364.         else:
  365.             dt.daemon = True
  366.         dt.start()
  367.         for t in writers:
  368.             import sys
  369.             if sys.version_info[0] < 3:
  370.                 t.setDaemon(True)
  371.             else:
  372.                 t.daemon = True
  373.             t.start()
  374.         
  375.         for t in writers:
  376.             t.join()
  377.         
  378.         for t in readers:
  379.             t.join()
  380.         
  381.         self.doLockDetect = False
  382.         dt.join()
  383.  
  384.     
  385.     def writerThread(self, d, keys, readers):
  386.         import sys
  387.         if sys.version_info[0] < 3:
  388.             name = currentThread().getName()
  389.         else:
  390.             name = currentThread().name
  391.         count = len(keys) // len(readers)
  392.         while len(keys):
  393.             
  394.             try:
  395.                 txn = self.env.txn_begin(None, self.txnFlag)
  396.                 keys2 = keys[:count]
  397.                 for x in keys2:
  398.                     key = '%04d' % x
  399.                     d.put(key, self.makeData(key), txn)
  400.                     if verbose and x % 100 == 0:
  401.                         print '%s: records %d - %d finished' % (name, start, x)
  402.                         continue
  403.                 
  404.                 txn.commit()
  405.                 keys = keys[count:]
  406.                 readers.pop().start()
  407.             continue
  408.             except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
  409.                 val = None
  410.                 if verbose:
  411.                     print '%s: Aborting transaction (%s)' % (name, val[1])
  412.                 
  413.                 txn.abort()
  414.                 continue
  415.             
  416.  
  417.             None<EXCEPTION MATCH>(db.DBLockDeadlockError, db.DBLockNotGrantedError)
  418.         if verbose:
  419.             print '%s: thread finished' % name
  420.         
  421.  
  422.     
  423.     def readerThread(self, d, readerNum):
  424.         import sys
  425.         if sys.version_info[0] < 3:
  426.             name = currentThread().getName()
  427.         else:
  428.             name = currentThread().name
  429.         finished = False
  430.         while not finished:
  431.             
  432.             try:
  433.                 txn = self.env.txn_begin(None, self.txnFlag)
  434.                 c = d.cursor(txn)
  435.                 count = 0
  436.                 rec = c.first()
  437.                 while rec:
  438.                     count += 1
  439.                     (key, data) = rec
  440.                     self.assertEqual(self.makeData(key), data)
  441.                     rec = c.next()
  442.                 if verbose:
  443.                     print '%s: found %d records' % (name, count)
  444.                 
  445.                 c.close()
  446.                 txn.commit()
  447.                 finished = True
  448.             continue
  449.             except (db.DBLockDeadlockError, db.DBLockNotGrantedError):
  450.                 val = None
  451.                 if verbose:
  452.                     print '%s: Aborting transaction (%s)' % (name, val[1])
  453.                 
  454.                 c.close()
  455.                 txn.abort()
  456.                 continue
  457.             
  458.  
  459.             None<EXCEPTION MATCH>(db.DBLockDeadlockError, db.DBLockNotGrantedError)
  460.         if verbose:
  461.             print '%s: thread finished' % name
  462.         
  463.  
  464.     
  465.     def deadlockThread(self):
  466.         self.doLockDetect = True
  467.         while self.doLockDetect:
  468.             time.sleep(0.05)
  469.             
  470.             try:
  471.                 aborted = self.env.lock_detect(db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT)
  472.                 if verbose and aborted:
  473.                     print 'deadlock: Aborted %d deadlocked transaction(s)' % aborted
  474.             continue
  475.             except db.DBError:
  476.                 continue
  477.             
  478.  
  479.             None<EXCEPTION MATCH>db.DBError
  480.  
  481.  
  482.  
  483. class BTreeThreadedTransactions(ThreadedTransactionsBase):
  484.     dbtype = db.DB_BTREE
  485.     writers = 2
  486.     readers = 10
  487.     records = 1000
  488.  
  489.  
  490. class HashThreadedTransactions(ThreadedTransactionsBase):
  491.     dbtype = db.DB_HASH
  492.     writers = 2
  493.     readers = 10
  494.     records = 1000
  495.  
  496.  
  497. class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase):
  498.     dbtype = db.DB_BTREE
  499.     writers = 2
  500.     readers = 10
  501.     records = 1000
  502.     txnFlag = db.DB_TXN_NOWAIT
  503.  
  504.  
  505. class HashThreadedNoWaitTransactions(ThreadedTransactionsBase):
  506.     dbtype = db.DB_HASH
  507.     writers = 2
  508.     readers = 10
  509.     records = 1000
  510.     txnFlag = db.DB_TXN_NOWAIT
  511.  
  512.  
  513. def test_suite():
  514.     suite = unittest.TestSuite()
  515.     if have_threads:
  516.         suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore))
  517.         suite.addTest(unittest.makeSuite(HashConcurrentDataStore))
  518.         suite.addTest(unittest.makeSuite(BTreeSimpleThreaded))
  519.         suite.addTest(unittest.makeSuite(HashSimpleThreaded))
  520.         suite.addTest(unittest.makeSuite(BTreeThreadedTransactions))
  521.         suite.addTest(unittest.makeSuite(HashThreadedTransactions))
  522.         suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions))
  523.         suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions))
  524.     else:
  525.         print 'Threads not available, skipping thread tests.'
  526.     return suite
  527.  
  528. if __name__ == '__main__':
  529.     unittest.main(defaultTest = 'test_suite')
  530.  
  531.