pyln: Add type-annotations to plugin.py

This should help users that have type-checking enabled.
This commit is contained in:
Christian Decker
2020-06-26 16:39:02 +02:00
committed by Rusty Russell
parent d27da4d152
commit 49ec800a07
3 changed files with 267 additions and 145 deletions

View File

@@ -4,6 +4,7 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from .primitives import Secret, PrivateKey, PublicKey
from hashlib import sha256
import coincurve
import os
@@ -55,64 +56,6 @@ def decryptWithAD(k, n, ad, ciphertext):
return chacha.decrypt(n, ciphertext, ad)
class PrivateKey(object):
def __init__(self, rawkey):
if not isinstance(rawkey, bytes):
raise TypeError(f"rawkey must be bytes, {type(rawkey)} received")
elif len(rawkey) != 32:
raise ValueError(f"rawkey must be 32-byte long. {len(rawkey)} received")
self.rawkey = rawkey
self.key = coincurve.PrivateKey(rawkey)
def serializeCompressed(self):
return self.key.secret
def public_key(self):
return PublicKey(self.key.public_key)
class Secret(object):
def __init__(self, raw):
assert(len(raw) == 32)
self.raw = raw
def __str__(self):
return "Secret[0x{}]".format(self.raw.hex())
class PublicKey(object):
def __init__(self, innerkey):
# We accept either 33-bytes raw keys, or an EC PublicKey as returned
# by coincurve
if isinstance(innerkey, bytes):
if innerkey[0] in [2, 3] and len(innerkey) == 33:
innerkey = coincurve.PublicKey(innerkey)
else:
raise ValueError(
"Byte keys must be 33-byte long starting from either 02 or 03"
)
elif not isinstance(innerkey, coincurve.keys.PublicKey):
raise ValueError(
"Key must either be bytes or coincurve.keys.PublicKey"
)
self.key = innerkey
def serializeCompressed(self):
return self.key.format(compressed=True)
def __str__(self):
return "PublicKey[0x{}]".format(
self.serializeCompressed().hex()
)
def Keypair(object):
def __init__(self, priv, pub):
self.priv, self.pub = priv, pub
class Sha256Mixer(object):
def __init__(self, base):
self.hash = sha256(base).digest()
@@ -174,7 +117,7 @@ class LightningConnection(object):
h.hash = self.handshake['h']
h.update(self.handshake['e'].public_key().serializeCompressed())
es = ecdh(self.handshake['e'], self.remote_pubkey)
t = hkdf(salt=self.chaining_key, ikm=es.raw, info=b'')
t = hkdf(salt=self.chaining_key, ikm=es.data, info=b'')
assert(len(t) == 64)
self.chaining_key, temp_k1 = t[:32], t[32:]
c = encryptWithAD(temp_k1, self.nonce(0), h.digest(), b'')
@@ -194,7 +137,7 @@ class LightningConnection(object):
h.update(re.serializeCompressed())
es = ecdh(self.local_privkey, re)
self.handshake['re'] = re
t = hkdf(salt=self.chaining_key, ikm=es.raw, info=b'')
t = hkdf(salt=self.chaining_key, ikm=es.data, info=b'')
self.chaining_key, temp_k1 = t[:32], t[32:]
try:
@@ -210,7 +153,7 @@ class LightningConnection(object):
h.hash = self.handshake['h']
h.update(self.handshake['e'].public_key().serializeCompressed())
ee = ecdh(self.handshake['e'], self.handshake['re'])
t = hkdf(salt=self.chaining_key, ikm=ee.raw, info=b'')
t = hkdf(salt=self.chaining_key, ikm=ee.data, info=b'')
assert(len(t) == 64)
self.chaining_key, self.temp_k2 = t[:32], t[32:]
c = encryptWithAD(self.temp_k2, self.nonce(0), h.digest(), b'')
@@ -231,7 +174,7 @@ class LightningConnection(object):
h.update(re.serializeCompressed())
ee = ecdh(self.handshake['e'], re)
self.chaining_key, self.temp_k2 = hkdf_two_keys(
salt=self.chaining_key, ikm=ee.raw
salt=self.chaining_key, ikm=ee.data
)
try:
decryptWithAD(self.temp_k2, self.nonce(0), h.digest(), c)
@@ -249,7 +192,7 @@ class LightningConnection(object):
se = ecdh(self.local_privkey, self.re)
self.chaining_key, self.temp_k3 = hkdf_two_keys(
salt=self.chaining_key, ikm=se.raw
salt=self.chaining_key, ikm=se.data
)
t = encryptWithAD(self.temp_k3, self.nonce(0), h.digest(), b'')
m = b'\x00' + c + t
@@ -272,7 +215,7 @@ class LightningConnection(object):
se = ecdh(self.handshake['e'], self.remote_pubkey)
self.chaining_key, self.temp_k3 = hkdf_two_keys(
se.raw, self.chaining_key
se.data, self.chaining_key
)
decryptWithAD(self.temp_k3, self.nonce(0), h.digest(), t)
self.rn, self.sn = 0, 0