view inbus.py @ 7:7bf7acf4225e
Added bool_unpacker.
| author |
Eric Hopper <hopper@omnifarious.org> |
| date |
Tue Feb 26 10:10:55 2008 -0800 (16 months ago) |
| parents |
59a9ca55098c |
| children |
|
line source
1 from struct import pack as _pack 2 from struct import unpack as _unpack 3 from functools import partial as _partial 5 def unpack(s, numtypes=1): 6 up, data_offset = create_unpacker(s, numtypes) 7 result = up(([], s, data_offset)) 8 return tuple(result[0]) 10 def unpack_off(s, numtypes=1): 11 up, data_offset = create_unpacker(s, numtypes) 12 result = up(([], s, data_offset)) 13 return tuple(result[0]), result[2] 15 def create_unpacker(s, numtypes=1): 17 numtypes = int(numtypes) 18 except (ValueError, TypeError): 19 raise TypeError("numtypes must be an integer") 21 raise TypeError("numtypes must be > 0") 22 if not isinstance(s, (str, buffer)): 23 raise TypeError("s must be a string or buffer") 24 def nullunpacker(dectup): 26 def composeunpacker(f1, f2): 30 unpacker = nullunpacker 32 for i in xrange(0, numtypes): 36 unpacker = composeunpacker(count_unpacker, unpacker) 38 if (len(s) > offset) and (s[offset] == '<'): 39 raise NotImplementedError("Length modifiers not yet supported for multi-precision integer ('m') type.") 40 unpacker = composeunpacker(multi_int_unpacker, unpacker) 42 if (len(s) > offset) and (s[offset] == '<'): 43 raise NotImplementedError("Length modifiers not yet supported for unsigned multi-precision integer ('w') type.") 44 unpacker = composeunpacker(multi_uint_unpacker, unpacker) 46 unpacker = composeunpacker(bool_unpacker, unpacker) 48 if (len(s) > offset) and (s[offset] == '<'): 49 raise NotImplementedError("Length modifiers not yet supported for binary blob ('b') type.") 50 unpacker = composeunpacker(blob_unpacker, unpacker) 52 if (len(s) > offset) and (s[offset] == '<'): 53 raise NotImplementedError("Length modifiers not yet supported for string ('s') type.") 54 unpacker = composeunpacker(string_unpacker, unpacker) 56 unpacker = composeunpacker(byte_unpacker, unpacker) 58 unpacker = composeunpacker(ubyte_unpacker, unpacker) 60 unpacker = composeunpacker(short_unpacker, unpacker) 62 unpacker = composeunpacker(ushort_unpacker, unpacker) 64 unpacker = composeunpacker(int_unpacker, unpacker) 66 unpacker = composeunpacker(uint_unpacker, unpacker) 68 unpacker = composeunpacker(long_unpacker, unpacker) 70 unpacker = composeunpacker(ulong_unpacker, unpacker) 72 unpacker = composeunpacker(double_unpacker, unpacker) 74 unpacker = composeunpacker(float_unpacker, unpacker) 76 raise NotImplementedError("The arbitrary size floating point ('g') type is not yet supported.") 78 unpacker = composeunpacker(variant_unpacker, unpacker) 79 elif tag in ['t', 'a', 'd']: 81 subunpacker, suboffset = make_tuple_unpacker(buffer(s, offset)) 83 subunpacker, suboffset = make_array_unpacker(buffer(s, offset)) 85 subunpacker, suboffset = make_dict_unpacker(buffer(s, offset)) 87 assert False, "Getting here should be impossible." 89 unpacker = composeunpacker(subunpacker, unpacker) 91 raise ValueError("Unknown type tag '%s' encountered." % (tag,)) 92 return unpacker, offset 94 def count_unpacker(dectup): 95 val, newoffset = unpackcount_off(dectup[1], dectup[2]) 97 return dectup[0], dectup[1], newoffset 99 def multi_int_unpacker(dectup): 100 result, s, offset = dectup 101 mlen, newoffset = unpackcount_off(dectup[1], dectup[2]) 102 if (newoffset + mlen) > len(s): 103 raise ValueError("The string passed in is too short to unpack.") 106 return (result, s, newoffset) 108 mint = unpackl(buffer(s, newoffset, mlen)) 109 if ord(s[newoffset]) & 0x80: 110 mint -= 2**(mlen * 8) 112 return (result, s, newoffset + mlen) 114 def multi_uint_unpacker(dectup): 115 result, s, offset = dectup 116 mlen, newoffset = unpackcount_off(dectup[1], dectup[2]) 117 if (newoffset + mlen) > len(s): 118 raise ValueError("The string passed in is too short to unpack.") 121 return (result, s, newoffset) 122 mint = unpackl(buffer(s, newoffset, mlen)) 124 return (result, s, newoffset + mlen) 126 def blob_unpacker(dectup): 127 result, s, offset = dectup 128 blen, newoffset = unpackcount_off(dectup[1], dectup[2]) 129 if (newoffset + blen) > len(s): 130 raise ValueError("The string passed in is too short to unpack.") 133 return result, s, newoffset 134 result.append(buffer(s, newoffset, blen)) 135 return result, s, newoffset + blen 137 def string_unpacker(dectup): 138 result, s, offset = dectup 139 slen, newoffset = unpackcount_off(dectup[1], dectup[2]) 140 if (newoffset + slen) > len(s): 141 raise ValueError("The string passed in is too short to unpack.") 144 return result, s, newoffset 145 result.append(unicode(buffer(s, newoffset, slen), 'utf-8')) 146 return result, s, newoffset + slen 148 def bool_unpacker(dectup): 149 result, s, offset = dectup 151 raise ValueError("The string passed in is too short for type spec.") 152 result.append(True if ord(s[offset]) != 0 else False) 153 return result, s, offset + 1 155 def pystruct_unpacker(spec, len, dectup): 156 result, s, offset = dectup 157 (upresult,) = _unpack(spec, buffer(s, offset, len)) 158 result.append(upresult) 159 return result, s, offset + len 161 byte_unpacker = _partial(pystruct_unpacker, '>b', 1) 162 ubyte_unpacker = _partial(pystruct_unpacker, '>b', 1) 163 short_unpacker = _partial(pystruct_unpacker, '>h', 2) 164 ushort_unpacker = _partial(pystruct_unpacker, '>H', 2) 165 int_unpacker = _partial(pystruct_unpacker, '>i', 4) 166 uint_unpacker = _partial(pystruct_unpacker, '>I', 4) 167 long_unpacker = _partial(pystruct_unpacker, '>q', 8) 168 ulong_unpacker = _partial(pystruct_unpacker, '>Q', 8) 169 float_unpacker = _partial(pystruct_unpacker, '>f', 4) 170 double_unpacker = _partial(pystruct_unpacker, '>d', 8) 172 def variant_unpacker(dectup): 173 result, s, offset = dectup 174 val, uplen = unpack_off(buffer(s, offset), 1) 177 return result, s, offset + uplen 179 def make_tuple_unpacker(s): 181 raise ValueError("Tuple type tag ('t') must be followed by '('") 183 def nullunpacker(dectup): 185 def composeunpacker(f1, f2): 187 return f1(f2(dectup)) 189 def tuplefinishunpacker(dectup): 190 return [tuple(dectup[0])], dectup[1], dectup[2] 191 unpacker = nullunpacker 192 while (offset < len(s)) and (s[offset] != ')'): 193 elunpacker, consumed = create_unpacker(buffer(s, offset)) 195 unpacker = composeunpacker(elunpacker, unpacker) 197 raise ValueError("Ran off the end of the string to unpack while parsing tuple") 198 return composeunpacker(tuplefinishunpacker, unpacker), offset + 1 200 def make_array_unpacker(s): 202 raise ValueError("Array type tag ('a') must be followed by '['") 204 elunpacker, consumed = create_unpacker(buffer(s, offset)) 206 if (offset >= len(s)) or (s[offset] != ']'): 207 raise ValueError("Array type tag must end with ']' after one type tag.") 208 def unpack_array(dectup): 209 result, s, offset = dectup 212 raise ValueError("End of string while parsing array.") 213 while s[offset] != '\0': 214 out, s, offset = elunpacker(([], s, offset + 1)) 216 raise ValueError("End of string while parsing array.") 219 return result, s, offset + 1 220 return unpack_array, offset + 1 222 def make_dict_unpacker(s): 224 raise ValueError("Dictionary type tag ('d') must be followed by '{'") 226 keyunpacker, consumed = create_unpacker(buffer(s, offset)) 229 raise ValueError("Dictionary type tags must contain two subtypes.") 230 valunpacker, consumed = create_unpacker(buffer(s, offset)) 232 if (offset >= len(s)) or (s[offset] != '}'): 233 print "offset = %r / consumed = %r / s = %r / len(s) = %r" % \
234 (offset, consumed, s, len(s)) 235 raise ValueError("Dictionary type tag must end with '}' after two type tag.") 236 def unpack_dict(dectup): 237 result, s, offset = dectup 240 raise ValueError("End of string while parsing dictionary.") 241 while s[offset] != '\0': 242 out, s, offset = valunpacker(keyunpacker(([], s, offset + 1))) 244 raise ValueError("End of string while parsing dictionary.") 245 dictionary[out[0]] = out[1] 246 result.append(dictionary) 247 return result, s, offset + 1 248 return unpack_dict, offset + 1 250 def packl(lnum, pad = 1): 252 raise RangeError("Cannot use packl to convert a negative integer " 257 l.append(lnum & 0xffffffffffffffffL) 263 lens = 8 * count % pad 264 pad = ((lens != 0) and (pad - lens)) or 0 265 l.append('>' + 'x' * pad + 'Q' * count) 269 l.append('>' + 'Q' * count) 271 s = _pack(*l).lstrip('\0') 273 if (lens % pad) != 0: 274 return '\0' * (pad - lens % pad) + s 284 upfmt = '>' + 'Q' * count8 + 'B' * count1 285 l = _unpack(upfmt, s) 286 for val in l[0:count8]: 289 for val in l[count8:]: 294 def packcount(count): 295 """packcount(non-negative integer) -> string representing the count 297 The count cannot be less than 0. The encoding is designed so that if 298 the count is a message length, a very small percentage of the total 299 message will be consumed by the count. 301 It is suggested that the message length itself not be counted as part 302 of the message length. This means that counts of 0 should be 303 sensible, and indicate a message with no content. 305 This function should NOT be used for values that are really arbitrary 306 sized integers. The maximum sized integer that this function can 307 represent is 4080 bits. Public key values will most likely exceed 308 this. To store a public key (or other arbitrary sized integer) value, 309 encode that value in some other way (perhaps using packl), then use 310 this function to encode the length of that encoding.""" 312 raise RangeError("A count cannot be negative!") 315 elif count < (223 + 8192): 320 return chr(upper + 223) + chr(lower) 323 assert len(s) % 2 == 0 324 countlen = len(s) // 2 326 raise RangeError("A count must take fewer than 4080 bits to " 328 return chr(255) + chr(countlen) + s 330 def unpackcount_off(s, off = 0): 331 """unpackcount(a string, offset in string of something created by packcount) 333 the tuple is (count, offset of byte after count). 335 See the documentation for packcount for a little more of an 336 explanation. A ValueError exception will be thrown for strings that 337 don't start with a valid count value.""" 342 elif fc < (223 + 32): 344 if 2 > (len(s) - off): 345 raise ValueError("The passed in string is too short, and isn't " 347 upper_5bits = fc - 223 348 lower_8bits = ord(s[off + 1]) 349 count = ((upper_5bits << 8) | lower_8bits) + 223 350 return (count, off + 2) 352 # Variable length count, length in second octet 353 if 2 > (len(s) - off): 354 raise ValueError("The passed in string is too short, and isn't " 356 length = ord(s[off + 1]) * 2 358 raise ValueError("The passed in string isn't a valid count!") 359 # Skip past length of count itself 361 # Enough octets for rest of count? 362 if length > (len(s) - off): 363 raise ValueError("The passed in string is too short, and isn't " 365 # Use the handy unpackl function to turn the octet string into a number 366 count = unpackl(s[off:(off + length)]) 367 # If count fits into an int, turn it into an int 370 return (count, off + length) 373 """unpackcount(a string beginning with something created by packcount) -> a tuple 374 the tuple is (count, remainder of s). 376 See the documentation for packcount for a little more of an 377 explanation. A ValueError exception will be thrown for strings that 378 don't start with a valid count value.""" 379 (count, newoff) = unpackcount_off(s, 0) 380 return (count, s[newoff:])