home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import unittest
- import time
- import threading
- import win32pipe
- import win32file
- import win32event
- import pywintypes
- import winerror
- import win32con
-
- class PipeTests(unittest.TestCase):
- pipename = '\\\\.\\pipe\\python_test_pipe'
-
- def _serverThread(self, pipe_handle, event, wait_time):
- hr = win32pipe.ConnectNamedPipe(pipe_handle)
- self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), 'Got error code 0x%x' % (hr,))
- (hr, got) = win32file.ReadFile(pipe_handle, 100)
- self.failUnless(got == 'foo\x00bar')
- time.sleep(wait_time)
- win32file.WriteFile(pipe_handle, 'bar\x00foo')
- pipe_handle.Close()
- event.set()
-
-
- def startPipeServer(self, event, wait_time = 0):
- openMode = win32pipe.PIPE_ACCESS_DUPLEX
- pipeMode = win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_WAIT
- sa = pywintypes.SECURITY_ATTRIBUTES()
- sa.SetSecurityDescriptorDacl(1, None, 0)
- pipe_handle = win32pipe.CreateNamedPipe(self.pipename, openMode, pipeMode, win32pipe.PIPE_UNLIMITED_INSTANCES, 0, 0, 2000, sa)
- threading.Thread(target = self._serverThread, args = (pipe_handle, event, wait_time)).start()
-
-
- def testCallNamedPipe(self):
- event = threading.Event()
- self.startPipeServer(event)
- got = win32pipe.CallNamedPipe(self.pipename, 'foo\x00bar', 1024, win32pipe.NMPWAIT_WAIT_FOREVER)
- self.failUnlessEqual(got, 'bar\x00foo')
- event.wait(5)
- self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
-
-
- def testTransactNamedPipeBlocking(self):
- event = threading.Event()
- self.startPipeServer(event)
- open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
- hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, 0, None)
- win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
- (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', 1024, None)
- self.failUnlessEqual(got, 'bar\x00foo')
- event.wait(5)
- self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
-
-
- def testTransactNamedPipeBlockingBuffer(self):
- event = threading.Event()
- self.startPipeServer(event)
- open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
- hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, 0, None)
- win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
- buffer = win32file.AllocateReadBuffer(1024)
- (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', buffer, None)
- self.failUnlessEqual(got, 'bar\x00foo')
- event.wait(5)
- self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
-
-
- def testTransactNamedPipeAsync(self):
- event = threading.Event()
- overlapped = pywintypes.OVERLAPPED()
- overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
- self.startPipeServer(event, 0.5)
- open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
- hpipe = win32file.CreateFile(self.pipename, open_mode, 0, None, win32con.OPEN_EXISTING, win32con.FILE_FLAG_OVERLAPPED, None)
- win32pipe.SetNamedPipeHandleState(hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
- buffer = win32file.AllocateReadBuffer(1024)
- (hr, got) = win32pipe.TransactNamedPipe(hpipe, 'foo\x00bar', buffer, overlapped)
- self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
- nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
- got = buffer[:nbytes]
- self.failUnlessEqual(got, 'bar\x00foo')
- event.wait(5)
- self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
-
-
- if __name__ == '__main__':
- unittest.main()
-
-