home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 January / maximum-cd-2011-01.iso / DiscContents / calibre-0.7.26.msi / file_2892 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-10-31  |  3.9 KB  |  114 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import threading
  5. import traceback
  6. import win32com.client as win32com
  7. import win32event
  8. import win32api
  9. import pythoncom
  10. import unittest
  11. from testServers import InterpCase
  12. freeThreaded = 1
  13.  
  14. class ThreadInterpCase(InterpCase):
  15.     
  16.     def _testInterpInThread(self, stopEvent, interp):
  17.         
  18.         try:
  19.             self._doTestInThread(interp)
  20.         finally:
  21.             win32event.SetEvent(stopEvent)
  22.  
  23.  
  24.     
  25.     def _doTestInThread(self, interp):
  26.         pythoncom.CoInitialize()
  27.         myThread = win32api.GetCurrentThreadId()
  28.         if freeThreaded:
  29.             interp = pythoncom.CoGetInterfaceAndReleaseStream(interp, pythoncom.IID_IDispatch)
  30.             interp = win32com.client.Dispatch(interp)
  31.         
  32.         interp.Exec('import win32api')
  33.         pythoncom.CoUninitialize()
  34.  
  35.     
  36.     def BeginThreadsSimpleMarshal(self, numThreads):
  37.         interp = win32com.client.Dispatch('Python.Interpreter')
  38.         events = []
  39.         threads = []
  40.         for i in range(numThreads):
  41.             hEvent = win32event.CreateEvent(None, 0, 0, None)
  42.             events.append(hEvent)
  43.             interpStream = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
  44.             t = threading.Thread(target = self._testInterpInThread, args = (hEvent, interpStream))
  45.             t.setDaemon(1)
  46.             t.start()
  47.             threads.append(t)
  48.         
  49.         interp = None
  50.         return (threads, events)
  51.  
  52.     
  53.     def BeginThreadsFastMarshal(self, numThreads):
  54.         interp = win32com.client.Dispatch('Python.Interpreter')
  55.         if freeThreaded:
  56.             interp = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, interp._oleobj_)
  57.         
  58.         events = []
  59.         threads = []
  60.         for i in range(numThreads):
  61.             hEvent = win32event.CreateEvent(None, 0, 0, None)
  62.             t = threading.Thread(target = self._testInterpInThread, args = (hEvent, interp))
  63.             t.setDaemon(1)
  64.             t.start()
  65.             events.append(hEvent)
  66.             threads.append(t)
  67.         
  68.         return (threads, events)
  69.  
  70.     
  71.     def _DoTestMarshal(self, fn, bCoWait = 0):
  72.         (threads, events) = fn(2)
  73.         numFinished = 0
  74.         while None:
  75.             
  76.             try:
  77.                 if bCoWait:
  78.                     rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events)
  79.                 else:
  80.                     rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT)
  81.                 if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0 + len(events):
  82.                     numFinished = numFinished + 1
  83.                     if numFinished >= len(events):
  84.                         break
  85.                     
  86.                 elif rc == win32event.WAIT_OBJECT_0 + len(events):
  87.                     pythoncom.PumpWaitingMessages()
  88.                 else:
  89.                     print 'Waiting for thread to stop with interfaces=%d, gateways=%d' % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())
  90.             continue
  91.             except KeyboardInterrupt:
  92.                 break
  93.                 continue
  94.             
  95.  
  96.             for t in threads:
  97.                 t.join(2)
  98.                 self.failIf(t.isAlive(), 'thread failed to stop!?')
  99.             
  100.         threads = None
  101.  
  102.     
  103.     def testSimpleMarshal(self):
  104.         self._DoTestMarshal(self.BeginThreadsSimpleMarshal)
  105.  
  106.     
  107.     def testSimpleMarshalCoWait(self):
  108.         self._DoTestMarshal(self.BeginThreadsSimpleMarshal, 1)
  109.  
  110.  
  111. if __name__ == '__main__':
  112.     unittest.main('testMarshal')
  113.  
  114.