home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import threading
- import traceback
- import win32com.client as win32com
- import win32event
- import win32api
- import pythoncom
- import unittest
- from testServers import InterpCase
- freeThreaded = 1
-
- class ThreadInterpCase(InterpCase):
-
- def _testInterpInThread(self, stopEvent, interp):
-
- try:
- self._doTestInThread(interp)
- finally:
- win32event.SetEvent(stopEvent)
-
-
-
- def _doTestInThread(self, interp):
- pythoncom.CoInitialize()
- myThread = win32api.GetCurrentThreadId()
- if freeThreaded:
- interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
- interp = win32com.client.Dispatch(interp)
-
- interp.Exec('import win32api')
- pythoncom.CoUninitialize()
-
-
- def BeginThreadsSimpleMarshal(self, numThreads):
- interp = win32com.client.Dispatch('Python.Interpreter')
- events = []
- threads = []
- for i in range(numThreads):
- hEvent = win32event.CreateEvent(None, 0, 0, None)
- events.append(hEvent)
- interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
- t = threading.Thread(target = self._testInterpInThread, args = (hEvent, interpStream))
- t.setDaemon(1)
- t.start()
- threads.append(t)
-
- interp = None
- return (threads, events)
-
-
- def BeginThreadsFastMarshal(self, numThreads):
- interp = win32com.client.Dispatch('Python.Interpreter')
- if freeThreaded:
- interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
-
- events = []
- threads = []
- for i in range(numThreads):
- hEvent = win32event.CreateEvent(None, 0, 0, None)
- t = threading.Thread(target = self._testInterpInThread, args = (hEvent, interp))
- t.setDaemon(1)
- t.start()
- events.append(hEvent)
- threads.append(t)
-
- return (threads, events)
-
-
- def _DoTestMarshal(self, fn, bCoWait = 0):
- (threads, events) = fn(2)
- numFinished = 0
- while None:
-
- try:
- if bCoWait:
- rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
- else:
- rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
- if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0 + len(events):
- numFinished = numFinished + 1
- if numFinished >= len(events):
- break
-
- elif rc == win32event.WAIT_OBJECT_0 + len(events):
- pythoncom.PumpWaitingMessages()
- else:
- print 'Waiting for thread to stop with interfaces=%d, gateways=%d' % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
- continue
- except KeyboardInterrupt:
- break
- continue
-
-
- for t in threads:
- t.join(2)
- self.failIf(t.isAlive(), 'thread failed to stop!?')
-
- threads = None
-
-
- def testSimpleMarshal(self):
- self._DoTestMarshal(self.BeginThreadsSimpleMarshal)
-
-
- def testSimpleMarshalCoWait(self):
- self._DoTestMarshal(self.BeginThreadsSimpleMarshal, 1)
-
-
- if __name__ == '__main__':
- unittest.main('testMarshal')
-
-