mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Adapts tests to use userDB and registered_users content as dict
This commit is contained in:
@@ -328,11 +328,8 @@ def test_appointment_shutdown_teos_trigger_back_online(create_txs, bitcoin_cli):
|
||||
|
||||
assert teos_pid != teosd_process.pid
|
||||
|
||||
# FIXME: We have to cheat here since users are not kept between restarts atm
|
||||
sleep(1)
|
||||
teos_cli.register(compressed_cli_pk, teos_base_endpoint)
|
||||
|
||||
# Check that the appointment is still in the Watcher
|
||||
sleep(1)
|
||||
appointment_info = get_appointment_info(locator)
|
||||
|
||||
assert appointment_info is not None
|
||||
@@ -376,11 +373,8 @@ def test_appointment_shutdown_teos_trigger_while_offline(create_txs, bitcoin_cli
|
||||
teosd_process = run_teosd()
|
||||
assert teos_pid != teosd_process.pid
|
||||
|
||||
# FIXME: We have to cheat here since users are not kept between restarts atm
|
||||
sleep(1)
|
||||
teos_cli.register(compressed_cli_pk, teos_base_endpoint)
|
||||
|
||||
# The appointment should have been moved to the Responder
|
||||
sleep(1)
|
||||
appointment_info = get_appointment_info(locator)
|
||||
|
||||
assert appointment_info is not None
|
||||
|
||||
@@ -12,6 +12,8 @@ from bitcoind_mock.transaction import create_dummy_transaction
|
||||
|
||||
from teos.carrier import Carrier
|
||||
from teos.tools import bitcoin_cli
|
||||
from teos.users_dbm import UsersDBM
|
||||
from teos.gatekeeper import Gatekeeper
|
||||
from teos import LOG_PREFIX, DEFAULT_CONF
|
||||
from teos.responder import TransactionTracker
|
||||
from teos.block_processor import BlockProcessor
|
||||
@@ -62,6 +64,17 @@ def db_manager():
|
||||
rmtree("test_db")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def user_db_manager():
|
||||
manager = UsersDBM("test_user_db")
|
||||
# Add last know block for the Responder in the db
|
||||
|
||||
yield manager
|
||||
|
||||
manager.db.close()
|
||||
rmtree("test_user_db")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def carrier():
|
||||
return Carrier(bitcoind_connect_params)
|
||||
@@ -72,6 +85,11 @@ def block_processor():
|
||||
return BlockProcessor(bitcoind_connect_params)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def gatekeeper(user_db_manager):
|
||||
return Gatekeeper(user_db_manager, get_config().get("DEFAULT_SLOTS"))
|
||||
|
||||
|
||||
def generate_keypair():
|
||||
sk = PrivateKey()
|
||||
pk = sk.public_key
|
||||
|
||||
@@ -58,13 +58,12 @@ def get_all_db_manager():
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def api(db_manager, carrier, block_processor, run_bitcoind):
|
||||
def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind):
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
responder = Responder(db_manager, carrier, block_processor)
|
||||
watcher = Watcher(db_manager, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS, config.get("EXPIRY_DELTA"))
|
||||
|
||||
gatekeeper = Gatekeeper(config.get("DEFAULT_SLOTS"))
|
||||
api = API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, gatekeeper)
|
||||
|
||||
return api
|
||||
@@ -146,7 +145,7 @@ def test_register_json_no_inner_dict(client):
|
||||
|
||||
def test_add_appointment(api, client, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 1}
|
||||
|
||||
# Properly formatted appointment
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
@@ -159,7 +158,7 @@ def test_add_appointment(api, client, appointment):
|
||||
|
||||
def test_add_appointment_no_json(api, client, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 1}
|
||||
|
||||
# Properly formatted appointment
|
||||
r = client.post(add_appointment_endpoint, data="random_message")
|
||||
@@ -168,7 +167,7 @@ def test_add_appointment_no_json(api, client, appointment):
|
||||
|
||||
def test_add_appointment_json_no_inner_dict(api, client, appointment):
|
||||
# Simulate the user registration
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 1}
|
||||
|
||||
# Properly formatted appointment
|
||||
r = client.post(add_appointment_endpoint, json="random_message")
|
||||
@@ -204,7 +203,7 @@ def test_add_appointment_not_registered(api, client, appointment):
|
||||
|
||||
def test_add_appointment_registered_no_free_slots(api, client, appointment):
|
||||
# Empty the user slots
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 0
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 0}
|
||||
|
||||
# Properly formatted appointment
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
@@ -237,7 +236,7 @@ def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MU
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
|
||||
# Simulate registering enough slots
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = n
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": n}
|
||||
for _ in range(n):
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -257,7 +256,8 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment
|
||||
|
||||
# Add one slot per public key
|
||||
for pair in user_keys:
|
||||
api.gatekeeper.registered_users[hexlify(pair[1].format(compressed=True)).decode("utf-8")] = 2
|
||||
tmp_compressed_pk = hexlify(pair[1].format(compressed=True)).decode("utf-8")
|
||||
api.gatekeeper.registered_users[tmp_compressed_pk] = {"available_slots": 2}
|
||||
|
||||
# Send the appointments
|
||||
for compressed_pk, signature in zip(compressed_pks, signatures):
|
||||
@@ -271,7 +271,7 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment
|
||||
|
||||
def test_add_appointment_update_same_size(api, client, appointment):
|
||||
# Update an appointment by one of the same size and check that no additional slots are filled
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 1
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 1}
|
||||
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
# # Since we will replace the appointment, we won't added to appointments
|
||||
@@ -292,7 +292,7 @@ def test_add_appointment_update_same_size(api, client, appointment):
|
||||
|
||||
def test_add_appointment_update_bigger(api, client, appointment):
|
||||
# Update an appointment by one bigger, and check additional slots are filled
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 2
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 2}
|
||||
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
@@ -320,7 +320,7 @@ def test_add_appointment_update_bigger(api, client, appointment):
|
||||
|
||||
def test_add_appointment_update_smaller(api, client, appointment):
|
||||
# Update an appointment by one bigger, and check slots are freed
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 2
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 2}
|
||||
|
||||
# This should take 2 slots
|
||||
appointment.encrypted_blob.data = TWO_SLOTS_BLOTS
|
||||
@@ -341,7 +341,7 @@ def test_add_appointment_update_smaller(api, client, appointment):
|
||||
|
||||
def test_add_too_many_appointment(api, client):
|
||||
# Give slots to the user
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = 200
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 200}
|
||||
|
||||
free_appointment_slots = MAX_APPOINTMENTS - len(api.watcher.appointments)
|
||||
|
||||
|
||||
@@ -1,31 +1,33 @@
|
||||
import pytest
|
||||
|
||||
from teos.gatekeeper import Gatekeeper, IdentificationFailure, NotEnoughSlots
|
||||
from teos.gatekeeper import IdentificationFailure, NotEnoughSlots
|
||||
|
||||
from common.cryptographer import Cryptographer
|
||||
|
||||
from test.teos.unit.conftest import get_random_value_hex, generate_keypair
|
||||
|
||||
DEFAULT_SLOTS = 42
|
||||
gatekeeper = Gatekeeper(DEFAULT_SLOTS)
|
||||
from test.teos.unit.conftest import get_random_value_hex, generate_keypair, get_config
|
||||
|
||||
|
||||
def test_init():
|
||||
assert isinstance(gatekeeper.default_slots, int) and gatekeeper.default_slots == DEFAULT_SLOTS
|
||||
config = get_config()
|
||||
|
||||
|
||||
def test_init(gatekeeper):
|
||||
assert isinstance(gatekeeper.default_slots, int) and gatekeeper.default_slots == config.get("DEFAULT_SLOTS")
|
||||
assert isinstance(gatekeeper.registered_users, dict) and len(gatekeeper.registered_users) == 0
|
||||
|
||||
|
||||
def test_add_update_user():
|
||||
def test_add_update_user(gatekeeper):
|
||||
# add_update_user adds DEFAULT_SLOTS to a given user as long as the identifier is {02, 03}| 32-byte hex str
|
||||
user_pk = "02" + get_random_value_hex(32)
|
||||
|
||||
for _ in range(10):
|
||||
current_slots = gatekeeper.registered_users.get(user_pk)
|
||||
current_slots = current_slots if current_slots is not None else 0
|
||||
current_slots = current_slots.get("available_slots") if current_slots is not None else 0
|
||||
|
||||
gatekeeper.add_update_user(user_pk)
|
||||
|
||||
assert gatekeeper.registered_users.get(user_pk) == current_slots + DEFAULT_SLOTS
|
||||
assert gatekeeper.registered_users.get(user_pk).get("available_slots") == current_slots + config.get(
|
||||
"DEFAULT_SLOTS"
|
||||
)
|
||||
|
||||
# The same can be checked for multiple users
|
||||
for _ in range(10):
|
||||
@@ -33,10 +35,10 @@ def test_add_update_user():
|
||||
user_pk = "03" + get_random_value_hex(32)
|
||||
|
||||
gatekeeper.add_update_user(user_pk)
|
||||
assert gatekeeper.registered_users.get(user_pk) == DEFAULT_SLOTS
|
||||
assert gatekeeper.registered_users.get(user_pk).get("available_slots") == config.get("DEFAULT_SLOTS")
|
||||
|
||||
|
||||
def test_add_update_user_wrong_pk():
|
||||
def test_add_update_user_wrong_pk(gatekeeper):
|
||||
# Passing a wrong pk defaults to the errors in check_user_pk. We can try with one.
|
||||
wrong_pk = get_random_value_hex(32)
|
||||
|
||||
@@ -44,7 +46,7 @@ def test_add_update_user_wrong_pk():
|
||||
gatekeeper.add_update_user(wrong_pk)
|
||||
|
||||
|
||||
def test_add_update_user_wrong_pk_prefix():
|
||||
def test_add_update_user_wrong_pk_prefix(gatekeeper):
|
||||
# Prefixes must be 02 or 03, anything else should fail
|
||||
wrong_pk = "04" + get_random_value_hex(32)
|
||||
|
||||
@@ -52,7 +54,7 @@ def test_add_update_user_wrong_pk_prefix():
|
||||
gatekeeper.add_update_user(wrong_pk)
|
||||
|
||||
|
||||
def test_identify_user():
|
||||
def test_identify_user(gatekeeper):
|
||||
# Identify user should return a user_pk for registered users. It raises
|
||||
# IdentificationFailure for invalid parameters or non-registered users.
|
||||
|
||||
@@ -67,7 +69,7 @@ def test_identify_user():
|
||||
assert gatekeeper.identify_user(message.encode(), signature) == compressed_pk
|
||||
|
||||
|
||||
def test_identify_user_non_registered():
|
||||
def test_identify_user_non_registered(gatekeeper):
|
||||
# Non-registered user won't be identified
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
@@ -78,7 +80,7 @@ def test_identify_user_non_registered():
|
||||
gatekeeper.identify_user(message.encode(), signature)
|
||||
|
||||
|
||||
def test_identify_user_invalid_signature():
|
||||
def test_identify_user_invalid_signature(gatekeeper):
|
||||
# If the signature does not match the message given a public key, the user won't be identified
|
||||
message = "Hey, it's me"
|
||||
signature = get_random_value_hex(72)
|
||||
@@ -87,7 +89,7 @@ def test_identify_user_invalid_signature():
|
||||
gatekeeper.identify_user(message.encode(), signature)
|
||||
|
||||
|
||||
def test_identify_user_wrong():
|
||||
def test_identify_user_wrong(gatekeeper):
|
||||
# Wrong parameters shouldn't verify either
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
@@ -107,13 +109,13 @@ def test_identify_user_wrong():
|
||||
gatekeeper.identify_user(message, signature.encode())
|
||||
|
||||
|
||||
def test_fill_slots():
|
||||
def test_fill_slots(gatekeeper):
|
||||
# Free slots will decrease the slot count of a user as long as he has enough slots, otherwise raise NotEnoughSlots
|
||||
user_pk = "02" + get_random_value_hex(32)
|
||||
gatekeeper.add_update_user(user_pk)
|
||||
|
||||
gatekeeper.fill_slots(user_pk, DEFAULT_SLOTS - 1)
|
||||
assert gatekeeper.registered_users.get(user_pk) == 1
|
||||
gatekeeper.fill_slots(user_pk, config.get("DEFAULT_SLOTS") - 1)
|
||||
assert gatekeeper.registered_users.get(user_pk).get("available_slots") == 1
|
||||
|
||||
with pytest.raises(NotEnoughSlots):
|
||||
gatekeeper.fill_slots(user_pk, 2)
|
||||
@@ -123,13 +125,13 @@ def test_fill_slots():
|
||||
gatekeeper.fill_slots(get_random_value_hex(33), 2)
|
||||
|
||||
|
||||
def test_free_slots():
|
||||
def test_free_slots(gatekeeper):
|
||||
# Free slots simply adds slots to the user as long as it exists.
|
||||
user_pk = "03" + get_random_value_hex(32)
|
||||
gatekeeper.add_update_user(user_pk)
|
||||
gatekeeper.free_slots(user_pk, 42)
|
||||
|
||||
assert gatekeeper.registered_users.get(user_pk) == DEFAULT_SLOTS + 42
|
||||
assert gatekeeper.registered_users.get(user_pk).get("available_slots") == config.get("DEFAULT_SLOTS") + 42
|
||||
|
||||
# Just making sure it does not crash for non-registered user
|
||||
assert gatekeeper.free_slots(get_random_value_hex(33), 10) is None
|
||||
|
||||
Reference in New Issue
Block a user