From 98b3dcae7be62fd9978a58ffc222c2595bd925d9 Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 7 Nov 2019 17:52:24 -0500 Subject: [PATCH] Test that server validates signature properly --- test/unit/conftest.py | 41 ++++++++++++++++++++++++++--- test/unit/test_api.py | 46 ++++++++++++++++----------------- test/unit/test_inspector.py | 51 ++++++++++++++++++++++++++++++++++--- 3 files changed, 108 insertions(+), 30 deletions(-) diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 4f27d60..2027673 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -1,3 +1,4 @@ +import json import pytest import random import requests @@ -5,7 +6,12 @@ from time import sleep from shutil import rmtree from threading import Thread from hashlib import sha256 -from binascii import unhexlify +from binascii import hexlify, unhexlify + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization from pisa.conf import DB_PATH from apps.cli.blob import Blob @@ -48,6 +54,18 @@ def prng_seed(): random.seed(0) +@pytest.fixture(scope="module") +def generate_keypair(): + client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) + client_pk = ( + client_sk.public_key() + .public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) + .decode("utf-8") + ) + + return client_sk, client_pk + + @pytest.fixture(scope="module") def db_manager(): manager = DBManager("test_db") @@ -73,6 +91,11 @@ def generate_blocks(n): generate_block() +def sign_appointment(sk, appointment): + data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") + return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") + + def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30): current_height = bitcoin_cli().getblockcount() @@ -91,6 +114,14 @@ def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30): cipher = "AES-GCM-128" hash_function = "SHA256" + # dummy keys for this test + client_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) + client_pk = ( + client_sk.public_key() + .public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) + .decode("utf-8") + ) + locator = sha256(unhexlify(dispute_txid)).hexdigest() blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function) @@ -107,7 +138,11 @@ def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30): "triggered": False, } - return appointment_data, dispute_tx + signature = sign_appointment(client_sk, appointment_data) + + data = {"appointment": appointment_data, "signature": signature, "public_key": client_pk} + + return data, dispute_tx def generate_dummy_appointment(start_time_offset=5, end_time_offset=30): @@ -115,7 +150,7 @@ def generate_dummy_appointment(start_time_offset=5, end_time_offset=30): start_time_offset=start_time_offset, end_time_offset=end_time_offset ) - return Appointment.from_dict(appointment_data), dispute_tx + return Appointment.from_dict(appointment_data["appointment"]), dispute_tx def generate_dummy_job(): diff --git a/test/unit/test_api.py b/test/unit/test_api.py index 14c0c3f..32e4724 100644 --- a/test/unit/test_api.py +++ b/test/unit/test_api.py @@ -17,40 +17,40 @@ locator_dispute_tx_map = {} @pytest.fixture -def new_appointment(): - appointment, dispute_tx = generate_dummy_appointment_data() - locator_dispute_tx_map[appointment["locator"]] = dispute_tx +def new_appt_data(): + appt_data, dispute_tx = generate_dummy_appointment_data() + locator_dispute_tx_map[appt_data["appointment"]["locator"]] = dispute_tx - return appointment + return appt_data -def add_appointment(appointment): - r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5) +def add_appointment(new_appt_data): + r = requests.post(url=PISA_API, json=json.dumps(new_appt_data), timeout=5) if r.status_code == 200: - appointments.append(appointment) + appointments.append(new_appt_data["appointment"]) return r -def test_add_appointment(run_api, run_bitcoind, new_appointment): +def test_add_appointment(run_api, run_bitcoind, new_appt_data): # Properly formatted appointment - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 200 # Incorrect appointment - new_appointment["dispute_delta"] = 0 - r = add_appointment(new_appointment) + new_appt_data["appointment"]["dispute_delta"] = 0 + r = add_appointment(new_appt_data) assert r.status_code == 400 -def test_request_appointment(new_appointment): +def test_request_appointment(new_appt_data): # First we need to add an appointment - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 200 # Next we can request it - r = requests.get(url=PISA_API + "/get_appointment?locator=" + new_appointment["locator"]) + r = requests.get(url=PISA_API + "/get_appointment?locator=" + new_appt_data["appointment"]["locator"]) assert r.status_code == 200 # Each locator may point to multiple appointments, check them all @@ -60,7 +60,7 @@ def test_request_appointment(new_appointment): appointment_status = [appointment.pop("status") for appointment in received_appointments] # Check that the appointment is within the received appoints - assert new_appointment in received_appointments + assert new_appt_data["appointment"] in received_appointments # Check that all the appointments are being watched assert all([status == "being_watched" for status in appointment_status]) @@ -76,28 +76,28 @@ def test_request_random_appointment(): assert all([status == "not_found" for status in appointment_status]) -def test_add_appointment_multiple_times(new_appointment, n=MULTIPLE_APPOINTMENTS): +def test_add_appointment_multiple_times(new_appt_data, n=MULTIPLE_APPOINTMENTS): # Multiple appointments with the same locator should be valid # DISCUSS: #34-store-identical-appointments for _ in range(n): - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 200 -def test_request_multiple_appointments_same_locator(new_appointment, n=MULTIPLE_APPOINTMENTS): +def test_request_multiple_appointments_same_locator(new_appt_data, n=MULTIPLE_APPOINTMENTS): for _ in range(n): - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 200 - test_request_appointment(new_appointment) + test_request_appointment(new_appt_data) -def test_add_too_many_appointment(new_appointment): +def test_add_too_many_appointment(new_appt_data): for _ in range(MAX_APPOINTMENTS - len(appointments)): - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 200 - r = add_appointment(new_appointment) + r = add_appointment(new_appt_data) assert r.status_code == 503 diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index 30b272d..297642c 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -1,11 +1,18 @@ -from binascii import unhexlify +import json +from binascii import hexlify, unhexlify +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec + +from apps.cli.pisa_cli import build_appointment from pisa import c_logger from pisa.errors import * from pisa.inspector import Inspector from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor from test.unit.conftest import get_random_value_hex + from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS c_logger.disabled = True @@ -18,6 +25,11 @@ WRONG_TYPES = [[], "", get_random_value_hex(32), 3.2, 2.0, (), object, {}, " " * WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()] +def sign_appointment(sk, appointment): + data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") + return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8") + + def test_check_locator(): # Right appointment type, size and format locator = get_random_value_hex(32) @@ -189,13 +201,42 @@ def test_check_hash_function(): assert Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_EMPTY_FIELD -def test_inspect(run_bitcoind): +def test_check_appointment_signature(generate_keypair): + client_sk, client_pk = generate_keypair + + dummy_appointment_request = { + "tx": get_random_value_hex(192), + "tx_id": get_random_value_hex(32), + "start_time": 1500, + "end_time": 50000, + "dispute_delta": 200, + } + dummy_appointment = build_appointment(**dummy_appointment_request) + + # Verify that an appointment signed by the client is valid + signature = sign_appointment(client_sk, dummy_appointment) + assert Inspector.check_appointment_signature(dummy_appointment, signature, client_pk) + + fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) + + # Create a bad signature to make sure inspector rejects it + bad_signature = sign_appointment(fake_sk, dummy_appointment) + assert ( + Inspector.check_appointment_signature(dummy_appointment, bad_signature, client_pk)[0] + == APPOINTMENT_INVALID_SIGNATURE + ) + + +def test_inspect(run_bitcoind, generate_keypair): # At this point every single check function has been already tested, let's test inspect with an invalid and a valid # appointments. + client_sk, client_pk = generate_keypair + # Invalid appointment, every field is empty appointment_data = dict() - appointment = inspector.inspect(appointment_data) + signature = sign_appointment(client_sk, appointment_data) + appointment = inspector.inspect(appointment_data, signature, client_pk) assert type(appointment) == tuple and appointment[0] != 0 # Valid appointment @@ -217,7 +258,9 @@ def test_inspect(run_bitcoind): "hash_function": hash_function, } - appointment = inspector.inspect(appointment_data) + signature = sign_appointment(client_sk, appointment_data) + + appointment = inspector.inspect(appointment_data, signature, client_pk) assert ( type(appointment) == Appointment