home *** CD-ROM | disk | FTP | other *** search
/ PC World 2002 May / PCWorld_2002-05_cd.bin / Software / TemaCD / activepython / ActivePython-2.1.1.msi / Python21_Lib_xdrlib.py < prev    next >
Encoding:
Python Source  |  2001-07-26  |  7.4 KB  |  282 lines

  1. """Implements (a subset of) Sun XDR -- eXternal Data Representation.
  2.  
  3. See: RFC 1014
  4.  
  5. """
  6.  
  7. import struct
  8.  
  9. __all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
  10.  
  11. # exceptions
  12. class Error:
  13.     """Exception class for this module. Use:
  14.  
  15.     except xdrlib.Error, var:
  16.         # var has the Error instance for the exception
  17.  
  18.     Public ivars:
  19.         msg -- contains the message
  20.  
  21.     """
  22.     def __init__(self, msg):
  23.         self.msg = msg
  24.     def __repr__(self):
  25.         return repr(self.msg)
  26.     def __str__(self):
  27.         return str(self.msg)
  28.  
  29.  
  30. class ConversionError(Error):
  31.     pass
  32.  
  33.  
  34.  
  35. class Packer:
  36.     """Pack various data representations into a buffer."""
  37.  
  38.     def __init__(self):
  39.         self.reset()
  40.  
  41.     def reset(self):
  42.         self.__buf = ''
  43.  
  44.     def get_buffer(self):
  45.         return self.__buf
  46.     # backwards compatibility
  47.     get_buf = get_buffer
  48.  
  49.     def pack_uint(self, x):
  50.         self.__buf = self.__buf + struct.pack('>L', x)
  51.  
  52.     pack_int = pack_uint
  53.     pack_enum = pack_int
  54.  
  55.     def pack_bool(self, x):
  56.         if x: self.__buf = self.__buf + '\0\0\0\1'
  57.         else: self.__buf = self.__buf + '\0\0\0\0'
  58.  
  59.     def pack_uhyper(self, x):
  60.         self.pack_uint(x>>32 & 0xffffffffL)
  61.         self.pack_uint(x & 0xffffffffL)
  62.  
  63.     pack_hyper = pack_uhyper
  64.  
  65.     def pack_float(self, x):
  66.         try: self.__buf = self.__buf + struct.pack('>f', x)
  67.         except struct.error, msg:
  68.             raise ConversionError, msg
  69.  
  70.     def pack_double(self, x):
  71.         try: self.__buf = self.__buf + struct.pack('>d', x)
  72.         except struct.error, msg:
  73.             raise ConversionError, msg
  74.  
  75.     def pack_fstring(self, n, s):
  76.         if n < 0:
  77.             raise ValueError, 'fstring size must be nonnegative'
  78.         n = ((n+3)/4)*4
  79.         data = s[:n]
  80.         data = data + (n - len(data)) * '\0'
  81.         self.__buf = self.__buf + data
  82.  
  83.     pack_fopaque = pack_fstring
  84.  
  85.     def pack_string(self, s):
  86.         n = len(s)
  87.         self.pack_uint(n)
  88.         self.pack_fstring(n, s)
  89.  
  90.     pack_opaque = pack_string
  91.     pack_bytes = pack_string
  92.  
  93.     def pack_list(self, list, pack_item):
  94.         for item in list:
  95.             self.pack_uint(1)
  96.             pack_item(item)
  97.         self.pack_uint(0)
  98.  
  99.     def pack_farray(self, n, list, pack_item):
  100.         if len(list) != n:
  101.             raise ValueError, 'wrong array size'
  102.         for item in list:
  103.             pack_item(item)
  104.  
  105.     def pack_array(self, list, pack_item):
  106.         n = len(list)
  107.         self.pack_uint(n)
  108.         self.pack_farray(n, list, pack_item)
  109.  
  110.  
  111.  
  112. class Unpacker:
  113.     """Unpacks various data representations from the given buffer."""
  114.  
  115.     def __init__(self, data):
  116.         self.reset(data)
  117.  
  118.     def reset(self, data):
  119.         self.__buf = data
  120.         self.__pos = 0
  121.  
  122.     def get_position(self):
  123.         return self.__pos
  124.  
  125.     def set_position(self, position):
  126.         self.__pos = position
  127.  
  128.     def get_buffer(self):
  129.         return self.__buf
  130.  
  131.     def done(self):
  132.         if self.__pos < len(self.__buf):
  133.             raise Error('unextracted data remains')
  134.  
  135.     def unpack_uint(self):
  136.         i = self.__pos
  137.         self.__pos = j = i+4
  138.         data = self.__buf[i:j]
  139.         if len(data) < 4:
  140.             raise EOFError
  141.         x = struct.unpack('>L', data)[0]
  142.         try:
  143.             return int(x)
  144.         except OverflowError:
  145.             return x
  146.  
  147.     def unpack_int(self):
  148.         i = self.__pos
  149.         self.__pos = j = i+4
  150.         data = self.__buf[i:j]
  151.         if len(data) < 4:
  152.             raise EOFError
  153.         return struct.unpack('>l', data)[0]
  154.  
  155.     unpack_enum = unpack_int
  156.     unpack_bool = unpack_int
  157.  
  158.     def unpack_uhyper(self):
  159.         hi = self.unpack_uint()
  160.         lo = self.unpack_uint()
  161.         return long(hi)<<32 | lo
  162.  
  163.     def unpack_hyper(self):
  164.         x = self.unpack_uhyper()
  165.         if x >= 0x8000000000000000L:
  166.             x = x - 0x10000000000000000L
  167.         return x
  168.  
  169.     def unpack_float(self):
  170.         i = self.__pos
  171.         self.__pos = j = i+4
  172.         data = self.__buf[i:j]
  173.         if len(data) < 4:
  174.             raise EOFError
  175.         return struct.unpack('>f', data)[0]
  176.  
  177.     def unpack_double(self):
  178.         i = self.__pos
  179.         self.__pos = j = i+8
  180.         data = self.__buf[i:j]
  181.         if len(data) < 8:
  182.             raise EOFError
  183.         return struct.unpack('>d', data)[0]
  184.  
  185.     def unpack_fstring(self, n):
  186.         if n < 0:
  187.             raise ValueError, 'fstring size must be nonnegative'
  188.         i = self.__pos
  189.         j = i + (n+3)/4*4
  190.         if j > len(self.__buf):
  191.             raise EOFError
  192.         self.__pos = j
  193.         return self.__buf[i:i+n]
  194.  
  195.     unpack_fopaque = unpack_fstring
  196.  
  197.     def unpack_string(self):
  198.         n = self.unpack_uint()
  199.         return self.unpack_fstring(n)
  200.  
  201.     unpack_opaque = unpack_string
  202.     unpack_bytes = unpack_string
  203.  
  204.     def unpack_list(self, unpack_item):
  205.         list = []
  206.         while 1:
  207.             x = self.unpack_uint()
  208.             if x == 0: break
  209.             if x != 1:
  210.                 raise ConversionError, '0 or 1 expected, got ' + `x`
  211.             item = unpack_item()
  212.             list.append(item)
  213.         return list
  214.  
  215.     def unpack_farray(self, n, unpack_item):
  216.         list = []
  217.         for i in range(n):
  218.             list.append(unpack_item())
  219.         return list
  220.  
  221.     def unpack_array(self, unpack_item):
  222.         n = self.unpack_uint()
  223.         return self.unpack_farray(n, unpack_item)
  224.  
  225.  
  226. # test suite
  227. def _test():
  228.     p = Packer()
  229.     packtest = [
  230.         (p.pack_uint,    (9,)),
  231.         (p.pack_bool,    (None,)),
  232.         (p.pack_bool,    ('hello',)),
  233.         (p.pack_uhyper,  (45L,)),
  234.         (p.pack_float,   (1.9,)),
  235.         (p.pack_double,  (1.9,)),
  236.         (p.pack_string,  ('hello world',)),
  237.         (p.pack_list,    (range(5), p.pack_uint)),
  238.         (p.pack_array,   (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
  239.         ]
  240.     succeedlist = [1] * len(packtest)
  241.     count = 0
  242.     for method, args in packtest:
  243.         print 'pack test', count,
  244.         try:
  245.             apply(method, args)
  246.             print 'succeeded'
  247.         except ConversionError, var:
  248.             print 'ConversionError:', var.msg
  249.             succeedlist[count] = 0
  250.         count = count + 1
  251.     data = p.get_buffer()
  252.     # now verify
  253.     up = Unpacker(data)
  254.     unpacktest = [
  255.         (up.unpack_uint,   (), lambda x: x == 9),
  256.         (up.unpack_bool,   (), lambda x: not x),
  257.         (up.unpack_bool,   (), lambda x: x),
  258.         (up.unpack_uhyper, (), lambda x: x == 45L),
  259.         (up.unpack_float,  (), lambda x: 1.89 < x < 1.91),
  260.         (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
  261.         (up.unpack_string, (), lambda x: x == 'hello world'),
  262.         (up.unpack_list,   (up.unpack_uint,), lambda x: x == range(5)),
  263.         (up.unpack_array,  (up.unpack_string,),
  264.          lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
  265.         ]
  266.     count = 0
  267.     for method, args, pred in unpacktest:
  268.         print 'unpack test', count,
  269.         try:
  270.             if succeedlist[count]:
  271.                 x = apply(method, args)
  272.                 print pred(x) and 'succeeded' or 'failed', ':', x
  273.             else:
  274.                 print 'skipping'
  275.         except ConversionError, var:
  276.             print 'ConversionError:', var.msg
  277.         count = count + 1
  278.  
  279.  
  280. if __name__ == '__main__':
  281.     _test()
  282.