home *** CD-ROM | disk | FTP | other *** search
/ PC World 2002 May / PCWorld_2002-05_cd.bin / Software / TemaCD / activepython / ActivePython-2.1.1.msi / Python21_win32com_test_testuniv.py < prev    next >
Encoding:
Python Source  |  2001-07-26  |  7.1 KB  |  255 lines

  1. #
  2. # test the universal gateway stuff
  3. #
  4.  
  5. import struct
  6. import univgw
  7. import pythoncom
  8. from win32com.server import util
  9. import traceback
  10. import winerror
  11. import time
  12.  
  13. class Definition:
  14.   "Completely defines a COM interface."
  15.  
  16.   def iid(self):
  17.     "Return the IID that we are defining."
  18.     return self._iid
  19.  
  20.   def vtbl_argsizes(self):
  21.     "Return the size of the arguments to each interface method."
  22.     #def argsize(method, calcsize=struct.calcsize):
  23.     #  return calcsize(method[1])
  24.     def argsize(method):
  25.       return method[2]
  26.     return map(argsize, self._methods)
  27.  
  28.   def dispatch(self, ob, index, argPtr,
  29.                ReadMemory=univgw.ReadMemory, unpack=struct.unpack):
  30.     "Dispatch a call to an interface method."
  31.  
  32.     ### WARNING: the Alpha will probably need special work to deal with
  33.     ### the "argPtr" variable. most platforms, though, can simply read the
  34.     ### arguments off of the stack using the stack pointer.
  35.  
  36.     ### just print some crap for now
  37.  
  38.     name, fmt, argsize = self._methods[index]
  39.     s = ReadMemory(argPtr, argsize)
  40.     args = unpack(fmt, s)
  41.  
  42.     #print 'called:', ob, self, index, argPtr
  43.     #print '   iid:', self._iid
  44.     #print '   fmt:', fmt
  45.     #print '  size:', argsize
  46.     #print '  args:', args
  47.  
  48.     return getattr(self, name)(ob, args)
  49.  
  50.   if 1:
  51.     _dispatch = dispatch
  52.     def dispatch(self, ob, index, argPtr):
  53.       try:
  54.         return self._dispatch(ob, index, argPtr)
  55.       except:
  56.         traceback.print_exc()
  57.         raise
  58.  
  59. class IStream(Definition):
  60.   _iid = pythoncom.IID_IStream
  61.   _methods = [
  62.     ('Read', 'PLP', 12),
  63.     ('Write', 'PLP', 12),
  64.     ('Seek', '8sLP', 16),    # LARGE_INTEGER, DWORD, ULARGE_INTEGER*
  65.     ('SetSize', '8s', 8),
  66.     ('CopyTo', 'P8sPP', 20),
  67.     ('Commit', 'L', 4),
  68.     ('Revert', '', 0),
  69.     ('LockRegion', '8s8sL', 20),
  70.     ('UnlockRegion', '8s8sL', 20),
  71.     ('Stat', 'PL', 8),
  72.     ('Clone', 'P', 4),
  73.     ]
  74.  
  75.   def Read(self, ob, (pv, cb, pcbRead),
  76.            WriteMemory=univgw.WriteMemory, pack=struct.pack):
  77.     if not pv:
  78.       return winerror.E_POINTER
  79.     s = ob.Read(cb)
  80.     WriteMemory(pv, s)
  81.     if pcbRead:
  82.       ### get rid of this pack for more speed!
  83.       WriteMemory(pcbRead, pack('L', len(s)))
  84.  
  85.   def Write(self, ob, (pv, cb, pcbWritten),
  86.             WriteMemory=univgw.WriteMemory, ReadMemory=univgw.ReadMemory,
  87.             pack=struct.pack):
  88.     if not pv:
  89.       return winerror.E_POINTER
  90.     s = ReadMemory(pv, cb)
  91.     l = ob.Write(s)
  92.     if pcbWritten:
  93.       WriteMemory(pcbWritten, pack('L', l))
  94.  
  95.   def Seek(self, ob, (dlibMove, dwOrigin, plibNewPosition),
  96.            L64=univgw.L64, strUL64=univgw.strUL64, WriteMemory=univgw.WriteMemory):
  97.     pos = ob.Seek(L64(dlibMove), dwOrigin)
  98.     if plibNewPosition:
  99.       WriteMemory(plibNewPosition, strUL64(pos))
  100.  
  101.   def SetSize(self, ob, (libNewSize,), UL64=univgw.UL64):
  102.     ob.SetSize(UL64(libNewSize))
  103.  
  104.   def CopyTo(self, ob, (pstm, cb, pcbRead, pcbWritten),
  105.              WriteMemory=univgw.WriteMemory, interface=univgw.interface,
  106.              IID_IStream=pythoncom.IID_IStream,
  107.              UL64=univgw.UL64, strUL64=univgw.strUL64):
  108.     pstm = interface(pstm, IID_IStream)
  109.     r, w = ob.CopyTo(pstm, UL64(cb))
  110.     if pcbRead:
  111.       WriteMemory(pcbRead, strUL64(r))
  112.     if pcbWritten:
  113.       WriteMemory(pcbWritten, strUL64(w))
  114.  
  115.   def Commit(self, ob, (grfCommitFlags,)):
  116.     ob.Commit(grfCommitFlags)
  117.  
  118.   def Revert(self, ob, args):
  119.     ob.Revert()
  120.  
  121.   def LockRegion(self, ob, (libOffset, cb, dwLockType), UL64=univgw.UL64):
  122.     ob.LockRegion(UL64(libOffset), UL64(cb), dwLockType)
  123.  
  124.   def UnlockRegion(self, ob, (libOffset, cb, dwLockType), UL64=univgw.UL64):
  125.     ob.UnlockRegion(UL64(libOffset), UL64(cb), dwLockType)
  126.  
  127.   def Stat(self, ob, (pstatstg, grfStatFlag)):
  128.     print 'IStream::Stat:', ob, pstatstg, grfStatFlag
  129.  
  130.   def Clone(self, ob, (ppstm,)):
  131.     print 'IStream::Clone:', ob, ppstm
  132.  
  133. defn = IStream()
  134. #print defn
  135.  
  136. # create a vtable for the IStream interface
  137. vtbl = univgw.CreateVTable(defn)
  138. #print vtbl
  139.  
  140. # the Python object class that we're going to expose
  141. class MyStream:
  142.   ### only for the timing tests
  143.   _public_methods_ = ['Write','SetSize','Read', 'Seek', 'Commit', 'Revert', 'LockRegion','CopyTo']
  144.   _com_interfaces_ = [pythoncom.IID_IStream]
  145.  
  146.   def Read(self, cb):
  147.     return 'this is a very long string that we want to read'[:int(cb)]
  148.   def Write(self, s):
  149.     #print 'MyStream.Write:', `s`
  150.     return len(s)
  151.   def Seek(self, move, origin):
  152.     #print 'MyStream.Seek:', move, origin
  153.     return move
  154.   def SetSize(self, size):
  155.     #print 'MyStream.SetSize:', size
  156.     pass
  157.   def CopyTo(self, pstm, cb):
  158.     #print 'MyStream.CopyTo:', `pstm, cb`
  159.     return cb, cb
  160.   def Commit(self, flags):
  161.     #print 'MyStream.Commit:', flags
  162.     pass
  163.   def Revert(self):
  164.     #print 'MyStream.Revert'
  165.     pass
  166.   def LockRegion(self, offset, cb, type):
  167.     #print 'MyStream.LockRegion:', offset, cb, type
  168.     pass
  169.   def UnlockRegion(self, offset, cb, type):
  170.     print 'MyStream.UnlockRegion:', offset, cb, type
  171.   def Stat(self, *args):    ### WHATEVER
  172.     print 'MyStream.Stat:', args
  173.   def Clone(self, *args):    ### WHATEVER
  174.     print 'MyStream.Clone:', args
  175.  
  176. # create the stream, then wrap it with a pair of thunks. the COM2Py thunk
  177. # that we build here becomes the "identity" interface for the stream object.
  178. wrapped = pythoncom.WrapObject(MyStream())
  179. #print wrapped
  180.  
  181. # create a tear-off IStream gateway for the policy object
  182. ob = univgw.CreateTearOff(wrapped, vtbl, pythoncom.IID_IStream)
  183. #print ob
  184.  
  185. ob2 = util.wrap(MyStream())
  186. ob2 = ob2.QueryInterface(pythoncom.IID_IStream)
  187. #print ob2
  188.  
  189. # exercise it
  190. if 0:
  191.   print 'read:', `ob.Read(10)`
  192.   print 'read:', `ob.Read(5)`
  193.   cb = ob.Write('your mom was here')
  194.   print 'wrote:', cb
  195.   newpos = ob.Seek(10, 20)
  196.   print 'newpos:', newpos
  197.   ob.SetSize(5678)
  198.   cb = ob.CopyTo(ob2, 10)
  199.   print 'copied:', cb
  200.   ob.Commit(1234)
  201.   ob.Revert()
  202.   ob.LockRegion(1,2,3)
  203.   ob.UnlockRegion(1,2,3)
  204.   #ob.Stat(...
  205.   #ob.Clone(...
  206.  
  207. def f1():
  208.   x1, x2 = struct.unpack('LL','\0\0\0\0\0\0\0\0')
  209.   x = x1 | (x2 << 32L)
  210. def f2(T=univgw.L64):
  211.   x = T(struct.unpack('8s','\0\0\0\0\0\0\0\0')[0])
  212.  
  213. if 1:
  214.   def time_it(f, a):
  215.     t = time.time()
  216.     for i in xrange(50000):
  217.       apply(f, a)
  218.     return time.time() - t
  219.   def time_pair(name, *a):
  220.     t1 = time_it(getattr(ob, name), a)
  221.     t2 = time_it(getattr(ob2, name), a)
  222.     print '%-10s - univgw: %.3f' % (name, t1)
  223.     print '%-10s -  pycom: %.3f' % (name, t2)
  224.  
  225.   print 'Timing...'
  226.  
  227.   if 0:
  228.     print time_it(f1,())
  229.     print time_it(f2,())
  230.  
  231.   if 1:
  232.     #time_pair('Read', 10)
  233.     #time_pair('Write', 'your mom was here')
  234.     #time_pair('Seek', 10, 20)
  235.     #time_pair('SetSize', 1000)
  236.     time_pair('CopyTo', ob2, 1000)
  237.     #time_pair('Commit', 0)
  238.     #time_pair('Revert')
  239.     #time_pair('LockRegion', 10, 20, 30)
  240.  
  241. import win32api
  242. win32api.OutputDebugString("step 1\n")
  243. ob2 = ob
  244. del ob2
  245. win32api.OutputDebugString("step 2\n")
  246.  
  247. ob = None
  248.  
  249. win32api.OutputDebugString("step 3\n")
  250.  
  251. # in case we're inside Dev Studio, this will pause so you can see
  252. # the output of the script
  253. #print "Press return:",
  254. #raw_input()
  255.