Merge pull request #65 from orbitalturtle/pisa-cli-modularity

Refactor add_appointment cli code
This commit is contained in:
Sergi Delgado Segura
2020-01-15 10:14:49 +01:00
committed by GitHub
7 changed files with 380 additions and 136 deletions

View File

@@ -1,6 +1,6 @@
# pisa-cli # pisa_cli
`pisa-cli` is a command line interface to interact with the PISA server, written in Python3. `pisa_cli` is a command line interface to interact with the PISA server, written in Python3.
## Dependencies ## Dependencies
Refer to [DEPENDENCIES.md](DEPENDENCIES.md) Refer to [DEPENDENCIES.md](DEPENDENCIES.md)
@@ -11,7 +11,7 @@ Refer to [INSTALL.md](INSTALL.md)
## Usage ## Usage
python pisa-cli.py [global options] command [command options] [arguments] python pisa_cli.py [global options] command [command options] [arguments]
#### Global options #### Global options
@@ -54,7 +54,7 @@ The API will return a `text/plain` HTTP response code `200/OK` if the appointmen
#### Usage #### Usage
python pisa-cli add_appointment [command options] <appointment>/<path_to_appointment_file> python pisa_cli add_appointment [command options] <appointment>/<path_to_appointment_file>
if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded if `-f, --file` **is** specified, then the command expects a path to a json file instead of a json encoded
string as parameter. string as parameter.
@@ -100,7 +100,7 @@ if `-f, --file` **is** specified, then the command expects a path to a json file
#### Usage #### Usage
python pisa-cli get_appointment <appointment_locator> python pisa_cli get_appointment <appointment_locator>
@@ -109,18 +109,18 @@ if `-f, --file` **is** specified, then the command expects a path to a json file
Shows the list of commands or help about how to run a specific command. Shows the list of commands or help about how to run a specific command.
#### Usage #### Usage
python pisa-cli help python pisa_cli help
or or
python pisa-cli help command python pisa_cli help command
## Example ## Example
1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API. 1. Generate a new dummy appointment. **Note:** this appointment will never be fulfilled (it will eventually expire) since it does not corresopond to a valid transaction. However it can be used to interact with the PISA API.
``` ```
python pisa-cli.py generate_dummy_appointment python pisa_cli.py generate_dummy_appointment
``` ```
That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`. That will create a json file that follows the appointment data structure filled with dummy data and store it in `dummy_appointment_data.json`.
@@ -128,7 +128,7 @@ or
2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions. 2. Send the appointment to the PISA API. Which will then start monitoring for matching transactions.
``` ```
python pisa-cli.py add_appointment -f dummy_appointment_data.json python pisa_cli.py add_appointment -f dummy_appointment_data.json
``` ```
This returns a appointment locator that can be used to get updates about this appointment from PISA. This returns a appointment locator that can be used to get updates about this appointment from PISA.
@@ -136,7 +136,7 @@ or
3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command: 3. Test that PISA is still watching the appointment by replacing the appointment locator received into the following command:
``` ```
python pisa-cli.py get_appointment <appointment_locator> python pisa_cli.py get_appointment <appointment_locator>
``` ```
## PISA API ## PISA API

View File

@@ -9,8 +9,8 @@ from getopt import getopt, GetoptError
from requests import ConnectTimeout, ConnectionError from requests import ConnectTimeout, ConnectionError
from uuid import uuid4 from uuid import uuid4
from apps.cli.blob import Blob
from apps.cli.help import help_add_appointment, help_get_appointment from apps.cli.help import help_add_appointment, help_get_appointment
from apps.cli.blob import Blob
from apps.cli import ( from apps.cli import (
DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_SERVER,
DEFAULT_PISA_API_PORT, DEFAULT_PISA_API_PORT,
@@ -22,9 +22,8 @@ from apps.cli import (
from common.logger import Logger from common.logger import Logger
from common.appointment import Appointment from common.appointment import Appointment
from common.constants import LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.tools import check_sha256_hex_format from common.tools import check_sha256_hex_format, compute_locator
HTTP_OK = 200 HTTP_OK = 200
@@ -46,11 +45,13 @@ def generate_dummy_appointment():
"to_self_delay": 20, "to_self_delay": 20,
} }
print("Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)) logger.info(
"Generating dummy appointment data:" "\n\n" + json.dumps(dummy_appointment_data, indent=4, sort_keys=True)
)
json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w")) json.dump(dummy_appointment_data, open("dummy_appointment_data.json", "w"))
print("\nData stored in dummy_appointment_data.json") logger.info("\nData stored in dummy_appointment_data.json")
# Loads and returns Pisa keys from disk # Loads and returns Pisa keys from disk
@@ -61,11 +62,12 @@ def load_key_file_data(file_name):
return key return key
except FileNotFoundError: except FileNotFoundError:
raise FileNotFoundError("File not found.") logger.error("Client's key file not found. Please check your settings.")
return False
except IOError as e:
def compute_locator(tx_id): logger.error("I/O error({}): {}".format(e.errno, e.strerror))
return tx_id[:LOCATOR_LEN_HEX] return False
# Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it. # Makes sure that the folder APPOINTMENTS_FOLDER_NAME exists, then saves the appointment and signature in it.
@@ -85,12 +87,81 @@ def save_signed_appointment(appointment, signature):
def add_appointment(args): def add_appointment(args):
appointment_data = None # Get appointment data from user.
appointment_data = parse_add_appointment_args(args)
if appointment_data is None:
logger.error("The provided appointment JSON is empty")
return False
valid_txid = check_sha256_hex_format(appointment_data.get("tx_id"))
if not valid_txid:
logger.error("The provided txid is not valid")
return False
tx_id = appointment_data.get("tx_id")
tx = appointment_data.get("tx")
if None not in [tx_id, tx]:
appointment_data["locator"] = compute_locator(tx_id)
appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(tx), tx_id)
else:
logger.error("Appointment data is missing some fields.")
return False
appointment = Appointment.from_dict(appointment_data)
signature = get_appointment_signature(appointment)
hex_pk_der = get_pk()
if not (appointment and signature and hex_pk_der):
return False
data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":"))
# Send appointment to the server.
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
response_json = post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json)
if response_json is None:
return False
signature = response_json.get("signature")
# Check that the server signed the appointment as it should.
if signature is None:
logger.error("The response does not contain the signature of the appointment.")
return False
valid = check_signature(signature, appointment)
if not valid:
logger.error("The returned appointment's signature is invalid")
return False
logger.info("Appointment accepted and signed by Pisa")
# all good, store appointment and signature
try:
save_signed_appointment(appointment.to_dict(), signature)
except OSError as e:
logger.error("There was an error while saving the appointment", error=e)
return False
return True
# Parse arguments passed to add_appointment and handle them accordingly.
# Returns appointment data.
def parse_add_appointment_args(args):
use_help = "Use 'help add_appointment' for help of how to use the command" use_help = "Use 'help add_appointment' for help of how to use the command"
if not args: if not args:
logger.error("No appointment data provided. " + use_help) logger.error("No appointment data provided. " + use_help)
return False return None
arg_opt = args.pop(0) arg_opt = args.pop(0)
@@ -102,7 +173,7 @@ def add_appointment(args):
fin = args.pop(0) fin = args.pop(0)
if not os.path.isfile(fin): if not os.path.isfile(fin):
logger.error("Can't find file", filename=fin) logger.error("Can't find file", filename=fin)
return False return None
try: try:
with open(fin) as f: with open(fin) as f:
@@ -110,63 +181,19 @@ def add_appointment(args):
except IOError as e: except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror) logger.error("I/O error", errno=e.errno, error=e.strerror)
return False return None
else: else:
appointment_data = json.loads(arg_opt) appointment_data = json.loads(arg_opt)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("Non-JSON encoded data provided as appointment. " + use_help) logger.error("Non-JSON encoded data provided as appointment. " + use_help)
return False return None
if not appointment_data: return appointment_data
logger.error("The provided JSON is empty")
return False
valid_locator = check_sha256_hex_format(appointment_data.get("tx_id"))
if not valid_locator:
logger.error("The provided locator is not valid")
return False
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
appointment = Appointment.from_dict(appointment_data)
try:
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
cli_sk = Cryptographer.load_private_key_der(sk_der)
except ValueError:
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False
except FileNotFoundError:
logger.error("Client's private key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
try:
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
hex_pk_der = binascii.hexlify(cli_pk_der)
except FileNotFoundError:
logger.error("Client's public key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
# FIXME: Exceptions for hexlify need to be covered
data = {"appointment": appointment, "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
appointment_json = json.dumps(data, sort_keys=True, separators=(",", ":"))
# Sends appointment data to add_appointment endpoint to be processed by the server.
def post_data_to_add_appointment_endpoint(add_appointment_endpoint, appointment_json):
logger.info("Sending appointment to PISA") logger.info("Sending appointment to PISA")
try: try:
@@ -176,15 +203,15 @@ def add_appointment(args):
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("The response was not valid JSON") logger.error("The response was not valid JSON")
return False return None
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout") logger.error("Can't connect to pisa API. Connection timeout")
return False return None
except ConnectionError: except ConnectionError:
logger.error("Can't connect to pisa API. Server cannot be reached") logger.error("Can't connect to pisa API. Server cannot be reached")
return False return None
if r.status_code != HTTP_OK: if r.status_code != HTTP_OK:
if "error" not in response_json: if "error" not in response_json:
@@ -196,14 +223,17 @@ def add_appointment(args):
status_code=r.status_code, status_code=r.status_code,
description=error, description=error,
) )
return False return None
if "signature" not in response_json: if "signature" not in response_json:
logger.error("The response does not contain the signature of the appointment") logger.error("The response does not contain the signature of the appointment")
return False return None
signature = response_json["signature"] return response_json
# verify that the returned signature is valid
# Verify that the signature returned from the watchtower is valid.
def check_signature(signature, appointment):
try: try:
pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY) pisa_pk_der = load_key_file_data(PISA_PUBLIC_KEY)
pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der) pisa_pk = Cryptographer.load_public_key_der(pisa_pk_der)
@@ -212,7 +242,7 @@ def add_appointment(args):
logger.error("Failed to deserialize the public key. It might be in an unsupported format") logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False return False
is_sig_valid = Cryptographer.verify(appointment.serialize(), signature, pisa_pk) return Cryptographer.verify(appointment.serialize(), signature, pisa_pk)
except FileNotFoundError: except FileNotFoundError:
logger.error("Pisa's public key file not found. Please check your settings") logger.error("Pisa's public key file not found. Please check your settings")
@@ -222,21 +252,6 @@ def add_appointment(args):
logger.error("I/O error", errno=e.errno, error=e.strerror) logger.error("I/O error", errno=e.errno, error=e.strerror)
return False return False
if not is_sig_valid:
logger.error("The returned appointment's signature is invalid")
return False
logger.info("Appointment accepted and signed by Pisa")
# all good, store appointment and signature
try:
save_signed_appointment(appointment, signature)
except OSError as e:
logger.error("There was an error while saving the appointment", error=e)
return False
return True
def get_appointment(args): def get_appointment(args):
if not args: if not args:
@@ -260,8 +275,9 @@ def get_appointment(args):
try: try:
r = requests.get(url=get_appointment_endpoint + parameters, timeout=5) r = requests.get(url=get_appointment_endpoint + parameters, timeout=5)
logger.info("Appointment response returned from server: " + str(r))
return True
print(json.dumps(r.json(), indent=4, sort_keys=True))
except ConnectTimeout: except ConnectTimeout:
logger.error("Can't connect to pisa API. Connection timeout") logger.error("Can't connect to pisa API. Connection timeout")
return False return False
@@ -270,7 +286,47 @@ def get_appointment(args):
logger.error("Can't connect to pisa API. Server cannot be reached") logger.error("Can't connect to pisa API. Server cannot be reached")
return False return False
return True
def get_appointment_signature(appointment):
try:
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
cli_sk = Cryptographer.load_private_key_der(sk_der)
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
return signature
except ValueError:
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
return False
except FileNotFoundError:
logger.error("Client's private key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
def get_pk():
try:
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
hex_pk_der = binascii.hexlify(cli_pk_der)
return hex_pk_der
except FileNotFoundError:
logger.error("Client's public key file not found. Please check your settings")
return False
except IOError as e:
logger.error("I/O error", errno=e.errno, error=e.strerror)
return False
except binascii.Error as e:
logger.error("Could not successfully encode public key as hex: ", e)
return False
def show_usage(): def show_usage():

View File

@@ -1,4 +1,5 @@
import re import re
from common.constants import LOCATOR_LEN_HEX
def check_sha256_hex_format(value): def check_sha256_hex_format(value):
@@ -12,3 +13,15 @@ def check_sha256_hex_format(value):
:mod:`bool`: Whether or not the value matches the format. :mod:`bool`: Whether or not the value matches the format.
""" """
return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None return isinstance(value, str) and re.match(r"^[0-9A-Fa-f]{64}$", value) is not None
def compute_locator(tx_id):
"""
Computes an appointment locator given a transaction id.
Args:
tx_id (:obj:`str`): the transaction id used to compute the locator.
Returns:
(:obj:`str`): The computed locator.
"""
return tx_id[:LOCATOR_LEN_HEX]

View File

@@ -3,7 +3,7 @@ from queue import Queue
from threading import Thread from threading import Thread
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.constants import LOCATOR_LEN_HEX from common.tools import compute_locator
from common.logger import Logger from common.logger import Logger
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
@@ -71,20 +71,6 @@ class Watcher:
if not isinstance(responder, Responder): if not isinstance(responder, Responder):
self.responder = Responder(db_manager) self.responder = Responder(db_manager)
@staticmethod
def compute_locator(tx_id):
"""
Computes an appointment locator given a transaction id.
Args:
tx_id (:obj:`str`): the transaction id used to compute the locator.
Returns:
(:obj:`str`): The computed locator.
"""
return tx_id[:LOCATOR_LEN_HEX]
def add_appointment(self, appointment): def add_appointment(self, appointment):
""" """
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached. Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
@@ -238,7 +224,7 @@ class Watcher:
found. found.
""" """
potential_locators = {Watcher.compute_locator(txid): txid for txid in txids} potential_locators = {compute_locator(txid): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match # Check is any of the tx_ids in the received block is an actual match
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())

View File

@@ -1,23 +1,41 @@
import responses import responses
import json import json
import os
import shutil
from binascii import hexlify from binascii import hexlify
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from common.appointment import Appointment
from common.cryptographer import Cryptographer
import apps.cli.pisa_cli as pisa_cli import apps.cli.pisa_cli as pisa_cli
from test.apps.cli.unit.conftest import get_random_value_hex from test.apps.cli.unit.conftest import get_random_value_hex
# TODO: should find a way of doing without this
from apps.cli.pisa_cli import build_appointment
# dummy keys for the tests # dummy keys for the tests
pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) pisa_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_pk = pisa_sk.public_key() pisa_pk = pisa_sk.public_key()
other_sk = ec.generate_private_key(ec.SECP256K1, default_backend()) other_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
pisa_sk_der = pisa_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
pisa_pk_der = pisa_pk.public_bytes(
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
)
other_sk_der = other_sk.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
# Replace the key in the module with a key we control for the tests # Replace the key in the module with a key we control for the tests
pisa_cli.pisa_public_key = pisa_pk pisa_cli.pisa_public_key = pisa_pk
# Replace endpoint with dummy one # Replace endpoint with dummy one
@@ -32,18 +50,47 @@ dummy_appointment_request = {
"end_time": 50000, "end_time": 50000,
"to_self_delay": 200, "to_self_delay": 200,
} }
dummy_appointment = build_appointment(**dummy_appointment_request)
# FIXME: USE CRYPTOGRAPHER # This is the format appointment turns into once it hits "add_appointment"
dummy_appointment_full = {
"locator": get_random_value_hex(32),
"start_time": 1500,
"end_time": 50000,
"to_self_delay": 200,
"encrypted_blob": get_random_value_hex(120),
}
dummy_appointment = Appointment.from_dict(dummy_appointment_full)
def sign_appointment(sk, appointment): def get_dummy_pisa_sk_der(*args):
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8") return pisa_sk_der
return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8")
def get_dummy_pisa_pk(der_data): def get_dummy_pisa_pk_der(*args):
return pisa_pk return pisa_pk_der
def get_dummy_hex_pk_der(*args):
return hexlify(get_dummy_pisa_pk_der())
def get_dummy_signature(*args):
sk = Cryptographer.load_private_key_der(pisa_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def get_bad_signature(*args):
sk = Cryptographer.load_private_key_der(other_sk_der)
return Cryptographer.sign(dummy_appointment.serialize(), sk)
def valid_sig(*args):
return True
def invalid_sig(*args):
return False
@responses.activate @responses.activate
@@ -51,10 +98,12 @@ def test_add_appointment(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested # Simulate a request to add_appointment for dummy_appointment, make sure that the right endpoint is requested
# and the return value is True # and the return value is True
# make sure the test uses the right dummy key instead of loading it from disk # Make sure the test uses the dummy signature
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_dummy_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", valid_sig)
response = {"locator": dummy_appointment["locator"], "signature": sign_appointment(pisa_sk, dummy_appointment)} response = {"locator": dummy_appointment.to_dict()["locator"], "signature": get_dummy_signature()}
request_url = "http://{}/".format(pisa_endpoint) request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200) responses.add(responses.POST, request_url, json=response, status=200)
@@ -72,12 +121,14 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
# Simulate a request to add_appointment for dummy_appointment, but sign with a different key, # Simulate a request to add_appointment for dummy_appointment, but sign with a different key,
# make sure that the right endpoint is requested, but the return value is False # make sure that the right endpoint is requested, but the return value is False
# make sure the test uses the right dummy key instead of loading it from disk # Make sure the test uses the bad dummy signature
monkeypatch.setattr(pisa_cli, "load_public_key", get_dummy_pisa_pk) monkeypatch.setattr(pisa_cli, "get_appointment_signature", get_bad_signature)
monkeypatch.setattr(pisa_cli, "get_pk", get_dummy_hex_pk_der)
monkeypatch.setattr(pisa_cli, "check_signature", invalid_sig)
response = { response = {
"locator": dummy_appointment["locator"], "locator": dummy_appointment.to_dict()["locator"],
"signature": sign_appointment(other_sk, dummy_appointment), # signing with a different key "signature": get_bad_signature(), # Sign with a bad key
} }
request_url = "http://{}/".format(pisa_endpoint) request_url = "http://{}/".format(pisa_endpoint)
@@ -85,4 +136,141 @@ def test_add_appointment_with_invalid_signature(monkeypatch):
result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)]) result = pisa_cli.add_appointment([json.dumps(dummy_appointment_request)])
assert not result assert result is False
def test_load_key_file_data():
# If file exists and has data in it, function should work.
with open("key_test_file", "w+b") as f:
f.write(pisa_sk_der)
appt_data = pisa_cli.load_key_file_data("key_test_file")
assert appt_data
os.remove("key_test_file")
# If file doesn't exist, function should fail.
appt_data = pisa_cli.load_key_file_data("nonexistent_file")
assert not appt_data
def test_save_signed_appointment(monkeypatch):
monkeypatch.setattr(pisa_cli, "APPOINTMENTS_FOLDER_NAME", "test_appointments")
pisa_cli.save_signed_appointment(dummy_appointment.to_dict(), get_dummy_signature())
# In folder "Appointments," grab all files and print them.
files = os.listdir("test_appointments")
found = False
for f in files:
if dummy_appointment.to_dict().get("locator") in f:
found = True
assert found
# If "appointments" directory doesn't exist, function should create it.
assert os.path.exists("test_appointments")
# Delete test directory once we're done.
shutil.rmtree("test_appointments")
def test_parse_add_appointment_args():
# If no args are passed, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(None)
assert not appt_data
# If file doesn't exist, function should fail.
appt_data = pisa_cli.parse_add_appointment_args(["-f", "nonexistent_file"])
assert not appt_data
# If file exists and has data in it, function should work.
with open("appt_test_file", "w") as f:
json.dump(dummy_appointment_request, f)
appt_data = pisa_cli.parse_add_appointment_args(["-f", "appt_test_file"])
assert appt_data
os.remove("appt_test_file")
# If appointment json is passed in, function should work.
appt_data = pisa_cli.parse_add_appointment_args([json.dumps(dummy_appointment_request)])
assert appt_data
@responses.activate
def test_post_data_to_add_appointment_endpoint():
response = {
"locator": dummy_appointment.to_dict()["locator"],
"signature": Cryptographer.sign(dummy_appointment.serialize(), pisa_sk),
}
request_url = "http://{}/".format(pisa_endpoint)
responses.add(responses.POST, request_url, json=response, status=200)
response = pisa_cli.post_data_to_add_appointment_endpoint(request_url, json.dumps(dummy_appointment_request))
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert response
def test_check_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
valid = pisa_cli.check_signature(get_dummy_signature(), dummy_appointment)
assert valid
valid = pisa_cli.check_signature(get_bad_signature(), dummy_appointment)
assert not valid
@responses.activate
def test_get_appointment():
# Response of get_appointment endpoint is an appointment with status added to it.
dummy_appointment_full["status"] = "being_watched"
response = dummy_appointment_full
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator={}".format(response.get("locator"))
responses.add(responses.GET, request_url, json=response, status=200)
result = pisa_cli.get_appointment([response.get("locator")])
assert len(responses.calls) == 1
assert responses.calls[0].request.url == request_url
assert result
@responses.activate
def test_get_appointment_err():
locator = get_random_value_hex(32)
# Test that get_appointment handles a connection error appropriately.
request_url = "http://{}/".format(pisa_endpoint) + "get_appointment?locator=".format(locator)
responses.add(responses.GET, request_url, body=ConnectionError())
assert not pisa_cli.get_appointment([locator])
def test_get_appointment_signature(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_sk_der)
signature = pisa_cli.get_appointment_signature(dummy_appointment)
assert isinstance(signature, str)
def test_get_pk(monkeypatch):
# Make sure the test uses the right dummy key instead of loading it from disk
monkeypatch.setattr(pisa_cli, "load_key_file_data", get_dummy_pisa_pk_der)
pk = pisa_cli.get_pk()
assert isinstance(pk, bytes)

View File

@@ -16,6 +16,7 @@ from pisa.watcher import Watcher
from pisa.tools import bitcoin_cli from pisa.tools import bitcoin_cli
from pisa.db_manager import DBManager from pisa.db_manager import DBManager
from common.appointment import Appointment from common.appointment import Appointment
from common.tools import compute_locator
from bitcoind_mock.utils import sha256d from bitcoind_mock.utils import sha256d
from bitcoind_mock.transaction import TX from bitcoind_mock.transaction import TX
@@ -103,7 +104,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
locator = Watcher.compute_locator(dispute_txid) locator = compute_locator(dispute_txid)
blob = Blob(dummy_appointment_data.get("tx")) blob = Blob(dummy_appointment_data.get("tx"))
encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id")) encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id"))

View File

@@ -16,7 +16,7 @@ from test.pisa.unit.conftest import (
) )
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS
from common.tools import check_sha256_hex_format from common.tools import check_sha256_hex_format, compute_locator
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
@@ -46,7 +46,7 @@ def txids():
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def locator_uuid_map(txids): def locator_uuid_map(txids):
return {Watcher.compute_locator(txid): uuid4().hex for txid in txids} return {compute_locator(txid): uuid4().hex for txid in txids}
def create_appointments(n): def create_appointments(n):
@@ -219,7 +219,7 @@ def test_filter_valid_breaches(watcher):
dummy_appointment, _ = generate_dummy_appointment() dummy_appointment, _ = generate_dummy_appointment()
dummy_appointment.encrypted_blob.data = encrypted_blob dummy_appointment.encrypted_blob.data = encrypted_blob
dummy_appointment.locator = Watcher.compute_locator(dispute_txid) dummy_appointment.locator = compute_locator(dispute_txid)
uuid = uuid4().hex uuid = uuid4().hex
appointments = {uuid: dummy_appointment} appointments = {uuid: dummy_appointment}