inbus

annotate inbus.py @ 7:7bf7acf4225e

Added bool_unpacker.
author Eric Hopper <hopper@omnifarious.org>
date Tue, 26 Feb 2008 10:10:55 -0800
parents 59a9ca55098c
children
rev   line source
hopper@2 1 from struct import pack as _pack
hopper@2 2 from struct import unpack as _unpack
hopper@2 3 from functools import partial as _partial
hopper@2 4
hopper@2 5 def unpack(s, numtypes=1):
hopper@2 6 up, data_offset = create_unpacker(s, numtypes)
hopper@2 7 result = up(([], s, data_offset))
hopper@2 8 return tuple(result[0])
hopper@2 9
hopper@2 10 def unpack_off(s, numtypes=1):
hopper@2 11 up, data_offset = create_unpacker(s, numtypes)
hopper@2 12 result = up(([], s, data_offset))
hopper@2 13 return tuple(result[0]), result[2]
hopper@2 14
hopper@2 15 def create_unpacker(s, numtypes=1):
hopper@2 16 try:
hopper@2 17 numtypes = int(numtypes)
hopper@2 18 except (ValueError, TypeError):
hopper@2 19 raise TypeError("numtypes must be an integer")
hopper@2 20 if numtypes < 0:
hopper@2 21 raise TypeError("numtypes must be > 0")
hopper@2 22 if not isinstance(s, (str, buffer)):
hopper@2 23 raise TypeError("s must be a string or buffer")
hopper@2 24 def nullunpacker(dectup):
hopper@2 25 return dectup
hopper@2 26 def composeunpacker(f1, f2):
hopper@2 27 def compose(dectup):
hopper@2 28 return f1(f2(dectup))
hopper@2 29 return compose
hopper@2 30 unpacker = nullunpacker
hopper@2 31 offset = 0
hopper@2 32 for i in xrange(0, numtypes):
hopper@2 33 tag = s[offset]
hopper@2 34 offset += 1
hopper@2 35 if tag == 'c':
hopper@2 36 unpacker = composeunpacker(count_unpacker, unpacker)
hopper@2 37 elif tag == 'm':
hopper@2 38 if (len(s) > offset) and (s[offset] == '<'):
hopper@2 39 raise NotImplementedError("Length modifiers not yet supported for multi-precision integer ('m') type.")
hopper@2 40 unpacker = composeunpacker(multi_int_unpacker, unpacker)
hopper@2 41 elif tag == 'w':
hopper@2 42 if (len(s) > offset) and (s[offset] == '<'):
hopper@2 43 raise NotImplementedError("Length modifiers not yet supported for unsigned multi-precision integer ('w') type.")
hopper@2 44 unpacker = composeunpacker(multi_uint_unpacker, unpacker)
hopper@2 45 elif tag == 'y':
hopper@2 46 unpacker = composeunpacker(bool_unpacker, unpacker)
hopper@2 47 elif tag == 'b':
hopper@2 48 if (len(s) > offset) and (s[offset] == '<'):
hopper@2 49 raise NotImplementedError("Length modifiers not yet supported for binary blob ('b') type.")
hopper@2 50 unpacker = composeunpacker(blob_unpacker, unpacker)
hopper@2 51 elif tag == 's':
hopper@2 52 if (len(s) > offset) and (s[offset] == '<'):
hopper@2 53 raise NotImplementedError("Length modifiers not yet supported for string ('s') type.")
hopper@2 54 unpacker = composeunpacker(string_unpacker, unpacker)
hopper@2 55 elif tag == 'k':
hopper@2 56 unpacker = composeunpacker(byte_unpacker, unpacker)
hopper@2 57 elif tag == 'p':
hopper@2 58 unpacker = composeunpacker(ubyte_unpacker, unpacker)
hopper@2 59 elif tag == 'j':
hopper@2 60 unpacker = composeunpacker(short_unpacker, unpacker)
hopper@2 61 elif tag == 'o':
hopper@2 62 unpacker = composeunpacker(ushort_unpacker, unpacker)
hopper@2 63 elif tag == 'i':
hopper@2 64 unpacker = composeunpacker(int_unpacker, unpacker)
hopper@2 65 elif tag == 'u':
hopper@2 66 unpacker = composeunpacker(uint_unpacker, unpacker)
hopper@2 67 elif tag == 'l':
hopper@2 68 unpacker = composeunpacker(long_unpacker, unpacker)
hopper@2 69 elif tag == 'n':
hopper@2 70 unpacker = composeunpacker(ulong_unpacker, unpacker)
hopper@2 71 elif tag == 'f':
hopper@2 72 unpacker = composeunpacker(double_unpacker, unpacker)
hopper@2 73 elif tag == 'h':
hopper@2 74 unpacker = composeunpacker(float_unpacker, unpacker)
hopper@2 75 elif tag == 'g':
hopper@2 76 raise NotImplementedError("The arbitrary size floating point ('g') type is not yet supported.")
hopper@2 77 elif tag == 'v':
hopper@2 78 unpacker = composeunpacker(variant_unpacker, unpacker)
hopper@2 79 elif tag in ['t', 'a', 'd']:
hopper@2 80 if tag == 't':
hopper@3 81 subunpacker, suboffset = make_tuple_unpacker(buffer(s, offset))
hopper@2 82 elif tag == 'a':
hopper@3 83 subunpacker, suboffset = make_array_unpacker(buffer(s, offset))
hopper@2 84 elif tag == 'd':
hopper@3 85 subunpacker, suboffset = make_dict_unpacker(buffer(s, offset))
hopper@2 86 else:
hopper@2 87 assert False, "Getting here should be impossible."
hopper@2 88 offset += suboffset
hopper@2 89 unpacker = composeunpacker(subunpacker, unpacker)
hopper@2 90 else:
hopper@2 91 raise ValueError("Unknown type tag '%s' encountered." % (tag,))
hopper@2 92 return unpacker, offset
hopper@2 93
hopper@2 94 def count_unpacker(dectup):
hopper@2 95 val, newoffset = unpackcount_off(dectup[1], dectup[2])
hopper@2 96 dectup[0].append(val)
hopper@2 97 return dectup[0], dectup[1], newoffset
hopper@2 98
hopper@2 99 def multi_int_unpacker(dectup):
hopper@2 100 result, s, offset = dectup
hopper@2 101 mlen, newoffset = unpackcount_off(dectup[1], dectup[2])
hopper@2 102 if (newoffset + mlen) > len(s):
hopper@2 103 raise ValueError("The string passed in is too short to unpack.")
hopper@2 104 if mlen <= 0:
hopper@2 105 result.append(0L)
hopper@2 106 return (result, s, newoffset)
hopper@2 107
hopper@2 108 mint = unpackl(buffer(s, newoffset, mlen))
hopper@2 109 if ord(s[newoffset]) & 0x80:
hopper@2 110 mint -= 2**(mlen * 8)
hopper@2 111 result.append(mint)
hopper@2 112 return (result, s, newoffset + mlen)
hopper@2 113
hopper@2 114 def multi_uint_unpacker(dectup):
hopper@2 115 result, s, offset = dectup
hopper@2 116 mlen, newoffset = unpackcount_off(dectup[1], dectup[2])
hopper@2 117 if (newoffset + mlen) > len(s):
hopper@2 118 raise ValueError("The string passed in is too short to unpack.")
hopper@2 119 if mlen <= 0:
hopper@2 120 result.append(0L)
hopper@2 121 return (result, s, newoffset)
hopper@2 122 mint = unpackl(buffer(s, newoffset, mlen))
hopper@2 123 result.append(mint)
hopper@2 124 return (result, s, newoffset + mlen)
hopper@2 125
hopper@2 126 def blob_unpacker(dectup):
hopper@2 127 result, s, offset = dectup
hopper@2 128 blen, newoffset = unpackcount_off(dectup[1], dectup[2])
hopper@2 129 if (newoffset + blen) > len(s):
hopper@2 130 raise ValueError("The string passed in is too short to unpack.")
hopper@2 131 if blen <= 0:
hopper@2 132 result.append('')
hopper@2 133 return result, s, newoffset
hopper@2 134 result.append(buffer(s, newoffset, blen))
hopper@2 135 return result, s, newoffset + blen
hopper@2 136
hopper@2 137 def string_unpacker(dectup):
hopper@2 138 result, s, offset = dectup
hopper@2 139 slen, newoffset = unpackcount_off(dectup[1], dectup[2])
hopper@2 140 if (newoffset + slen) > len(s):
hopper@2 141 raise ValueError("The string passed in is too short to unpack.")
hopper@2 142 if slen <= 0:
hopper@2 143 result.append('')
hopper@2 144 return result, s, newoffset
hopper@2 145 result.append(unicode(buffer(s, newoffset, slen), 'utf-8'))
hopper@2 146 return result, s, newoffset + slen
hopper@2 147
hopper@7 148 def bool_unpacker(dectup):
hopper@7 149 result, s, offset = dectup
hopper@7 150 if offset >= len(s):
hopper@7 151 raise ValueError("The string passed in is too short for type spec.")
hopper@7 152 result.append(True if ord(s[offset]) != 0 else False)
hopper@7 153 return result, s, offset + 1
hopper@7 154
hopper@6 155 def pystruct_unpacker(spec, len, dectup):
hopper@2 156 result, s, offset = dectup
hopper@2 157 (upresult,) = _unpack(spec, buffer(s, offset, len))
hopper@2 158 result.append(upresult)
hopper@2 159 return result, s, offset + len
hopper@2 160
hopper@6 161 byte_unpacker = _partial(pystruct_unpacker, '>b', 1)
hopper@6 162 ubyte_unpacker = _partial(pystruct_unpacker, '>b', 1)
hopper@6 163 short_unpacker = _partial(pystruct_unpacker, '>h', 2)
hopper@6 164 ushort_unpacker = _partial(pystruct_unpacker, '>H', 2)
hopper@6 165 int_unpacker = _partial(pystruct_unpacker, '>i', 4)
hopper@6 166 uint_unpacker = _partial(pystruct_unpacker, '>I', 4)
hopper@6 167 long_unpacker = _partial(pystruct_unpacker, '>q', 8)
hopper@6 168 ulong_unpacker = _partial(pystruct_unpacker, '>Q', 8)
hopper@6 169 float_unpacker = _partial(pystruct_unpacker, '>f', 4)
hopper@6 170 double_unpacker = _partial(pystruct_unpacker, '>d', 8)
hopper@2 171
hopper@2 172 def variant_unpacker(dectup):
hopper@2 173 result, s, offset = dectup
hopper@2 174 val, uplen = unpack_off(buffer(s, offset), 1)
hopper@2 175 (val,) = val
hopper@2 176 result.append(val)
hopper@2 177 return result, s, offset + uplen
hopper@2 178
hopper@3 179 def make_tuple_unpacker(s):
hopper@3 180 if s[0] != '(':
hopper@3 181 raise ValueError("Tuple type tag ('t') must be followed by '('")
hopper@3 182 offset = 1
hopper@3 183 def nullunpacker(dectup):
hopper@3 184 return dectup
hopper@3 185 def composeunpacker(f1, f2):
hopper@3 186 def compose(dectup):
hopper@3 187 return f1(f2(dectup))
hopper@3 188 return compose
hopper@3 189 def tuplefinishunpacker(dectup):
hopper@3 190 return [tuple(dectup[0])], dectup[1], dectup[2]
hopper@3 191 unpacker = nullunpacker
hopper@3 192 while (offset < len(s)) and (s[offset] != ')'):
hopper@3 193 elunpacker, consumed = create_unpacker(buffer(s, offset))
hopper@3 194 offset += consumed
hopper@3 195 unpacker = composeunpacker(elunpacker, unpacker)
hopper@3 196 if offset >= len(s):
hopper@3 197 raise ValueError("Ran off the end of the string to unpack while parsing tuple")
hopper@3 198 return composeunpacker(tuplefinishunpacker, unpacker), offset + 1
hopper@3 199
hopper@4 200 def make_array_unpacker(s):
hopper@4 201 if s[0] != '[':
hopper@4 202 raise ValueError("Array type tag ('a') must be followed by '['")
hopper@4 203 offset = 1
hopper@4 204 elunpacker, consumed = create_unpacker(buffer(s, offset))
hopper@4 205 offset += consumed
hopper@4 206 if (offset >= len(s)) or (s[offset] != ']'):
hopper@4 207 raise ValueError("Array type tag must end with ']' after one type tag.")
hopper@4 208 def unpack_array(dectup):
hopper@4 209 result, s, offset = dectup
hopper@4 210 array = []
hopper@4 211 if len(s) <= offset:
hopper@4 212 raise ValueError("End of string while parsing array.")
hopper@4 213 while s[offset] != '\0':
hopper@4 214 out, s, offset = elunpacker(([], s, offset + 1))
hopper@4 215 if len(s) <= offset:
hopper@4 216 raise ValueError("End of string while parsing array.")
hopper@4 217 array.extend(out)
hopper@4 218 result.append(array)
hopper@4 219 return result, s, offset + 1
hopper@4 220 return unpack_array, offset + 1
hopper@4 221
hopper@5 222 def make_dict_unpacker(s):
hopper@5 223 if s[0] != '{':
hopper@5 224 raise ValueError("Dictionary type tag ('d') must be followed by '{'")
hopper@5 225 offset = 1
hopper@5 226 keyunpacker, consumed = create_unpacker(buffer(s, offset))
hopper@5 227 offset += consumed
hopper@5 228 if offset >= len(s):
hopper@5 229 raise ValueError("Dictionary type tags must contain two subtypes.")
hopper@5 230 valunpacker, consumed = create_unpacker(buffer(s, offset))
hopper@5 231 offset += consumed
hopper@5 232 if (offset >= len(s)) or (s[offset] != '}'):
hopper@5 233 print "offset = %r / consumed = %r / s = %r / len(s) = %r" % \
hopper@5 234 (offset, consumed, s, len(s))
hopper@5 235 raise ValueError("Dictionary type tag must end with '}' after two type tag.")
hopper@5 236 def unpack_dict(dectup):
hopper@5 237 result, s, offset = dectup
hopper@5 238 dictionary = {}
hopper@5 239 if len(s) <= offset:
hopper@5 240 raise ValueError("End of string while parsing dictionary.")
hopper@5 241 while s[offset] != '\0':
hopper@5 242 out, s, offset = valunpacker(keyunpacker(([], s, offset + 1)))
hopper@5 243 if len(s) <= offset:
hopper@5 244 raise ValueError("End of string while parsing dictionary.")
hopper@5 245 dictionary[out[0]] = out[1]
hopper@5 246 result.append(dictionary)
hopper@5 247 return result, s, offset + 1
hopper@5 248 return unpack_dict, offset + 1
hopper@5 249
hopper@2 250 def packl(lnum, pad = 1):
hopper@2 251 if lnum < 0:
hopper@2 252 raise RangeError("Cannot use packl to convert a negative integer "
hopper@2 253 "to a string.")
hopper@2 254 count = 0
hopper@2 255 l = []
hopper@2 256 while lnum > 0:
hopper@2 257 l.append(lnum & 0xffffffffffffffffL)
hopper@2 258 count += 1
hopper@2 259 lnum >>= 64
hopper@2 260 if count <= 0:
hopper@2 261 return '\0' * pad
hopper@2 262 elif pad >= 8:
hopper@2 263 lens = 8 * count % pad
hopper@2 264 pad = ((lens != 0) and (pad - lens)) or 0
hopper@2 265 l.append('>' + 'x' * pad + 'Q' * count)
hopper@2 266 l.reverse()
hopper@2 267 return _pack(*l)
hopper@2 268 else:
hopper@2 269 l.append('>' + 'Q' * count)
hopper@2 270 l.reverse()
hopper@2 271 s = _pack(*l).lstrip('\0')
hopper@2 272 lens = len(s)
hopper@2 273 if (lens % pad) != 0:
hopper@2 274 return '\0' * (pad - lens % pad) + s
hopper@2 275 else:
hopper@2 276 return s
hopper@2 277
hopper@2 278 def unpackl(s):
hopper@2 279 n = 0L
hopper@2 280 if len(s) <= 0:
hopper@2 281 return n
hopper@2 282 count8 = len(s) // 8
hopper@2 283 count1 = len(s) % 8
hopper@2 284 upfmt = '>' + 'Q' * count8 + 'B' * count1
hopper@2 285 l = _unpack(upfmt, s)
hopper@2 286 for val in l[0:count8]:
hopper@2 287 n <<= 64
hopper@2 288 n |= val
hopper@2 289 for val in l[count8:]:
hopper@2 290 n <<= 8
hopper@2 291 n |= val
hopper@2 292 return n
hopper@2 293
hopper@2 294 def packcount(count):
hopper@2 295 """packcount(non-negative integer) -> string representing the count
hopper@2 296
hopper@2 297 The count cannot be less than 0. The encoding is designed so that if
hopper@2 298 the count is a message length, a very small percentage of the total
hopper@2 299 message will be consumed by the count.
hopper@2 300
hopper@2 301 It is suggested that the message length itself not be counted as part
hopper@2 302 of the message length. This means that counts of 0 should be
hopper@2 303 sensible, and indicate a message with no content.
hopper@2 304
hopper@2 305 This function should NOT be used for values that are really arbitrary
hopper@2 306 sized integers. The maximum sized integer that this function can
hopper@2 307 represent is 4080 bits. Public key values will most likely exceed
hopper@2 308 this. To store a public key (or other arbitrary sized integer) value,
hopper@2 309 encode that value in some other way (perhaps using packl), then use
hopper@2 310 this function to encode the length of that encoding."""
hopper@2 311 if count < 0:
hopper@2 312 raise RangeError("A count cannot be negative!")
hopper@2 313 if count < 223:
hopper@2 314 return chr(count)
hopper@2 315 elif count < (223 + 8192):
hopper@2 316 count -= 223
hopper@2 317 upper = count // 256
hopper@2 318 assert upper < 32
hopper@2 319 lower = count % 256
hopper@2 320 return chr(upper + 223) + chr(lower)
hopper@2 321 else:
hopper@2 322 s = packl(count, 2)
hopper@2 323 assert len(s) % 2 == 0
hopper@2 324 countlen = len(s) // 2
hopper@2 325 if countlen > 255:
hopper@2 326 raise RangeError("A count must take fewer than 4080 bits to "
hopper@2 327 "represent!")
hopper@2 328 return chr(255) + chr(countlen) + s
hopper@2 329
hopper@2 330 def unpackcount_off(s, off = 0):
hopper@2 331 """unpackcount(a string, offset in string of something created by packcount)
hopper@2 332 -> a tuple
hopper@2 333 the tuple is (count, offset of byte after count).
hopper@2 334
hopper@2 335 See the documentation for packcount for a little more of an
hopper@2 336 explanation. A ValueError exception will be thrown for strings that
hopper@2 337 don't start with a valid count value."""
hopper@2 338 fc = ord(s[off])
hopper@2 339 if fc < 223:
hopper@2 340 # One byte count
hopper@2 341 return (fc, off + 1)
hopper@2 342 elif fc < (223 + 32):
hopper@2 343 # Two byte count
hopper@2 344 if 2 > (len(s) - off):
hopper@2 345 raise ValueError("The passed in string is too short, and isn't "
hopper@2 346 "a valid count.")
hopper@2 347 upper_5bits = fc - 223
hopper@2 348 lower_8bits = ord(s[off + 1])
hopper@2 349 count = ((upper_5bits << 8) | lower_8bits) + 223
hopper@2 350 return (count, off + 2)
hopper@2 351 else:
hopper@2 352 # Variable length count, length in second octet
hopper@2 353 if 2 > (len(s) - off):
hopper@2 354 raise ValueError("The passed in string is too short, and isn't "
hopper@2 355 "a valid count.")
hopper@2 356 length = ord(s[off + 1]) * 2
hopper@2 357 if length <= 0:
hopper@2 358 raise ValueError("The passed in string isn't a valid count!")
hopper@2 359 # Skip past length of count itself
hopper@2 360 off += 2
hopper@2 361 # Enough octets for rest of count?
hopper@2 362 if length > (len(s) - off):
hopper@2 363 raise ValueError("The passed in string is too short, and isn't "
hopper@2 364 "a valid count.")
hopper@2 365 # Use the handy unpackl function to turn the octet string into a number
hopper@2 366 count = unpackl(s[off:(off + length)])
hopper@2 367 # If count fits into an int, turn it into an int
hopper@2 368 if count < 2**31:
hopper@2 369 count = int(count)
hopper@2 370 return (count, off + length)
hopper@2 371
hopper@2 372 def unpackcount(s):
hopper@2 373 """unpackcount(a string beginning with something created by packcount) -> a tuple
hopper@2 374 the tuple is (count, remainder of s).
hopper@2 375
hopper@2 376 See the documentation for packcount for a little more of an
hopper@2 377 explanation. A ValueError exception will be thrown for strings that
hopper@2 378 don't start with a valid count value."""
hopper@2 379 (count, newoff) = unpackcount_off(s, 0)
hopper@2 380 return (count, s[newoff:])