mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Merge pull request #103 from sr-gi/enable_user_sig
Enables user-side sig using the new signature algorithm
This commit is contained in:
@@ -81,7 +81,8 @@ jobs:
|
||||
. venv/bin/activate
|
||||
cp test/teos/e2e/teos-conf.py teos/conf.py
|
||||
python3 -m apps.generate_key -d ~/.teos/
|
||||
python3 -m apps.generate_key -n cli -d ~/.teos/
|
||||
python3 -m apps.generate_key -n cli -d ~/.teos_cli/
|
||||
cp ~/.teos/teos_pk.der ~/.teos_cli/
|
||||
|
||||
|
||||
# Run E2E tests
|
||||
|
||||
@@ -11,9 +11,9 @@ conf_fields = {
|
||||
"DATA_FOLDER": {"value": conf.DATA_FOLDER, "type": str},
|
||||
"CLIENT_LOG_FILE": {"value": conf.CLIENT_LOG_FILE, "type": str, "path": True},
|
||||
"APPOINTMENTS_FOLDER_NAME": {"value": conf.APPOINTMENTS_FOLDER_NAME, "type": str, "path": True},
|
||||
# "CLI_PUBLIC_KEY": {"value": conf.CLI_PUBLIC_KEY, "type": str, "path": True},
|
||||
# "CLI_PRIVATE_KEY": {"value": conf.CLI_PRIVATE_KEY, "type": str, "path": True},
|
||||
# "TEOS_PUBLIC_KEY": {"value": conf.TEOS_PUBLIC_KEY, "type": str, "path": True},
|
||||
"CLI_PUBLIC_KEY": {"value": conf.CLI_PUBLIC_KEY, "type": str, "path": True},
|
||||
"CLI_PRIVATE_KEY": {"value": conf.CLI_PRIVATE_KEY, "type": str, "path": True},
|
||||
"TEOS_PUBLIC_KEY": {"value": conf.TEOS_PUBLIC_KEY, "type": str, "path": True},
|
||||
}
|
||||
|
||||
# Expand user (~) if found and check fields are correct
|
||||
|
||||
@@ -7,3 +7,8 @@ DATA_FOLDER = "~/.teos_cli/"
|
||||
|
||||
CLIENT_LOG_FILE = "cli.log"
|
||||
APPOINTMENTS_FOLDER_NAME = "appointment_receipts"
|
||||
|
||||
# KEYS
|
||||
TEOS_PUBLIC_KEY = "teos_pk.der"
|
||||
CLI_PRIVATE_KEY = "cli_sk.der"
|
||||
CLI_PUBLIC_KEY = "cli_pk.der"
|
||||
|
||||
@@ -24,49 +24,57 @@ from common.tools import check_sha256_hex_format, check_locator_format, compute_
|
||||
logger = Logger(actor="Client", log_name_prefix=LOG_PREFIX)
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
# FIXME: creating a simpler load_keys for the alpha. Client keys will not be necessary. TEOS key is hardcoded.
|
||||
# def load_keys(teos_pk_path, cli_sk_path, cli_pk_path):
|
||||
# """
|
||||
# Loads all the keys required so sign, send, and verify the appointment.
|
||||
#
|
||||
# Args:
|
||||
# teos_pk_path (:obj:`str`): path to the TEOS public key file.
|
||||
# cli_sk_path (:obj:`str`): path to the client private key file.
|
||||
# cli_pk_path (:obj:`str`): path to the client public key file.
|
||||
#
|
||||
# Returns:
|
||||
# :obj:`tuple` or ``None``: a three item tuple containing a teos_pk object, cli_sk object and the cli_sk_der
|
||||
# encoded key if all keys can be loaded. ``None`` otherwise.
|
||||
# """
|
||||
#
|
||||
# teos_pk_der = Cryptographer.load_key_file(teos_pk_path)
|
||||
# teos_pk = Cryptographer.load_public_key_der(teos_pk_der)
|
||||
#
|
||||
# if teos_pk is None:
|
||||
# logger.error("TEOS's public key file not found. Please check your settings")
|
||||
# return None
|
||||
#
|
||||
# cli_sk_der = Cryptographer.load_key_file(cli_sk_path)
|
||||
# cli_sk = Cryptographer.load_private_key_der(cli_sk_der)
|
||||
#
|
||||
# if cli_sk is None:
|
||||
# logger.error("Client's private key file not found. Please check your settings")
|
||||
# return None
|
||||
#
|
||||
# cli_pk_der = Cryptographer.load_key_file(cli_pk_path)
|
||||
#
|
||||
# if cli_pk_der is None:
|
||||
# logger.error("Client's public key file not found. Please check your settings")
|
||||
# return None
|
||||
#
|
||||
# return teos_pk, cli_sk, cli_pk_der
|
||||
|
||||
def load_keys(teos_pk_path, cli_sk_path, cli_pk_path):
|
||||
"""
|
||||
Loads all the keys required so sign, send, and verify the appointment.
|
||||
|
||||
def load_keys():
|
||||
TEOS_PUBLIC_KEY = "0230053e39c53b8bcb43354a4ed886b8082af1d1e8fc14956e60ad0592bfdfab51"
|
||||
teos_pk = PublicKey(binascii.unhexlify(TEOS_PUBLIC_KEY))
|
||||
Args:
|
||||
teos_pk_path (:obj:`str`): path to the TEOS public key file.
|
||||
cli_sk_path (:obj:`str`): path to the client private key file.
|
||||
cli_pk_path (:obj:`str`): path to the client public key file.
|
||||
|
||||
return teos_pk
|
||||
Returns:
|
||||
:obj:`tuple` or ``None``: a three item tuple containing a teos_pk object, cli_sk object and the cli_sk_der
|
||||
encoded key if all keys can be loaded. ``None`` otherwise.
|
||||
"""
|
||||
|
||||
if teos_pk_path is None:
|
||||
logger.error("TEOS's public key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
if cli_sk_path is None:
|
||||
logger.error("Client's private key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
if cli_pk_path is None:
|
||||
logger.error("Client's public key file not found. Please check your settings")
|
||||
return None
|
||||
|
||||
try:
|
||||
teos_pk_der = Cryptographer.load_key_file(teos_pk_path)
|
||||
teos_pk = PublicKey(teos_pk_der)
|
||||
|
||||
except ValueError:
|
||||
logger.error("TEOS public key is invalid or cannot be parsed")
|
||||
return None
|
||||
|
||||
cli_sk_der = Cryptographer.load_key_file(cli_sk_path)
|
||||
cli_sk = Cryptographer.load_private_key_der(cli_sk_der)
|
||||
|
||||
if cli_sk is None:
|
||||
logger.error("Client private key is invalid or cannot be parsed")
|
||||
return None
|
||||
|
||||
try:
|
||||
cli_pk_der = Cryptographer.load_key_file(cli_pk_path)
|
||||
PublicKey(cli_pk_der)
|
||||
|
||||
except ValueError:
|
||||
logger.error("Client public key is invalid or cannot be parsed")
|
||||
return None
|
||||
|
||||
return teos_pk, cli_sk, cli_pk_der
|
||||
|
||||
|
||||
def add_appointment(args):
|
||||
@@ -95,18 +103,17 @@ def add_appointment(args):
|
||||
:obj:`bool`: True if the appointment is accepted by the tower and the receipt is properly stored, false if any
|
||||
error occurs during the process.
|
||||
"""
|
||||
# FIXME: creating a simpler load_keys for the alpha. Client keys will not be necessary. TEOS key is hardcoded.
|
||||
# teos_pk, cli_sk, cli_pk_der = load_keys(
|
||||
# config.get("TEOS_PUBLIC_KEY"), config.get("CLI_PRIVATE_KEY"), config.get("CLI_PUBLIC_KEY")
|
||||
# )
|
||||
#
|
||||
# try:
|
||||
# hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
#
|
||||
# except binascii.Error as e:
|
||||
# logger.error("Could not successfully encode public key as hex", error=str(e))
|
||||
# return False
|
||||
teos_pk = load_keys()
|
||||
|
||||
teos_pk, cli_sk, cli_pk_der = load_keys(
|
||||
config.get("TEOS_PUBLIC_KEY"), config.get("CLI_PRIVATE_KEY"), config.get("CLI_PUBLIC_KEY")
|
||||
)
|
||||
|
||||
try:
|
||||
hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
|
||||
except binascii.Error as e:
|
||||
logger.error("Could not successfully encode public key as hex", error=str(e))
|
||||
return False
|
||||
|
||||
if teos_pk is None:
|
||||
return False
|
||||
@@ -136,15 +143,12 @@ def add_appointment(args):
|
||||
return False
|
||||
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
# FIXME: getting rid of the client-side signature for the alpha. A proper authentication is required.
|
||||
# signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
#
|
||||
# if not (appointment and signature):
|
||||
# return False
|
||||
#
|
||||
# data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
|
||||
data = {"appointment": appointment.to_dict()}
|
||||
if not (appointment and signature):
|
||||
return False
|
||||
|
||||
data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
|
||||
|
||||
# Send appointment to the server.
|
||||
server_response = post_appointment(data)
|
||||
|
||||
@@ -52,8 +52,7 @@ dummy_appointment = Appointment.from_dict(dummy_appointment_full)
|
||||
|
||||
|
||||
def load_dummy_keys(*args):
|
||||
# return dummy_pk, dummy_sk, dummy_pk_der
|
||||
return dummy_pk
|
||||
return dummy_pk, dummy_sk, dummy_pk.format(compressed=True)
|
||||
|
||||
|
||||
def get_dummy_signature(*args):
|
||||
@@ -64,30 +63,39 @@ def get_bad_signature(*args):
|
||||
return Cryptographer.sign(dummy_appointment.serialize(), another_sk)
|
||||
|
||||
|
||||
# def test_load_keys():
|
||||
# # Let's first create a private key and public key files
|
||||
# private_key_file_path = "sk_test_file"
|
||||
# public_key_file_path = "pk_test_file"
|
||||
# with open(private_key_file_path, "wb") as f:
|
||||
# f.write(dummy_sk.to_der())
|
||||
# with open(public_key_file_path, "wb") as f:
|
||||
# f.write(dummy_pk_der)
|
||||
#
|
||||
# # Now we can test the function passing the using this files (we'll use the same pk for both)
|
||||
# r = teos_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path)
|
||||
# assert isinstance(r, tuple)
|
||||
# assert len(r) == 3
|
||||
#
|
||||
# # If any param does not match we should get None as result
|
||||
# assert teos_cli.load_keys(None, private_key_file_path, public_key_file_path) is None
|
||||
# assert teos_cli.load_keys(public_key_file_path, None, public_key_file_path) is None
|
||||
# assert teos_cli.load_keys(public_key_file_path, private_key_file_path, None) is None
|
||||
#
|
||||
# # The same should happen if we pass a public key where a private should be, for instance
|
||||
# assert teos_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None
|
||||
#
|
||||
# os.remove(private_key_file_path)
|
||||
# os.remove(public_key_file_path)
|
||||
def test_load_keys():
|
||||
# Let's first create a private key and public key files
|
||||
private_key_file_path = "sk_test_file"
|
||||
public_key_file_path = "pk_test_file"
|
||||
empty_file_path = "empty_file"
|
||||
with open(private_key_file_path, "wb") as f:
|
||||
f.write(dummy_sk.to_der())
|
||||
with open(public_key_file_path, "wb") as f:
|
||||
f.write(dummy_pk.format(compressed=True))
|
||||
with open(empty_file_path, "wb") as f:
|
||||
pass
|
||||
|
||||
# Now we can test the function passing the using this files (we'll use the same pk for both)
|
||||
r = teos_cli.load_keys(public_key_file_path, private_key_file_path, public_key_file_path)
|
||||
assert isinstance(r, tuple)
|
||||
assert len(r) == 3
|
||||
|
||||
# If any param does not match we should get None as result
|
||||
assert teos_cli.load_keys(None, private_key_file_path, public_key_file_path) is None
|
||||
assert teos_cli.load_keys(public_key_file_path, None, public_key_file_path) is None
|
||||
assert teos_cli.load_keys(public_key_file_path, private_key_file_path, None) is None
|
||||
|
||||
# The same should happen if we pass a public key where a private should be, for instance
|
||||
assert teos_cli.load_keys(private_key_file_path, public_key_file_path, private_key_file_path) is None
|
||||
|
||||
# Same if any of the files is empty
|
||||
assert teos_cli.load_keys(empty_file_path, private_key_file_path, public_key_file_path) is None
|
||||
assert teos_cli.load_keys(public_key_file_path, empty_file_path, public_key_file_path) is None
|
||||
assert teos_cli.load_keys(public_key_file_path, private_key_file_path, empty_file_path) is None
|
||||
|
||||
os.remove(private_key_file_path)
|
||||
os.remove(public_key_file_path)
|
||||
os.remove(empty_file_path)
|
||||
|
||||
|
||||
# TODO: 90-add-more-add-appointment-tests
|
||||
@@ -1,8 +1,9 @@
|
||||
import json
|
||||
import binascii
|
||||
from time import sleep
|
||||
from riemann.tx import Tx
|
||||
|
||||
from teos import config
|
||||
|
||||
from teos import HOST, PORT
|
||||
from apps.cli import teos_cli
|
||||
from common.blob import Blob
|
||||
@@ -20,6 +21,7 @@ from test.teos.e2e.conftest import (
|
||||
create_penalty_tx,
|
||||
run_teosd,
|
||||
)
|
||||
from apps.cli import config as cli_conf
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix="")
|
||||
|
||||
@@ -32,13 +34,6 @@ teos_cli.teos_api_port = PORT
|
||||
teosd_process = run_teosd()
|
||||
|
||||
|
||||
def get_teos_pk():
|
||||
teos_sk = Cryptographer.load_private_key_der(Cryptographer.load_key_file(config.get("TEOS_SECRET_KEY")))
|
||||
teos_pk = teos_sk.public_key
|
||||
|
||||
return teos_pk
|
||||
|
||||
|
||||
def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr):
|
||||
# Broadcast the commitment transaction and mine a block
|
||||
bitcoin_cli.sendrawtransaction(commitment_tx)
|
||||
@@ -51,9 +46,7 @@ def get_appointment_info(locator):
|
||||
return teos_cli.get_appointment(locator)
|
||||
|
||||
|
||||
def test_appointment_life_cycle(monkeypatch, bitcoin_cli, create_txs):
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
def test_appointment_life_cycle(bitcoin_cli, create_txs):
|
||||
commitment_tx, penalty_tx = create_txs
|
||||
commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")
|
||||
appointment_data = build_appointment_data(bitcoin_cli, commitment_tx_id, penalty_tx)
|
||||
@@ -96,9 +89,7 @@ def test_appointment_life_cycle(monkeypatch, bitcoin_cli, create_txs):
|
||||
assert appointment_info[0].get("status") == "not_found"
|
||||
|
||||
|
||||
def test_appointment_malformed_penalty(monkeypatch, bitcoin_cli, create_txs):
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
def test_appointment_malformed_penalty(bitcoin_cli, create_txs):
|
||||
# Lets start by creating two valid transaction
|
||||
commitment_tx, penalty_tx = create_txs
|
||||
|
||||
@@ -140,17 +131,13 @@ def test_appointment_wrong_key(bitcoin_cli, create_txs):
|
||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(penalty_tx), get_random_value_hex(32))
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
|
||||
# teos_pk, cli_sk, cli_pk_der = teos_cli.load_keys(
|
||||
# cli_conf.get("TEOS_PUBLIC_KEY"), cli_conf.get("CLI_PRIVATE_KEY"), cli_conf.get("CLI_PUBLIC_KEY")
|
||||
# )
|
||||
# hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
#
|
||||
# signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
# data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
|
||||
# FIXME: Since the pk is now hardcoded for the alpha in the cli we cannot use load_keys here. We need to derive
|
||||
# the pk from the sk on disk.
|
||||
teos_pk = get_teos_pk()
|
||||
data = {"appointment": appointment.to_dict()}
|
||||
teos_pk, cli_sk, cli_pk_der = teos_cli.load_keys(
|
||||
cli_conf.get("TEOS_PUBLIC_KEY"), cli_conf.get("CLI_PRIVATE_KEY"), cli_conf.get("CLI_PUBLIC_KEY")
|
||||
)
|
||||
hex_pk_der = binascii.hexlify(cli_pk_der)
|
||||
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
data = {"appointment": appointment.to_dict(), "signature": signature, "public_key": hex_pk_der.decode("utf-8")}
|
||||
|
||||
# Send appointment to the server.
|
||||
response = teos_cli.post_appointment(data)
|
||||
@@ -176,9 +163,7 @@ def test_appointment_wrong_key(bitcoin_cli, create_txs):
|
||||
assert appointment_info[0].get("status") == "not_found"
|
||||
|
||||
|
||||
def test_two_identical_appointments(monkeypatch, bitcoin_cli, create_txs):
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
def test_two_identical_appointments(bitcoin_cli, create_txs):
|
||||
# Tests sending two identical appointments to the tower.
|
||||
# At the moment there are no checks for identical appointments, so both will be accepted, decrypted and kept until
|
||||
# the end.
|
||||
@@ -211,9 +196,7 @@ def test_two_identical_appointments(monkeypatch, bitcoin_cli, create_txs):
|
||||
assert info.get("penalty_rawtx") == penalty_tx
|
||||
|
||||
|
||||
def test_two_appointment_same_locator_different_penalty(monkeypatch, bitcoin_cli, create_txs):
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
def test_two_appointment_same_locator_different_penalty(bitcoin_cli, create_txs):
|
||||
# This tests sending an appointment with two valid transaction with the same locator.
|
||||
commitment_tx, penalty_tx1 = create_txs
|
||||
commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")
|
||||
@@ -245,11 +228,9 @@ def test_two_appointment_same_locator_different_penalty(monkeypatch, bitcoin_cli
|
||||
assert appointment_info[0].get("penalty_rawtx") == penalty_tx1
|
||||
|
||||
|
||||
def test_appointment_shutdown_teos_trigger_back_online(monkeypatch, create_txs, bitcoin_cli):
|
||||
def test_appointment_shutdown_teos_trigger_back_online(create_txs, bitcoin_cli):
|
||||
global teosd_process
|
||||
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
teos_pid = teosd_process.pid
|
||||
|
||||
commitment_tx, penalty_tx = create_txs
|
||||
@@ -285,11 +266,9 @@ def test_appointment_shutdown_teos_trigger_back_online(monkeypatch, create_txs,
|
||||
assert appointment_info[0].get("status") == "dispute_responded"
|
||||
|
||||
|
||||
def test_appointment_shutdown_teos_trigger_while_offline(monkeypatch, create_txs, bitcoin_cli):
|
||||
def test_appointment_shutdown_teos_trigger_while_offline(create_txs, bitcoin_cli):
|
||||
global teosd_process
|
||||
|
||||
monkeypatch.setattr(teos_cli, "load_keys", get_teos_pk)
|
||||
|
||||
teos_pid = teosd_process.pid
|
||||
|
||||
commitment_tx, penalty_tx = create_txs
|
||||
|
||||
Reference in New Issue
Block a user