mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 06:04:21 +01:00
Changes sign/verify functions to match the ones used by lnd and c-lightning
- Replaces cryptography for coincurve for the signature / recovery operations - Fixes return types of ecrypt, decrypt to be str (gets rid of rtype)
This commit is contained in:
@@ -1,15 +1,67 @@
|
||||
import pyzbase32
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify, hexlify
|
||||
|
||||
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from coincurve.utils import int_to_bytes
|
||||
from coincurve import PrivateKey, PublicKey
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||
from cryptography.hazmat.primitives.serialization import load_der_public_key, load_der_private_key
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from common.tools import check_sha256_hex_format
|
||||
|
||||
LN_MESSAGE_PREFIX = b"Lightning Signed Message:"
|
||||
|
||||
|
||||
def sha256d(message):
|
||||
"""
|
||||
Compute the sha245d (double sha256) of a given by message.
|
||||
|
||||
Args:
|
||||
message(:obj:`bytes`): the message to be used as input to the hash function.
|
||||
|
||||
Returns:
|
||||
:obj:`bytes`: the sha256d of the given message.
|
||||
"""
|
||||
|
||||
return sha256(sha256(message).digest()).digest()
|
||||
|
||||
|
||||
def sigrec_encode(rsig_rid):
|
||||
"""
|
||||
Encodes a pk-recoverable signature to be used in LN. ```rsig_rid`` can be obtained trough
|
||||
``PrivateKey.sign_recoverable``. The required format has the recovery id as the last byte, and for signing LN
|
||||
messages we need it as the first.
|
||||
From: https://twitter.com/rusty_twit/status/1182102005914800128
|
||||
|
||||
Args:
|
||||
rsig_rid(:obj:`bytes`): the signature to be encoded.
|
||||
|
||||
Returns:
|
||||
:obj:`bytes`: the encoded signature.
|
||||
"""
|
||||
|
||||
rsig, rid = rsig_rid[:64], rsig_rid[64]
|
||||
sigrec = int_to_bytes(rid + 31) + rsig
|
||||
|
||||
return sigrec
|
||||
|
||||
|
||||
def sigrec_decode(sigrec):
|
||||
"""
|
||||
Decodes a pk-recoverable signature in the format used by LN to be input to ``PublicKey.from_signature_and_message``.
|
||||
|
||||
Args:
|
||||
sigrec(:obj:`bytes`): the signature to be decoded.
|
||||
|
||||
Returns:
|
||||
:obj:`bytes`: the decoded signature.
|
||||
"""
|
||||
|
||||
rid, rsig = int_to_bytes(sigrec[0] - 31), sigrec[1:]
|
||||
rsig_rid = rsig + rid
|
||||
|
||||
return rsig_rid
|
||||
|
||||
|
||||
# FIXME: Common has not log file, so it needs to log in the same log as the caller. This is a temporary fix.
|
||||
logger = None
|
||||
|
||||
@@ -47,7 +99,7 @@ class Cryptographer:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def encrypt(blob, secret, rtype="str"):
|
||||
def encrypt(blob, secret):
|
||||
"""
|
||||
Encrypts a given :mod:`Blob <common.cli.blob.Blob>` data using ``CHACHA20POLY1305``.
|
||||
|
||||
@@ -56,18 +108,11 @@ class Cryptographer:
|
||||
Args:
|
||||
blob (:mod:`Blob <common.cli.blob.Blob>`): a ``Blob`` object containing a raw penalty transaction.
|
||||
secret (:mod:`str`): a value to used to derive the encryption key. Should be the dispute txid.
|
||||
rtype(:mod:`str`): the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``.
|
||||
|
||||
Returns:
|
||||
:obj:`str` or :obj:`bytes`: The encrypted data in ``str`` or ``bytes``, depending on ``rtype``.
|
||||
|
||||
Raises:
|
||||
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
|
||||
:obj:`str`: The encrypted data (hex encoded).
|
||||
"""
|
||||
|
||||
if rtype not in ["str", "bytes"]:
|
||||
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'")
|
||||
|
||||
Cryptographer.check_data_key_format(blob.data, secret)
|
||||
|
||||
# Transaction to be encrypted
|
||||
@@ -83,36 +128,27 @@ class Cryptographer:
|
||||
# Encrypt the data
|
||||
cipher = ChaCha20Poly1305(sk)
|
||||
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
|
||||
|
||||
if rtype == "str":
|
||||
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
|
||||
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
|
||||
|
||||
return encrypted_blob
|
||||
|
||||
@staticmethod
|
||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||
def decrypt(encrypted_blob, secret, rtype="str"):
|
||||
def decrypt(encrypted_blob, secret):
|
||||
"""
|
||||
Decrypts a given :mod:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` using ``CHACHA20POLY1305``.
|
||||
|
||||
``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``.
|
||||
|
||||
Args:
|
||||
encrypted_blob(:mod:`EncryptedBlob <comnmon.encrypted_blob.EncryptedBlob>`): an ``EncryptedBlob`` potentially
|
||||
encrypted_blob(:mod:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>`): an ``EncryptedBlob`` potentially
|
||||
containing a penalty transaction.
|
||||
secret (:mod:`str`): a value to used to derive the decryption key. Should be the dispute txid.
|
||||
rtype(:mod:`str`): the return type for the decrypted value. Can be either ``'str'`` or ``'bytes'``.
|
||||
|
||||
Returns:
|
||||
:obj:`str` or :obj:`bytes`: The decrypted data in ``str`` or ``bytes``, depending on ``rtype``.
|
||||
|
||||
Raises:
|
||||
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
|
||||
:obj:`str`: The decrypted data (hex encoded).
|
||||
"""
|
||||
|
||||
if rtype not in ["str", "bytes"]:
|
||||
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'")
|
||||
|
||||
Cryptographer.check_data_key_format(encrypted_blob.data, secret)
|
||||
|
||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||
@@ -132,10 +168,7 @@ class Cryptographer:
|
||||
|
||||
try:
|
||||
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
|
||||
|
||||
# Change the blob encoding to hex depending on the rtype (default)
|
||||
if rtype == "str":
|
||||
blob = hexlify(blob).decode("utf8")
|
||||
blob = hexlify(blob).decode("utf8")
|
||||
|
||||
except InvalidTag:
|
||||
blob = None
|
||||
@@ -172,61 +205,25 @@ class Cryptographer:
|
||||
logger.error("I/O error({}): {}".format(e.errno, e.strerror))
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def load_public_key_der(pk_der):
|
||||
"""
|
||||
Creates an :mod:`EllipticCurvePublicKey` object from a given ``DER`` encoded public key.
|
||||
|
||||
Args:
|
||||
pk_der(:mod:`str`): a public key encoded in ``DER`` format.
|
||||
|
||||
Returns:
|
||||
:mod:`EllipticCurvePublicKey`: An ``EllipticCurvePublicKey`` object.
|
||||
|
||||
Raises:
|
||||
UnsupportedAlgorithm: if the key algorithm is not supported.
|
||||
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
|
||||
TypeError: if the provided ``pk_der`` data is not a string.
|
||||
"""
|
||||
|
||||
try:
|
||||
pk = load_der_public_key(pk_der, backend=default_backend())
|
||||
return pk
|
||||
|
||||
except UnsupportedAlgorithm:
|
||||
logger.error("Could not deserialize the public key (unsupported algorithm)")
|
||||
|
||||
except ValueError:
|
||||
logger.error("The provided data cannot be deserialized (wrong size or format)")
|
||||
|
||||
except TypeError:
|
||||
logger.error("The provided data cannot be deserialized (wrong type)")
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def load_private_key_der(sk_der):
|
||||
"""
|
||||
Creates an :mod:`EllipticCurvePrivateKey` object from a given ``DER`` encoded private key.
|
||||
Creates a :mod:`PrivateKey` object from a given ``DER`` encoded private key.
|
||||
|
||||
Args:
|
||||
sk_der(:mod:`str`): a private key encoded in ``DER`` format.
|
||||
|
||||
Returns:
|
||||
:mod:`EllipticCurvePrivateKey`: An ``EllipticCurvePrivateKey`` object.
|
||||
:mod:`PrivateKey`: A ``PrivateKey`` object.
|
||||
|
||||
Raises:
|
||||
UnsupportedAlgorithm: if the key algorithm is not supported.
|
||||
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
|
||||
TypeError: if the provided ``pk_der`` data is not a string.
|
||||
"""
|
||||
try:
|
||||
sk = load_der_private_key(sk_der, None, backend=default_backend())
|
||||
sk = PrivateKey.from_der(sk_der)
|
||||
return sk
|
||||
|
||||
except UnsupportedAlgorithm:
|
||||
logger.error("Could not deserialize the private key (unsupported algorithm)")
|
||||
|
||||
except ValueError:
|
||||
logger.error("The provided data cannot be deserialized (wrong size or format)")
|
||||
|
||||
@@ -236,64 +233,85 @@ class Cryptographer:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def sign(data, sk, rtype="str"):
|
||||
def sign(message, sk):
|
||||
"""
|
||||
Signs a given data using a given secret key using ECDSA.
|
||||
|
||||
Args:
|
||||
data(:mod:`bytes`): the data to be signed.
|
||||
sk(:mod:`EllipticCurvePrivateKey`): the ECDSA secret key used to signed the data.
|
||||
rtype: the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``.
|
||||
message(:obj:`bytes`): the data to be signed.
|
||||
sk(:obj:`PrivateKey`): the ECDSA secret key used to signed the data.
|
||||
|
||||
Returns:
|
||||
:obj:`str` or :obj:`bytes`: The data signature in ``str`` or ``bytes``, depending on ``rtype``.
|
||||
|
||||
Raises:
|
||||
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
|
||||
:obj:`str`: The zbase32 signature of the given message.
|
||||
"""
|
||||
|
||||
if rtype not in ["str", "bytes"]:
|
||||
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'")
|
||||
if not isinstance(message, bytes):
|
||||
logger.error("The message must be bytes. {} received".format(type(message)))
|
||||
return None
|
||||
|
||||
if not isinstance(sk, ec.EllipticCurvePrivateKey):
|
||||
if not isinstance(sk, PrivateKey):
|
||||
logger.error("The value passed as sk is not a private key (EllipticCurvePrivateKey)")
|
||||
return None
|
||||
|
||||
else:
|
||||
signature = sk.sign(data, ec.ECDSA(hashes.SHA256()))
|
||||
rsig_rid = sk.sign_recoverable(LN_MESSAGE_PREFIX + message, hasher=sha256d)
|
||||
sigrec = sigrec_encode(rsig_rid)
|
||||
zb32_sig = pyzbase32.encode_bytes(sigrec).decode()
|
||||
|
||||
if rtype == "str":
|
||||
signature = hexlify(signature).decode("utf-8")
|
||||
|
||||
return signature
|
||||
return zb32_sig
|
||||
|
||||
@staticmethod
|
||||
def verify(message, signature, pk):
|
||||
def recover_pk(message, zb32_sig):
|
||||
"""
|
||||
Verifies if a signature is valid for a given public key and message.
|
||||
Recovers an ECDSA public key from a given message and zbase32 signature.
|
||||
|
||||
Args:
|
||||
message(:mod:`bytes`): the message that is supposed have been signed.
|
||||
signature(:mod:`str`): the potential signature of the message.
|
||||
pk(:mod:`EllipticCurvePublicKey`): the public key that is used to try to verify the signature.
|
||||
message(:obj:`bytes`): the data to be signed.
|
||||
zb32_sig(:obj:`str`): the zbase32 signature of the message.
|
||||
|
||||
Returns:
|
||||
:mod:`bool`: Whether or not the provided signature is valid for the given message and public key.
|
||||
Returns ``False`` is the ``key`` is not in the right format or if either the ``message`` or ``pk`` cannot
|
||||
be decoded.
|
||||
:obj:`PublicKey`: The recovered public key.
|
||||
"""
|
||||
|
||||
if not isinstance(pk, ec.EllipticCurvePublicKey):
|
||||
logger.error("The value passed as pk is not a public key (EllipticCurvePublicKey)")
|
||||
return False
|
||||
if not isinstance(message, bytes):
|
||||
logger.error("The message must be bytes. {} received".format(type(message)))
|
||||
return None
|
||||
|
||||
if isinstance(signature, str):
|
||||
signature = unhexlify(signature)
|
||||
if not isinstance(zb32_sig, str):
|
||||
logger.error("The zbase32_sig must be str. {} received".format(type(zb32_sig)))
|
||||
return None
|
||||
|
||||
sigrec = pyzbase32.decode_bytes(zb32_sig)
|
||||
rsig_recid = sigrec_decode(sigrec)
|
||||
|
||||
try:
|
||||
pk.verify(signature, message, ec.ECDSA(hashes.SHA256()))
|
||||
pk = PublicKey.from_signature_and_message(rsig_recid, LN_MESSAGE_PREFIX + message, hasher=sha256d)
|
||||
return pk
|
||||
|
||||
return True
|
||||
except ValueError as e:
|
||||
# Several errors fit here: Signature length != 65, wrong recover id and failed to parse signature.
|
||||
# All of them return raise ValueError.
|
||||
logger.error(str(e))
|
||||
return None
|
||||
|
||||
except InvalidSignature:
|
||||
return False
|
||||
except Exception as e:
|
||||
if "failed to recover ECDSA public key" in str(e):
|
||||
logger.error("Cannot recover public key from signature".format(type(rsig_recid)))
|
||||
else:
|
||||
logger.error("Unknown exception", error=e)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def verify_rpk(pk, rpk):
|
||||
"""
|
||||
Verifies that that a recovered public key matches a given one.
|
||||
|
||||
Args:
|
||||
pk(:obj:`PublicKey`): a given public key (provided by the user).
|
||||
rpk(:obj:`PublicKey`): a public key recovered via ``recover_pk``.
|
||||
|
||||
Returns:
|
||||
:obj:`bool`: True if the public keys match, False otherwise.
|
||||
"""
|
||||
|
||||
return pk.point() == rpk.point()
|
||||
|
||||
Reference in New Issue
Block a user