mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Generalizes get_dummy_appointment (there were multiple instances of it in different tests)
Also refactors some unused imports and modifies the watcher tests to use .serialize instead of to_json
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
from hashlib import sha256
|
||||
from threading import Thread
|
||||
from binascii import unhexlify
|
||||
from queue import Queue, Empty
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
@@ -12,17 +10,13 @@ from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
|
||||
from pisa import c_logger
|
||||
from apps.cli.blob import Blob
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.responder import Responder
|
||||
from pisa.conf import MAX_APPOINTMENTS
|
||||
from pisa.appointment import Appointment
|
||||
from pisa.tools import check_txid_format
|
||||
from test.simulator.utils import sha256d
|
||||
from test.simulator.transaction import TX
|
||||
from pisa.utils.auth_proxy import AuthServiceProxy
|
||||
from test.unit.conftest import generate_block, generate_blocks
|
||||
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, PISA_SECRET_KEY
|
||||
from test.unit.conftest import generate_block, generate_blocks, generate_dummy_appointment
|
||||
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, PISA_SECRET_KEY, \
|
||||
MAX_APPOINTMENTS
|
||||
|
||||
c_logger.disabled = True
|
||||
|
||||
@@ -42,37 +36,14 @@ def watcher(db_manager):
|
||||
return Watcher(db_manager)
|
||||
|
||||
|
||||
def generate_dummy_appointment():
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
|
||||
|
||||
dispute_tx = TX.create_dummy_transaction()
|
||||
dispute_txid = sha256d(dispute_tx)
|
||||
justice_tx = TX.create_dummy_transaction(dispute_txid)
|
||||
|
||||
start_time = bitcoin_cli.getblockcount() + 1
|
||||
end_time = start_time + 1
|
||||
dispute_delta = 20
|
||||
|
||||
cipher = "AES-GCM-128"
|
||||
hash_function = "SHA256"
|
||||
|
||||
locator = sha256(unhexlify(dispute_txid)).hexdigest()
|
||||
blob = Blob(justice_tx, cipher, hash_function)
|
||||
|
||||
encrypted_blob = blob.encrypt(dispute_txid)
|
||||
|
||||
appointment = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
|
||||
|
||||
return appointment, dispute_tx
|
||||
|
||||
|
||||
def create_appointments(n):
|
||||
locator_uuid_map = dict()
|
||||
appointments = dict()
|
||||
dispute_txs = []
|
||||
|
||||
for i in range(n):
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment, dispute_tx = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET,
|
||||
end_time_offset=END_TIME_OFFSET)
|
||||
uuid = uuid4().hex
|
||||
|
||||
appointments[uuid] = appointment
|
||||
@@ -85,7 +56,7 @@ def create_appointments(n):
|
||||
def is_signature_valid(appointment, signature, pk):
|
||||
# verify the signature
|
||||
try:
|
||||
data = appointment.to_json().encode('utf-8')
|
||||
data = appointment.serialize()
|
||||
pk.verify(signature, data, ec.ECDSA(hashes.SHA256()))
|
||||
except InvalidSignature:
|
||||
return False
|
||||
@@ -109,7 +80,8 @@ def test_add_appointment(run_bitcoind, watcher):
|
||||
|
||||
# We should be able to add appointments up to the limit
|
||||
for _ in range(10):
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment, dispute_tx = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET,
|
||||
end_time_offset=END_TIME_OFFSET)
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is True
|
||||
@@ -117,7 +89,8 @@ def test_add_appointment(run_bitcoind, watcher):
|
||||
|
||||
|
||||
def test_sign_appointment(watcher):
|
||||
appointment, _ = generate_dummy_appointment()
|
||||
appointment, _ = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET,
|
||||
end_time_offset=END_TIME_OFFSET)
|
||||
signature = watcher.sign_appointment(appointment)
|
||||
assert is_signature_valid(appointment, signature, public_key)
|
||||
|
||||
@@ -127,13 +100,15 @@ def test_add_too_many_appointments(watcher):
|
||||
watcher.appointments = dict()
|
||||
|
||||
for _ in range(MAX_APPOINTMENTS):
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment, dispute_tx = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET,
|
||||
end_time_offset=END_TIME_OFFSET)
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is True
|
||||
assert is_signature_valid(appointment, sig, public_key)
|
||||
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment, dispute_tx = generate_dummy_appointment(start_time_offset=START_TIME_OFFSET,
|
||||
end_time_offset=END_TIME_OFFSET)
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is False
|
||||
|
||||
Reference in New Issue
Block a user