home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import unittest
- import win32trace
- import threading
- import time
- import os
- import sys
- if __name__ == '__main__':
- this_file = sys.argv[0]
- else:
- this_file = __file__
-
- def CheckNoOtherReaders():
- win32trace.write('Hi')
- time.sleep(0.05)
- if win32trace.read() != 'Hi':
- win32trace.TermRead()
- win32trace.TermWrite()
- raise RuntimeError, 'An existing win32trace reader appears to be running - please stop this process and try again'
- win32trace.read() != 'Hi'
-
-
- class TestInitOps(unittest.TestCase):
-
- def setUp(self):
- win32trace.InitRead()
- win32trace.read()
- win32trace.TermRead()
-
-
- def tearDown(self):
-
- try:
- win32trace.TermRead()
- except win32trace.error:
- pass
-
-
- try:
- win32trace.TermWrite()
- except win32trace.error:
- pass
-
-
-
- def testInitTermRead(self):
- self.assertRaises(win32trace.error, win32trace.read)
- win32trace.InitRead()
- result = win32trace.read()
- self.assertEquals(result, '')
- win32trace.TermRead()
- self.assertRaises(win32trace.error, win32trace.read)
- win32trace.InitRead()
- self.assertRaises(win32trace.error, win32trace.InitRead)
- win32trace.InitWrite()
- self.assertRaises(win32trace.error, win32trace.InitWrite)
- win32trace.TermWrite()
- win32trace.TermRead()
-
-
- def testInitTermWrite(self):
- self.assertRaises(win32trace.error, win32trace.write, 'Hei')
- win32trace.InitWrite()
- win32trace.write('Johan Galtung')
- win32trace.TermWrite()
- self.assertRaises(win32trace.error, win32trace.write, 'Hei')
-
-
- def testTermSematics(self):
- win32trace.InitWrite()
- win32trace.write('Ta da')
- win32trace.TermWrite()
- win32trace.InitRead()
- self.failUnless(win32trace.read() in ('Ta da', ''))
- win32trace.TermRead()
- win32trace.InitWrite()
- win32trace.write('Ta da')
- win32trace.InitRead()
- win32trace.TermWrite()
- self.assertEquals('Ta da', win32trace.read())
- win32trace.TermRead()
-
-
-
- class BasicSetupTearDown(unittest.TestCase):
-
- def setUp(self):
- win32trace.InitRead()
- win32trace.read()
- win32trace.InitWrite()
-
-
- def tearDown(self):
- win32trace.TermWrite()
- win32trace.TermRead()
-
-
-
- class TestModuleOps(BasicSetupTearDown):
-
- def testRoundTrip(self):
- win32trace.write('Syver Enstad')
- syverEnstad = win32trace.read()
- self.assertEquals('Syver Enstad', syverEnstad)
-
-
- def testBlockingRead(self):
- win32trace.write('Syver Enstad')
- self.assertEquals('Syver Enstad', win32trace.blockingread())
-
-
- def testFlush(self):
- win32trace.flush()
-
-
-
- class TestTraceObjectOps(BasicSetupTearDown):
-
- def testInit(self):
- win32trace.TermRead()
- win32trace.TermWrite()
- traceObject = win32trace.GetTracer()
- self.assertRaises(win32trace.error, traceObject.read)
- self.assertRaises(win32trace.error, traceObject.write, '')
- win32trace.InitRead()
- win32trace.InitWrite()
- self.assertEquals('', traceObject.read())
- traceObject.write('Syver')
-
-
- def testFlush(self):
- traceObject = win32trace.GetTracer()
- traceObject.flush()
-
-
- def testIsatty(self):
- tracer = win32trace.GetTracer()
-
-
- def testRoundTrip(self):
- traceObject = win32trace.GetTracer()
- traceObject.write('Syver Enstad')
- self.assertEquals('Syver Enstad', traceObject.read())
-
-
-
- class WriterThread(threading.Thread):
-
- def run(self):
- self.writeCount = 0
- for each in range(self.BucketCount):
- win32trace.write(str(each))
-
- self.writeCount = self.BucketCount
-
-
- def verifyWritten(self):
- return self.writeCount == self.BucketCount
-
-
-
- class TestMultipleThreadsWriting(unittest.TestCase):
- FullBucket = 50
- BucketCount = 9
-
- def setUp(self):
- WriterThread.BucketCount = self.BucketCount
- win32trace.InitRead()
- win32trace.read()
- win32trace.InitWrite()
- CheckNoOtherReaders()
- self.threads = [ WriterThread() for each in range(self.FullBucket) ]
- self.buckets = range(self.BucketCount)
- for each in self.buckets:
- self.buckets[each] = 0
-
-
-
- def tearDown(self):
- win32trace.TermRead()
- win32trace.TermWrite()
-
-
- def areBucketsFull(self):
- bucketsAreFull = True
- for each in self.buckets:
- if each != self.FullBucket:
- bucketsAreFull = False
- break
- continue
-
- return bucketsAreFull
-
-
- def read(self):
- while None:
- readString = win32trace.blockingread()
- for ch in readString:
- integer = int(ch)
- count = self.buckets[integer]
- self.buckets[integer] = count + 1
- if self.buckets[integer] == self.FullBucket:
- if self.areBucketsFull():
- return None
- continue
- self.areBucketsFull()
-
- continue
- return None
-
-
- def testThreads(self):
- for each in self.threads:
- each.start()
-
- self.read()
- for each in self.threads:
- each.join()
-
- for each in self.threads:
- pass
-
-
-
-
- class TestHugeChunks(unittest.TestCase):
- BiggestChunk = 65536
-
- def setUp(self):
- win32trace.InitRead()
- win32trace.read()
- win32trace.InitWrite()
-
-
- def testHugeChunks(self):
- data = '*' * 1023 + '\n'
- while len(data) <= self.BiggestChunk:
- win32trace.write(data)
- data = data + data
-
-
- def tearDown(self):
- win32trace.TermRead()
- win32trace.TermWrite()
-
-
- import win32event
- import win32process
-
- class TraceWriteProcess:
-
- def __init__(self, threadCount):
- self.exitCode = -1
- self.threadCount = threadCount
-
-
- def start(self):
- (procHandle, threadHandle, procId, threadId) = win32process.CreateProcess(None, 'python.exe "%s" /run_test_process %s %s' % (this_file, self.BucketCount, self.threadCount), None, None, 0, win32process.NORMAL_PRIORITY_CLASS, None, None, win32process.STARTUPINFO())
- self.processHandle = procHandle
-
-
- def join(self):
- win32event.WaitForSingleObject(self.processHandle, win32event.INFINITE)
- self.exitCode = win32process.GetExitCodeProcess(self.processHandle)
-
-
- def verifyWritten(self):
- return self.exitCode == 0
-
-
-
- class TestOutofProcess(unittest.TestCase):
- BucketCount = 9
- FullBucket = 50
-
- def setUp(self):
- win32trace.InitRead()
- TraceWriteProcess.BucketCount = self.BucketCount
- self.setUpWriters()
- self.buckets = range(self.BucketCount)
- for each in self.buckets:
- self.buckets[each] = 0
-
-
-
- def tearDown(self):
- win32trace.TermRead()
-
-
- def setUpWriters(self):
- self.processes = []
- (quot, remainder) = divmod(self.FullBucket, 5)
- for each in range(5):
- self.processes.append(TraceWriteProcess(quot))
-
- if remainder:
- self.processes.append(TraceWriteProcess(remainder))
-
-
-
- def areBucketsFull(self):
- bucketsAreFull = True
- for each in self.buckets:
- if each != self.FullBucket:
- bucketsAreFull = False
- break
- continue
-
- return bucketsAreFull
-
-
- def read(self):
- while None:
- readString = win32trace.blockingread()
- for ch in readString:
- integer = int(ch)
- count = self.buckets[integer]
- self.buckets[integer] = count + 1
- if self.buckets[integer] == self.FullBucket:
- if self.areBucketsFull():
- return None
- continue
- self.areBucketsFull()
-
- continue
- return None
-
-
- def testProcesses(self):
- for each in self.processes:
- each.start()
-
- self.read()
- for each in self.processes:
- each.join()
-
- for each in self.processes:
- pass
-
-
-
-
- def _RunAsTestProcess():
- WriterThread.BucketCount = int(sys.argv[2])
- threadCount = int(sys.argv[3])
- threads = [ WriterThread() for each in range(threadCount) ]
- win32trace.InitWrite()
- for thread in threads:
- thread.start()
-
- for thread in threads:
- thread.join()
-
- for thread in threads:
- if not thread.verifyWritten():
- sys.exit(-1)
- continue
- []
-
-
- if __name__ == '__main__':
- if sys.argv[1:2] == [
- '/run_test_process']:
- _RunAsTestProcess()
- sys.exit(0)
-
- win32trace.InitRead()
- win32trace.InitWrite()
- CheckNoOtherReaders()
- win32trace.TermRead()
- win32trace.TermWrite()
- unittest.main()
-
-