home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_2592 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  4.4 KB  |  92 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. import unittest
  5. import time
  6. import threading
  7. import win32pipe
  8. import win32file
  9. import win32event
  10. import pywintypes
  11. import winerror
  12. import win32con
  13.  
  14. class PipeTests(unittest.TestCase):
  15.     pipename = '\\\\.\\pipe\\python_test_pipe'
  16.     
  17.     def _serverThread(self, pipe_handle, event, wait_time):
  18.         hr = win32pipe.ConnectNamedPipe(pipe_handle)
  19.         self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), 'Got error code 0x%x' % (hr,))
  20.         (hr, got) = win32file.ReadFile(pipe_handle, 100)
  21.         self.failUnless(got == 'foo\x00bar')
  22.         time.sleep(wait_time)
  23.         win32file.WriteFile(pipe_handle, 'bar\x00foo')
  24.         pipe_handle.Close()
  25.         event.set()
  26.  
  27.     
  28.     def startPipeServer(self, event, wait_time = 0):
  29.         openMode = win32pipe.PIPE_ACCESS_DUPLEX
  30.         pipeMode = win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT
  31.         sa = pywintypes.SECURITY_ATTRIBUTES()
  32.         sa.SetSecurityDescriptorDacl(1, None, 0)
  33.         pipe_handle = win32pipe.CreateNamedPipe(self.pipename, openMode, pipeMode, win32pipe.PIPE_UNLIMITED_INSTANCES, 0, 0, 2000, sa)
  34.         threading.Thread(target = self._serverThread, args = (pipe_handle, event, wait_time)).start()
  35.  
  36.     
  37.     def testCallNamedPipe(self):
  38.         event = threading.Event()
  39.         self.startPipeServer(event)
  40.         got = win32pipe.CallNamedPipe(self.pipename, 'foo\x00bar', 1024, win32pipe.NMPWAIT_WAIT_FOREVER)
  41.         self.failUnlessEqual(got, 'bar\x00foo')
  42.         event.wait(5)
  43.         self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
  44.  
  45.     
  46.     def testTransactNamedPipeBlocking(self):
  47.         event = threading.Event()
  48.         self.startPipeServer(event)
  49.         open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
  50.         hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, 0, None)
  51.         win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
  52.         (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', 1024, None)
  53.         self.failUnlessEqual(got, 'bar\x00foo')
  54.         event.wait(5)
  55.         self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
  56.  
  57.     
  58.     def testTransactNamedPipeBlockingBuffer(self):
  59.         event = threading.Event()
  60.         self.startPipeServer(event)
  61.         open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
  62.         hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, 0, None)
  63.         win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
  64.         buffer = win32file.AllocateReadBuffer(1024)
  65.         (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', buffer, None)
  66.         self.failUnlessEqual(got, 'bar\x00foo')
  67.         event.wait(5)
  68.         self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
  69.  
  70.     
  71.     def testTransactNamedPipeAsync(self):
  72.         event = threading.Event()
  73.         overlapped = pywintypes.OVERLAPPED()
  74.         overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
  75.         self.startPipeServer(event, 0.5)
  76.         open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
  77.         hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, None)
  78.         win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
  79.         buffer = win32file.AllocateReadBuffer(1024)
  80.         (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', buffer, overlapped)
  81.         self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
  82.         nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
  83.         got = buffer[:nbytes]
  84.         self.failUnlessEqual(got, 'bar\x00foo')
  85.         event.wait(5)
  86.         self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
  87.  
  88.  
  89. if __name__ == '__main__':
  90.     unittest.main()
  91.  
  92.