diff --git a/apps/cli/pisa_cli.py b/apps/cli/pisa_cli.py index 251c53a..84f955a 100644 --- a/apps/cli/pisa_cli.py +++ b/apps/cli/pisa_cli.py @@ -4,17 +4,10 @@ import json import requests import time from sys import argv -from binascii import hexlify, unhexlify from getopt import getopt, GetoptError from requests import ConnectTimeout, ConnectionError from uuid import uuid4 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import load_pem_public_key, load_pem_private_key -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm - from apps.cli.blob import Blob from apps.cli.help import help_add_appointment, help_get_appointment from apps.cli import ( @@ -57,58 +50,21 @@ def generate_dummy_appointment(): print("\nData stored in dummy_appointment_data.json") -def sign_appointment(sk, appointment): - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") - - # Loads and returns Pisa keys from disk def load_key_file_data(file_name): try: - with open(file_name, "r") as key_file: - key_pem = key_file.read().encode("utf-8") - return key_pem + with open(file_name, "rb") as key_file: + key = key_file.read() + return key except FileNotFoundError: raise FileNotFoundError("File not found.") -# Deserialize public key from pem data. -def load_public_key(pk_pem): - try: - pisa_pk = load_pem_public_key(pk_pem, backend=default_backend()) - return pisa_pk - - except UnsupportedAlgorithm: - raise ValueError("Could not deserialize the public key (unsupported algorithm).") - - -# Deserialize private key from pem data. -def load_private_key(sk_pem): - try: - cli_sk = load_pem_private_key(sk_pem, None, backend=default_backend()) - return cli_sk - - except UnsupportedAlgorithm: - raise ValueError("Could not deserialize the private key (unsupported algorithm).") - - def compute_locator(tx_id): return tx_id[:LOCATOR_LEN_HEX] -# returning True or False accordingly. -def is_appointment_signature_valid(appointment, signature, pk): - try: - sig_bytes = unhexlify(signature.encode("utf-8")) - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - pk.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256())) - return True - - except InvalidSignature: - return False - - # Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it. def save_signed_appointment(appointment, signature): # Create the appointments directory if it doesn't already exist @@ -179,8 +135,8 @@ def add_appointment(args): ) try: - sk_pem = load_key_file_data(CLI_PRIVATE_KEY) - cli_sk = load_private_key(sk_pem) + sk_der = load_key_file_data(CLI_PRIVATE_KEY) + cli_sk = Cryptographer.load_private_key_der(sk_der) except ValueError: logger.error("Failed to deserialize the public key. It might be in an unsupported format.") @@ -194,19 +150,20 @@ def add_appointment(args): logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return False - signature = sign_appointment(cli_sk, appointment) + signature = Cryptographer.sign(Cryptographer.signature_format(appointment), cli_sk) + try: - cli_pk_pem = load_key_file_data(CLI_PUBLIC_KEY) + cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY) except FileNotFoundError: - logger.error("Client's private key file not found. Please check your settings.") + logger.error("Client's public key file not found. Please check your settings.") return False except IOError as e: logger.error("I/O error({}): {}".format(e.errno, e.strerror)) return False - data = {"appointment": appointment, "signature": signature, "public_key": cli_pk_pem.decode("utf-8")} + data = {"appointment": appointment, "signature": signature, "public_key": cli_pk_der.decode("utf-8")} appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":")) @@ -246,9 +203,9 @@ def add_appointment(args): signature = response_json["signature"] # verify that the returned signature is valid try: - pk_pem = load_key_file_data(PISA_PUBLIC_KEY) - pk = load_public_key(pk_pem) - is_sig_valid = is_appointment_signature_valid(appointment, signature, pk) + pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY) + pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der) + is_sig_valid = Cryptographer.verify(Cryptographer.signature_format(appointment), signature, pisa_pk) except ValueError: logger.error("Failed to deserialize the public key. It might be in an unsupported format.") diff --git a/common/cryptographer.py b/common/cryptographer.py index 5983e74..18cca23 100644 --- a/common/cryptographer.py +++ b/common/cryptographer.py @@ -1,8 +1,14 @@ +import json from hashlib import sha256 from binascii import unhexlify, hexlify -from cryptography.exceptions import InvalidTag -from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.serialization import load_der_public_key, load_der_private_key +from cryptography.exceptions import InvalidSignature from common.tools import check_sha256_hex_format from pisa.logger import Logger @@ -85,3 +91,63 @@ class Cryptographer: blob = None return blob + + # NOTCOVERED + @staticmethod + def signature_format(data): + # FIXME: This is temporary serialization. A proper one is required. Data need to be unhexlified too (can't atm) + return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8") + + # Deserialize public key from der data. + @staticmethod + def load_public_key_der(pk_der): + try: + pk = load_der_public_key(pk_der, backend=default_backend()) + return pk + + except UnsupportedAlgorithm: + raise ValueError("Could not deserialize the public key (unsupported algorithm).") + + # Deserialize private key from der data. + @staticmethod + def load_private_key_der(sk_der): + try: + sk = load_der_private_key(sk_der, None, backend=default_backend()) + return sk + + except UnsupportedAlgorithm: + raise ValueError("Could not deserialize the private key (unsupported algorithm).") + + @staticmethod + def sign(data, sk, rtype="hex"): + if rtype not in ["hex", "bytes"]: + raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'") + + if not isinstance(sk, ec.EllipticCurvePrivateKey): + logger.error("Wrong public key.") + return None + + else: + signature = sk.sign(data, ec.ECDSA(hashes.SHA256())) + + if rtype == "hex": + signature = hexlify(signature).decode("utf-8") + + return signature + + @staticmethod + def verify(message, signature, pk): + if not isinstance(pk, ec.EllipticCurvePublicKey): + logger.error("Wrong public key.") + return False + + if isinstance(signature, str): + signature = unhexlify(signature.encode("utf-8")) + + try: + pk.verify(signature, message, ec.ECDSA(hashes.SHA256())) + + return True + + except InvalidSignature: + return False diff --git a/pisa/api.py b/pisa/api.py index ae9070f..820e5df 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -1,7 +1,6 @@ import os import json from flask import Flask, request, abort, jsonify -from binascii import hexlify from pisa import HOST, PORT, logging from pisa.logger import Logger @@ -40,7 +39,7 @@ def add_appointment(): if appointment_added: rcode = HTTP_OK - response = {"locator": appointment.locator, "signature": hexlify(signature).decode("utf-8")} + response = {"locator": appointment.locator, "signature": signature} else: rcode = HTTP_SERVICE_UNAVAILABLE error = "appointment rejected" diff --git a/pisa/appointment.py b/pisa/appointment.py index aa56e3d..4c1f302 100644 --- a/pisa/appointment.py +++ b/pisa/appointment.py @@ -32,7 +32,7 @@ class Appointment: return appointment - def to_dict(self): + def to_dict(self, include_triggered=True): # ToDO: #3-improve-appointment-structure appointment = { "locator": self.locator, @@ -40,15 +40,16 @@ class Appointment: "end_time": self.end_time, "dispute_delta": self.dispute_delta, "encrypted_blob": self.encrypted_blob.data, - "triggered": self.triggered, } + if include_triggered: + appointment["triggered"] = self.triggered + return appointment def to_json(self): return json.dumps(self.to_dict(), sort_keys=True, separators=(",", ":")) def serialize(self): - data = self.to_dict() - data.pop("triggered") - return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8") + # FIXME: This is temporary serialization. A proper one is required + return self.to_dict(include_triggered=False) diff --git a/pisa/inspector.py b/pisa/inspector.py index 6694c6a..4675765 100644 --- a/pisa/inspector.py +++ b/pisa/inspector.py @@ -1,14 +1,8 @@ -import json import re from binascii import unhexlify -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import load_pem_public_key -from cryptography.exceptions import InvalidSignature - from common.constants import LOCATOR_LEN_HEX +from common.cryptographer import Cryptographer from pisa import errors import pisa.conf as conf @@ -200,7 +194,7 @@ class Inspector: @staticmethod # Verifies that the appointment signature is a valid signature with public key - def check_appointment_signature(appointment, signature, pk_pem): + def check_appointment_signature(appointment, signature, pk_der): message = None rcode = 0 @@ -208,13 +202,10 @@ class Inspector: rcode = errors.APPOINTMENT_EMPTY_FIELD message = "empty signature received" - try: - sig_bytes = unhexlify(signature.encode("utf-8")) - client_pk = load_pem_public_key(pk_pem.encode("utf-8"), backend=default_backend()) - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - client_pk.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256())) + pk = Cryptographer.load_public_key_der(unhexlify(pk_der.encode("utf-8"))) + valid_sig = Cryptographer.verify(Cryptographer.signature_format(appointment), signature, pk) - except InvalidSignature: + if not valid_sig: rcode = errors.APPOINTMENT_INVALID_SIGNATURE message = "invalid signature" diff --git a/pisa/watcher.py b/pisa/watcher.py index 7350a94..83771e6 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -2,11 +2,6 @@ from uuid import uuid4 from queue import Queue from threading import Thread -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.serialization import load_pem_private_key - from common.cryptographer import Cryptographer from common.constants import LOCATOR_LEN_HEX @@ -36,18 +31,14 @@ class Watcher: if pisa_sk_file is None: raise ValueError("No signing key provided. Please fix your pisa.conf") else: - with open(PISA_SECRET_KEY, "r") as key_file: - secret_key_pem = key_file.read().encode("utf-8") - self.signing_key = load_pem_private_key(secret_key_pem, password=None, backend=default_backend()) + with open(PISA_SECRET_KEY, "rb") as key_file: + secret_key_der = key_file.read() + self.signing_key = Cryptographer.load_private_key_der(secret_key_der) @staticmethod def compute_locator(tx_id): return tx_id[:LOCATOR_LEN_HEX] - def sign_appointment(self, appointment): - data = appointment.serialize() - return self.signing_key.sign(data, ec.ECDSA(hashes.SHA256())) - def add_appointment(self, appointment): # Rationale: # The Watcher will analyze every received block looking for appointment matches. If there is no work @@ -87,7 +78,8 @@ class Watcher: logger.info("New appointment accepted.", locator=appointment.locator) - signature = self.sign_appointment(appointment) + signature = Cryptographer.sign(Cryptographer.signature_format(appointment.to_dict()), self.signing_key) + else: appointment_added = False signature = None diff --git a/test/apps/cli/test_pisa_cli.py b/test/apps/cli/test_pisa_cli.py index 8390b25..0bfcc0c 100644 --- a/test/apps/cli/test_pisa_cli.py +++ b/test/apps/cli/test_pisa_cli.py @@ -34,23 +34,15 @@ dummy_appointment_request = { } dummy_appointment = build_appointment(**dummy_appointment_request) +# FIXME: USE CRYPTOGRAPHER + def sign_appointment(sk, appointment): data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") -def test_is_appointment_signature_valid(): - # Verify that an appointment signed by Pisa is valid - signature = sign_appointment(pisa_sk, dummy_appointment) - assert pisa_cli.is_appointment_signature_valid(dummy_appointment, signature, pisa_pk) - - # Test that a signature from a different key is indeed invalid - other_signature = sign_appointment(other_sk, dummy_appointment) - assert not pisa_cli.is_appointment_signature_valid(dummy_appointment, other_signature, pisa_pk) - - -def get_dummy_pisa_pk(pem_data): +def get_dummy_pisa_pk(der_data): return pisa_pk diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 9479557..0774272 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -1,4 +1,3 @@ -import json import pytest import random import requests @@ -8,7 +7,6 @@ from threading import Thread from binascii import hexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import serialization @@ -42,18 +40,6 @@ def prng_seed(): random.seed(0) -@pytest.fixture(scope="module") -def generate_keypair(): - client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) - client_pk = ( - client_sk.public_key() - .public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) - .decode("utf-8") - ) - - return client_sk, client_pk - - @pytest.fixture(scope="session") def db_manager(): manager = DBManager("test_db") @@ -63,6 +49,13 @@ def db_manager(): rmtree("test_db") +def generate_keypair(): + client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) + client_pk = client_sk.public_key() + + return client_sk, client_pk + + def get_random_value_hex(nbytes): pseudo_random_value = random.getrandbits(8 * nbytes) prv_hex = "{:x}".format(pseudo_random_value) @@ -79,11 +72,6 @@ def generate_blocks(n): generate_block() -def sign_appointment(sk, appointment): - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") - - def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_time_offset=30): if real_height: current_height = bitcoin_cli().getblockcount() @@ -104,11 +92,9 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t } # dummy keys for this test - client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) - client_pk = ( - client_sk.public_key() - .public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) - .decode("utf-8") + client_sk, client_pk = generate_keypair() + client_pk_der = client_pk.public_bytes( + encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) locator = Watcher.compute_locator(dispute_txid) @@ -124,9 +110,10 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t "encrypted_blob": encrypted_blob, } - signature = sign_appointment(client_sk, appointment_data) + signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk) + pk_hex = hexlify(client_pk_der).decode("utf-8") - data = {"appointment": appointment_data, "signature": signature, "public_key": client_pk} + data = {"appointment": appointment_data, "signature": signature, "public_key": pk_hex} return data, dispute_tx diff --git a/test/unit/test_api.py b/test/unit/test_api.py index 0150af4..7d68e95 100644 --- a/test/unit/test_api.py +++ b/test/unit/test_api.py @@ -150,7 +150,6 @@ def test_request_appointment_watcher(new_appt_data): appointment_status = [appointment.pop("status") for appointment in received_appointments] # Check that the appointment is within the received appoints - print("AAA", new_appt_data["appointment"], received_appointments) assert new_appt_data["appointment"] in received_appointments # Check that all the appointments are being watched diff --git a/test/unit/test_appointment.py b/test/unit/test_appointment.py index d4e32bb..2eff9e9 100644 --- a/test/unit/test_appointment.py +++ b/test/unit/test_appointment.py @@ -112,10 +112,11 @@ def test_from_dict(appointment_data): assert True +# This test is pretty worthless atm, it would make sense once we have a proper serialize function def test_serialize(appointment_data): appointment = Appointment.from_dict(appointment_data) assert appointment.triggered is False ser_appointment = appointment.serialize() - assert ser_appointment == json.dumps(appointment_data, sort_keys=True, separators=(",", ":")).encode("utf-8") + assert ser_appointment == appointment.to_dict(include_triggered=False) diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index 2603e61..f082d16 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -1,9 +1,8 @@ -import json from binascii import hexlify, unhexlify from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization from pisa import c_logger from pisa.errors import * @@ -12,9 +11,10 @@ from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor from pisa.conf import MIN_DISPUTE_DELTA -from test.unit.conftest import get_random_value_hex, generate_dummy_appointment_data +from test.unit.conftest import get_random_value_hex, generate_dummy_appointment_data, generate_keypair from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX +from common.cryptographer import Cryptographer c_logger.disabled = True @@ -42,11 +42,6 @@ WRONG_TYPES = [ WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(LOCATOR_LEN_BYTES)), 3.2, 2.0, (), object, {}, object()] -def sign_appointment(sk, appointment): - data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") - return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") - - def test_check_locator(): # Right appointment type, size and format locator = get_random_value_hex(LOCATOR_LEN_BYTES) @@ -174,8 +169,13 @@ def test_check_blob(): assert Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT -def test_check_appointment_signature(generate_keypair): - client_sk, client_pk = generate_keypair +def test_check_appointment_signature(): + # The inspector receives the public key as hex + client_sk, client_pk = generate_keypair() + client_pk_der = client_pk.public_bytes( + encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + client_pk_hex = hexlify(client_pk_der).decode("utf-8") dummy_appointment_data, _ = generate_dummy_appointment_data(real_height=False) assert Inspector.check_appointment_signature( @@ -185,22 +185,26 @@ def test_check_appointment_signature(generate_keypair): fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) # Create a bad signature to make sure inspector rejects it - bad_signature = sign_appointment(fake_sk, dummy_appointment_data["appointment"]) + bad_signature = Cryptographer.sign(Cryptographer.signature_format(dummy_appointment_data["appointment"]), fake_sk) assert ( - Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk)[0] + Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk_hex)[0] == APPOINTMENT_INVALID_SIGNATURE ) -def test_inspect(run_bitcoind, generate_keypair): +def test_inspect(run_bitcoind): # At this point every single check function has been already tested, let's test inspect with an invalid and a valid # appointments. - client_sk, client_pk = generate_keypair + client_sk, client_pk = generate_keypair() + client_pk_der = client_pk.public_bytes( + encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + client_pk_hex = hexlify(client_pk_der).decode("utf-8") # Invalid appointment, every field is empty appointment_data = dict() - signature = sign_appointment(client_sk, appointment_data) + signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk) appointment = inspector.inspect(appointment_data, signature, client_pk) assert type(appointment) == tuple and appointment[0] != 0 @@ -219,9 +223,9 @@ def test_inspect(run_bitcoind, generate_keypair): "encrypted_blob": encrypted_blob, } - signature = sign_appointment(client_sk, appointment_data) + signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk) - appointment = inspector.inspect(appointment_data, signature, client_pk) + appointment = inspector.inspect(appointment_data, signature, client_pk_hex) assert ( type(appointment) == Appointment diff --git a/test/unit/test_watcher.py b/test/unit/test_watcher.py index 960a57a..1688a3e 100644 --- a/test/unit/test_watcher.py +++ b/test/unit/test_watcher.py @@ -3,12 +3,6 @@ from uuid import uuid4 from threading import Thread from queue import Queue, Empty -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.exceptions import InvalidSignature - from pisa import c_logger from pisa.watcher import Watcher from pisa.responder import Responder @@ -17,6 +11,7 @@ from test.unit.conftest import generate_block, generate_blocks, generate_dummy_a from pisa.conf import EXPIRY_DELTA, PISA_SECRET_KEY, MAX_APPOINTMENTS from common.tools import check_sha256_hex_format +from common.cryptographer import Cryptographer c_logger.disabled = True @@ -25,10 +20,9 @@ START_TIME_OFFSET = 1 END_TIME_OFFSET = 1 TEST_SET_SIZE = 200 -with open(PISA_SECRET_KEY, "r") as key_file: - pubkey_pem = key_file.read().encode("utf-8") - # TODO: should use the public key file instead, but it is not currently exported in the configuration - signing_key = load_pem_private_key(pubkey_pem, password=None, backend=default_backend()) +with open(PISA_SECRET_KEY, "rb") as key_file_der: + sk_der = key_file_der.read() + signing_key = Cryptographer.load_private_key_der(sk_der) public_key = signing_key.public_key() @@ -65,16 +59,6 @@ def create_appointments(n): return appointments, locator_uuid_map, dispute_txs -def is_signature_valid(appointment, signature, pk): - # verify the signature - try: - data = appointment.serialize() - pk.verify(signature, data, ec.ECDSA(hashes.SHA256())) - except InvalidSignature: - return False - return True - - def test_init(watcher): assert type(watcher.appointments) is dict and len(watcher.appointments) == 0 assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0 @@ -107,19 +91,13 @@ def test_add_appointment(run_bitcoind, watcher): added_appointment, sig = watcher.add_appointment(appointment) assert added_appointment is True - assert is_signature_valid(appointment, sig, public_key) + assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key) # Check that we can also add an already added appointment (same locator) added_appointment, sig = watcher.add_appointment(appointment) assert added_appointment is True - assert is_signature_valid(appointment, sig, public_key) - - -def test_sign_appointment(watcher): - appointment, _ = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET) - signature = watcher.sign_appointment(appointment) - assert is_signature_valid(appointment, signature, public_key) + assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key) def test_add_too_many_appointments(watcher): @@ -133,7 +111,7 @@ def test_add_too_many_appointments(watcher): added_appointment, sig = watcher.add_appointment(appointment) assert added_appointment is True - assert is_signature_valid(appointment, sig, public_key) + assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key) appointment, dispute_tx = generate_dummy_appointment( start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET