mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
Added signature verification to watcher's add_appointment test
This commit is contained in:
@@ -73,6 +73,7 @@ class Watcher:
|
|||||||
|
|
||||||
logger.info("New appointment accepted.", locator=appointment.locator)
|
logger.info("New appointment accepted.", locator=appointment.locator)
|
||||||
|
|
||||||
|
print(appointment.to_json().encode("utf-8"))
|
||||||
signature = self.signing_key.sign(
|
signature = self.signing_key.sign(
|
||||||
appointment.to_json().encode("utf-8"),
|
appointment.to_json().encode("utf-8"),
|
||||||
ec.ECDSA(hashes.SHA256())
|
ec.ECDSA(hashes.SHA256())
|
||||||
|
|||||||
@@ -6,6 +6,12 @@ from threading import Thread
|
|||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
|
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
|
||||||
from apps.cli.blob import Blob
|
from apps.cli.blob import Blob
|
||||||
from pisa.watcher import Watcher
|
from pisa.watcher import Watcher
|
||||||
from pisa.responder import Responder
|
from pisa.responder import Responder
|
||||||
@@ -16,7 +22,7 @@ from test.simulator.utils import sha256d
|
|||||||
from test.simulator.transaction import TX
|
from test.simulator.transaction import TX
|
||||||
from test.unit.conftest import generate_block
|
from test.unit.conftest import generate_block
|
||||||
from pisa.utils.auth_proxy import AuthServiceProxy
|
from pisa.utils.auth_proxy import AuthServiceProxy
|
||||||
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, SIGNING_KEY_FILE
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
|
|
||||||
@@ -24,6 +30,12 @@ APPOINTMENTS = 5
|
|||||||
START_TIME_OFFSET = 1
|
START_TIME_OFFSET = 1
|
||||||
END_TIME_OFFSET = 1
|
END_TIME_OFFSET = 1
|
||||||
|
|
||||||
|
with open(SIGNING_KEY_FILE, "r") as key_file:
|
||||||
|
pubkey_pem = key_file.read().encode("utf-8")
|
||||||
|
# TODO: should use the public key file instead, but it is not currently exported in the configuration
|
||||||
|
signing_key = load_pem_private_key(pubkey_pem, password=None, backend=default_backend())
|
||||||
|
public_key = signing_key.public_key()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def watcher():
|
def watcher():
|
||||||
@@ -92,6 +104,13 @@ def test_add_appointment(run_bitcoind, watcher):
|
|||||||
|
|
||||||
assert added_appointment is True
|
assert added_appointment is True
|
||||||
|
|
||||||
|
# verify the signature
|
||||||
|
try:
|
||||||
|
data = appointment.to_json().encode("utf-8")
|
||||||
|
public_key.verify(sig, data, ec.ECDSA(hashes.SHA256()))
|
||||||
|
except InvalidSignature:
|
||||||
|
assert False, "The appointment's signature is not correct"
|
||||||
|
|
||||||
|
|
||||||
def test_add_too_many_appointments(watcher):
|
def test_add_too_many_appointments(watcher):
|
||||||
# Any appointment on top of those should fail
|
# Any appointment on top of those should fail
|
||||||
|
|||||||
Reference in New Issue
Block a user