home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / pyos2bin.zip / Lib / popen2.py < prev    next >
Text File  |  1997-09-29  |  2KB  |  100 lines

  1. import os
  2. import sys
  3. import string
  4.  
  5. MAXFD = 256    # Max number of file descriptors (os.getdtablesize()???)
  6.  
  7. _active = []
  8.  
  9. def _cleanup():
  10.     for inst in _active[:]:
  11.     inst.poll()
  12.  
  13. class Popen3:
  14.     def __init__(self, cmd, capturestderr=0, bufsize=-1):
  15.     if type(cmd) == type(''):
  16.         cmd = ['/bin/sh', '-c', cmd]
  17.     p2cread, p2cwrite = os.pipe()
  18.     c2pread, c2pwrite = os.pipe()
  19.     if capturestderr:
  20.         errout, errin = os.pipe()
  21.     self.pid = os.fork()
  22.     if self.pid == 0:
  23.         # Child
  24.         os.close(0)
  25.         os.close(1)
  26.         if os.dup(p2cread) <> 0:
  27.         sys.stderr.write('popen2: bad read dup\n')
  28.         if os.dup(c2pwrite) <> 1:
  29.         sys.stderr.write('popen2: bad write dup\n')
  30.         if capturestderr:
  31.         os.close(2)
  32.         if os.dup(errin) <> 2: pass
  33.         for i in range(3, MAXFD):
  34.         try:
  35.             os.close(i)
  36.         except: pass
  37.         try:
  38.         os.execvp(cmd[0], cmd)
  39.         finally:
  40.         os._exit(1)
  41.         # Shouldn't come here, I guess
  42.         os._exit(1)
  43.     os.close(p2cread)
  44.     self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
  45.     os.close(c2pwrite)
  46.     self.fromchild = os.fdopen(c2pread, 'r', bufsize)
  47.     if capturestderr:
  48.         os.close(errin)
  49.         self.childerr = os.fdopen(errout, 'r', bufsize)
  50.     else:
  51.         self.childerr = None
  52.     self.sts = -1 # Child not completed yet
  53.     _active.append(self)
  54.     def poll(self):
  55.     if self.sts < 0:
  56.         try:
  57.         pid, sts = os.waitpid(self.pid, os.WNOHANG)
  58.         if pid == self.pid:
  59.             self.sts = sts
  60.             _active.remove(self)
  61.         except os.error:
  62.         pass
  63.     return self.sts
  64.     def wait(self):
  65.     pid, sts = os.waitpid(self.pid, 0)
  66.     if pid == self.pid:
  67.         self.sts = sts
  68.         _active.remove(self)
  69.     return self.sts
  70.  
  71. def popen2(cmd, bufsize=-1):
  72.     _cleanup()
  73.     inst = Popen3(cmd, 0, bufsize)
  74.     return inst.fromchild, inst.tochild
  75.  
  76. def popen3(cmd, bufsize=-1):
  77.     _cleanup()
  78.     inst = Popen3(cmd, 1, bufsize)
  79.     return inst.fromchild, inst.tochild, inst.childerr
  80.  
  81. def _test():
  82.     teststr = "abc\n"
  83.     print "testing popen2..."
  84.     r, w = popen2('cat')
  85.     w.write(teststr)
  86.     w.close()
  87.     assert r.read() == teststr
  88.     print "testing popen3..."
  89.     r, w, e = popen3(['cat'])
  90.     w.write(teststr)
  91.     w.close()
  92.     assert r.read() == teststr
  93.     assert e.read() == ""
  94.     _cleanup()
  95.     assert not _active
  96.     print "All OK"
  97.  
  98. if __name__ == '__main__':
  99.     _test()
  100.