mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Docstrings for common package
This commit is contained in:
@@ -17,33 +17,69 @@ logger = Logger("Cryptographer")
|
|||||||
|
|
||||||
|
|
||||||
class Cryptographer:
|
class Cryptographer:
|
||||||
|
"""
|
||||||
|
The :class:`Cryptographer` is the class in charge of all the cryptography in the tower.
|
||||||
|
"""
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def check_data_key_format(data, key):
|
def check_data_key_format(data, secret):
|
||||||
|
"""
|
||||||
|
Checks that the data and secret that will be used to by ``encrypt`` / ``decrypt`` are properly
|
||||||
|
formatted.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data(:mod:`str`): the data to be encrypted.
|
||||||
|
secret(:mod:`str`): the secret used to derive the encryption key.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`bool`: Whether or not the ``key`` and ``data`` are properly formatted.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: if either the ``key`` or ``data`` is not properly formatted.
|
||||||
|
"""
|
||||||
|
|
||||||
if len(data) % 2:
|
if len(data) % 2:
|
||||||
error = "Incorrect (Odd-length) value."
|
error = "Incorrect (Odd-length) value."
|
||||||
logger.error(error, data=data)
|
logger.error(error, data=data)
|
||||||
raise ValueError(error)
|
raise ValueError(error)
|
||||||
|
|
||||||
if not check_sha256_hex_format(key):
|
if not check_sha256_hex_format(secret):
|
||||||
error = "Key must be a 32-byte hex value (64 hex chars)."
|
error = "Secret must be a 32-byte hex value (64 hex chars)."
|
||||||
logger.error(error, key=key)
|
logger.error(error, secret=secret)
|
||||||
raise ValueError(error)
|
raise ValueError(error)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def encrypt(blob, key, rtype="hex"):
|
def encrypt(blob, secret, rtype=str):
|
||||||
if rtype not in ["hex", "bytes"]:
|
"""
|
||||||
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
Encrypts a given :mod:`Blob <apps.cli.blob.Blob>` data using ``CHACHA20POLY1305``.
|
||||||
|
|
||||||
Cryptographer.check_data_key_format(blob.data, key)
|
``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
blob (:mod:`Blob <apps.cli.blob.Blob>`): a ``Blob`` object containing a raw penalty transaction.
|
||||||
|
secret (:mod:`str`): a value to used to derive the encryption key. Should be the dispute txid.
|
||||||
|
rtype: the return type for the encrypted value. Can be either ``str`` or ``bytes``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`str` or :obj:`bytes`: The encrypted data in ``str`` or ``bytes``, depending on ``rtype``.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: if ``rtype`` is not ``str`` or ``bytes``
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not (isinstance(str, rtype) or isinstance(bytes, rtype)):
|
||||||
|
raise ValueError("Wrong return type. Return type must be str or bytes")
|
||||||
|
|
||||||
|
Cryptographer.check_data_key_format(blob.data, secret)
|
||||||
|
|
||||||
# Transaction to be encrypted
|
# Transaction to be encrypted
|
||||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||||
tx = unhexlify(blob.data)
|
tx = unhexlify(blob.data)
|
||||||
|
|
||||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||||
sk = sha256(unhexlify(key)).digest()
|
sk = sha256(unhexlify(secret)).digest()
|
||||||
nonce = bytearray(12)
|
nonce = bytearray(12)
|
||||||
|
|
||||||
logger.info("Encrypting blob.", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), blob=blob.data)
|
logger.info("Encrypting blob.", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), blob=blob.data)
|
||||||
@@ -52,21 +88,39 @@ class Cryptographer:
|
|||||||
cipher = ChaCha20Poly1305(sk)
|
cipher = ChaCha20Poly1305(sk)
|
||||||
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
|
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
|
||||||
|
|
||||||
if rtype == "hex":
|
if isinstance(str, rtype):
|
||||||
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
|
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
|
||||||
|
|
||||||
return encrypted_blob
|
return encrypted_blob
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||||
def decrypt(encrypted_blob, key, rtype="hex"):
|
def decrypt(encrypted_blob, secret, rtype=str):
|
||||||
if rtype not in ["hex", "bytes"]:
|
"""
|
||||||
|
Decrypts a given :mod:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>` using ``CHACHA20POLY1305``.
|
||||||
|
|
||||||
|
``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
encrypted_blob(:mod:`EncryptedBlob <pisa.encrypted_blob.EncryptedBlob>`): an ``EncryptedBlob`` potentially
|
||||||
|
containing a penalty transaction.
|
||||||
|
secret (:mod:`str`): a value to used to derive the decryption key. Should be the dispute txid.
|
||||||
|
rtype: the return type for the encrypted value. Can be either ``str`` or ``bytes``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`str` or :obj:`bytes`: The decrypted data in ``str`` or ``bytes``, depending on ``rtype``.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: if ``rtype`` is not ``str`` or ``bytes``
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not (isinstance(str, rtype) or isinstance(bytes, rtype)):
|
||||||
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
||||||
|
|
||||||
Cryptographer.check_data_key_format(encrypted_blob.data, key)
|
Cryptographer.check_data_key_format(encrypted_blob.data, secret)
|
||||||
|
|
||||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||||
sk = sha256(unhexlify(key)).digest()
|
sk = sha256(unhexlify(secret)).digest()
|
||||||
nonce = bytearray(12)
|
nonce = bytearray(12)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@@ -84,7 +138,7 @@ class Cryptographer:
|
|||||||
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
|
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
|
||||||
|
|
||||||
# Change the blob encoding to hex depending on the rtype (default)
|
# Change the blob encoding to hex depending on the rtype (default)
|
||||||
if rtype == "hex":
|
if isinstance(str, rtype):
|
||||||
blob = hexlify(blob).decode("utf8")
|
blob = hexlify(blob).decode("utf8")
|
||||||
|
|
||||||
except InvalidTag:
|
except InvalidTag:
|
||||||
@@ -95,12 +149,37 @@ class Cryptographer:
|
|||||||
# NOTCOVERED
|
# NOTCOVERED
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def signature_format(data):
|
def signature_format(data):
|
||||||
|
"""
|
||||||
|
Serializes a given ``data`` in the right format to be signed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data(:mod:`str`): the data to be formatted.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:mod:`str`: The serialized data to be signed.
|
||||||
|
"""
|
||||||
|
|
||||||
# FIXME: This is temporary serialization. A proper one is required. Data need to be unhexlified too (can't atm)
|
# FIXME: This is temporary serialization. A proper one is required. Data need to be unhexlified too (can't atm)
|
||||||
return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||||
|
|
||||||
# Deserialize public key from der data.
|
# Deserialize public key from der data.
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load_public_key_der(pk_der):
|
def load_public_key_der(pk_der):
|
||||||
|
"""
|
||||||
|
Creates an :mod:`EllipticCurvePublicKey` object from a given ``DER`` encoded public key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pk_der(:mod:`str`): a public key encoded in ``DER`` format.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:mod:`EllipticCurvePublicKey`: An ``EllipticCurvePublicKey`` object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
UnsupportedAlgorithm: if the key algorithm is not supported.
|
||||||
|
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
|
||||||
|
TypeError: if the provided ``pk_der`` data is not a string.
|
||||||
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pk = load_der_public_key(pk_der, backend=default_backend())
|
pk = load_der_public_key(pk_der, backend=default_backend())
|
||||||
return pk
|
return pk
|
||||||
@@ -109,16 +188,30 @@ class Cryptographer:
|
|||||||
logger.error("Could not deserialize the public key (unsupported algorithm).")
|
logger.error("Could not deserialize the public key (unsupported algorithm).")
|
||||||
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
logger.error("The provided data cannot be deserialized (wrong size or format)")
|
logger.error("The provided data cannot be deserialized (wrong size or format).")
|
||||||
|
|
||||||
except TypeError:
|
except TypeError:
|
||||||
logger.error("The provided data cannot be deserialized (wrong type)")
|
logger.error("The provided data cannot be deserialized (wrong type).")
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Deserialize private key from der data.
|
# Deserialize private key from der data.
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load_private_key_der(sk_der):
|
def load_private_key_der(sk_der):
|
||||||
|
"""
|
||||||
|
Creates an :mod:`EllipticCurvePrivateKey` object from a given ``DER`` encoded private key.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sk_der(:mod:`str`): a private key encoded in ``DER`` format.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:mod:`EllipticCurvePrivateKey`: An ``EllipticCurvePrivateKey`` object.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
UnsupportedAlgorithm: if the key algorithm is not supported.
|
||||||
|
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
|
||||||
|
TypeError: if the provided ``pk_der`` data is not a string.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
sk = load_der_private_key(sk_der, None, backend=default_backend())
|
sk = load_der_private_key(sk_der, None, backend=default_backend())
|
||||||
return sk
|
return sk
|
||||||
@@ -126,10 +219,31 @@ class Cryptographer:
|
|||||||
except UnsupportedAlgorithm:
|
except UnsupportedAlgorithm:
|
||||||
raise ValueError("Could not deserialize the private key (unsupported algorithm).")
|
raise ValueError("Could not deserialize the private key (unsupported algorithm).")
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
logger.error("The provided data cannot be deserialized (wrong size or format).")
|
||||||
|
|
||||||
|
except TypeError:
|
||||||
|
logger.error("The provided data cannot be deserialized (wrong type).")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def sign(data, sk, rtype="hex"):
|
def sign(data, sk, rtype=str):
|
||||||
if rtype not in ["hex", "bytes"]:
|
"""
|
||||||
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
Signs a given data using a given secret key using ECDSA.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data(:mod:`str`): the data to be signed.
|
||||||
|
sk(:mod:`EllipticCurvePrivateKey`): the ECDSA secret key used to signed the data.
|
||||||
|
rtype: the return type for the encrypted value. Can be either ``str`` or ``bytes``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`str` or :obj:`bytes`: The data signature in ``str`` or ``bytes``, depending on ``rtype``.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: if ``rtype`` is not ``str`` or ``bytes``
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not (isinstance(str, rtype) or isinstance(bytes, rtype)):
|
||||||
|
raise ValueError("Wrong return type. Return type must be str or bytes")
|
||||||
|
|
||||||
if not isinstance(sk, ec.EllipticCurvePrivateKey):
|
if not isinstance(sk, ec.EllipticCurvePrivateKey):
|
||||||
logger.error("Wrong public key.")
|
logger.error("Wrong public key.")
|
||||||
@@ -145,6 +259,20 @@ class Cryptographer:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def verify(message, signature, pk):
|
def verify(message, signature, pk):
|
||||||
|
"""
|
||||||
|
Verifies if a signature is valid for a given public key and message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message(:mod:`str`): the message that is supposed have been signed.
|
||||||
|
signature(:mod:`str`): the potential signature of the message.
|
||||||
|
pk(:mod:`EllipticCurvePublicKey`): the public key that is used to try to verify the signature.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:mod:`bool`: Whether or not the provided signature is valid for the given message and public key.
|
||||||
|
Returns ``False`` is the ``key`` is not in the right format or if either the ``message`` or ``pk`` cannot
|
||||||
|
be decoded.
|
||||||
|
"""
|
||||||
|
|
||||||
if not isinstance(pk, ec.EllipticCurvePublicKey):
|
if not isinstance(pk, ec.EllipticCurvePublicKey):
|
||||||
logger.error("Wrong public key.")
|
logger.error("Wrong public key.")
|
||||||
return False
|
return False
|
||||||
|
|||||||
@@ -2,4 +2,13 @@ import re
|
|||||||
|
|
||||||
|
|
||||||
def check_sha256_hex_format(value):
|
def check_sha256_hex_format(value):
|
||||||
|
"""
|
||||||
|
Checks if a given value is a 32-byte hex encoded string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value(:mod:`str`): the value to be checked.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:mod:`bool`: Wether or not the value matches the format.
|
||||||
|
"""
|
||||||
return isinstance(value, str) and re.search(r"^[0-9A-Fa-f]{64}$", value) is not None
|
return isinstance(value, str) and re.search(r"^[0-9A-Fa-f]{64}$", value) is not None
|
||||||
|
|||||||
Reference in New Issue
Block a user