Merge pull request #99 from sr-gi/lightning-signmessage-recoverpk

Lightning signmessage recoverpk
This commit is contained in:
Sergi Delgado Segura
2020-03-17 11:54:53 +01:00
committed by GitHub
20 changed files with 282 additions and 364 deletions

View File

@@ -18,21 +18,15 @@ jobs:
steps: steps:
- checkout - checkout
# Get bitcoin_sandbox
- run:
name: Get bitcoin_sandbox
command: git clone --single-branch --branch ln https://github.com/sr-gi/bitcoin_sandbox.git
# Download and cache dependencies # Download and cache dependencies
- restore_cache: - restore_cache:
keys: keys:
- v2-dependencies-{{ checksum "pisa/requirements.txt" }}-{{ checksum "pisa/requirements-dev.txt" }}-{{ checksum "apps/cli/requirements-dev.txt" }}-{{ checksum "bitcoin_sandbox/requirements.txt" }} - v1-dependencies-{{ checksum "pisa/requirements.txt" }}-{{ checksum "pisa/requirements-dev.txt" }}-{{ checksum "apps/cli/requirements-dev.txt" }}-{{ checksum "test/pisa/e2e/bitcoind_snap.sh" }}
# fallback to using the latest cache if no exact match is found
- v2-dependencies-
- run: - run:
name: Install dependencies name: Install dependencies
command: | command: |
sudo snap install `cat test/pisa/e2e/bitcoind_snap.sh`
pyenv local 3.7.0 pyenv local 3.7.0
python3 -m venv venv python3 -m venv venv
. venv/bin/activate . venv/bin/activate
@@ -40,23 +34,20 @@ jobs:
pip install -r pisa/requirements.txt pip install -r pisa/requirements.txt
pip install -r pisa/requirements-dev.txt pip install -r pisa/requirements-dev.txt
pip install -r apps/cli/requirements-dev.txt pip install -r apps/cli/requirements-dev.txt
pip install -r bitcoin_sandbox/requirements.txt
- save_cache: - save_cache:
paths: paths:
- ./venv - ./venv
key: v2-dependencies-{{ checksum "pisa/requirements.txt" }}-{{ checksum "pisa/requirements-dev.txt" }}-{{ checksum "apps/cli/requirements-dev.txt" }}-{{ checksum "bitcoin_sandbox/requirements.txt" }} - /snap
key: v1-dependencies-{{ checksum "pisa/requirements.txt" }}-{{ checksum "pisa/requirements-dev.txt" }}-{{ checksum "apps/cli/requirements-dev.txt" }}-{{ checksum "test/pisa/e2e/bitcoind_snap.sh" }}
# Build docker env for E2E testing # Run bitcoind for E2E testing (running it early so it has time to bootstrap)
- run: - run:
name: Build bitcoin_sandbox name: Run bitcoind
command: | command: |
cp test/pisa/e2e/bitcoin.conf bitcoin_sandbox/ mkdir -p /home/circleci/snap/bitcoin-core/common/.bitcoin/
cp test/pisa/e2e/sandbox-conf.py bitcoin_sandbox/bitcoin_sandbox/conf.py cp test/pisa/e2e/bitcoin.conf /home/circleci/snap/bitcoin-core/common/.bitcoin/
cp bitcoin_sandbox/docker/Dockerfile_ubuntu_no_ln bitcoin_sandbox/Dockerfile /snap/bin/bitcoin-core.daemon
. venv/bin/activate
cd bitcoin_sandbox && python -m bitcoin_sandbox.run_scenarios
# Run unit tests # Run unit tests
- run: - run:

View File

@@ -145,3 +145,14 @@ or
## the Eye of Satoshi's API ## the Eye of Satoshi's API
If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [tEOS-API.md](tEOS-API.md). If you wish to read about the underlying API, and how to write your own tool to interact with it, refer to [tEOS-API.md](tEOS-API.md).
## Are you reckless? Try me on mainnet
Would you like to try me on `mainnet` instead of `testnet`? Add `-s https://mainnet.teos.pisa.watch` to your calls, for example:
```
python wt_cli.py -s https://teosmainnet.pisa.watch add_appointment -f dummy_appointment_data.json
```
You can also change the config file to avoid specifying the server every time:
`DEFAULT_PISA_API_SERVER = "https://teosmainnet.pisa.watch"`

View File

@@ -1 +1,3 @@
responses responses
pytest
black

View File

@@ -5,9 +5,10 @@ import requests
import time import time
import binascii import binascii
from sys import argv from sys import argv
from uuid import uuid4
from coincurve import PublicKey
from getopt import getopt, GetoptError from getopt import getopt, GetoptError
from requests import ConnectTimeout, ConnectionError from requests import ConnectTimeout, ConnectionError
from uuid import uuid4
from apps.cli import config, LOG_PREFIX from apps.cli import config, LOG_PREFIX
from apps.cli.help import help_add_appointment, help_get_appointment from apps.cli.help import help_add_appointment, help_get_appointment
@@ -62,8 +63,8 @@ common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_
def load_keys(): def load_keys():
PISA_PUBLIC_KEY = "3056301006072a8648ce3d020106052b8104000a0342000430053e39c53b8bcb43354a4ed886b8082af1d1e8fc14956e60ad0592bfdfab511b7e309f6ac83b7495462196692e145bf7b1a321e96ec8fc4d678719c77342da" PISA_PUBLIC_KEY = "0230053e39c53b8bcb43354a4ed886b8082af1d1e8fc14956e60ad0592bfdfab51"
pisa_pk = Cryptographer.load_public_key_der(binascii.unhexlify(PISA_PUBLIC_KEY)) pisa_pk = PublicKey(binascii.unhexlify(PISA_PUBLIC_KEY))
return pisa_pk return pisa_pk
@@ -161,7 +162,8 @@ def add_appointment(args):
logger.error("The response does not contain the signature of the appointment") logger.error("The response does not contain the signature of the appointment")
return False return False
if not Cryptographer.verify(appointment.serialize(), signature, pisa_pk): rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
if not Cryptographer.verify_rpk(pisa_pk, rpk):
logger.error("The returned appointment's signature is invalid") logger.error("The returned appointment's signature is invalid")
return False return False

