From b712e3df0cdf04c609a52520d6640cefa291ac65 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 21 Feb 2020 13:08:33 +0100 Subject: [PATCH] Changes sign/verify functions to match the ones used by lnd and c-lightning - Replaces cryptography for coincurve for the signature / recovery operations - Fixes return types of ecrypt, decrypt to be str (gets rid of rtype) --- common/cryptographer.py | 230 ++++++++++++++++++++++------------------ 1 file changed, 124 insertions(+), 106 deletions(-) diff --git a/common/cryptographer.py b/common/cryptographer.py index c701f5f..3e65099 100644 --- a/common/cryptographer.py +++ b/common/cryptographer.py @@ -1,15 +1,67 @@ +import pyzbase32 from hashlib import sha256 from binascii import unhexlify, hexlify - -from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import ec +from coincurve.utils import int_to_bytes +from coincurve import PrivateKey, PublicKey +from cryptography.exceptions import InvalidTag from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 -from cryptography.hazmat.primitives.serialization import load_der_public_key, load_der_private_key -from cryptography.exceptions import InvalidSignature + from common.tools import check_sha256_hex_format +LN_MESSAGE_PREFIX = b"Lightning Signed Message:" + + +def sha256d(message): + """ + Compute the sha245d (double sha256) of a given by message. + + Args: + message(:obj:`bytes`): the message to be used as input to the hash function. + + Returns: + :obj:`bytes`: the sha256d of the given message. + """ + + return sha256(sha256(message).digest()).digest() + + +def sigrec_encode(rsig_rid): + """ + Encodes a pk-recoverable signature to be used in LN. ```rsig_rid`` can be obtained trough + ``PrivateKey.sign_recoverable``. The required format has the recovery id as the last byte, and for signing LN + messages we need it as the first. + From: https://twitter.com/rusty_twit/status/1182102005914800128 + + Args: + rsig_rid(:obj:`bytes`): the signature to be encoded. + + Returns: + :obj:`bytes`: the encoded signature. + """ + + rsig, rid = rsig_rid[:64], rsig_rid[64] + sigrec = int_to_bytes(rid + 31) + rsig + + return sigrec + + +def sigrec_decode(sigrec): + """ + Decodes a pk-recoverable signature in the format used by LN to be input to ``PublicKey.from_signature_and_message``. + + Args: + sigrec(:obj:`bytes`): the signature to be decoded. + + Returns: + :obj:`bytes`: the decoded signature. + """ + + rid, rsig = int_to_bytes(sigrec[0] - 31), sigrec[1:] + rsig_rid = rsig + rid + + return rsig_rid + + # FIXME: Common has not log file, so it needs to log in the same log as the caller. This is a temporary fix. logger = None @@ -47,7 +99,7 @@ class Cryptographer: return True @staticmethod - def encrypt(blob, secret, rtype="str"): + def encrypt(blob, secret): """ Encrypts a given :mod:`Blob ` data using ``CHACHA20POLY1305``. @@ -56,18 +108,11 @@ class Cryptographer: Args: blob (:mod:`Blob `): a ``Blob`` object containing a raw penalty transaction. secret (:mod:`str`): a value to used to derive the encryption key. Should be the dispute txid. - rtype(:mod:`str`): the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``. Returns: - :obj:`str` or :obj:`bytes`: The encrypted data in ``str`` or ``bytes``, depending on ``rtype``. - - Raises: - ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'`` + :obj:`str`: The encrypted data (hex encoded). """ - if rtype not in ["str", "bytes"]: - raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'") - Cryptographer.check_data_key_format(blob.data, secret) # Transaction to be encrypted @@ -83,36 +128,27 @@ class Cryptographer: # Encrypt the data cipher = ChaCha20Poly1305(sk) encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None) - - if rtype == "str": - encrypted_blob = hexlify(encrypted_blob).decode("utf8") + encrypted_blob = hexlify(encrypted_blob).decode("utf8") return encrypted_blob @staticmethod # ToDo: #20-test-tx-decrypting-edge-cases - def decrypt(encrypted_blob, secret, rtype="str"): + def decrypt(encrypted_blob, secret): """ Decrypts a given :mod:`EncryptedBlob ` using ``CHACHA20POLY1305``. ``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``. Args: - encrypted_blob(:mod:`EncryptedBlob `): an ``EncryptedBlob`` potentially + encrypted_blob(:mod:`EncryptedBlob `): an ``EncryptedBlob`` potentially containing a penalty transaction. secret (:mod:`str`): a value to used to derive the decryption key. Should be the dispute txid. - rtype(:mod:`str`): the return type for the decrypted value. Can be either ``'str'`` or ``'bytes'``. Returns: - :obj:`str` or :obj:`bytes`: The decrypted data in ``str`` or ``bytes``, depending on ``rtype``. - - Raises: - ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'`` + :obj:`str`: The decrypted data (hex encoded). """ - if rtype not in ["str", "bytes"]: - raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'") - Cryptographer.check_data_key_format(encrypted_blob.data, secret) # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) @@ -132,10 +168,7 @@ class Cryptographer: try: blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None) - - # Change the blob encoding to hex depending on the rtype (default) - if rtype == "str": - blob = hexlify(blob).decode("utf8") + blob = hexlify(blob).decode("utf8") except InvalidTag: blob = None @@ -172,61 +205,25 @@ class Cryptographer: logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return None - @staticmethod - def load_public_key_der(pk_der): - """ - Creates an :mod:`EllipticCurvePublicKey` object from a given ``DER`` encoded public key. - - Args: - pk_der(:mod:`str`): a public key encoded in ``DER`` format. - - Returns: - :mod:`EllipticCurvePublicKey`: An ``EllipticCurvePublicKey`` object. - - Raises: - UnsupportedAlgorithm: if the key algorithm is not supported. - ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format). - TypeError: if the provided ``pk_der`` data is not a string. - """ - - try: - pk = load_der_public_key(pk_der, backend=default_backend()) - return pk - - except UnsupportedAlgorithm: - logger.error("Could not deserialize the public key (unsupported algorithm)") - - except ValueError: - logger.error("The provided data cannot be deserialized (wrong size or format)") - - except TypeError: - logger.error("The provided data cannot be deserialized (wrong type)") - - return None - @staticmethod def load_private_key_der(sk_der): """ - Creates an :mod:`EllipticCurvePrivateKey` object from a given ``DER`` encoded private key. + Creates a :mod:`PrivateKey` object from a given ``DER`` encoded private key. Args: sk_der(:mod:`str`): a private key encoded in ``DER`` format. Returns: - :mod:`EllipticCurvePrivateKey`: An ``EllipticCurvePrivateKey`` object. + :mod:`PrivateKey`: A ``PrivateKey`` object. Raises: - UnsupportedAlgorithm: if the key algorithm is not supported. ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format). TypeError: if the provided ``pk_der`` data is not a string. """ try: - sk = load_der_private_key(sk_der, None, backend=default_backend()) + sk = PrivateKey.from_der(sk_der) return sk - except UnsupportedAlgorithm: - logger.error("Could not deserialize the private key (unsupported algorithm)") - except ValueError: logger.error("The provided data cannot be deserialized (wrong size or format)") @@ -236,64 +233,85 @@ class Cryptographer: return None @staticmethod - def sign(data, sk, rtype="str"): + def sign(message, sk): """ Signs a given data using a given secret key using ECDSA. Args: - data(:mod:`bytes`): the data to be signed. - sk(:mod:`EllipticCurvePrivateKey`): the ECDSA secret key used to signed the data. - rtype: the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``. + message(:obj:`bytes`): the data to be signed. + sk(:obj:`PrivateKey`): the ECDSA secret key used to signed the data. Returns: - :obj:`str` or :obj:`bytes`: The data signature in ``str`` or ``bytes``, depending on ``rtype``. - - Raises: - ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'`` + :obj:`str`: The zbase32 signature of the given message. """ - if rtype not in ["str", "bytes"]: - raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'") + if not isinstance(message, bytes): + logger.error("The message must be bytes. {} received".format(type(message))) + return None - if not isinstance(sk, ec.EllipticCurvePrivateKey): + if not isinstance(sk, PrivateKey): logger.error("The value passed as sk is not a private key (EllipticCurvePrivateKey)") return None - else: - signature = sk.sign(data, ec.ECDSA(hashes.SHA256())) + rsig_rid = sk.sign_recoverable(LN_MESSAGE_PREFIX + message, hasher=sha256d) + sigrec = sigrec_encode(rsig_rid) + zb32_sig = pyzbase32.encode_bytes(sigrec).decode() - if rtype == "str": - signature = hexlify(signature).decode("utf-8") - - return signature + return zb32_sig @staticmethod - def verify(message, signature, pk): + def recover_pk(message, zb32_sig): """ - Verifies if a signature is valid for a given public key and message. + Recovers an ECDSA public key from a given message and zbase32 signature. Args: - message(:mod:`bytes`): the message that is supposed have been signed. - signature(:mod:`str`): the potential signature of the message. - pk(:mod:`EllipticCurvePublicKey`): the public key that is used to try to verify the signature. + message(:obj:`bytes`): the data to be signed. + zb32_sig(:obj:`str`): the zbase32 signature of the message. Returns: - :mod:`bool`: Whether or not the provided signature is valid for the given message and public key. - Returns ``False`` is the ``key`` is not in the right format or if either the ``message`` or ``pk`` cannot - be decoded. + :obj:`PublicKey`: The recovered public key. """ - if not isinstance(pk, ec.EllipticCurvePublicKey): - logger.error("The value passed as pk is not a public key (EllipticCurvePublicKey)") - return False + if not isinstance(message, bytes): + logger.error("The message must be bytes. {} received".format(type(message))) + return None - if isinstance(signature, str): - signature = unhexlify(signature) + if not isinstance(zb32_sig, str): + logger.error("The zbase32_sig must be str. {} received".format(type(zb32_sig))) + return None + + sigrec = pyzbase32.decode_bytes(zb32_sig) + rsig_recid = sigrec_decode(sigrec) try: - pk.verify(signature, message, ec.ECDSA(hashes.SHA256())) + pk = PublicKey.from_signature_and_message(rsig_recid, LN_MESSAGE_PREFIX + message, hasher=sha256d) + return pk - return True + except ValueError as e: + # Several errors fit here: Signature length != 65, wrong recover id and failed to parse signature. + # All of them return raise ValueError. + logger.error(str(e)) + return None - except InvalidSignature: - return False + except Exception as e: + if "failed to recover ECDSA public key" in str(e): + logger.error("Cannot recover public key from signature".format(type(rsig_recid))) + else: + logger.error("Unknown exception", error=e) + + return None + + @staticmethod + def verify_rpk(pk, rpk): + """ + Verifies that that a recovered public key matches a given one. + + Args: + pk(:obj:`PublicKey`): a given public key (provided by the user). + rpk(:obj:`PublicKey`): a public key recovered via ``recover_pk``. + + Returns: + :obj:`bool`: True if the public keys match, False otherwise. + """ + + return pk.point() == rpk.point()