mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-20 15:44:21 +01:00
pyln: Improve zbase32 encoding / decoding
- Adds bitarray filling so mesages of any length can be encoded, instead of forcing the encoding to be of messages with length multiple of 5. - Adds checks for encoding / decoding and raises expections if the inputs are not as expected. - Flags functions that are supposed to be internal as "private".
This commit is contained in:
committed by
Christian Decker
parent
375040a3d9
commit
1da29305fc
@@ -23,34 +23,68 @@ zbase32_revchars = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def bitarray_to_u5(barr):
|
def _message_to_bitarray(message):
|
||||||
assert len(barr) % 5 == 0
|
barr = bitstring.ConstBitStream(message)
|
||||||
|
padding_len = 5 - (len(barr) % 5)
|
||||||
|
if padding_len < 5:
|
||||||
|
# The bitarray length has to be multiple of 5. If not, it is right-padded with zeros.
|
||||||
|
barr = bitstring.ConstBitStream(bin="{}{}".format(barr.bin, '0' * padding_len))
|
||||||
|
return barr
|
||||||
|
|
||||||
|
|
||||||
|
def _bitarray_to_message(barr):
|
||||||
|
padding_len = len(barr) % 8
|
||||||
|
if padding_len > 0:
|
||||||
|
return bitstring.Bits(bin=barr.bin[:-padding_len]).bytes
|
||||||
|
else:
|
||||||
|
return barr.bytes
|
||||||
|
|
||||||
|
|
||||||
|
def _bitarray_to_u5(barr):
|
||||||
ret = []
|
ret = []
|
||||||
s = bitstring.ConstBitStream(barr)
|
while barr.pos != barr.len:
|
||||||
while s.pos != s.len:
|
ret.append(barr.read(5).uint)
|
||||||
ret.append(s.read(5).uint)
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def u5_to_bitarray(arr):
|
def _u5_to_bitarray(arr):
|
||||||
ret = bitstring.BitArray()
|
ret = bitstring.BitArray()
|
||||||
for a in arr:
|
for a in arr:
|
||||||
ret += bitstring.pack("uint:5", a)
|
ret += bitstring.pack("uint:5", a)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
def encode(b):
|
def is_zbase32_encoded(message):
|
||||||
uint5s = bitarray_to_u5(b)
|
if isinstance(message, str):
|
||||||
|
message = message.encode("ASCII")
|
||||||
|
elif not isinstance(message, bytes):
|
||||||
|
raise TypeError("message must be string or bytes")
|
||||||
|
return set(message).issubset(zbase32_chars)
|
||||||
|
|
||||||
|
|
||||||
|
def encode(message):
|
||||||
|
if isinstance(message, str):
|
||||||
|
message = message.encode('ASCII')
|
||||||
|
elif not isinstance(message, bytes):
|
||||||
|
raise TypeError("message must be string or bytes")
|
||||||
|
|
||||||
|
barr = _message_to_bitarray(message)
|
||||||
|
uint5s = _bitarray_to_u5(barr)
|
||||||
res = [zbase32_chars[c] for c in uint5s]
|
res = [zbase32_chars[c] for c in uint5s]
|
||||||
return bytes(res)
|
return bytes(res)
|
||||||
|
|
||||||
|
|
||||||
def decode(b):
|
def decode(message):
|
||||||
if isinstance(b, str):
|
if isinstance(message, str):
|
||||||
b = b.encode('ASCII')
|
message = message.encode('ASCII')
|
||||||
|
elif not isinstance(message, bytes):
|
||||||
|
raise TypeError("message must be string or bytes")
|
||||||
|
|
||||||
|
if not is_zbase32_encoded(message):
|
||||||
|
raise ValueError("message is not zbase32 encoded")
|
||||||
|
|
||||||
uint5s = []
|
uint5s = []
|
||||||
for c in b:
|
for c in message:
|
||||||
uint5s.append(zbase32_revchars[c])
|
uint5s.append(zbase32_revchars[c])
|
||||||
dec = u5_to_bitarray(uint5s)
|
dec = _u5_to_bitarray(uint5s)
|
||||||
return dec.bytes
|
return _bitarray_to_message(dec)
|
||||||
|
|||||||
Reference in New Issue
Block a user