home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_2595 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  12.0 KB  |  376 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import unittest
  5. import win32trace
  6. import threading
  7. import time
  8. import os
  9. import sys
  10. if __name__ == '__main__':
  11.     this_file = sys.argv[0]
  12. else:
  13.     this_file = __file__
  14.  
  15. def CheckNoOtherReaders():
  16.     win32trace.write('Hi')
  17.     time.sleep(0.05)
  18.     if win32trace.read() != 'Hi':
  19.         win32trace.TermRead()
  20.         win32trace.TermWrite()
  21.         raise RuntimeError, 'An existing win32trace reader appears to be running - please stop this process and try again'
  22.     win32trace.read() != 'Hi'
  23.  
  24.  
  25. class TestInitOps(unittest.TestCase):
  26.     
  27.     def setUp(self):
  28.         win32trace.InitRead()
  29.         win32trace.read()
  30.         win32trace.TermRead()
  31.  
  32.     
  33.     def tearDown(self):
  34.         
  35.         try:
  36.             win32trace.TermRead()
  37.         except win32trace.error:
  38.             pass
  39.  
  40.         
  41.         try:
  42.             win32trace.TermWrite()
  43.         except win32trace.error:
  44.             pass
  45.  
  46.  
  47.     
  48.     def testInitTermRead(self):
  49.         self.assertRaises(win32trace.error, win32trace.read)
  50.         win32trace.InitRead()
  51.         result = win32trace.read()
  52.         self.assertEquals(result, '')
  53.         win32trace.TermRead()
  54.         self.assertRaises(win32trace.error, win32trace.read)
  55.         win32trace.InitRead()
  56.         self.assertRaises(win32trace.error, win32trace.InitRead)
  57.         win32trace.InitWrite()
  58.         self.assertRaises(win32trace.error, win32trace.InitWrite)
  59.         win32trace.TermWrite()
  60.         win32trace.TermRead()
  61.  
  62.     
  63.     def testInitTermWrite(self):
  64.         self.assertRaises(win32trace.error, win32trace.write, 'Hei')
  65.         win32trace.InitWrite()
  66.         win32trace.write('Johan Galtung')
  67.         win32trace.TermWrite()
  68.         self.assertRaises(win32trace.error, win32trace.write, 'Hei')
  69.  
  70.     
  71.     def testTermSematics(self):
  72.         win32trace.InitWrite()
  73.         win32trace.write('Ta da')
  74.         win32trace.TermWrite()
  75.         win32trace.InitRead()
  76.         self.failUnless(win32trace.read() in ('Ta da', ''))
  77.         win32trace.TermRead()
  78.         win32trace.InitWrite()
  79.         win32trace.write('Ta da')
  80.         win32trace.InitRead()
  81.         win32trace.TermWrite()
  82.         self.assertEquals('Ta da', win32trace.read())
  83.         win32trace.TermRead()
  84.  
  85.  
  86.  
  87. class BasicSetupTearDown(unittest.TestCase):
  88.     
  89.     def setUp(self):
  90.         win32trace.InitRead()
  91.         win32trace.read()
  92.         win32trace.InitWrite()
  93.  
  94.     
  95.     def tearDown(self):
  96.         win32trace.TermWrite()
  97.         win32trace.TermRead()
  98.  
  99.  
  100.  
  101. class TestModuleOps(BasicSetupTearDown):
  102.     
  103.     def testRoundTrip(self):
  104.         win32trace.write('Syver Enstad')
  105.         syverEnstad = win32trace.read()
  106.         self.assertEquals('Syver Enstad', syverEnstad)
  107.  
  108.     
  109.     def testBlockingRead(self):
  110.         win32trace.write('Syver Enstad')
  111.         self.assertEquals('Syver Enstad', win32trace.blockingread())
  112.  
  113.     
  114.     def testFlush(self):
  115.         win32trace.flush()
  116.  
  117.  
  118.  
  119. class TestTraceObjectOps(BasicSetupTearDown):
  120.     
  121.     def testInit(self):
  122.         win32trace.TermRead()
  123.         win32trace.TermWrite()
  124.         traceObject = win32trace.GetTracer()
  125.         self.assertRaises(win32trace.error, traceObject.read)
  126.         self.assertRaises(win32trace.error, traceObject.write, '')
  127.         win32trace.InitRead()
  128.         win32trace.InitWrite()
  129.         self.assertEquals('', traceObject.read())
  130.         traceObject.write('Syver')
  131.  
  132.     
  133.     def testFlush(self):
  134.         traceObject = win32trace.GetTracer()
  135.         traceObject.flush()
  136.  
  137.     
  138.     def testIsatty(self):
  139.         tracer = win32trace.GetTracer()
  140.  
  141.     
  142.     def testRoundTrip(self):
  143.         traceObject = win32trace.GetTracer()
  144.         traceObject.write('Syver Enstad')
  145.         self.assertEquals('Syver Enstad', traceObject.read())
  146.  
  147.  
  148.  
  149. class WriterThread(threading.Thread):
  150.     
  151.     def run(self):
  152.         self.writeCount = 0
  153.         for each in range(self.BucketCount):
  154.             win32trace.write(str(each))
  155.         
  156.         self.writeCount = self.BucketCount
  157.  
  158.     
  159.     def verifyWritten(self):
  160.         return self.writeCount == self.BucketCount
  161.  
  162.  
  163.  
  164. class TestMultipleThreadsWriting(unittest.TestCase):
  165.     FullBucket = 50
  166.     BucketCount = 9
  167.     
  168.     def setUp(self):
  169.         WriterThread.BucketCount = self.BucketCount
  170.         win32trace.InitRead()
  171.         win32trace.read()
  172.         win32trace.InitWrite()
  173.         CheckNoOtherReaders()
  174.         self.threads = [ WriterThread() for each in range(self.FullBucket) ]
  175.         self.buckets = range(self.BucketCount)
  176.         for each in self.buckets:
  177.             self.buckets[each] = 0
  178.         
  179.  
  180.     
  181.     def tearDown(self):
  182.         win32trace.TermRead()
  183.         win32trace.TermWrite()
  184.  
  185.     
  186.     def areBucketsFull(self):
  187.         bucketsAreFull = True
  188.         for each in self.buckets:
  189.             if each != self.FullBucket:
  190.                 bucketsAreFull = False
  191.                 break
  192.                 continue
  193.         
  194.         return bucketsAreFull
  195.  
  196.     
  197.     def read(self):
  198.         while None:
  199.             readString = win32trace.blockingread()
  200.             for ch in readString:
  201.                 integer = int(ch)
  202.                 count = self.buckets[integer]
  203.                 self.buckets[integer] = count + 1
  204.                 if self.buckets[integer] == self.FullBucket:
  205.                     if self.areBucketsFull():
  206.                         return None
  207.                     continue
  208.                 self.areBucketsFull()
  209.             
  210.             continue
  211.             return None
  212.  
  213.     
  214.     def testThreads(self):
  215.         for each in self.threads:
  216.             each.start()
  217.         
  218.         self.read()
  219.         for each in self.threads:
  220.             each.join()
  221.         
  222.         for each in self.threads:
  223.             pass
  224.         
  225.  
  226.  
  227.  
  228. class TestHugeChunks(unittest.TestCase):
  229.     BiggestChunk = 65536
  230.     
  231.     def setUp(self):
  232.         win32trace.InitRead()
  233.         win32trace.read()
  234.         win32trace.InitWrite()
  235.  
  236.     
  237.     def testHugeChunks(self):
  238.         data = '*' * 1023 + '\n'
  239.         while len(data) <= self.BiggestChunk:
  240.             win32trace.write(data)
  241.             data = data + data
  242.  
  243.     
  244.     def tearDown(self):
  245.         win32trace.TermRead()
  246.         win32trace.TermWrite()
  247.  
  248.  
  249. import win32event
  250. import win32process
  251.  
  252. class TraceWriteProcess:
  253.     
  254.     def __init__(self, threadCount):
  255.         self.exitCode = -1
  256.         self.threadCount = threadCount
  257.  
  258.     
  259.     def start(self):
  260.         (procHandle, threadHandle, procId, threadId) = win32process.CreateProcess(None, 'python.exe "%s" /run_test_process %s %s' % (this_file, self.BucketCount, self.threadCount), None, None, 0, win32process.NORMAL_PRIORITY_CLASS, None, None, win32process.STARTUPINFO())
  261.         self.processHandle = procHandle
  262.  
  263.     
  264.     def join(self):
  265.         win32event.WaitForSingleObject(self.processHandle, win32event.INFINITE)
  266.         self.exitCode = win32process.GetExitCodeProcess(self.processHandle)
  267.  
  268.     
  269.     def verifyWritten(self):
  270.         return self.exitCode == 0
  271.  
  272.  
  273.  
  274. class TestOutofProcess(unittest.TestCase):
  275.     BucketCount = 9
  276.     FullBucket = 50
  277.     
  278.     def setUp(self):
  279.         win32trace.InitRead()
  280.         TraceWriteProcess.BucketCount = self.BucketCount
  281.         self.setUpWriters()
  282.         self.buckets = range(self.BucketCount)
  283.         for each in self.buckets:
  284.             self.buckets[each] = 0
  285.         
  286.  
  287.     
  288.     def tearDown(self):
  289.         win32trace.TermRead()
  290.  
  291.     
  292.     def setUpWriters(self):
  293.         self.processes = []
  294.         (quot, remainder) = divmod(self.FullBucket, 5)
  295.         for each in range(5):
  296.             self.processes.append(TraceWriteProcess(quot))
  297.         
  298.         if remainder:
  299.             self.processes.append(TraceWriteProcess(remainder))
  300.         
  301.  
  302.     
  303.     def areBucketsFull(self):
  304.         bucketsAreFull = True
  305.         for each in self.buckets:
  306.             if each != self.FullBucket:
  307.                 bucketsAreFull = False
  308.                 break
  309.                 continue
  310.         
  311.         return bucketsAreFull
  312.  
  313.     
  314.     def read(self):
  315.         while None:
  316.             readString = win32trace.blockingread()
  317.             for ch in readString:
  318.                 integer = int(ch)
  319.                 count = self.buckets[integer]
  320.                 self.buckets[integer] = count + 1
  321.                 if self.buckets[integer] == self.FullBucket:
  322.                     if self.areBucketsFull():
  323.                         return None
  324.                     continue
  325.                 self.areBucketsFull()
  326.             
  327.             continue
  328.             return None
  329.  
  330.     
  331.     def testProcesses(self):
  332.         for each in self.processes:
  333.             each.start()
  334.         
  335.         self.read()
  336.         for each in self.processes:
  337.             each.join()
  338.         
  339.         for each in self.processes:
  340.             pass
  341.         
  342.  
  343.  
  344.  
  345. def _RunAsTestProcess():
  346.     WriterThread.BucketCount = int(sys.argv[2])
  347.     threadCount = int(sys.argv[3])
  348.     threads = [ WriterThread() for each in range(threadCount) ]
  349.     win32trace.InitWrite()
  350.     for thread in threads:
  351.         thread.start()
  352.     
  353.     for thread in threads:
  354.         thread.join()
  355.     
  356.     for thread in threads:
  357.         if not thread.verifyWritten():
  358.             sys.exit(-1)
  359.             continue
  360.         []
  361.     
  362.  
  363. if __name__ == '__main__':
  364.     if sys.argv[1:2] == [
  365.         '/run_test_process']:
  366.         _RunAsTestProcess()
  367.         sys.exit(0)
  368.     
  369.     win32trace.InitRead()
  370.     win32trace.InitWrite()
  371.     CheckNoOtherReaders()
  372.     win32trace.TermRead()
  373.     win32trace.TermWrite()
  374.     unittest.main()
  375.  
  376.