mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
tests - Adds LocatorCache unittests and updates existing ones to match
This commit is contained in:
@@ -4,13 +4,21 @@ from binascii import hexlify
|
||||
|
||||
from teos.api import API
|
||||
import common.errors as errors
|
||||
from teos.watcher import Watcher
|
||||
from teos.inspector import Inspector
|
||||
from teos.gatekeeper import UserInfo
|
||||
from teos.appointments_dbm import AppointmentsDBM
|
||||
from teos.responder import Responder, TransactionTracker
|
||||
from teos.extended_appointment import ExtendedAppointment
|
||||
from teos.watcher import Watcher, AppointmentAlreadyTriggered
|
||||
|
||||
from test.teos.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_keypair, get_config
|
||||
from test.teos.unit.conftest import (
|
||||
get_random_value_hex,
|
||||
generate_dummy_appointment,
|
||||
generate_keypair,
|
||||
get_config,
|
||||
create_dummy_transaction,
|
||||
compute_locator,
|
||||
)
|
||||
|
||||
from common.cryptographer import Cryptographer, hash_160
|
||||
from common.constants import (
|
||||
@@ -60,7 +68,15 @@ def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind):
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
responder = Responder(db_manager, gatekeeper, carrier, block_processor)
|
||||
watcher = Watcher(db_manager, gatekeeper, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS)
|
||||
watcher = Watcher(
|
||||
db_manager,
|
||||
gatekeeper,
|
||||
block_processor,
|
||||
responder,
|
||||
sk.to_der(),
|
||||
MAX_APPOINTMENTS,
|
||||
config.get("BLOCK_CACHE_SIZE"),
|
||||
)
|
||||
inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY"))
|
||||
api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher)
|
||||
|
||||
@@ -323,6 +339,74 @@ def test_add_appointment_update_smaller(api, client, appointment):
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1
|
||||
|
||||
|
||||
def test_add_appointment_in_cache(api, client):
|
||||
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
|
||||
|
||||
# Add the data to the cache
|
||||
dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
|
||||
api.watcher.locator_cache.cache[appointment.locator] = dispute_txid
|
||||
|
||||
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
|
||||
|
||||
# Trying to add it again should fail, since it is already in the Responder
|
||||
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
|
||||
assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED
|
||||
|
||||
# The appointment should simply accepted if the data is not in the cache, since it cannot be triggered again
|
||||
del api.watcher.locator_cache.cache[appointment.locator]
|
||||
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
|
||||
|
||||
|
||||
def test_add_appointment_in_cache_cannot_decrypt(api, client):
|
||||
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
|
||||
appointment, dispute_tx = generate_dummy_appointment()
|
||||
appointment.encrypted_blob = appointment.encrypted_blob[::-1]
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
|
||||
|
||||
# Add the data to the cache
|
||||
dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
|
||||
api.watcher.locator_cache.cache[dispute_txid] = appointment.locator
|
||||
|
||||
# The appointment should be accepted
|
||||
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
|
||||
|
||||
|
||||
def test_add_appointment_in_cache_invalid_transaction(api, client):
|
||||
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
|
||||
|
||||
# We need to create the appointment manually
|
||||
dispute_tx = create_dummy_transaction()
|
||||
dispute_txid = dispute_tx.tx_id.hex()
|
||||
penalty_tx = create_dummy_transaction(dispute_txid)
|
||||
|
||||
locator = compute_locator(dispute_txid)
|
||||
dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20}
|
||||
encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id"))
|
||||
|
||||
appointment_data = {
|
||||
"locator": locator,
|
||||
"to_self_delay": dummy_appointment_data.get("to_self_delay"),
|
||||
"encrypted_blob": encrypted_blob,
|
||||
"user_id": get_random_value_hex(16),
|
||||
}
|
||||
|
||||
appointment = ExtendedAppointment.from_dict(appointment_data)
|
||||
api.watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex()
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
|
||||
|
||||
# Add the data to the cache
|
||||
api.watcher.locator_cache.cache[dispute_txid] = appointment.locator
|
||||
|
||||
# The appointment should be accepted
|
||||
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
|
||||
|
||||
|
||||
def test_add_too_many_appointment(api, client):
|
||||
# Give slots to the user
|
||||
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=200, subscription_expiry=0)
|
||||
|
||||
Reference in New Issue
Block a user