inbus

view inbus.py @ 7:7bf7acf4225e

Added bool_unpacker.
author Eric Hopper <hopper@omnifarious.org>
date Tue Feb 26 10:10:55 2008 -0800 (16 months ago)
parents 59a9ca55098c
children
line source
1 from struct import pack as _pack
2 from struct import unpack as _unpack
3 from functools import partial as _partial
5 def unpack(s, numtypes=1):
6 up, data_offset = create_unpacker(s, numtypes)
7 result = up(([], s, data_offset))
8 return tuple(result[0])
10 def unpack_off(s, numtypes=1):
11 up, data_offset = create_unpacker(s, numtypes)
12 result = up(([], s, data_offset))
13 return tuple(result[0]), result[2]
15 def create_unpacker(s, numtypes=1):
16 try:
17 numtypes = int(numtypes)
18 except (ValueError, TypeError):
19 raise TypeError("numtypes must be an integer")
20 if numtypes < 0:
21 raise TypeError("numtypes must be > 0")
22 if not isinstance(s, (str, buffer)):
23 raise TypeError("s must be a string or buffer")
24 def nullunpacker(dectup):
25 return dectup
26 def composeunpacker(f1, f2):
27 def compose(dectup):
28 return f1(f2(dectup))
29 return compose
30 unpacker = nullunpacker
31 offset = 0
32 for i in xrange(0, numtypes):
33 tag = s[offset]
34 offset += 1
35 if tag == 'c':
36 unpacker = composeunpacker(count_unpacker, unpacker)
37 elif tag == 'm':
38 if (len(s) > offset) and (s[offset] == '<'):
39 raise NotImplementedError("Length modifiers not yet supported for multi-precision integer ('m') type.")
40 unpacker = composeunpacker(multi_int_unpacker, unpacker)
41 elif tag == 'w':
42 if (len(s) > offset) and (s[offset] == '<'):
43 raise NotImplementedError("Length modifiers not yet supported for unsigned multi-precision integer ('w') type.")
44 unpacker = composeunpacker(multi_uint_unpacker, unpacker)
45 elif tag == 'y':
46 unpacker = composeunpacker(bool_unpacker, unpacker)
47 elif tag == 'b':
48 if (len(s) > offset) and (s[offset] == '<'):
49 raise NotImplementedError("Length modifiers not yet supported for binary blob ('b') type.")
50 unpacker = composeunpacker(blob_unpacker, unpacker)
51 elif tag == 's':
52 if (len(s) > offset) and (s[offset] == '<'):
53 raise NotImplementedError("Length modifiers not yet supported for string ('s') type.")
54 unpacker = composeunpacker(string_unpacker, unpacker)
55 elif tag == 'k':
56 unpacker = composeunpacker(byte_unpacker, unpacker)
57 elif tag == 'p':
58 unpacker = composeunpacker(ubyte_unpacker, unpacker)
59 elif tag == 'j':
60 unpacker = composeunpacker(short_unpacker, unpacker)
61 elif tag == 'o':
62 unpacker = composeunpacker(ushort_unpacker, unpacker)
63 elif tag == 'i':
64 unpacker = composeunpacker(int_unpacker, unpacker)
65 elif tag == 'u':
66 unpacker = composeunpacker(uint_unpacker, unpacker)
67 elif tag == 'l':
68 unpacker = composeunpacker(long_unpacker, unpacker)
69 elif tag == 'n':
70 unpacker = composeunpacker(ulong_unpacker, unpacker)
71 elif tag == 'f':
72 unpacker = composeunpacker(double_unpacker, unpacker)
73 elif tag == 'h':
74 unpacker = composeunpacker(float_unpacker, unpacker)
75 elif tag == 'g':
76 raise NotImplementedError("The arbitrary size floating point ('g') type is not yet supported.")
77 elif tag == 'v':
78 unpacker = composeunpacker(variant_unpacker, unpacker)
79 elif tag in ['t', 'a', 'd']:
80 if tag == 't':
81 subunpacker, suboffset = make_tuple_unpacker(buffer(s, offset))
82 elif tag == 'a':
83 subunpacker, suboffset = make_array_unpacker(buffer(s, offset))
84 elif tag == 'd':
85 subunpacker, suboffset = make_dict_unpacker(buffer(s, offset))
86 else:
87 assert False, "Getting here should be impossible."
88 offset += suboffset
89 unpacker = composeunpacker(subunpacker, unpacker)
90 else:
91 raise ValueError("Unknown type tag '%s' encountered." % (tag,))
92 return unpacker, offset
94 def count_unpacker(dectup):
95 val, newoffset = unpackcount_off(dectup[1], dectup[2])
96 dectup[0].append(val)
97 return dectup[0], dectup[1], newoffset
99 def multi_int_unpacker(dectup):
100 result, s, offset = dectup
101 mlen, newoffset = unpackcount_off(dectup[1], dectup[2])
102 if (newoffset + mlen) > len(s):
103 raise ValueError("The string passed in is too short to unpack.")
104 if mlen <= 0:
105 result.append(0L)
106 return (result, s, newoffset)
108 mint = unpackl(buffer(s, newoffset, mlen))
109 if ord(s[newoffset]) & 0x80:
110 mint -= 2**(mlen * 8)
111 result.append(mint)
112 return (result, s, newoffset + mlen)
114 def multi_uint_unpacker(dectup):
115 result, s, offset = dectup
116 mlen, newoffset = unpackcount_off(dectup[1], dectup[2])
117 if (newoffset + mlen) > len(s):
118 raise ValueError("The string passed in is too short to unpack.")
119 if mlen <= 0:
120 result.append(0L)
121 return (result, s, newoffset)
122 mint = unpackl(buffer(s, newoffset, mlen))
123 result.append(mint)
124 return (result, s, newoffset + mlen)
126 def blob_unpacker(dectup):
127 result, s, offset = dectup
128 blen, newoffset = unpackcount_off(dectup[1], dectup[2])
129 if (newoffset + blen) > len(s):
130 raise ValueError("The string passed in is too short to unpack.")
131 if blen <= 0:
132 result.append('')
133 return result, s, newoffset
134 result.append(buffer(s, newoffset, blen))
135 return result, s, newoffset + blen
137 def string_unpacker(dectup):
138 result, s, offset = dectup
139 slen, newoffset = unpackcount_off(dectup[1], dectup[2])
140 if (newoffset + slen) > len(s):
141 raise ValueError("The string passed in is too short to unpack.")
142 if slen <= 0:
143 result.append('')
144 return result, s, newoffset
145 result.append(unicode(buffer(s, newoffset, slen), 'utf-8'))
146 return result, s, newoffset + slen
148 def bool_unpacker(dectup):
149 result, s, offset = dectup
150 if offset >= len(s):
151 raise ValueError("The string passed in is too short for type spec.")
152 result.append(True if ord(s[offset]) != 0 else False)
153 return result, s, offset + 1
155 def pystruct_unpacker(spec, len, dectup):
156 result, s, offset = dectup
157 (upresult,) = _unpack(spec, buffer(s, offset, len))
158 result.append(upresult)
159 return result, s, offset + len
161 byte_unpacker = _partial(pystruct_unpacker, '>b', 1)
162 ubyte_unpacker = _partial(pystruct_unpacker, '>b', 1)
163 short_unpacker = _partial(pystruct_unpacker, '>h', 2)
164 ushort_unpacker = _partial(pystruct_unpacker, '>H', 2)
165 int_unpacker = _partial(pystruct_unpacker, '>i', 4)
166 uint_unpacker = _partial(pystruct_unpacker, '>I', 4)
167 long_unpacker = _partial(pystruct_unpacker, '>q', 8)
168 ulong_unpacker = _partial(pystruct_unpacker, '>Q', 8)
169 float_unpacker = _partial(pystruct_unpacker, '>f', 4)
170 double_unpacker = _partial(pystruct_unpacker, '>d', 8)
172 def variant_unpacker(dectup):
173 result, s, offset = dectup
174 val, uplen = unpack_off(buffer(s, offset), 1)
175 (val,) = val
176 result.append(val)
177 return result, s, offset + uplen
179 def make_tuple_unpacker(s):
180 if s[0] != '(':
181 raise ValueError("Tuple type tag ('t') must be followed by '('")
182 offset = 1
183 def nullunpacker(dectup):
184 return dectup
185 def composeunpacker(f1, f2):
186 def compose(dectup):
187 return f1(f2(dectup))
188 return compose
189 def tuplefinishunpacker(dectup):
190 return [tuple(dectup[0])], dectup[1], dectup[2]
191 unpacker = nullunpacker
192 while (offset < len(s)) and (s[offset] != ')'):
193 elunpacker, consumed = create_unpacker(buffer(s, offset))
194 offset += consumed
195 unpacker = composeunpacker(elunpacker, unpacker)
196 if offset >= len(s):
197 raise ValueError("Ran off the end of the string to unpack while parsing tuple")
198 return composeunpacker(tuplefinishunpacker, unpacker), offset + 1
200 def make_array_unpacker(s):
201 if s[0] != '[':
202 raise ValueError("Array type tag ('a') must be followed by '['")
203 offset = 1
204 elunpacker, consumed = create_unpacker(buffer(s, offset))
205 offset += consumed
206 if (offset >= len(s)) or (s[offset] != ']'):
207 raise ValueError("Array type tag must end with ']' after one type tag.")
208 def unpack_array(dectup):
209 result, s, offset = dectup
210 array = []
211 if len(s) <= offset:
212 raise ValueError("End of string while parsing array.")
213 while s[offset] != '\0':
214 out, s, offset = elunpacker(([], s, offset + 1))
215 if len(s) <= offset:
216 raise ValueError("End of string while parsing array.")
217 array.extend(out)
218 result.append(array)
219 return result, s, offset + 1
220 return unpack_array, offset + 1
222 def make_dict_unpacker(s):
223 if s[0] != '{':
224 raise ValueError("Dictionary type tag ('d') must be followed by '{'")
225 offset = 1
226 keyunpacker, consumed = create_unpacker(buffer(s, offset))
227 offset += consumed
228 if offset >= len(s):
229 raise ValueError("Dictionary type tags must contain two subtypes.")
230 valunpacker, consumed = create_unpacker(buffer(s, offset))
231 offset += consumed
232 if (offset >= len(s)) or (s[offset] != '}'):
233 print "offset = %r / consumed = %r / s = %r / len(s) = %r" % \
234 (offset, consumed, s, len(s))
235 raise ValueError("Dictionary type tag must end with '}' after two type tag.")
236 def unpack_dict(dectup):
237 result, s, offset = dectup
238 dictionary = {}
239 if len(s) <= offset:
240 raise ValueError("End of string while parsing dictionary.")
241 while s[offset] != '\0':
242 out, s, offset = valunpacker(keyunpacker(([], s, offset + 1)))
243 if len(s) <= offset:
244 raise ValueError("End of string while parsing dictionary.")
245 dictionary[out[0]] = out[1]
246 result.append(dictionary)
247 return result, s, offset + 1
248 return unpack_dict, offset + 1
250 def packl(lnum, pad = 1):
251 if lnum < 0:
252 raise RangeError("Cannot use packl to convert a negative integer "
253 "to a string.")
254 count = 0
255 l = []
256 while lnum > 0:
257 l.append(lnum & 0xffffffffffffffffL)
258 count += 1
259 lnum >>= 64
260 if count <= 0:
261 return '\0' * pad
262 elif pad >= 8:
263 lens = 8 * count % pad
264 pad = ((lens != 0) and (pad - lens)) or 0
265 l.append('>' + 'x' * pad + 'Q' * count)
266 l.reverse()
267 return _pack(*l)
268 else:
269 l.append('>' + 'Q' * count)
270 l.reverse()
271 s = _pack(*l).lstrip('\0')
272 lens = len(s)
273 if (lens % pad) != 0:
274 return '\0' * (pad - lens % pad) + s
275 else:
276 return s
278 def unpackl(s):
279 n = 0L
280 if len(s) <= 0:
281 return n
282 count8 = len(s) // 8
283 count1 = len(s) % 8
284 upfmt = '>' + 'Q' * count8 + 'B' * count1
285 l = _unpack(upfmt, s)
286 for val in l[0:count8]:
287 n <<= 64
288 n |= val
289 for val in l[count8:]:
290 n <<= 8
291 n |= val
292 return n
294 def packcount(count):
295 """packcount(non-negative integer) -> string representing the count
297 The count cannot be less than 0. The encoding is designed so that if
298 the count is a message length, a very small percentage of the total
299 message will be consumed by the count.
301 It is suggested that the message length itself not be counted as part
302 of the message length. This means that counts of 0 should be
303 sensible, and indicate a message with no content.
305 This function should NOT be used for values that are really arbitrary
306 sized integers. The maximum sized integer that this function can
307 represent is 4080 bits. Public key values will most likely exceed
308 this. To store a public key (or other arbitrary sized integer) value,
309 encode that value in some other way (perhaps using packl), then use
310 this function to encode the length of that encoding."""
311 if count < 0:
312 raise RangeError("A count cannot be negative!")
313 if count < 223:
314 return chr(count)
315 elif count < (223 + 8192):
316 count -= 223
317 upper = count // 256
318 assert upper < 32
319 lower = count % 256
320 return chr(upper + 223) + chr(lower)
321 else:
322 s = packl(count, 2)
323 assert len(s) % 2 == 0
324 countlen = len(s) // 2
325 if countlen > 255:
326 raise RangeError("A count must take fewer than 4080 bits to "
327 "represent!")
328 return chr(255) + chr(countlen) + s
330 def unpackcount_off(s, off = 0):
331 """unpackcount(a string, offset in string of something created by packcount)
332 -> a tuple
333 the tuple is (count, offset of byte after count).
335 See the documentation for packcount for a little more of an
336 explanation. A ValueError exception will be thrown for strings that
337 don't start with a valid count value."""
338 fc = ord(s[off])
339 if fc < 223:
340 # One byte count
341 return (fc, off + 1)
342 elif fc < (223 + 32):
343 # Two byte count
344 if 2 > (len(s) - off):
345 raise ValueError("The passed in string is too short, and isn't "
346 "a valid count.")
347 upper_5bits = fc - 223
348 lower_8bits = ord(s[off + 1])
349 count = ((upper_5bits << 8) | lower_8bits) + 223
350 return (count, off + 2)
351 else:
352 # Variable length count, length in second octet
353 if 2 > (len(s) - off):
354 raise ValueError("The passed in string is too short, and isn't "
355 "a valid count.")
356 length = ord(s[off + 1]) * 2
357 if length <= 0:
358 raise ValueError("The passed in string isn't a valid count!")
359 # Skip past length of count itself
360 off += 2
361 # Enough octets for rest of count?
362 if length > (len(s) - off):
363 raise ValueError("The passed in string is too short, and isn't "
364 "a valid count.")
365 # Use the handy unpackl function to turn the octet string into a number
366 count = unpackl(s[off:(off + length)])
367 # If count fits into an int, turn it into an int
368 if count < 2**31:
369 count = int(count)
370 return (count, off + length)
372 def unpackcount(s):
373 """unpackcount(a string beginning with something created by packcount) -> a tuple
374 the tuple is (count, remainder of s).
376 See the documentation for packcount for a little more of an
377 explanation. A ValueError exception will be thrown for strings that
378 don't start with a valid count value."""
379 (count, newoff) = unpackcount_off(s, 0)
380 return (count, s[newoff:])