Merge branch 'master' into isolate-config

This commit is contained in:
Sergi Delgado Segura
2020-01-15 10:15:48 +01:00
committed by GitHub
7 changed files with 380 additions and 136 deletions

View File

@@ -1,23 +1,41 @@
import responses
import json
import os
import shutil
from binascii import hexlify
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from common.appointment import Appointment
from common.cryptographer import Cryptographer
import apps.cli.pisa_cli as pisa_cli
from test.apps.cli.unit.conftest import get_random_value_hex
# TODO: should find a way of doing without this
from apps.cli.pisa_cli import build_appointment
# dummy keys for the tests
pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_pk = pisa_sk.public_key()
other_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_sk_der = pisa_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pisa_pk_der = pisa_pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
other_sk_der = other_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Replace the key in the module with a key we control for the tests
pisa_cli.pisa_public_key = pisa_pk
# Replace endpoint with dummy one
@@ -32,18 +50,47 @@ dummy_appointment_request = {
"end_time": 50000,
"to_self_delay": 200,
}
dummy_appointment = build_appointment(**dummy_appointment_request)
# FIXME: USE CRYPTOGRAPHER
# This is the format appointment turns into once it hits "add_appointment"
dummy_appointment_full = {
"locator": get_random_value_hex(32),
"start_time": 1500,
"end_time": 50000,
"to_self_delay": 200,
"encrypted_blob": get_random_value_hex(120),
}
dummy_appointment = Appointment.from_dict(dummy_appointment_full)
def sign_appointment(sk, appointment):
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8")
return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8")
def get_dummy_pisa_sk_der(*args):
return pisa_sk_der
def get_dummy_pisa_pk(der_data):
return pisa_pk
def get_dummy_pisa_pk_der(*args):
return pisa_pk_der
def get_dummy_hex_pk_der(*args):
return hexlify(get_dummy_pisa_pk_der())
def get_dummy_signature(*args):
sk = Cryptographer.load_private_key_der(pisa_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def get_bad_signature(*args):
sk = Cryptographer.load_private_key_der(other_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def valid_sig(*args):
return True
def invalid_sig(*args):
return False
@responses.activate
@@ -51,10 +98,12 @@ def test_add_appointment(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True
# make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk)
# Make sure the test uses the dummy signature
monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
response = {"locator": dummy_appointment["locator"], "signature": sign_appointment(pisa_sk, dummy_appointment)}
response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()}
request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200)
@@ -72,12 +121,14 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False
# make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk)
# Make sure the test uses the bad dummy signature
monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig)
response = {
"locator": dummy_appointment["locator"],
"signature": sign_appointment(other_sk, dummy_appointment), # signing with a different key
"locator": dummy_appointment.to_dict()["locator"],
"signature": get_bad_signature(), # Sign with a bad key
}
request_url = "http://{}/".format(pisa_endpoint)
@@ -85,4 +136,141 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert not result
assert result is False
def test_load_key_file_data():
# If file exists and has data in it, function should work.
with open("key_test_file", "w+b") as f:
f.write(pisa_sk_der)
appt_data = pisa_cli.load_key_file_data("key_test_file")
assert appt_data
os.remove("key_test_file")
# If file doesn't exist, function should fail.
appt_data = pisa_cli.load_key_file_data("nonexistent_file")
assert not appt_data
def test_save_signed_appointment(monkeypatch):
monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments")
pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature())
# In folder "Appointments," grab all files and print them.
files = os.listdir("test_appointments")
found = False
for f in files:
if dummy_appointment.to_dict().get("locator") in f:
found = True
assert found
# If "appointments" directory doesn't exist, function should create it.
assert os.path.exists("test_appointments")
# Delete test directory once we're done.
shutil.rmtree("test_appointments")
def test_parse_add_appointment_args():
# If no args are passed, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(None)
assert not appt_data
# If file doesn't exist, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(["-f", "nonexistent_file"])
assert not appt_data
# If file exists and has data in it, function should work.
with open("appt_test_file", "w") as f:
json.dump(dummy_appointment_request, f)
appt_data = pisa_cli.parse_add_appointment_args(["-f", "appt_test_file"])
assert appt_data
os.remove("appt_test_file")
# If appointment json is passed in, function should work.
appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
assert appt_data
@responses.activate
def test_post_data_to_add_appointment_endpoint():
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk),
}
request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200)
response = pisa_cli.post_data_to_add_appointment_endpoint(request_url, json.dumps(dummy_appointment_request))
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert response
def test_check_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment)
assert valid
valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment)
assert not valid
@responses.activate
def test_get_appointment():
# Response of get_appointment endpoint is an appointment with status added to it.
dummy_appointment_full["status"] = "being_watched"
response = dummy_appointment_full
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200)
result = pisa_cli.get_appointment([response.get("locator")])
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert result
@responses.activate
def test_get_appointment_err():
locator = get_random_value_hex(32)
# Test that get_appointment handles a connection error appropriately.
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator)
responses.add(responses.GET, request_url, body=ConnectionError())
assert not pisa_cli.get_appointment([locator])
def test_get_appointment_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
signature = pisa_cli.get_appointment_signature(dummy_appointment)
assert isinstance(signature, str)
def test_get_pk(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
pk = pisa_cli.get_pk()
assert isinstance(pk, bytes)

View File

@@ -16,6 +16,7 @@ from pisa.watcher import Watcher
from pisa.tools import bitcoin_cli
from pisa.db_manager import DBManager
from common.appointment import Appointment
from common.tools import compute_locator
from bitcoind_mock.utils import sha256d
from bitcoind_mock.transaction import TX
@@ -103,7 +104,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
locator = Watcher.compute_locator(dispute_txid)
locator = compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx"))
encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id"))

View File

@@ -17,7 +17,7 @@ from test.pisa.unit.conftest import (
)
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
from common.tools import check_sha256_hex_format
from common.tools import check_sha256_hex_format, compute_locator
from common.cryptographer import Cryptographer
@@ -47,7 +47,7 @@ def txids():
@pytest.fixture(scope="module")
def locator_uuid_map(txids):
return {Watcher.compute_locator(txid): uuid4().hex for txid in txids}
return {compute_locator(txid): uuid4().hex for txid in txids}
def create_appointments(n):
@@ -220,7 +220,7 @@ def test_filter_valid_breaches(watcher):
dummy_appointment, _ = generate_dummy_appointment()
dummy_appointment.encrypted_blob.data = encrypted_blob
dummy_appointment.locator = Watcher.compute_locator(dispute_txid)
dummy_appointment.locator = compute_locator(dispute_txid)
uuid = uuid4().hex
appointments = {uuid: dummy_appointment}