mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Test that server validates signature properly
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import pytest
|
||||
import random
|
||||
import requests
|
||||
@@ -5,7 +6,12 @@ from time import sleep
|
||||
from shutil import rmtree
|
||||
from threading import Thread
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
from binascii import hexlify, unhexlify
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from pisa.conf import DB_PATH
|
||||
from apps.cli.blob import Blob
|
||||
@@ -48,6 +54,18 @@ def prng_seed():
|
||||
random.seed(0)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def generate_keypair():
|
||||
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||
client_pk = (
|
||||
client_sk.public_key()
|
||||
.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
||||
.decode("utf-8")
|
||||
)
|
||||
|
||||
return client_sk, client_pk
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def db_manager():
|
||||
manager = DBManager("test_db")
|
||||
@@ -73,6 +91,11 @@ def generate_blocks(n):
|
||||
generate_block()
|
||||
|
||||
|
||||
def sign_appointment(sk, appointment):
|
||||
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
return hexlify(sk.sign(data, ec.ECDSA(hashes.SHA256()))).decode("utf-8")
|
||||
|
||||
|
||||
def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30):
|
||||
current_height = bitcoin_cli().getblockcount()
|
||||
|
||||
@@ -91,6 +114,14 @@ def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30):
|
||||
cipher = "AES-GCM-128"
|
||||
hash_function = "SHA256"
|
||||
|
||||
# dummy keys for this test
|
||||
client_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||
client_pk = (
|
||||
client_sk.public_key()
|
||||
.public_bytes(encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
|
||||
.decode("utf-8")
|
||||
)
|
||||
|
||||
locator = sha256(unhexlify(dispute_txid)).hexdigest()
|
||||
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
|
||||
|
||||
@@ -107,7 +138,11 @@ def generate_dummy_appointment_data(start_time_offset=5, end_time_offset=30):
|
||||
"triggered": False,
|
||||
}
|
||||
|
||||
return appointment_data, dispute_tx
|
||||
signature = sign_appointment(client_sk, appointment_data)
|
||||
|
||||
data = {"appointment": appointment_data, "signature": signature, "public_key": client_pk}
|
||||
|
||||
return data, dispute_tx
|
||||
|
||||
|
||||
def generate_dummy_appointment(start_time_offset=5, end_time_offset=30):
|
||||
@@ -115,7 +150,7 @@ def generate_dummy_appointment(start_time_offset=5, end_time_offset=30):
|
||||
start_time_offset=start_time_offset, end_time_offset=end_time_offset
|
||||
)
|
||||
|
||||
return Appointment.from_dict(appointment_data), dispute_tx
|
||||
return Appointment.from_dict(appointment_data["appointment"]), dispute_tx
|
||||
|
||||
|
||||
def generate_dummy_job():
|
||||
|
||||
Reference in New Issue
Block a user