mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
Refactors Cryptographer to remove logs and avoid multi return types
Cryptographer now uses exceptions to report errors and does not use the log anymore
This commit is contained in:
@@ -7,8 +7,6 @@ from binascii import hexlify
|
||||
from coincurve import PrivateKey
|
||||
from requests.exceptions import ConnectionError, Timeout
|
||||
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.appointment import Appointment
|
||||
from common.cryptographer import Cryptographer
|
||||
@@ -18,8 +16,6 @@ from cli.exceptions import InvalidParameter, InvalidKey, TowerResponseError
|
||||
|
||||
from test.cli.unit.conftest import get_random_value_hex, get_config
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=teos_cli.LOG_PREFIX)
|
||||
|
||||
config = get_config()
|
||||
|
||||
# dummy keys for the tests
|
||||
|
||||
@@ -1,17 +1,12 @@
|
||||
import os
|
||||
from binascii import unhexlify
|
||||
from coincurve import PrivateKey, PublicKey
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from coincurve import PrivateKey, PublicKey
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.cryptographer import Cryptographer
|
||||
from test.common.unit.conftest import get_random_value_hex
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix="")
|
||||
|
||||
data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777"
|
||||
key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659"
|
||||
encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810"
|
||||
@@ -185,7 +180,7 @@ def test_sign_ground_truth():
|
||||
sig = Cryptographer.sign(message, sk)
|
||||
rpk = Cryptographer.recover_pk(message, sig)
|
||||
|
||||
assert Cryptographer.verify_rpk(PublicKey(unhexlify(c_lightning_rpk)), rpk)
|
||||
assert c_lightning_rpk == Cryptographer.get_compressed_pk(rpk)
|
||||
|
||||
|
||||
def test_sign_wrong_sk():
|
||||
@@ -221,7 +216,7 @@ def test_recover_pk_ground_truth():
|
||||
|
||||
rpk = Cryptographer.recover_pk(message, zsig)
|
||||
|
||||
assert Cryptographer.verify_rpk(PublicKey(unhexlify(org_pk)), rpk)
|
||||
assert org_pk == Cryptographer.get_compressed_pk(rpk)
|
||||
|
||||
|
||||
def test_recover_pk_wrong_inputs():
|
||||
@@ -241,27 +236,6 @@ def test_recover_pk_wrong_inputs():
|
||||
assert Cryptographer.recover_pk(message, bytes(104)) is None
|
||||
|
||||
|
||||
def test_verify_pk():
|
||||
sk, _ = generate_keypair()
|
||||
message = b"Test message"
|
||||
|
||||
zbase32_sig = Cryptographer.sign(message, sk)
|
||||
rpk = Cryptographer.recover_pk(message, zbase32_sig)
|
||||
|
||||
assert Cryptographer.verify_rpk(sk.public_key, rpk)
|
||||
|
||||
|
||||
def test_verify_pk_wrong():
|
||||
sk, _ = generate_keypair()
|
||||
sk2, _ = generate_keypair()
|
||||
message = b"Test message"
|
||||
|
||||
zbase32_sig = Cryptographer.sign(message, sk)
|
||||
rpk = Cryptographer.recover_pk(message, zbase32_sig)
|
||||
|
||||
assert not Cryptographer.verify_rpk(sk2.public_key, rpk)
|
||||
|
||||
|
||||
def test_get_compressed_pk():
|
||||
sk, pk = generate_keypair()
|
||||
compressed_pk = Cryptographer.get_compressed_pk(pk)
|
||||
|
||||
@@ -8,8 +8,6 @@ from coincurve import PrivateKey
|
||||
from cli.exceptions import TowerResponseError
|
||||
from cli import teos_cli, DATA_DIR, DEFAULT_CONF, CONF_FILE_NAME
|
||||
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.appointment import Appointment
|
||||
from common.cryptographer import Cryptographer
|
||||
@@ -25,7 +23,6 @@ from test.teos.e2e.conftest import (
|
||||
)
|
||||
|
||||
cli_config = get_config(DATA_DIR, CONF_FILE_NAME, DEFAULT_CONF)
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix="")
|
||||
|
||||
teos_base_endpoint = "http://{}:{}".format(cli_config.get("API_CONNECT"), cli_config.get("API_PORT"))
|
||||
teos_add_appointment_endpoint = "{}/add_appointment".format(teos_base_endpoint)
|
||||
@@ -257,7 +254,7 @@ def test_appointment_wrong_decryption_key(bitcoin_cli):
|
||||
# Check that the server has accepted the appointment
|
||||
signature = response_json.get("signature")
|
||||
rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
|
||||
assert Cryptographer.verify_rpk(teos_pk, rpk) is True
|
||||
assert teos_pk == Cryptographer.get_compressed_pk(rpk)
|
||||
assert response_json.get("locator") == appointment.locator
|
||||
|
||||
# Trigger the appointment
|
||||
|
||||
@@ -10,25 +10,21 @@ from bitcoind_mock.bitcoind import BitcoindMock
|
||||
from bitcoind_mock.conf import BTC_RPC_HOST, BTC_RPC_PORT
|
||||
from bitcoind_mock.transaction import create_dummy_transaction
|
||||
|
||||
from teos import DEFAULT_CONF
|
||||
from teos.carrier import Carrier
|
||||
from teos.tools import bitcoin_cli
|
||||
from teos.users_dbm import UsersDBM
|
||||
from teos.gatekeeper import Gatekeeper
|
||||
from teos import LOG_PREFIX, DEFAULT_CONF
|
||||
from teos.responder import TransactionTracker
|
||||
from teos.block_processor import BlockProcessor
|
||||
from teos.appointments_dbm import AppointmentsDBM
|
||||
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.appointment import Appointment
|
||||
from common.constants import LOCATOR_LEN_HEX
|
||||
from common.config_loader import ConfigLoader
|
||||
from common.cryptographer import Cryptographer
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
# Set params to connect to regtest for testing
|
||||
DEFAULT_CONF["BTC_RPC_PORT"]["value"] = 18443
|
||||
DEFAULT_CONF["BTC_NETWORK"]["value"] = "regtest"
|
||||
|
||||
@@ -2,19 +2,14 @@ import pytest
|
||||
from binascii import unhexlify
|
||||
|
||||
import teos.errors as errors
|
||||
from teos import LOG_PREFIX
|
||||
from teos.block_processor import BlockProcessor
|
||||
from teos.inspector import Inspector, InspectionFailed
|
||||
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.appointment import Appointment
|
||||
from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX
|
||||
|
||||
from test.teos.unit.conftest import get_random_value_hex, bitcoind_connect_params, get_config
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
NO_HEX_STRINGS = [
|
||||
"R" * LOCATOR_LEN_HEX,
|
||||
get_random_value_hex(LOCATOR_LEN_BYTES - 1) + "PP",
|
||||
|
||||
@@ -13,8 +13,6 @@ from teos.chain_monitor import ChainMonitor
|
||||
from teos.appointments_dbm import AppointmentsDBM
|
||||
from teos.block_processor import BlockProcessor
|
||||
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.cryptographer import Cryptographer
|
||||
|
||||
@@ -28,9 +26,6 @@ from test.teos.unit.conftest import (
|
||||
bitcoind_connect_params,
|
||||
)
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
|
||||
APPOINTMENTS = 5
|
||||
START_TIME_OFFSET = 1
|
||||
END_TIME_OFFSET = 1
|
||||
@@ -134,16 +129,16 @@ def test_add_appointment(watcher):
|
||||
added_appointment, sig = watcher.add_appointment(appointment, user_pk)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify_rpk(
|
||||
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
assert Cryptographer.get_compressed_pk(watcher.signing_key.public_key) == Cryptographer.get_compressed_pk(
|
||||
Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
)
|
||||
|
||||
# Check that we can also add an already added appointment (same locator)
|
||||
added_appointment, sig = watcher.add_appointment(appointment, user_pk)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify_rpk(
|
||||
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
assert Cryptographer.get_compressed_pk(watcher.signing_key.public_key) == Cryptographer.get_compressed_pk(
|
||||
Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
)
|
||||
|
||||
# If two appointments with the same locator from the same user are added, they are overwritten, but if they come
|
||||
@@ -153,8 +148,8 @@ def test_add_appointment(watcher):
|
||||
different_user_pk = get_random_value_hex(33)
|
||||
added_appointment, sig = watcher.add_appointment(appointment, different_user_pk)
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify_rpk(
|
||||
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
assert Cryptographer.get_compressed_pk(watcher.signing_key.public_key) == Cryptographer.get_compressed_pk(
|
||||
Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
)
|
||||
assert len(watcher.locator_uuid_map[appointment.locator]) == 2
|
||||
|
||||
@@ -172,8 +167,8 @@ def test_add_too_many_appointments(watcher):
|
||||
added_appointment, sig = watcher.add_appointment(appointment, user_pk)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify_rpk(
|
||||
watcher.signing_key.public_key, Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
assert Cryptographer.get_compressed_pk(watcher.signing_key.public_key) == Cryptographer.get_compressed_pk(
|
||||
Cryptographer.recover_pk(appointment.serialize(), sig)
|
||||
)
|
||||
|
||||
appointment, dispute_tx = generate_dummy_appointment(
|
||||
|
||||
Reference in New Issue
Block a user