home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import thread
- import traceback
- import win32com.client as win32com
- import win32event
- import win32api
- import pythoncom
-
- def TestInterp(interp):
- if interp.Eval('1+1') != 2:
- raise ValueError, 'The interpreter returned the wrong result.'
- interp.Eval('1+1') != 2
-
- try:
- interp.Eval(2)
- raise ValueError, 'The interpreter did not raise an exception'
- except pythoncom.com_error:
- details = None
- import winerror
- if details[0] != winerror.DISP_E_TYPEMISMATCH:
- raise ValueError, 'The interpreter exception was not winerror.DISP_E_TYPEMISMATCH.'
- details[0] != winerror.DISP_E_TYPEMISMATCH
-
-
-
- def TestInterpInThread(stopEvent, cookie):
-
- try:
- DoTestInterpInThread(cookie)
- finally:
- win32event.SetEvent(stopEvent)
-
-
-
- def CreateGIT():
- return pythoncom.CoCreateInstance(pythoncom.CLSID_StdGlobalInterfaceTable, None, pythoncom.CLSCTX_INPROC, pythoncom.IID_IGlobalInterfaceTable)
-
-
- def DoTestInterpInThread(cookie):
-
- try:
- pythoncom.CoInitialize()
- myThread = win32api.GetCurrentThreadId()
- GIT = CreateGIT()
- interp = GIT.GetInterfaceFromGlobal(cookie, pythoncom.IID_IDispatch)
- interp = win32com.client.Dispatch(interp)
- TestInterp(interp)
- interp.Exec('import win32api')
- print "The test thread id is %d, Python.Interpreter's thread ID is %d" % (myThread, interp.Eval('win32api.GetCurrentThreadId()'))
- interp = None
- pythoncom.CoUninitialize()
- except:
- traceback.print_exc()
-
-
-
- def BeginThreadsSimpleMarshal(numThreads, cookie):
- ret = []
- for i in range(numThreads):
- hEvent = win32event.CreateEvent(None, 0, 0, None)
- thread.start_new(TestInterpInThread, (hEvent, cookie))
- ret.append(hEvent)
-
- return ret
-
-
- def test(fn):
- print 'The main thread is %d' % win32api.GetCurrentThreadId()
- GIT = CreateGIT()
- interp = win32com.client.Dispatch('Python.Interpreter')
- cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch)
- events = fn(4, cookie)
- numFinished = 0
- while None:
-
- try:
- rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
- if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0 + len(events):
- numFinished = numFinished + 1
- if numFinished >= len(events):
- break
-
- elif rc == win32event.WAIT_OBJECT_0 + len(events):
- pythoncom.PumpWaitingMessages()
- else:
- print 'Waiting for thread to stop with interfaces=%d, gateways=%d' % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
- continue
- except KeyboardInterrupt:
- break
- continue
-
-
- del interp
- del GIT
- return None
-
- if __name__ == '__main__':
- test(BeginThreadsSimpleMarshal)
- win32api.Sleep(500)
- pythoncom.CoUninitialize()
- if pythoncom._GetInterfaceCount() != 0 or pythoncom._GetGatewayCount() != 0:
- print 'Done with interfaces=%d, gateways=%d' % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
- else:
- print 'Done.'
-
-