View File

@@ -14,7 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import ec
def save_sk(sk, filename): def save_sk(sk, filename):
der = sk.private_bytes( der = sk.private_bytes(
encoding=serialization.Encoding.DER, encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )

View File

@@ -1,15 +1,67 @@
import pyzbase32
from hashlib import sha256 from hashlib import sha256
from binascii import unhexlify, hexlify from binascii import unhexlify, hexlify
from coincurve.utils import int_to_bytes
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm from coincurve import PrivateKey, PublicKey
from cryptography.hazmat.backends import default_backend from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.serialization import load_der_public_key, load_der_private_key
from cryptography.exceptions import InvalidSignature
from common.tools import check_sha256_hex_format from common.tools import check_sha256_hex_format
LN_MESSAGE_PREFIX = b"Lightning Signed Message:"
def sha256d(message):
"""
Compute the sha245d (double sha256) of a given by message.
Args:
message(:obj:`bytes`): the message to be used as input to the hash function.
Returns:
:obj:`bytes`: the sha256d of the given message.
"""
return sha256(sha256(message).digest()).digest()
def sigrec_encode(rsig_rid):
"""
Encodes a pk-recoverable signature to be used in LN. ```rsig_rid`` can be obtained trough
``PrivateKey.sign_recoverable``. The required format has the recovery id as the last byte, and for signing LN
messages we need it as the first.
From: https://twitter.com/rusty_twit/status/1182102005914800128
Args:
rsig_rid(:obj:`bytes`): the signature to be encoded.
Returns:
:obj:`bytes`: the encoded signature.
"""
rsig, rid = rsig_rid[:64], rsig_rid[64]
sigrec = int_to_bytes(rid + 31) + rsig
return sigrec
def sigrec_decode(sigrec):
"""
Decodes a pk-recoverable signature in the format used by LN to be input to ``PublicKey.from_signature_and_message``.
Args:
sigrec(:obj:`bytes`): the signature to be decoded.
Returns:
:obj:`bytes`: the decoded signature.
"""
rid, rsig = int_to_bytes(sigrec[0] - 31), sigrec[1:]
rsig_rid = rsig + rid
return rsig_rid
# FIXME: Common has not log file, so it needs to log in the same log as the caller. This is a temporary fix. # FIXME: Common has not log file, so it needs to log in the same log as the caller. This is a temporary fix.
logger = None logger = None
@@ -47,7 +99,7 @@ class Cryptographer:
return True return True
@staticmethod @staticmethod
def encrypt(blob, secret, rtype="str"): def encrypt(blob, secret):
""" """
Encrypts a given :mod:`Blob <common.cli.blob.Blob>` data using ``CHACHA20POLY1305``. Encrypts a given :mod:`Blob <common.cli.blob.Blob>` data using ``CHACHA20POLY1305``.
@@ -56,18 +108,11 @@ class Cryptographer:
Args: Args:
blob (:mod:`Blob <common.cli.blob.Blob>`): a ``Blob`` object containing a raw penalty transaction. blob (:mod:`Blob <common.cli.blob.Blob>`): a ``Blob`` object containing a raw penalty transaction.
secret (:mod:`str`): a value to used to derive the encryption key. Should be the dispute txid. secret (:mod:`str`): a value to used to derive the encryption key. Should be the dispute txid.
rtype(:mod:`str`): the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``.
Returns: Returns:
:obj:`str` or :obj:`bytes`: The encrypted data in ``str`` or ``bytes``, depending on ``rtype``. :obj:`str`: The encrypted data (hex encoded).
Raises:
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
""" """
if rtype not in ["str", "bytes"]:
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'")
Cryptographer.check_data_key_format(blob.data, secret) Cryptographer.check_data_key_format(blob.data, secret)
# Transaction to be encrypted # Transaction to be encrypted
@@ -83,36 +128,27 @@ class Cryptographer:
# Encrypt the data # Encrypt the data
cipher = ChaCha20Poly1305(sk) cipher = ChaCha20Poly1305(sk)
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None) encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
if rtype == "str":
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
return encrypted_blob return encrypted_blob
@staticmethod @staticmethod
# ToDo: #20-test-tx-decrypting-edge-cases # ToDo: #20-test-tx-decrypting-edge-cases
def decrypt(encrypted_blob, secret, rtype="str"): def decrypt(encrypted_blob, secret):
""" """
Decrypts a given :mod:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` using ``CHACHA20POLY1305``. Decrypts a given :mod:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` using ``CHACHA20POLY1305``.
``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``. ``SHA256(secret)`` is used as ``key``, and ``0 (12-byte)`` as ``iv``.
Args: Args:
encrypted_blob(:mod:`EncryptedBlob <comnmon.encrypted_blob.EncryptedBlob>`): an ``EncryptedBlob`` potentially encrypted_blob(:mod:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>`): an ``EncryptedBlob`` potentially
containing a penalty transaction. containing a penalty transaction.
secret (:mod:`str`): a value to used to derive the decryption key. Should be the dispute txid. secret (:mod:`str`): a value to used to derive the decryption key. Should be the dispute txid.
rtype(:mod:`str`): the return type for the decrypted value. Can be either ``'str'`` or ``'bytes'``.
Returns: Returns:
:obj:`str` or :obj:`bytes`: The decrypted data in ``str`` or ``bytes``, depending on ``rtype``. :obj:`str`: The decrypted data (hex encoded).
Raises:
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
""" """
if rtype not in ["str", "bytes"]:
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'")
Cryptographer.check_data_key_format(encrypted_blob.data, secret) Cryptographer.check_data_key_format(encrypted_blob.data, secret)
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte) # sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
@@ -132,10 +168,7 @@ class Cryptographer:
try: try:
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None) blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
blob = hexlify(blob).decode("utf8")
# Change the blob encoding to hex depending on the rtype (default)
if rtype == "str":
blob = hexlify(blob).decode("utf8")
except InvalidTag: except InvalidTag:
blob = None blob = None
@@ -172,61 +205,25 @@ class Cryptographer:
logger.error("I/O error({}): {}".format(e.errno, e.strerror)) logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return None return None
@staticmethod
def load_public_key_der(pk_der):
"""
Creates an :mod:`EllipticCurvePublicKey` object from a given ``DER`` encoded public key.
Args:
pk_der(:mod:`str`): a public key encoded in ``DER`` format.
Returns:
:mod:`EllipticCurvePublicKey`: An ``EllipticCurvePublicKey`` object.
Raises:
UnsupportedAlgorithm: if the key algorithm is not supported.
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
TypeError: if the provided ``pk_der`` data is not a string.
"""
try:
pk = load_der_public_key(pk_der, backend=default_backend())
return pk
except UnsupportedAlgorithm:
logger.error("Could not deserialize the public key (unsupported algorithm)")
except ValueError:
logger.error("The provided data cannot be deserialized (wrong size or format)")
except TypeError:
logger.error("The provided data cannot be deserialized (wrong type)")
return None
@staticmethod @staticmethod
def load_private_key_der(sk_der): def load_private_key_der(sk_der):
""" """
Creates an :mod:`EllipticCurvePrivateKey` object from a given ``DER`` encoded private key. Creates a :mod:`PrivateKey` object from a given ``DER`` encoded private key.
Args: Args:
sk_der(:mod:`str`): a private key encoded in ``DER`` format. sk_der(:mod:`str`): a private key encoded in ``DER`` format.
Returns: Returns:
:mod:`EllipticCurvePrivateKey`: An ``EllipticCurvePrivateKey`` object. :mod:`PrivateKey`: A ``PrivateKey`` object.
Raises: Raises:
UnsupportedAlgorithm: if the key algorithm is not supported.
ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format). ValueError: if the provided ``pk_der`` data cannot be deserialized (wrong size or format).
TypeError: if the provided ``pk_der`` data is not a string. TypeError: if the provided ``pk_der`` data is not a string.
""" """
try: try:
sk = load_der_private_key(sk_der, None, backend=default_backend()) sk = PrivateKey.from_der(sk_der)
return sk return sk
except UnsupportedAlgorithm:
logger.error("Could not deserialize the private key (unsupported algorithm)")
except ValueError: except ValueError:
logger.error("The provided data cannot be deserialized (wrong size or format)") logger.error("The provided data cannot be deserialized (wrong size or format)")
@@ -236,64 +233,85 @@ class Cryptographer:
return None return None
@staticmethod @staticmethod
def sign(data, sk, rtype="str"): def sign(message, sk):
""" """
Signs a given data using a given secret key using ECDSA. Signs a given data using a given secret key using ECDSA.
Args: Args:
data(:mod:`bytes`): the data to be signed. message(:obj:`bytes`): the data to be signed.
sk(:mod:`EllipticCurvePrivateKey`): the ECDSA secret key used to signed the data. sk(:obj:`PrivateKey`): the ECDSA secret key used to signed the data.
rtype: the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``.
Returns: Returns:
:obj:`str` or :obj:`bytes`: The data signature in ``str`` or ``bytes``, depending on ``rtype``. :obj:`str`: The zbase32 signature of the given message.
Raises:
ValueError: if ``rtype`` is not ``'str'`` or ``'bytes'``
""" """
if rtype not in ["str", "bytes"]: if not isinstance(message, bytes):
raise ValueError("Wrong return type. Return type must be 'str' or 'bytes'") logger.error("The message must be bytes. {} received".format(type(message)))
return None
if not isinstance(sk, ec.EllipticCurvePrivateKey): if not isinstance(sk, PrivateKey):
logger.error("The value passed as sk is not a private key (EllipticCurvePrivateKey)") logger.error("The value passed as sk is not a private key (EllipticCurvePrivateKey)")
return None return None
else: rsig_rid = sk.sign_recoverable(LN_MESSAGE_PREFIX + message, hasher=sha256d)
signature = sk.sign(data, ec.ECDSA(hashes.SHA256())) sigrec = sigrec_encode(rsig_rid)
zb32_sig = pyzbase32.encode_bytes(sigrec).decode()
if rtype == "str": return zb32_sig
signature = hexlify(signature).decode("utf-8")
return signature
@staticmethod @staticmethod
def verify(message, signature, pk): def recover_pk(message, zb32_sig):
""" """
Verifies if a signature is valid for a given public key and message. Recovers an ECDSA public key from a given message and zbase32 signature.
Args: Args:
message(:mod:`bytes`): the message that is supposed have been signed. message(:obj:`bytes`): the data to be signed.
signature(:mod:`str`): the potential signature of the message. zb32_sig(:obj:`str`): the zbase32 signature of the message.
pk(:mod:`EllipticCurvePublicKey`): the public key that is used to try to verify the signature.
Returns: Returns:
:mod:`bool`: Whether or not the provided signature is valid for the given message and public key. :obj:`PublicKey`: The recovered public key.
Returns ``False`` is the ``key`` is not in the right format or if either the ``message`` or ``pk`` cannot
be decoded.
""" """
if not isinstance(pk, ec.EllipticCurvePublicKey): if not isinstance(message, bytes):
logger.error("The value passed as pk is not a public key (EllipticCurvePublicKey)") logger.error("The message must be bytes. {} received".format(type(message)))
return False return None
if isinstance(signature, str): if not isinstance(zb32_sig, str):
signature = unhexlify(signature) logger.error("The zbase32_sig must be str. {} received".format(type(zb32_sig)))
return None
sigrec = pyzbase32.decode_bytes(zb32_sig)
rsig_recid = sigrec_decode(sigrec)
try: try:
pk.verify(signature, message, ec.ECDSA(hashes.SHA256())) pk = PublicKey.from_signature_and_message(rsig_recid, LN_MESSAGE_PREFIX + message, hasher=sha256d)
return pk
return True except ValueError as e:
# Several errors fit here: Signature length != 65, wrong recover id and failed to parse signature.
# All of them return raise ValueError.
logger.error(str(e))
return None
except InvalidSignature: except Exception as e:
return False if "failed to recover ECDSA public key" in str(e):
logger.error("Cannot recover public key from signature".format(type(rsig_recid)))
else:
logger.error("Unknown exception", error=e)
return None
@staticmethod
def verify_rpk(pk, rpk):
"""
Verifies that that a recovered public key matches a given one.
Args:
pk(:obj:`PublicKey`): a given public key (provided by the user).
rpk(:obj:`PublicKey`): a public key recovered via ``recover_pk``.
Returns:
:obj:`bool`: True if the public keys match, False otherwise.
"""
return pk.point() == rpk.point()

View File

@@ -3,7 +3,7 @@ from binascii import unhexlify
import common.cryptographer import common.cryptographer
from common.constants import LOCATOR_LEN_HEX from common.constants import LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer, PublicKey
from pisa import errors, LOG_PREFIX from pisa import errors, LOG_PREFIX
from common.logger import Logger from common.logger import Logger
@@ -337,14 +337,14 @@ class Inspector:
@staticmethod @staticmethod
# Verifies that the appointment signature is a valid signature with public key # Verifies that the appointment signature is a valid signature with public key
def check_appointment_signature(appointment_data, signature, pk_der): def check_appointment_signature(appointment_data, signature, pk):
""" """
Checks if the provided user signature is correct. Checks if the provided user signature is correct.
Args: Args:
appointment_data (:obj:`dict`): the appointment that was signed by the user. appointment_data (:obj:`dict`): the appointment that was signed by the user.
signature (:obj:`str`): the user's signature (hex encoded). signature (:obj:`str`): the user's signature (hex encoded).
pk_der (:obj:`str`): the user's public key (hex encoded, DER format). pk (:obj:`str`): the user's public key (hex encoded).
Returns: Returns:
:obj:`tuple`: A tuple (return code, message) as follows: :obj:`tuple`: A tuple (return code, message) as follows:
@@ -363,13 +363,19 @@ class Inspector:
rcode = errors.APPOINTMENT_EMPTY_FIELD rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty signature received" message = "empty signature received"
elif pk_der is None: elif pk is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty public key received" message = "empty public key received"
elif re.match(r"^[0-9A-Fa-f]{66}$", pk) is None:
rcode = errors.APPOINTMENT_WRONG_FIELD
message = "public key must be a hex encoded 33-byte long value"
else: else:
pk = Cryptographer.load_public_key_der(unhexlify(pk_der)) appointment = Appointment.from_dict(appointment_data)
valid_sig = Cryptographer.verify(Appointment.from_dict(appointment_data).serialize(), signature, pk) rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
pk = PublicKey(unhexlify(pk))
valid_sig = Cryptographer.verify_rpk(pk, rpk)
if not valid_sig: if not valid_sig:
rcode = errors.APPOINTMENT_INVALID_SIGNATURE rcode = errors.APPOINTMENT_INVALID_SIGNATURE

View File

@@ -1,5 +1,7 @@
zmq zmq
flask flask
cryptography cryptography
coincurve
pyzbase32
requests requests
plyvel plyvel

View File

@@ -51,7 +51,7 @@ class Watcher:
config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve config (:obj:`dict`): a dictionary containing all the configuration parameters. Used locally to retrieve
``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``. ``MAX_APPOINTMENTS`` and ``EXPIRY_DELTA``.
db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database. db_manager (:obj:`DBManager <pisa.db_manager>`): A db manager instance to interact with the database.
signing_key (:mod:`EllipticCurvePrivateKey`): a private key used to sign accepted appointments. signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
Raises: Raises:
ValueError: if `pisa_sk_file` is not found. ValueError: if `pisa_sk_file` is not found.

View File

@@ -1,12 +1,8 @@
import responses
import json
import os import os
import json
import shutil import shutil
from binascii import hexlify import responses
from coincurve import PrivateKey
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
import common.cryptographer import common.cryptographer
from common.logger import Logger from common.logger import Logger
@@ -21,18 +17,9 @@ from test.apps.cli.unit.conftest import get_random_value_hex
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=wt_cli.LOG_PREFIX) common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=wt_cli.LOG_PREFIX)
# dummy keys for the tests # dummy keys for the tests
dummy_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) dummy_sk = PrivateKey()
dummy_pk = dummy_sk.public_key() dummy_pk = dummy_sk.public_key
another_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) another_sk = PrivateKey()
dummy_sk_der = dummy_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
dummy_pk_der = dummy_pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Replace the key in the module with a key we control for the tests # Replace the key in the module with a key we control for the tests
@@ -69,14 +56,6 @@ def load_dummy_keys(*args):
return dummy_pk return dummy_pk
def get_dummy_pisa_pk_der(*args):
return dummy_pk_der
def get_dummy_hex_pk_der(*args):
return hexlify(get_dummy_pisa_pk_der())
def get_dummy_signature(*args): def get_dummy_signature(*args):
return Cryptographer.sign(dummy_appointment.serialize(), dummy_sk) return Cryptographer.sign(dummy_appointment.serialize(), dummy_sk)
@@ -90,7 +69,7 @@ def get_bad_signature(*args):
# private_key_file_path = "sk_test_file" # private_key_file_path = "sk_test_file"
# public_key_file_path = "pk_test_file" # public_key_file_path = "pk_test_file"
# with open(private_key_file_path, "wb") as f: # with open(private_key_file_path, "wb") as f:
# f.write(dummy_sk_der) # f.write(dummy_sk.to_der())
# with open(public_key_file_path, "wb") as f: # with open(public_key_file_path, "wb") as f:
# f.write(dummy_pk_der) # f.write(dummy_pk_der)
# #

View File

@@ -1,9 +1,10 @@
import os import os
import binascii from binascii import unhexlify
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from coincurve import PrivateKey, PublicKey
import common.cryptographer import common.cryptographer
from common.blob import Blob from common.blob import Blob
from common.logger import Logger from common.logger import Logger
@@ -22,34 +23,12 @@ WRONG_TYPES = [None, 2134, 14.56, str(), list(), dict()]
def generate_keypair(): def generate_keypair():
sk = ec.generate_private_key(ec.SECP256K1, default_backend()) sk = PrivateKey()
pk = sk.public_key() pk = sk.public_key
sk_der = sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
return sk, pk return sk, pk
def generate_keypair_der():
sk, pk = generate_keypair()
sk_der = sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pk_der = pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return sk_der, pk_der
def test_check_data_key_format_wrong_data(): def test_check_data_key_format_wrong_data():
data = get_random_value_hex(64)[:-1] data = get_random_value_hex(64)[:-1]
key = get_random_value_hex(32) key = get_random_value_hex(32)
@@ -105,29 +84,12 @@ def test_encrypt_wrong_key_size():
assert True assert True
def test_encrypt_hex(): def test_encrypt():
blob = Blob(data) blob = Blob(data)
assert Cryptographer.encrypt(blob, key) == encrypted_data assert Cryptographer.encrypt(blob, key) == encrypted_data
def test_encrypt_bytes():
blob = Blob(data)
byte_blob = Cryptographer.encrypt(blob, key, rtype="bytes")
assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(encrypted_data)
def test_encrypt_wrong_return():
# Any other type but "hex" (default) or "bytes" should fail
try:
Cryptographer.encrypt(Blob(data), key, rtype="random_value")
assert False
except ValueError:
assert True
def test_decrypt_invalid_tag(): def test_decrypt_invalid_tag():
random_key = get_random_value_hex(32) random_key = get_random_value_hex(32)
random_encrypted_data = get_random_value_hex(64) random_encrypted_data = get_random_value_hex(64)
@@ -165,27 +127,11 @@ def test_decrypt_wrong_key_size():
assert True assert True
def test_decrypt_hex(): def test_decrypt():
# Valid data should run with no InvalidTag and verify # Valid data should run with no InvalidTag and verify
assert Cryptographer.decrypt(EncryptedBlob(encrypted_data), key) == data assert Cryptographer.decrypt(EncryptedBlob(encrypted_data), key) == data
def test_decrypt_bytes():
# We can also get the decryption in bytes
byte_blob = Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="bytes")
assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(data)
def test_decrypt_wrong_return():
# Any other type but "hex" (default) or "bytes" should fail
try:
Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="random_value")
assert False
except ValueError:
assert True
def test_load_key_file(): def test_load_key_file():
dummy_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) dummy_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
dummy_sk_der = dummy_sk.private_bytes( dummy_sk_der = dummy_sk.private_bytes(
@@ -210,19 +156,6 @@ def test_load_key_file():
assert Cryptographer.load_key_file(0) is None and Cryptographer.load_key_file(None) is None assert Cryptographer.load_key_file(0) is None and Cryptographer.load_key_file(None) is None
def test_load_public_key_der():
# load_public_key_der expects a byte encoded data. Any other should fail and return None
for wtype in WRONG_TYPES:
assert Cryptographer.load_public_key_der(wtype) is None
# On the other hand, any random formatter byte array would also fail (zeros for example)
assert Cryptographer.load_public_key_der(bytes(32)) is None
# A proper formatted key should load
_, pk_der = generate_keypair_der()
assert Cryptographer.load_public_key_der(pk_der) is not None
def test_load_private_key_der(): def test_load_private_key_der():
# load_private_key_der expects a byte encoded data. Any other should fail and return None # load_private_key_der expects a byte encoded data. Any other should fail and return None
for wtype in WRONG_TYPES: for wtype in WRONG_TYPES:
@@ -232,82 +165,93 @@ def test_load_private_key_der():
assert Cryptographer.load_private_key_der(bytes(32)) is None assert Cryptographer.load_private_key_der(bytes(32)) is None
# A proper formatted key should load # A proper formatted key should load
sk_der, _ = generate_keypair_der() sk_der = generate_keypair()[0].to_der()
assert Cryptographer.load_private_key_der(sk_der) is not None assert Cryptographer.load_private_key_der(sk_der) is not None
def test_sign_wrong_rtype():
# Calling sign with an rtype different than 'str' or 'bytes' should fail
for wtype in WRONG_TYPES:
try:
Cryptographer.sign(b"", "", rtype=wtype)
assert False
except ValueError:
assert True
def test_sign_wrong_sk():
# If a sk is not passed, sign will return None
for wtype in WRONG_TYPES:
assert Cryptographer.sign(b"", wtype) is None
def test_sign(): def test_sign():
# Otherwise we should get a signature # Otherwise we should get a signature
sk, _ = generate_keypair() sk, _ = generate_keypair()
message = b"" message = b""
assert Cryptographer.sign(message, sk) is not None assert Cryptographer.sign(message, sk) is not None
assert isinstance(Cryptographer.sign(message, sk), str)
# Check that the returns work
assert isinstance(Cryptographer.sign(message, sk, rtype="str"), str)
assert isinstance(Cryptographer.sign(message, sk, rtype="bytes"), bytes)
def test_verify_wrong_pk(): def test_sign_ground_truth():
# If a pk is not passed, verify will return None # Generate a signature that has been verified by c-lightning.
raw_sk = "24e9a981580d27d9277071a8381542e89a7c124868c4e862a13595dc75c6922f"
sk = PrivateKey.from_hex(raw_sk)
c_lightning_rpk = "0235293db86c6aaa74aff69ebacad8471d5242901ea9f6a0341a8dca331875e62c"
message = b"Test message"
sig = Cryptographer.sign(message, sk)
rpk = Cryptographer.recover_pk(message, sig)
assert Cryptographer.verify_rpk(PublicKey(unhexlify(c_lightning_rpk)), rpk)
def test_sign_wrong_sk():
# If a sk is not passed, sign will return None
for wtype in WRONG_TYPES: for wtype in WRONG_TYPES:
assert Cryptographer.sign("", wtype) is None assert Cryptographer.sign(b"", wtype) is None
def test_verify_random_values(): def test_recover_pk():
# Random values shouldn't verify
sk, pk = generate_keypair()
message = binascii.unhexlify(get_random_value_hex(32))
signature = get_random_value_hex(32)
assert Cryptographer.verify(message, signature, pk) is False
def test_verify_wrong_pair():
# Verifying with a wrong keypair must fail
sk, _ = generate_keypair() sk, _ = generate_keypair()
_, pk = generate_keypair() message = b"Test message"
message = binascii.unhexlify(get_random_value_hex(32)) zbase32_sig = Cryptographer.sign(message, sk)
signature = get_random_value_hex(32) rpk = Cryptographer.recover_pk(message, zbase32_sig)
assert Cryptographer.verify(message, signature, pk) is False assert isinstance(rpk, PublicKey)
def test_verify_wrong_message(): def test_recover_pk_ground_truth():
# Verifying with a wrong keypair must fail # Use a message a signature generated by c-lightning and see if we recover the proper key
sk, pk = generate_keypair() message = b"Test message"
org_pk = "02b821c749295d5c24f6166ae77d8353eaa36fc4e47326670c6d2522cbd344bab9"
zsig = "rbwewwyr4zem3w5t39fd1xyeamfzbmfgztwm4b613ybjtmoeod5kazaxqo3akn3ae75bqi3aqeds8cs6n43w4p58ft34itjnnb61bp54"
message = binascii.unhexlify(get_random_value_hex(32)) rpk = Cryptographer.recover_pk(message, zsig)
signature = Cryptographer.sign(message, sk)
wrong_message = binascii.unhexlify(get_random_value_hex(32)) assert Cryptographer.verify_rpk(PublicKey(unhexlify(org_pk)), rpk)
assert Cryptographer.verify(wrong_message, signature, pk) is False
def test_verify(): def test_recover_pk_wrong_inputs():
# A properly generated signature should verify str_message = "Test message"
sk, pk = generate_keypair() message = bytes(20)
message = binascii.unhexlify(get_random_value_hex(32)) str_sig = "aaaaaaaa"
signature = Cryptographer.sign(message, sk) sig = bytes(20)
assert Cryptographer.verify(message, signature, pk) is True # Wrong input type
assert Cryptographer.recover_pk(message, str_sig) is None
assert Cryptographer.recover_pk(str_message, str_sig) is None
assert Cryptographer.recover_pk(str_message, sig) is None
assert Cryptographer.recover_pk(message, str_sig) is None
# Wrong input size or format
assert Cryptographer.recover_pk(message, sig) is None
assert Cryptographer.recover_pk(message, bytes(104)) is None
def test_verify_pk():
sk, _ = generate_keypair()
message = b"Test message"
zbase32_sig = Cryptographer.sign(message, sk)
rpk = Cryptographer.recover_pk(message, zbase32_sig)
assert Cryptographer.verify_rpk(sk.public_key, rpk)
def test_verify_pk_wrong():
sk, _ = generate_keypair()
sk2, _ = generate_keypair()
message = b"Test message"
zbase32_sig = Cryptographer.sign(message, sk)
rpk = Cryptographer.recover_pk(message, zbase32_sig)
assert not Cryptographer.verify_rpk(sk2.public_key, rpk)

View File

@@ -3,6 +3,7 @@ import pytest
import logging import logging
from copy import deepcopy from copy import deepcopy
# FIXME: Import from pisa. Common should not import anything from cli nor pisa.
from pisa import conf_fields from pisa import conf_fields
from common.constants import LOCATOR_LEN_BYTES from common.constants import LOCATOR_LEN_BYTES
@@ -46,7 +47,7 @@ def test_check_locator_format():
get_random_value_hex(LOCATOR_LEN_BYTES - 1), get_random_value_hex(LOCATOR_LEN_BYTES - 1),
] ]
for wtype in wrong_inputs: for wtype in wrong_inputs:
assert check_sha256_hex_format(wtype) is False assert check_locator_format(wtype) is False
for _ in range(100): for _ in range(100):
assert check_locator_format(get_random_value_hex(LOCATOR_LEN_BYTES)) is True assert check_locator_format(get_random_value_hex(LOCATOR_LEN_BYTES)) is True

View File

@@ -2,6 +2,7 @@
dnsseed=0 dnsseed=0
# [debug] # [debug]
daemon=1
regtest=1 regtest=1
debug=1 debug=1
logips=1 logips=1
@@ -14,9 +15,9 @@ rpcallowip=0.0.0.0/0
rpcservertimeout=300 rpcservertimeout=300
# [zmq] # [zmq]
zmqpubhashblock=tcp://0.0.0.0:28332 zmqpubhashblock=tcp://0.0.0.0:28335
zmqpubrawblock=tcp://0.0.0.0:28332 zmqpubrawblock=tcp://0.0.0.0:28335
zmqpubrawtx=tcp://0.0.0.0:28333 zmqpubrawtx=tcp://0.0.0.0:28336
# [blockchain] # [blockchain]
txindex=1 txindex=1
@@ -24,4 +25,4 @@ txindex=1
# There are some parameters that only work in the specific on regtest if specified in the regtest section # There are some parameters that only work in the specific on regtest if specified in the regtest section
[regtest] [regtest]
rpcbind=0.0.0.0 rpcbind=0.0.0.0
rpcport=18443 rpcport=18445

View File

@@ -0,0 +1 @@
bitcoin-core --channel=0.19/stable

View File

@@ -1,19 +0,0 @@
# Copy this file with your own configuration and save it as conf.py
# Docker
DOCK_NETWORK_NAME = "pisa_net"
DOCK_NETWORK_SUBNET = "172.16.0.0/16"
DOCK_NETWORK_GW = "172.16.0.1"
DOCK_CONTAINER_NAME_PREFIX = "btc_n"
DOCK_IMAGE_NAME = "sandbox_btc"
DOCKER_INI_PORT_MAPPING = 22000
DOCKER_RPC_PORT_MAPPING = 18444
DOCKER_ZMQ_BLOCK_PORT_MAPPING = 28334
# Log
LOG_FILE = "bitcoin_sandbox.log"
# Graphs
BITCOIN_GRAPH_FILE = "./graphs/basic3.graphml"
LN_GRAPH_FILE = "./graphs/basic3_ln.graphml"
DEFAULT_LN_GRAPH_WEIGHT = 10000

View File

@@ -34,7 +34,7 @@ pisad_process = run_pisad()
def get_pisa_pk(): def get_pisa_pk():
pisa_sk = Cryptographer.load_private_key_der(Cryptographer.load_key_file(config.get("PISA_SECRET_KEY"))) pisa_sk = Cryptographer.load_private_key_der(Cryptographer.load_key_file(config.get("PISA_SECRET_KEY")))
pisa_pk = pisa_sk.public_key() pisa_pk = pisa_sk.public_key
return pisa_pk return pisa_pk
@@ -159,7 +159,8 @@ def test_appointment_wrong_key(bitcoin_cli, create_txs):
# Check that the server has accepted the appointment # Check that the server has accepted the appointment
signature = response_json.get("signature") signature = response_json.get("signature")
assert signature is not None assert signature is not None
assert Cryptographer.verify(appointment.serialize(), signature, pisa_pk) is True rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
assert Cryptographer.verify_rpk(pisa_pk, rpk) is True
assert response_json.get("locator") == appointment.locator assert response_json.get("locator") == appointment.locator
# Trigger the appointment # Trigger the appointment

View File

@@ -5,11 +5,8 @@ import requests
from time import sleep from time import sleep
from shutil import rmtree from shutil import rmtree
from threading import Thread from threading import Thread
from binascii import hexlify
from cryptography.hazmat.backends import default_backend from coincurve import PrivateKey
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from common.blob import Blob from common.blob import Blob
from pisa.responder import TransactionTracker from pisa.responder import TransactionTracker
@@ -58,10 +55,10 @@ def db_manager():
def generate_keypair(): def generate_keypair():
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) sk = PrivateKey()
client_pk = client_sk.public_key() pk = sk.public_key
return client_sk, client_pk return sk, pk
def get_random_value_hex(nbytes): def get_random_value_hex(nbytes):
@@ -106,9 +103,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
# dummy keys for this test # dummy keys for this test
client_sk, client_pk = generate_keypair() client_sk, client_pk = generate_keypair()
client_pk_der = client_pk.public_bytes( client_pk_hex = client_pk.format().hex()
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
locator = compute_locator(dispute_txid) locator = compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx")) blob = Blob(dummy_appointment_data.get("tx"))
@@ -124,9 +119,8 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
} }
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk) signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk)
pk_hex = hexlify(client_pk_der).decode("utf-8")
data = {"appointment": appointment_data, "signature": signature, "public_key": pk_hex} data = {"appointment": appointment_data, "signature": signature, "public_key": client_pk_hex}
return data, dispute_tx.hex() return data, dispute_tx.hex()

View File

@@ -3,7 +3,6 @@ import pytest
import requests import requests
from time import sleep from time import sleep
from threading import Thread from threading import Thread
from cryptography.hazmat.primitives import serialization
from pisa.api import API from pisa.api import API
from pisa.watcher import Watcher from pisa.watcher import Watcher
@@ -36,13 +35,8 @@ config = get_config()
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def run_api(db_manager): def run_api(db_manager):
sk, pk = generate_keypair() sk, pk = generate_keypair()
sk_der = sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) watcher = Watcher(db_manager, Responder(db_manager), sk.to_der(), get_config())
chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
watcher.awake() watcher.awake()
chain_monitor.monitor_chain() chain_monitor.monitor_chain()

View File

@@ -1,8 +1,4 @@
from binascii import hexlify, unhexlify from binascii import unhexlify
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from pisa.errors import * from pisa.errors import *
from pisa.inspector import Inspector from pisa.inspector import Inspector
@@ -176,17 +172,14 @@ def test_check_blob():
def test_check_appointment_signature(): def test_check_appointment_signature():
# The inspector receives the public key as hex # The inspector receives the public key as hex
client_sk, client_pk = generate_keypair() client_sk, client_pk = generate_keypair()
client_pk_der = client_pk.public_bytes( client_pk_hex = client_pk.format().hex()
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
client_pk_hex = hexlify(client_pk_der).decode("utf-8")
dummy_appointment_data, _ = generate_dummy_appointment_data(real_height=False) dummy_appointment_data, _ = generate_dummy_appointment_data(real_height=False)
assert Inspector.check_appointment_signature( assert Inspector.check_appointment_signature(
dummy_appointment_data["appointment"], dummy_appointment_data["signature"], dummy_appointment_data["public_key"] dummy_appointment_data["appointment"], dummy_appointment_data["signature"], dummy_appointment_data["public_key"]
) )
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) fake_sk, _ = generate_keypair()
# Create a bad signature to make sure inspector rejects it # Create a bad signature to make sure inspector rejects it
bad_signature = Cryptographer.sign( bad_signature = Cryptographer.sign(
@@ -203,10 +196,7 @@ def test_inspect(run_bitcoind):
# appointments. # appointments.
client_sk, client_pk = generate_keypair() client_sk, client_pk = generate_keypair()
client_pk_der = client_pk.public_bytes( client_pk_hex = client_pk.format().hex()
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
client_pk_hex = hexlify(client_pk_der).decode("utf-8")
# Valid appointment # Valid appointment
locator = get_random_value_hex(LOCATOR_LEN_BYTES) locator = get_random_value_hex(LOCATOR_LEN_BYTES)

View File

@@ -2,8 +2,7 @@ import pytest
from uuid import uuid4 from uuid import uuid4
from shutil import rmtree from shutil import rmtree
from threading import Thread from threading import Thread
from cryptography.hazmat.primitives.asymmetric import ec from coincurve import PrivateKey
from cryptography.hazmat.primitives import serialization
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.responder import Responder from pisa.responder import Responder
@@ -36,11 +35,6 @@ TEST_SET_SIZE = 200
signing_key, public_key = generate_keypair() signing_key, public_key = generate_keypair()
sk_der = signing_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@@ -56,7 +50,7 @@ def temp_db_manager():
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def watcher(db_manager): def watcher(db_manager):
watcher = Watcher(db_manager, Responder(db_manager), sk_der, get_config()) watcher = Watcher(db_manager, Responder(db_manager), signing_key.to_der(), get_config())
chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue) chain_monitor = ChainMonitor(watcher.block_queue, watcher.responder.block_queue)
chain_monitor.monitor_chain() chain_monitor.monitor_chain()
@@ -96,7 +90,7 @@ def test_init(run_bitcoind, watcher):
assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue.empty() assert watcher.block_queue.empty()
assert isinstance(watcher.config, dict) assert isinstance(watcher.config, dict)
assert isinstance(watcher.signing_key, ec.EllipticCurvePrivateKey) assert isinstance(watcher.signing_key, PrivateKey)
assert isinstance(watcher.responder, Responder) assert isinstance(watcher.responder, Responder)
@@ -109,13 +103,17 @@ def test_add_appointment(watcher):
added_appointment, sig = watcher.add_appointment(appointment) added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True assert added_appointment is True
assert Cryptographer.verify(appointment.serialize(), sig, public_key) assert Cryptographer.verify_rpk(
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
)
# Check that we can also add an already added appointment (same locator) # Check that we can also add an already added appointment (same locator)
added_appointment, sig = watcher.add_appointment(appointment) added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True assert added_appointment is True
assert Cryptographer.verify(appointment.serialize(), sig, public_key) assert Cryptographer.verify_rpk(
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
)
def test_add_too_many_appointments(watcher): def test_add_too_many_appointments(watcher):
@@ -129,7 +127,9 @@ def test_add_too_many_appointments(watcher):
added_appointment, sig = watcher.add_appointment(appointment) added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True assert added_appointment is True
assert Cryptographer.verify(appointment.serialize(), sig, public_key) assert Cryptographer.verify_rpk(
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
)
appointment, dispute_tx = generate_dummy_appointment( appointment, dispute_tx = generate_dummy_appointment(
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET