1"""Implements (a subset of) Sun XDR -- eXternal Data Representation. 2 3See: RFC 1014 4 5""" 6 7import struct 8from io import BytesIO 9from functools import wraps 10import warnings 11 12warnings._deprecated(__name__, remove=(3, 13)) 13 14__all__ = ["Error", "Packer", "Unpacker", "ConversionError"] 15 16# exceptions 17class Error(Exception): 18 """Exception class for this module. Use: 19 20 except xdrlib.Error as var: 21 # var has the Error instance for the exception 22 23 Public ivars: 24 msg -- contains the message 25 26 """ 27 def __init__(self, msg): 28 self.msg = msg 29 def __repr__(self): 30 return repr(self.msg) 31 def __str__(self): 32 return str(self.msg) 33 34 35class ConversionError(Error): 36 pass 37 38def raise_conversion_error(function): 39 """ Wrap any raised struct.errors in a ConversionError. """ 40 41 @wraps(function) 42 def result(self, value): 43 try: 44 return function(self, value) 45 except struct.error as e: 46 raise ConversionError(e.args[0]) from None 47 return result 48 49 50class Packer: 51 """Pack various data representations into a buffer.""" 52 53 def __init__(self): 54 self.reset() 55 56 def reset(self): 57 self.__buf = BytesIO() 58 59 def get_buffer(self): 60 return self.__buf.getvalue() 61 # backwards compatibility 62 get_buf = get_buffer 63 64 @raise_conversion_error 65 def pack_uint(self, x): 66 self.__buf.write(struct.pack('>L', x)) 67 68 @raise_conversion_error 69 def pack_int(self, x): 70 self.__buf.write(struct.pack('>l', x)) 71 72 pack_enum = pack_int 73 74 def pack_bool(self, x): 75 if x: self.__buf.write(b'\0\0\0\1') 76 else: self.__buf.write(b'\0\0\0\0') 77 78 def pack_uhyper(self, x): 79 try: 80 self.pack_uint(x>>32 & 0xffffffff) 81 except (TypeError, struct.error) as e: 82 raise ConversionError(e.args[0]) from None 83 try: 84 self.pack_uint(x & 0xffffffff) 85 except (TypeError, struct.error) as e: 86 raise ConversionError(e.args[0]) from None 87 88 pack_hyper = pack_uhyper 89 90 @raise_conversion_error 91 def pack_float(self, x): 92 self.__buf.write(struct.pack('>f', x)) 93 94 @raise_conversion_error 95 def pack_double(self, x): 96 self.__buf.write(struct.pack('>d', x)) 97 98 def pack_fstring(self, n, s): 99 if n < 0: 100 raise ValueError('fstring size must be nonnegative') 101 data = s[:n] 102 n = ((n+3)//4)*4 103 data = data + (n - len(data)) * b'\0' 104 self.__buf.write(data) 105 106 pack_fopaque = pack_fstring 107 108 def pack_string(self, s): 109 n = len(s) 110 self.pack_uint(n) 111 self.pack_fstring(n, s) 112 113 pack_opaque = pack_string 114 pack_bytes = pack_string 115 116 def pack_list(self, list, pack_item): 117 for item in list: 118 self.pack_uint(1) 119 pack_item(item) 120 self.pack_uint(0) 121 122 def pack_farray(self, n, list, pack_item): 123 if len(list) != n: 124 raise ValueError('wrong array size') 125 for item in list: 126 pack_item(item) 127 128 def pack_array(self, list, pack_item): 129 n = len(list) 130 self.pack_uint(n) 131 self.pack_farray(n, list, pack_item) 132 133 134 135class Unpacker: 136 """Unpacks various data representations from the given buffer.""" 137 138 def __init__(self, data): 139 self.reset(data) 140 141 def reset(self, data): 142 self.__buf = data 143 self.__pos = 0 144 145 def get_position(self): 146 return self.__pos 147 148 def set_position(self, position): 149 self.__pos = position 150 151 def get_buffer(self): 152 return self.__buf 153 154 def done(self): 155 if self.__pos < len(self.__buf): 156 raise Error('unextracted data remains') 157 158 def unpack_uint(self): 159 i = self.__pos 160 self.__pos = j = i+4 161 data = self.__buf[i:j] 162 if len(data) < 4: 163 raise EOFError 164 return struct.unpack('>L', data)[0] 165 166 def unpack_int(self): 167 i = self.__pos 168 self.__pos = j = i+4 169 data = self.__buf[i:j] 170 if len(data) < 4: 171 raise EOFError 172 return struct.unpack('>l', data)[0] 173 174 unpack_enum = unpack_int 175 176 def unpack_bool(self): 177 return bool(self.unpack_int()) 178 179 def unpack_uhyper(self): 180 hi = self.unpack_uint() 181 lo = self.unpack_uint() 182 return int(hi)<<32 | lo 183 184 def unpack_hyper(self): 185 x = self.unpack_uhyper() 186 if x >= 0x8000000000000000: 187 x = x - 0x10000000000000000 188 return x 189 190 def unpack_float(self): 191 i = self.__pos 192 self.__pos = j = i+4 193 data = self.__buf[i:j] 194 if len(data) < 4: 195 raise EOFError 196 return struct.unpack('>f', data)[0] 197 198 def unpack_double(self): 199 i = self.__pos 200 self.__pos = j = i+8 201 data = self.__buf[i:j] 202 if len(data) < 8: 203 raise EOFError 204 return struct.unpack('>d', data)[0] 205 206 def unpack_fstring(self, n): 207 if n < 0: 208 raise ValueError('fstring size must be nonnegative') 209 i = self.__pos 210 j = i + (n+3)//4*4 211 if j > len(self.__buf): 212 raise EOFError 213 self.__pos = j 214 return self.__buf[i:i+n] 215 216 unpack_fopaque = unpack_fstring 217 218 def unpack_string(self): 219 n = self.unpack_uint() 220 return self.unpack_fstring(n) 221 222 unpack_opaque = unpack_string 223 unpack_bytes = unpack_string 224 225 def unpack_list(self, unpack_item): 226 list = [] 227 while 1: 228 x = self.unpack_uint() 229 if x == 0: break 230 if x != 1: 231 raise ConversionError('0 or 1 expected, got %r' % (x,)) 232 item = unpack_item() 233 list.append(item) 234 return list 235 236 def unpack_farray(self, n, unpack_item): 237 list = [] 238 for i in range(n): 239 list.append(unpack_item()) 240 return list 241 242 def unpack_array(self, unpack_item): 243 n = self.unpack_uint() 244 return self.unpack_farray(n, unpack_item) 245