Merge pull request #149 from sr-gi/138-locator-cache

Adds locator cache to the Watcher
This commit is contained in:
Sergi Delgado Segura
2020-06-15 11:29:17 +02:00
committed by GitHub
15 changed files with 926 additions and 98 deletions

View File

@@ -113,19 +113,20 @@ def add_appointment(appointment_data, user_sk, teos_id, teos_url):
add_appointment_endpoint = "{}/add_appointment".format(teos_url) add_appointment_endpoint = "{}/add_appointment".format(teos_url)
response = process_post_response(post_request(data, add_appointment_endpoint)) response = process_post_response(post_request(data, add_appointment_endpoint))
signature = response.get("signature") tower_signature = response.get("signature")
# Check that the server signed the appointment as it should. # Check that the server signed the appointment as it should.
if not signature: if not tower_signature:
raise TowerResponseError("The response does not contain the signature of the appointment") raise TowerResponseError("The response does not contain the signature of the appointment")
rpk = Cryptographer.recover_pk(appointment.serialize(), signature) rpk = Cryptographer.recover_pk(appointment.serialize(), tower_signature)
if teos_id != Cryptographer.get_compressed_pk(rpk): if teos_id != Cryptographer.get_compressed_pk(rpk):
raise TowerResponseError("The returned appointment's signature is invalid") raise TowerResponseError("The returned appointment's signature is invalid")
logger.info("Appointment accepted and signed by the Eye of Satoshi") logger.info("Appointment accepted and signed by the Eye of Satoshi")
logger.info("Remaining slots: {}".format(response.get("available_slots"))) logger.info("Remaining slots: {}".format(response.get("available_slots")))
logger.info("Start block: {}".format(response.get("start_block")))
return appointment, signature return appointment, tower_signature
def get_appointment(locator, user_sk, teos_id, teos_url): def get_appointment(locator, user_sk, teos_id, teos_url):

View File

@@ -7,6 +7,7 @@ APPOINTMENT_FIELD_TOO_SMALL = -5
APPOINTMENT_FIELD_TOO_BIG = -6 APPOINTMENT_FIELD_TOO_BIG = -6
APPOINTMENT_WRONG_FIELD = -7 APPOINTMENT_WRONG_FIELD = -7
APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8 APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8
APPOINTMENT_ALREADY_TRIGGERED = -9
# Registration errors [-33, -64] # Registration errors [-33, -64]
REGISTRATION_MISSING_FIELD = -33 REGISTRATION_MISSING_FIELD = -33

View File

@@ -4,4 +4,5 @@ cryptography==2.8
coincurve coincurve
pyzbase32 pyzbase32
requests requests
plyvel plyvel
readerwriterlock

View File

@@ -21,6 +21,7 @@ DEFAULT_CONF = {
"DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int}, "DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int},
"EXPIRY_DELTA": {"value": 6, "type": int}, "EXPIRY_DELTA": {"value": 6, "type": int},
"MIN_TO_SELF_DELAY": {"value": 20, "type": int}, "MIN_TO_SELF_DELAY": {"value": 20, "type": int},
"LOCATOR_CACHE_SIZE": {"value": 6, "type": int},
"LOG_FILE": {"value": "teos.log", "type": str, "path": True}, "LOG_FILE": {"value": "teos.log", "type": str, "path": True},
"TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True}, "TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True},
"APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True}, "APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True},

View File

@@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify
from teos import LOG_PREFIX from teos import LOG_PREFIX
import common.errors as errors import common.errors as errors
from teos.inspector import InspectionFailed from teos.inspector import InspectionFailed
from teos.watcher import AppointmentLimitReached from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered
from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure
from common.logger import Logger from common.logger import Logger
@@ -192,6 +192,13 @@ class API:
rcode = HTTP_SERVICE_UNAVAILABLE rcode = HTTP_SERVICE_UNAVAILABLE
response = {"error": "appointment rejected"} response = {"error": "appointment rejected"}
except AppointmentAlreadyTriggered:
rcode = HTTP_BAD_REQUEST
response = {
"error": "appointment rejected. The provided appointment has already been triggered",
"error_code": errors.APPOINTMENT_ALREADY_TRIGGERED,
}
logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response) logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode return jsonify(response), rcode

View File

@@ -1,4 +1,5 @@
from common.logger import Logger from common.logger import Logger
from common.exceptions import BasicException
from teos import LOG_PREFIX from teos import LOG_PREFIX
from teos.tools import bitcoin_cli from teos.tools import bitcoin_cli
@@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException
logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX) logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX)
class InvalidTransactionFormat(BasicException):
"""Raised when a transaction is not properly formatted"""
class BlockProcessor: class BlockProcessor:
""" """
The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication
@@ -89,17 +94,19 @@ class BlockProcessor:
raw_tx (:obj:`str`): the hex representation of the transaction. raw_tx (:obj:`str`): the hex representation of the transaction.
Returns: Returns:
:obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted. :obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted.
Returns ``None`` otherwise. Raises:
:obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format.
""" """
try: try:
tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx) tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx)
except JSONRPCException as e: except JSONRPCException as e:
tx = None msg = "Cannot build transaction from decoded data"
logger.error("Cannot build transaction from decoded data", error=e.error) logger.error(msg, error=e.error)
raise InvalidTransactionFormat(msg)
return tx return tx

View File

@@ -84,7 +84,13 @@ def main(command_line_conf):
db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH")) db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH"))
responder = Responder(db_manager, gatekeeper, carrier, block_processor) responder = Responder(db_manager, gatekeeper, carrier, block_processor)
watcher = Watcher( watcher = Watcher(
db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS") db_manager,
gatekeeper,
block_processor,
responder,
secret_key_der,
config.get("MAX_APPOINTMENTS"),
config.get("LOCATOR_CACHE_SIZE"),
) )
# Create the chain monitor and start monitoring the chain # Create the chain monitor and start monitoring the chain

View File

@@ -1,5 +1,7 @@
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from collections import OrderedDict
from readerwriterlock import rwlock
from common.logger import Logger from common.logger import Logger
from common.tools import compute_locator from common.tools import compute_locator
@@ -11,6 +13,7 @@ from common.exceptions import InvalidParameter, SignatureError
from teos import LOG_PREFIX from teos import LOG_PREFIX
from teos.cleaner import Cleaner from teos.cleaner import Cleaner
from teos.extended_appointment import ExtendedAppointment from teos.extended_appointment import ExtendedAppointment
from teos.block_processor import InvalidTransactionFormat
logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX) logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX)
@@ -19,6 +22,140 @@ class AppointmentLimitReached(BasicException):
"""Raised when the tower maximum appointment count has been reached""" """Raised when the tower maximum appointment count has been reached"""
class AppointmentAlreadyTriggered(BasicException):
"""Raised when an appointment is sent to the Watcher but that same data has already been sent to the Responder"""
class LocatorCache:
"""
The LocatorCache keeps the data about the last ``cache_size`` blocks around so appointments can be checked against
it. The data is indexed by locator and it's mainly built during the normal ``Watcher`` operation so no extra steps
are normally needed.
Args:
blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache.
Attributes:
cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pairs that received appointments are checked
against.
blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators).
Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to
rebuild the cache in case of reorgs.
cache_size (:obj:`int`): the size of the cache in blocks.
"""
def __init__(self, blocks_in_cache):
self.cache = dict()
self.blocks = OrderedDict()
self.cache_size = blocks_in_cache
self.rw_lock = rwlock.RWLockWrite()
def init(self, last_known_block, block_processor):
"""
Sets the initial state of the locator cache.
Args:
last_known_block (:obj:`str`): the last known block by the ``Watcher``.
block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance.
"""
# This is needed as a separate method from __init__ since it has to be initialized right before start watching.
# Not doing so implies store temporary variables in the Watcher and initialising the cache as None.
target_block_hash = last_known_block
for _ in range(self.cache_size):
# In some setups, like regtest, it could be the case that there are no enough previous blocks.
# In those cases we pull as many as we can (up to cache_size).
if not target_block_hash:
break
target_block = block_processor.get_block(target_block_hash)
if not target_block:
break
locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")}
self.cache.update(locator_txid_map)
self.blocks[target_block_hash] = list(locator_txid_map.keys())
target_block_hash = target_block.get("previousblockhash")
self.blocks = OrderedDict(reversed((list(self.blocks.items()))))
def get_txid(self, locator):
"""
Gets a txid from the locator cache.
Args:
locator (:obj:`str`): the locator to lookup in the cache.
Returns:
:obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise.
"""
with self.rw_lock.gen_rlock():
locator = self.cache.get(locator)
return locator
def update(self, block_hash, locator_txid_map):
"""
Updates the cache with data from a new block. Removes the oldest block if the cache is full after the addition.
Args:
block_hash (:obj:`str`): the hash of the new block.
locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction
ids.
"""
with self.rw_lock.gen_wlock():
self.cache.update(locator_txid_map)
self.blocks[block_hash] = list(locator_txid_map.keys())
logger.debug("Block added to cache", block_hash=block_hash)
if self.is_full():
self.remove_oldest_block()
def is_full(self):
""" Returns whether the cache is full or not """
with self.rw_lock.gen_rlock():
full = len(self.blocks) > self.cache_size
return full
def remove_oldest_block(self):
""" Removes the oldest block from the cache """
with self.rw_lock.gen_wlock():
block_hash, locators = self.blocks.popitem(last=False)
for locator in locators:
del self.cache[locator]
logger.debug("Block removed from cache", block_hash=block_hash)
def fix(self, last_known_block, block_processor):
"""
Fixes the cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it.
Args:
last_known_block (:obj:`str`): the last known block hash after the reorg.
block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance.
"""
tmp_cache = LocatorCache(self.cache_size)
# We assume there are no reorgs back to genesis. If so, this would raise some log warnings. And the cache will
# be filled with less than cache_size blocks.
target_block_hash = last_known_block
for _ in range(tmp_cache.cache_size):
target_block = block_processor.get_block(target_block_hash)
if target_block:
# Compute the locator:txid pair for every transaction in the block and update both the cache and
# the block mapping.
locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")}
tmp_cache.cache.update(locator_txid_map)
tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys())
target_block_hash = target_block.get("previousblockhash")
with self.rw_lock.gen_wlock():
self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items()))))
self.cache = tmp_cache.cache
class Watcher: class Watcher:
""" """
The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower. The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower.
@@ -41,6 +178,8 @@ class Watcher:
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance. responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance). sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be
covered.
Attributes: Attributes:
appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment
@@ -60,13 +199,14 @@ class Watcher:
signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
last_known_block (:obj:`str`): the last block known by the ``Watcher``. last_known_block (:obj:`str`): the last block known by the ``Watcher``.
locator_cache (:obj:`LocatorCache`): a cache of locators for the last ``blocks_in_cache`` blocks.
Raises: Raises:
:obj:`InvalidKey <common.exceptions.InvalidKey>`: if teos sk cannot be loaded. :obj:`InvalidKey <common.exceptions.InvalidKey>`: if teos sk cannot be loaded.
""" """
def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments): def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.block_queue = Queue() self.block_queue = Queue()
@@ -77,6 +217,7 @@ class Watcher:
self.max_appointments = max_appointments self.max_appointments = max_appointments
self.signing_key = Cryptographer.load_private_key_der(sk_der) self.signing_key = Cryptographer.load_private_key_der(sk_der)
self.last_known_block = db_manager.load_last_block_hash_watcher() self.last_known_block = db_manager.load_last_block_hash_watcher()
self.locator_cache = LocatorCache(blocks_in_cache)
def awake(self): def awake(self):
"""Starts a new thread to monitor the blockchain for channel breaches""" """Starts a new thread to monitor the blockchain for channel breaches"""
@@ -126,24 +267,55 @@ class Watcher:
# The user_id needs to be added to the ExtendedAppointment once the former has been authenticated # The user_id needs to be added to the ExtendedAppointment once the former has been authenticated
appointment.user_id = user_id appointment.user_id = user_id
# The uuids are generated as the RIPMED160(locator||user_pubkey). # The uuids are generated as the RIPEMD160(locator||user_pubkey).
# If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps).
uuid = hash_160("{}{}".format(appointment.locator, user_id)) uuid = hash_160("{}{}".format(appointment.locator, user_id))
# If this is a copy of an appointment we've already reacted to, the new appointment is rejected.
if uuid in self.responder.trackers:
message = "Appointment already in Responder"
logger.info(message)
raise AppointmentAlreadyTriggered(message)
# Add the appointment to the Gatekeeper # Add the appointment to the Gatekeeper
available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment) available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment)
self.appointments[uuid] = appointment.get_summary()
if appointment.locator in self.locator_uuid_map: # Appointments that were triggered in blocks held in the cache
# If the uuid is already in the map it means this is an update. dispute_txid = self.locator_cache.get_txid(appointment.locator)
if uuid not in self.locator_uuid_map[appointment.locator]: if dispute_txid:
self.locator_uuid_map[appointment.locator].append(uuid) try:
penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid)
receipt = self.responder.handle_breach(
uuid, appointment.locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, self.last_known_block
)
# At this point the appointment is accepted but data is only kept if it goes through the Responder.
# Otherwise it is dropped.
if receipt.delivered:
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
self.db_manager.create_triggered_appointment_flag(uuid)
except (EncryptionError, InvalidTransactionFormat):
# If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped.
# (same as with data that bounces in the Responder). This reduces the appointment slot count so it
# could be used to discourage user misbehaviour.
pass
# Regular appointments that have not been triggered (or, at least, not recently)
else: else:
# Otherwise two users have sent an appointment with the same locator, so we need to store both. self.appointments[uuid] = appointment.get_summary()
self.locator_uuid_map[appointment.locator] = [uuid]
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) if appointment.locator in self.locator_uuid_map:
self.db_manager.create_append_locator_map(appointment.locator, uuid) # If the uuid is already in the map it means this is an update.
if uuid not in self.locator_uuid_map[appointment.locator]:
self.locator_uuid_map[appointment.locator].append(uuid)
else:
# Otherwise two users have sent an appointment with the same locator, so we need to store both.
self.locator_uuid_map[appointment.locator] = [uuid]
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
try: try:
signature = Cryptographer.sign(appointment.serialize(), self.signing_key) signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
@@ -157,6 +329,7 @@ class Watcher:
return { return {
"locator": appointment.locator, "locator": appointment.locator,
"start_block": self.last_known_block,
"signature": signature, "signature": signature,
"available_slots": available_slots, "available_slots": available_slots,
"subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry, "subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry,
@@ -175,14 +348,24 @@ class Watcher:
self.last_known_block = self.block_processor.get_best_block_hash() self.last_known_block = self.block_processor.get_best_block_hash()
self.db_manager.store_last_block_hash_watcher(self.last_known_block) self.db_manager.store_last_block_hash_watcher(self.last_known_block)
# Initialise the locator cache with the last ``cache_size`` blocks.
self.locator_cache.init(self.last_known_block, self.block_processor)
while True: while True:
block_hash = self.block_queue.get() block_hash = self.block_queue.get()
block = self.block_processor.get_block(block_hash) block = self.block_processor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash")) logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if len(self.appointments) > 0 and block is not None: # If a reorg is detected, the cache is fixed to cover the last `cache_size` blocks of the new chain
txids = block.get("tx") if self.last_known_block != block.get("previousblockhash"):
self.locator_cache.fix(block_hash, self.block_processor)
txids = block.get("tx")
# Compute the locator for every transaction in the block and add them to the cache
locator_txid_map = {compute_locator(txid): txid for txid in txids}
self.locator_cache.update(block_hash, locator_txid_map)
if len(self.appointments) > 0 and locator_txid_map:
expired_appointments = self.gatekeeper.get_expired_appointments(block["height"]) expired_appointments = self.gatekeeper.get_expired_appointments(block["height"])
# Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered)
expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys()))
@@ -196,7 +379,7 @@ class Watcher:
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
) )
valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids)) valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locator_txid_map))
triggered_flags = [] triggered_flags = []
appointments_to_delete = [] appointments_to_delete = []
@@ -246,28 +429,27 @@ class Watcher:
if len(self.appointments) != 0: if len(self.appointments) != 0:
logger.info("No more pending appointments") logger.info("No more pending appointments")
# Register the last processed block for the watcher # Register the last processed block for the Watcher
self.db_manager.store_last_block_hash_watcher(block_hash) self.db_manager.store_last_block_hash_watcher(block_hash)
self.last_known_block = block.get("hash") self.last_known_block = block.get("hash")
self.block_queue.task_done() self.block_queue.task_done()
def get_breaches(self, txids): def get_breaches(self, locator_txid_map):
""" """
Gets a list of channel breaches given the list of transaction ids. Gets a dictionary of channel breaches given a map of locator:dispute_txid.
Args: Args:
txids (:obj:`list`): the list of transaction ids included in the last received block. locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of
transaction ids.
Returns: Returns:
:obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are :obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are
found. found.
""" """
potential_locators = {compute_locator(txid): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match # Check is any of the tx_ids in the received block is an actual match
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys()) intersection = set(self.locator_uuid_map.keys()).intersection(locator_txid_map.keys())
breaches = {locator: potential_locators[locator] for locator in intersection} breaches = {locator: locator_txid_map[locator] for locator in intersection}
if len(breaches) > 0: if len(breaches) > 0:
logger.info("List of breaches", breaches=breaches) logger.info("List of breaches", breaches=breaches)
@@ -277,21 +459,59 @@ class Watcher:
return breaches return breaches
def check_breach(self, uuid, appointment, dispute_txid):
"""
Checks if a breach is valid. Valid breaches should decrypt to a valid transaction.
Args:
uuid (:obj:`str`): the uuid of the appointment that was triggered by the breach.
appointment (:obj:`teos.extended_appointment.ExtendedAppointment`): the appointment data.
dispute_txid (:obj:`str`): the id of the transaction that triggered the breach.
Returns:
:obj:`tuple`: A tuple containing the penalty txid and the raw penalty tx.
Raises:
:obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the
key derived from the breach transaction id.
:obj:`InvalidTransactionFormat`: If the decrypted data does not have a valid transaction format.
"""
try:
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
except EncryptionError as e:
logger.info("Transaction cannot be decrypted", uuid=uuid)
raise e
except InvalidTransactionFormat as e:
logger.info("The breach contained an invalid transaction", uuid=uuid)
raise e
logger.info(
"Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
)
return penalty_tx.get("txid"), penalty_rawtx
def filter_breaches(self, breaches): def filter_breaches(self, breaches):
""" """
Filters the valid from the invalid channel breaches. Filters the valid from the invalid channel breaches.
The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen. The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen.
Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder <teos.responder.Responder>`. Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder <teos.responder.Responder>`.
Args: Args:
breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``). breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``).
Returns: Returns:
:obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid. :obj:`tuple`: A dictionary and a list. The former contains the valid breaches, while the latter contain the
The structure is as follows: invalid ones.
``{locator, dispute_txid, penalty_txid, penalty_rawtx, valid_breach}`` The valid breaches dictionary has the following structure:
``{locator, dispute_txid, penalty_txid, penalty_rawtx}``
""" """
valid_breaches = {} valid_breaches = {}
@@ -305,31 +525,26 @@ class Watcher:
appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid))
if appointment.encrypted_blob in decrypted_blobs: if appointment.encrypted_blob in decrypted_blobs:
penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob] penalty_txid, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]
else:
try:
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
except EncryptionError:
penalty_rawtx = None
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx)
if penalty_tx is not None:
valid_breaches[uuid] = { valid_breaches[uuid] = {
"locator": locator, "locator": appointment.locator,
"dispute_txid": dispute_txid, "dispute_txid": dispute_txid,
"penalty_txid": penalty_tx.get("txid"), "penalty_txid": penalty_txid,
"penalty_rawtx": penalty_rawtx, "penalty_rawtx": penalty_rawtx,
} }
logger.info(
"Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
)
else: else:
invalid_breaches.append(uuid) try:
penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid)
valid_breaches[uuid] = {
"locator": appointment.locator,
"dispute_txid": dispute_txid,
"penalty_txid": penalty_txid,
"penalty_rawtx": penalty_rawtx,
}
decrypted_blobs[appointment.encrypted_blob] = (penalty_txid, penalty_rawtx)
except (EncryptionError, InvalidTransactionFormat):
invalid_breaches.append(uuid)
return valid_breaches, invalid_breaches return valid_breaches, invalid_breaches

View File

@@ -11,6 +11,7 @@ from common.config_loader import ConfigLoader
getcontext().prec = 10 getcontext().prec = 10
utxos = []
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
@@ -37,11 +38,13 @@ def prng_seed():
def setup_node(bitcoin_cli): def setup_node(bitcoin_cli):
# This method will create a new address a mine bitcoin so the node can be used for testing # This method will create a new address a mine bitcoin so the node can be used for testing
new_addr = bitcoin_cli.getnewaddress() new_addr = bitcoin_cli.getnewaddress()
bitcoin_cli.generatetoaddress(106, new_addr) bitcoin_cli.generatetoaddress(200, new_addr)
def create_txs(bitcoin_cli, n=1): def create_txs(bitcoin_cli, n=1):
utxos = bitcoin_cli.listunspent() global utxos
if not utxos:
utxos = bitcoin_cli.listunspent()
if len(utxos) < n: if len(utxos) < n:
raise ValueError("There're no enough UTXOs.") raise ValueError("There're no enough UTXOs.")

View File

@@ -40,6 +40,9 @@ teosd_process = run_teosd()
teos_id, user_sk, user_id = teos_cli.load_keys(cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY")) teos_id, user_sk, user_id = teos_cli.load_keys(cli_config.get("TEOS_PUBLIC_KEY"), cli_config.get("CLI_PRIVATE_KEY"))
appointments_in_watcher = 0
appointments_in_responder = 0
def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr): def broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, addr):
# Broadcast the commitment transaction and mine a block # Broadcast the commitment transaction and mine a block
@@ -78,6 +81,8 @@ def test_commands_non_registered(bitcoin_cli):
def test_commands_registered(bitcoin_cli): def test_commands_registered(bitcoin_cli):
global appointments_in_watcher
# Test registering and trying again # Test registering and trying again
teos_cli.register(user_id, teos_base_endpoint) teos_cli.register(user_id, teos_base_endpoint)
@@ -93,9 +98,12 @@ def test_commands_registered(bitcoin_cli):
r = get_appointment_info(appointment_data.get("locator")) r = get_appointment_info(appointment_data.get("locator"))
assert r.get("locator") == appointment.locator assert r.get("locator") == appointment.locator
assert r.get("appointment") == appointment.to_dict() assert r.get("appointment") == appointment.to_dict()
appointments_in_watcher += 1
def test_appointment_life_cycle(bitcoin_cli): def test_appointment_life_cycle(bitcoin_cli):
global appointments_in_watcher, appointments_in_responder
# First of all we need to register # First of all we need to register
response = teos_cli.register(user_id, teos_base_endpoint) response = teos_cli.register(user_id, teos_base_endpoint)
available_slots = response.get("available_slots") available_slots = response.get("available_slots")
@@ -106,6 +114,7 @@ def test_appointment_life_cycle(bitcoin_cli):
appointment_data = build_appointment_data(commitment_tx_id, penalty_tx) appointment_data = build_appointment_data(commitment_tx_id, penalty_tx)
locator = compute_locator(commitment_tx_id) locator = compute_locator(commitment_tx_id)
appointment, signature = add_appointment(appointment_data) appointment, signature = add_appointment(appointment_data)
appointments_in_watcher += 1
# Get the information from the tower to check that it matches # Get the information from the tower to check that it matches
appointment_info = get_appointment_info(locator) appointment_info = get_appointment_info(locator)
@@ -117,7 +126,7 @@ def test_appointment_life_cycle(bitcoin_cli):
all_appointments = get_all_appointments() all_appointments = get_all_appointments()
watching = all_appointments.get("watcher_appointments") watching = all_appointments.get("watcher_appointments")
responding = all_appointments.get("responder_trackers") responding = all_appointments.get("responder_trackers")
assert len(watching) == 1 and len(responding) == 0 assert len(watching) == appointments_in_watcher and len(responding) == 0
# Trigger a breach and check again # Trigger a breach and check again
new_addr = bitcoin_cli.getnewaddress() new_addr = bitcoin_cli.getnewaddress()
@@ -125,11 +134,13 @@ def test_appointment_life_cycle(bitcoin_cli):
appointment_info = get_appointment_info(locator) appointment_info = get_appointment_info(locator)
assert appointment_info.get("status") == "dispute_responded" assert appointment_info.get("status") == "dispute_responded"
assert appointment_info.get("locator") == locator assert appointment_info.get("locator") == locator
appointments_in_watcher -= 1
appointments_in_responder += 1
all_appointments = get_all_appointments() all_appointments = get_all_appointments()
watching = all_appointments.get("watcher_appointments") watching = all_appointments.get("watcher_appointments")
responding = all_appointments.get("responder_trackers") responding = all_appointments.get("responder_trackers")
assert len(watching) == 0 and len(responding) == 1 assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder
# It can be also checked by ensuring that the penalty transaction made it to the network # It can be also checked by ensuring that the penalty transaction made it to the network
penalty_tx_id = bitcoin_cli.decoderawtransaction(penalty_tx).get("txid") penalty_tx_id = bitcoin_cli.decoderawtransaction(penalty_tx).get("txid")
@@ -144,6 +155,7 @@ def test_appointment_life_cycle(bitcoin_cli):
# Now let's mine some blocks so the appointment reaches its end. We need 100 + EXPIRY_DELTA -1 # Now let's mine some blocks so the appointment reaches its end. We need 100 + EXPIRY_DELTA -1
bitcoin_cli.generatetoaddress(100 + teos_config.get("EXPIRY_DELTA") - 1, new_addr) bitcoin_cli.generatetoaddress(100 + teos_config.get("EXPIRY_DELTA") - 1, new_addr)
appointments_in_responder -= 1
# The appointment is no longer in the tower # The appointment is no longer in the tower
with pytest.raises(TowerResponseError): with pytest.raises(TowerResponseError):
@@ -152,10 +164,14 @@ def test_appointment_life_cycle(bitcoin_cli):
# Check that the appointment is not in the Gatekeeper by checking the available slots (should have increase by 1) # Check that the appointment is not in the Gatekeeper by checking the available slots (should have increase by 1)
# We can do so by topping up the subscription (FIXME: find a better way to check this). # We can do so by topping up the subscription (FIXME: find a better way to check this).
response = teos_cli.register(user_id, teos_base_endpoint) response = teos_cli.register(user_id, teos_base_endpoint)
assert response.get("available_slots") == available_slots + teos_config.get("DEFAULT_SLOTS") + 1 assert (
response.get("available_slots")
== available_slots + teos_config.get("DEFAULT_SLOTS") + 1 - appointments_in_watcher - appointments_in_responder
)
def test_multiple_appointments_life_cycle(bitcoin_cli): def test_multiple_appointments_life_cycle(bitcoin_cli):
global appointments_in_watcher, appointments_in_responder
# Tests that get_all_appointments returns all the appointments the tower is storing at various stages in the # Tests that get_all_appointments returns all the appointments the tower is storing at various stages in the
# appointment lifecycle. # appointment lifecycle.
appointments = [] appointments = []
@@ -180,6 +196,7 @@ def test_multiple_appointments_life_cycle(bitcoin_cli):
# Send all of them to watchtower. # Send all of them to watchtower.
for appt in appointments: for appt in appointments:
add_appointment(appt.get("appointment_data")) add_appointment(appt.get("appointment_data"))
appointments_in_watcher += 1
# Two of these appointments are breached, and the watchtower responds to them. # Two of these appointments are breached, and the watchtower responds to them.
breached_appointments = [] breached_appointments = []
@@ -188,13 +205,15 @@ def test_multiple_appointments_life_cycle(bitcoin_cli):
broadcast_transaction_and_mine_block(bitcoin_cli, appointments[i]["commitment_tx"], new_addr) broadcast_transaction_and_mine_block(bitcoin_cli, appointments[i]["commitment_tx"], new_addr)
bitcoin_cli.generatetoaddress(1, new_addr) bitcoin_cli.generatetoaddress(1, new_addr)
breached_appointments.append(appointments[i]["locator"]) breached_appointments.append(appointments[i]["locator"])
appointments_in_watcher -= 1
appointments_in_responder += 1
sleep(1) sleep(1)
# Test that they all show up in get_all_appointments at the correct stages. # Test that they all show up in get_all_appointments at the correct stages.
all_appointments = get_all_appointments() all_appointments = get_all_appointments()
watching = all_appointments.get("watcher_appointments") watching = all_appointments.get("watcher_appointments")
responding = all_appointments.get("responder_trackers") responding = all_appointments.get("responder_trackers")
assert len(watching) == 3 and len(responding) == 2 assert len(watching) == appointments_in_watcher and len(responding) == appointments_in_responder
responder_locators = [appointment["locator"] for uuid, appointment in responding.items()] responder_locators = [appointment["locator"] for uuid, appointment in responding.items()]
assert set(responder_locators) == set(breached_appointments) assert set(responder_locators) == set(breached_appointments)
@@ -389,6 +408,73 @@ def test_two_appointment_same_locator_different_penalty_different_users(bitcoin_
assert appointment_info.get("appointment").get("penalty_tx") == appointment1_data.get("penalty_tx") assert appointment_info.get("appointment").get("penalty_tx") == appointment1_data.get("penalty_tx")
def test_add_appointment_trigger_on_cache(bitcoin_cli):
# This tests sending an appointment whose trigger is in the cache
commitment_tx, penalty_tx = create_txs(bitcoin_cli)
commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")
appointment_data = build_appointment_data(commitment_tx_id, penalty_tx)
locator = compute_locator(commitment_tx_id)
# Let's send the commitment to the network and mine a block
broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress())
# Send the data to the tower and request it back. It should have gone straightaway to the Responder
add_appointment(appointment_data)
assert get_appointment_info(locator).get("status") == "dispute_responded"
def test_add_appointment_invalid_trigger_on_cache(bitcoin_cli):
# This tests sending an invalid appointment which trigger is in the cache
commitment_tx, penalty_tx = create_txs(bitcoin_cli)
commitment_tx_id = bitcoin_cli.decoderawtransaction(commitment_tx).get("txid")
# We can just flip the justice tx so it is invalid
appointment_data = build_appointment_data(commitment_tx_id, penalty_tx[::-1])
locator = compute_locator(commitment_tx_id)
# Let's send the commitment to the network and mine a block
broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress())
sleep(1)
# Send the data to the tower and request it back. It should get accepted but the data will be dropped.
add_appointment(appointment_data)
with pytest.raises(TowerResponseError):
get_appointment_info(locator)
def test_add_appointment_trigger_on_cache_cannot_decrypt(bitcoin_cli):
commitment_tx, penalty_tx = create_txs(bitcoin_cli)
# Let's send the commitment to the network and mine a block
broadcast_transaction_and_mine_block(bitcoin_cli, commitment_tx, bitcoin_cli.getnewaddress())
sleep(1)
# The appointment data is built using a random 32-byte value.
appointment_data = build_appointment_data(get_random_value_hex(32), penalty_tx)
# We cannot use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually.
appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid"))
appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32))
appointment = Appointment.from_dict(appointment_data)
signature = Cryptographer.sign(appointment.serialize(), user_sk)
data = {"appointment": appointment.to_dict(), "signature": signature}
# Send appointment to the server.
response = teos_cli.post_request(data, teos_add_appointment_endpoint)
response_json = teos_cli.process_post_response(response)
# Check that the server has accepted the appointment
signature = response_json.get("signature")
rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
assert teos_id == Cryptographer.get_compressed_pk(rpk)
assert response_json.get("locator") == appointment.locator
# The appointment should should have been inmediately dropped
with pytest.raises(TowerResponseError):
get_appointment_info(appointment_data["locator"])
def test_appointment_shutdown_teos_trigger_back_online(bitcoin_cli): def test_appointment_shutdown_teos_trigger_back_online(bitcoin_cli):
global teosd_process global teosd_process

View File

@@ -4,13 +4,21 @@ from binascii import hexlify
from teos.api import API from teos.api import API
import common.errors as errors import common.errors as errors
from teos.watcher import Watcher
from teos.inspector import Inspector from teos.inspector import Inspector
from teos.gatekeeper import UserInfo from teos.gatekeeper import UserInfo
from teos.appointments_dbm import AppointmentsDBM from teos.appointments_dbm import AppointmentsDBM
from teos.responder import Responder, TransactionTracker from teos.responder import Responder, TransactionTracker
from teos.extended_appointment import ExtendedAppointment
from teos.watcher import Watcher, AppointmentAlreadyTriggered
from test.teos.unit.conftest import get_random_value_hex, generate_dummy_appointment, generate_keypair, get_config from test.teos.unit.conftest import (
get_random_value_hex,
generate_dummy_appointment,
generate_keypair,
get_config,
create_dummy_transaction,
compute_locator,
)
from common.cryptographer import Cryptographer, hash_160 from common.cryptographer import Cryptographer, hash_160
from common.constants import ( from common.constants import (
@@ -60,7 +68,15 @@ def api(db_manager, carrier, block_processor, gatekeeper, run_bitcoind):
sk, pk = generate_keypair() sk, pk = generate_keypair()
responder = Responder(db_manager, gatekeeper, carrier, block_processor) responder = Responder(db_manager, gatekeeper, carrier, block_processor)
watcher = Watcher(db_manager, gatekeeper, block_processor, responder, sk.to_der(), MAX_APPOINTMENTS) watcher = Watcher(
db_manager,
gatekeeper,
block_processor,
responder,
sk.to_der(),
MAX_APPOINTMENTS,
config.get("LOCATOR_CACHE_SIZE"),
)
inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")) inspector = Inspector(block_processor, config.get("MIN_TO_SELF_DELAY"))
api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher) api = API(config.get("API_HOST"), config.get("API_PORT"), inspector, watcher)
@@ -157,6 +173,7 @@ def test_add_appointment(api, client, appointment):
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK assert r.status_code == HTTP_OK
assert r.json.get("available_slots") == 0 assert r.json.get("available_slots") == 0
assert r.json.get("start_block") == api.watcher.last_known_block
def test_add_appointment_no_json(api, client, appointment): def test_add_appointment_no_json(api, client, appointment):
@@ -242,6 +259,7 @@ def test_add_appointment_multiple_times_same_user(api, client, appointment, n=MU
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK assert r.status_code == HTTP_OK
assert r.json.get("available_slots") == n - 1 assert r.json.get("available_slots") == n - 1
assert r.json.get("start_block") == api.watcher.last_known_block
# Since all updates came from the same user, only the last one is stored # Since all updates came from the same user, only the last one is stored
assert len(api.watcher.locator_uuid_map[appointment.locator]) == 1 assert len(api.watcher.locator_uuid_map[appointment.locator]) == 1
@@ -264,6 +282,7 @@ def test_add_appointment_multiple_times_different_users(api, client, appointment
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": signature}, compressed_pk) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": signature}, compressed_pk)
assert r.status_code == HTTP_OK assert r.status_code == HTTP_OK
assert r.json.get("available_slots") == 1 assert r.json.get("available_slots") == 1
assert r.json.get("start_block") == api.watcher.last_known_block
# Check that all the appointments have been added and that there are no duplicates # Check that all the appointments have been added and that there are no duplicates
assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n assert len(set(api.watcher.locator_uuid_map[appointment.locator])) == n
@@ -275,14 +294,22 @@ def test_add_appointment_update_same_size(api, client, appointment):
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
# The user has no additional slots, but it should be able to update # The user has no additional slots, but it should be able to update
# Let's just reverse the encrypted blob for example # Let's just reverse the encrypted blob for example
appointment.encrypted_blob = appointment.encrypted_blob[::-1] appointment.encrypted_blob = appointment.encrypted_blob[::-1]
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
def test_add_appointment_update_bigger(api, client, appointment): def test_add_appointment_update_bigger(api, client, appointment):
@@ -297,7 +324,11 @@ def test_add_appointment_update_bigger(api, client, appointment):
appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment.encrypted_blob = TWO_SLOTS_BLOTS
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
# Check that it'll fail if no enough slots are available # Check that it'll fail if no enough slots are available
# Double the size from before # Double the size from before
@@ -314,13 +345,101 @@ def test_add_appointment_update_smaller(api, client, appointment):
appointment.encrypted_blob = TWO_SLOTS_BLOTS appointment.encrypted_blob = TWO_SLOTS_BLOTS
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0 assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
# Let's update with one just small enough # Let's update with one just small enough
appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2) appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2)
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk) appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1 assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 1
and r.json.get("start_block") == api.watcher.last_known_block
)
def test_add_appointment_in_cache(api, client):
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
appointment, dispute_tx = generate_dummy_appointment()
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
# Add the data to the cache
dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
api.watcher.locator_cache.cache[appointment.locator] = dispute_txid
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
# Trying to add it again should fail, since it is already in the Responder
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED
# The appointment would be rejected even if the data is not in the cache provided it has been triggered
del api.watcher.locator_cache.cache[appointment.locator]
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert r.status_code == HTTP_BAD_REQUEST and r.json.get("error_code") == errors.APPOINTMENT_ALREADY_TRIGGERED
def test_add_appointment_in_cache_cannot_decrypt(api, client):
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
appointment, dispute_tx = generate_dummy_appointment()
appointment.encrypted_blob = appointment.encrypted_blob[::-1]
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
# Add the data to the cache
dispute_txid = api.watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
api.watcher.locator_cache.cache[dispute_txid] = appointment.locator
# The appointment should be accepted
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
def test_add_appointment_in_cache_invalid_transaction(api, client):
api.watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=0)
# We need to create the appointment manually
dispute_tx = create_dummy_transaction()
dispute_txid = dispute_tx.tx_id.hex()
penalty_tx = create_dummy_transaction(dispute_txid)
locator = compute_locator(dispute_txid)
dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20}
encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id"))
appointment_data = {
"locator": locator,
"to_self_delay": dummy_appointment_data.get("to_self_delay"),
"encrypted_blob": encrypted_blob,
"user_id": get_random_value_hex(16),
}
appointment = ExtendedAppointment.from_dict(appointment_data)
api.watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex()
appointment_signature = Cryptographer.sign(appointment.serialize(), user_sk)
# Add the data to the cache
api.watcher.locator_cache.cache[dispute_txid] = appointment.locator
# The appointment should be accepted
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
assert (
r.status_code == HTTP_OK
and r.json.get("available_slots") == 0
and r.json.get("start_block") == api.watcher.last_known_block
)
def test_add_too_many_appointment(api, client): def test_add_too_many_appointment(api, client):
@@ -337,7 +456,7 @@ def test_add_too_many_appointment(api, client):
r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id) r = add_appointment(client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, user_id)
if i < free_appointment_slots: if i < free_appointment_slots:
assert r.status_code == HTTP_OK assert r.status_code == HTTP_OK and r.json.get("start_block") == api.watcher.last_known_block
else: else:
assert r.status_code == HTTP_SERVICE_UNAVAILABLE assert r.status_code == HTTP_SERVICE_UNAVAILABLE

View File

@@ -1,3 +1,5 @@
import pytest
from teos.watcher import InvalidTransactionFormat
from test.teos.unit.conftest import get_random_value_hex, generate_block, generate_blocks, fork from test.teos.unit.conftest import get_random_value_hex, generate_block, generate_blocks, fork
@@ -46,7 +48,8 @@ def test_decode_raw_transaction(block_processor):
def test_decode_raw_transaction_invalid(block_processor): def test_decode_raw_transaction_invalid(block_processor):
# Same but with an invalid one # Same but with an invalid one
assert block_processor.decode_raw_transaction(hex_tx[::-1]) is None with pytest.raises(InvalidTransactionFormat):
block_processor.decode_raw_transaction(hex_tx[::-1])
def test_get_missed_blocks(block_processor): def test_get_missed_blocks(block_processor):

View File

@@ -102,6 +102,7 @@ def test_update_states_empty_list(db_manager, gatekeeper, carrier, block_process
responder=Responder(db_manager, gatekeeper, carrier, block_processor), responder=Responder(db_manager, gatekeeper, carrier, block_processor),
sk_der=generate_keypair()[0].to_der(), sk_der=generate_keypair()[0].to_der(),
max_appointments=config.get("MAX_APPOINTMENTS"), max_appointments=config.get("MAX_APPOINTMENTS"),
blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"),
) )
missed_blocks_watcher = [] missed_blocks_watcher = []
@@ -123,6 +124,7 @@ def test_update_states_responder_misses_more(run_bitcoind, db_manager, gatekeepe
responder=Responder(db_manager, gatekeeper, carrier, block_processor), responder=Responder(db_manager, gatekeeper, carrier, block_processor),
sk_der=generate_keypair()[0].to_der(), sk_der=generate_keypair()[0].to_der(),
max_appointments=config.get("MAX_APPOINTMENTS"), max_appointments=config.get("MAX_APPOINTMENTS"),
blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"),
) )
blocks = [] blocks = []
@@ -148,6 +150,7 @@ def test_update_states_watcher_misses_more(db_manager, gatekeeper, carrier, bloc
responder=Responder(db_manager, gatekeeper, carrier, block_processor), responder=Responder(db_manager, gatekeeper, carrier, block_processor),
sk_der=generate_keypair()[0].to_der(), sk_der=generate_keypair()[0].to_der(),
max_appointments=config.get("MAX_APPOINTMENTS"), max_appointments=config.get("MAX_APPOINTMENTS"),
blocks_in_cache=config.get("LOCATOR_CACHE_SIZE"),
) )
blocks = [] blocks = []

View File

@@ -1,6 +1,7 @@
import pytest import pytest
from uuid import uuid4 from uuid import uuid4
from shutil import rmtree from shutil import rmtree
from copy import deepcopy
from threading import Thread from threading import Thread
from coincurve import PrivateKey from coincurve import PrivateKey
@@ -9,22 +10,32 @@ from teos.tools import bitcoin_cli
from teos.responder import Responder from teos.responder import Responder
from teos.gatekeeper import UserInfo from teos.gatekeeper import UserInfo
from teos.chain_monitor import ChainMonitor from teos.chain_monitor import ChainMonitor
from teos.appointments_dbm import AppointmentsDBM
from teos.block_processor import BlockProcessor from teos.block_processor import BlockProcessor
from teos.watcher import Watcher, AppointmentLimitReached from teos.appointments_dbm import AppointmentsDBM
from teos.extended_appointment import ExtendedAppointment
from teos.gatekeeper import Gatekeeper, AuthenticationFailure, NotEnoughSlots from teos.gatekeeper import Gatekeeper, AuthenticationFailure, NotEnoughSlots
from teos.watcher import (
Watcher,
AppointmentLimitReached,
LocatorCache,
EncryptionError,
InvalidTransactionFormat,
AppointmentAlreadyTriggered,
)
from common.tools import compute_locator from common.tools import compute_locator
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from test.teos.unit.conftest import ( from test.teos.unit.conftest import (
generate_blocks_w_delay, generate_blocks_w_delay,
generate_blocks,
generate_dummy_appointment, generate_dummy_appointment,
get_random_value_hex, get_random_value_hex,
generate_keypair, generate_keypair,
get_config, get_config,
bitcoind_feed_params, bitcoind_feed_params,
bitcoind_connect_params, bitcoind_connect_params,
create_dummy_transaction,
) )
APPOINTMENTS = 5 APPOINTMENTS = 5
@@ -55,7 +66,15 @@ def watcher(db_manager, gatekeeper):
carrier = Carrier(bitcoind_connect_params) carrier = Carrier(bitcoind_connect_params)
responder = Responder(db_manager, gatekeeper, carrier, block_processor) responder = Responder(db_manager, gatekeeper, carrier, block_processor)
watcher = Watcher(db_manager, gatekeeper, block_processor, responder, signing_key.to_der(), MAX_APPOINTMENTS) watcher = Watcher(
db_manager,
gatekeeper,
block_processor,
responder,
signing_key.to_der(),
MAX_APPOINTMENTS,
config.get("LOCATOR_CACHE_SIZE"),
)
chain_monitor = ChainMonitor( chain_monitor = ChainMonitor(
watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params watcher.block_queue, watcher.responder.block_queue, block_processor, bitcoind_feed_params
@@ -91,7 +110,198 @@ def create_appointments(n):
return appointments, locator_uuid_map, dispute_txs return appointments, locator_uuid_map, dispute_txs
def test_init(run_bitcoind, watcher): def test_locator_cache_init_not_enough_blocks(run_bitcoind, block_processor):
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
# Make sure there are at least 3 blocks
block_count = block_processor.get_block_count()
if block_count < 3:
generate_blocks_w_delay(3 - block_count)
# Simulate there are only 3 blocks
third_block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(2)
locator_cache.init(third_block_hash, block_processor)
assert len(locator_cache.blocks) == 3
for k, v in locator_cache.blocks.items():
assert block_processor.get_block(k)
def test_locator_cache_init(block_processor):
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
# Generate enough blocks so the cache can start full
generate_blocks(2 * locator_cache.cache_size)
locator_cache.init(block_processor.get_best_block_hash(), block_processor)
assert len(locator_cache.blocks) == locator_cache.cache_size
for k, v in locator_cache.blocks.items():
assert block_processor.get_block(k)
def test_get_txid():
# Not much to test here, this is shadowing dict.get
locator = get_random_value_hex(16)
txid = get_random_value_hex(32)
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
locator_cache.cache[locator] = txid
assert locator_cache.get_txid(locator) == txid
# A random locator should fail
assert locator_cache.get_txid(get_random_value_hex(16)) is None
def test_update_cache():
# Update should add data about a new block in the cache. If the cache is full, the oldest block is dropped.
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
block_hash = get_random_value_hex(32)
txs = [get_random_value_hex(32) for _ in range(10)]
locator_txid_map = {compute_locator(txid): txid for txid in txs}
# Cache is empty
assert block_hash not in locator_cache.blocks
for locator in locator_txid_map.keys():
assert locator not in locator_cache.cache
# The data has been added to the cache
locator_cache.update(block_hash, locator_txid_map)
assert block_hash in locator_cache.blocks
for locator in locator_txid_map.keys():
assert locator in locator_cache.cache
def test_update_cache_full():
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
block_hashes = []
big_map = {}
for i in range(locator_cache.cache_size):
block_hash = get_random_value_hex(32)
txs = [get_random_value_hex(32) for _ in range(10)]
locator_txid_map = {compute_locator(txid): txid for txid in txs}
locator_cache.update(block_hash, locator_txid_map)
if i == 0:
first_block_hash = block_hash
first_locator_txid_map = locator_txid_map
else:
block_hashes.append(block_hash)
big_map.update(locator_txid_map)
# The cache is now full.
assert first_block_hash in locator_cache.blocks
for locator in first_locator_txid_map.keys():
assert locator in locator_cache.cache
# Add one more
block_hash = get_random_value_hex(32)
txs = [get_random_value_hex(32) for _ in range(10)]
locator_txid_map = {compute_locator(txid): txid for txid in txs}
locator_cache.update(block_hash, locator_txid_map)
# The first block is not there anymore, but the rest are there
assert first_block_hash not in locator_cache.blocks
for locator in first_locator_txid_map.keys():
assert locator not in locator_cache.cache
for block_hash in block_hashes:
assert block_hash in locator_cache.blocks
for locator in big_map.keys():
assert locator in locator_cache.cache
def test_locator_cache_is_full(block_processor):
# Empty cache
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
for _ in range(locator_cache.cache_size):
locator_cache.blocks[uuid4().hex] = 0
assert not locator_cache.is_full()
locator_cache.blocks[uuid4().hex] = 0
assert locator_cache.is_full()
def test_locator_remove_oldest_block(block_processor):
# Empty cache
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
# Add some blocks to the cache
for _ in range(locator_cache.cache_size):
txid = get_random_value_hex(32)
locator = txid[:16]
locator_cache.blocks[get_random_value_hex(32)] = {locator: txid}
locator_cache.cache[locator] = txid
blocks_in_cache = locator_cache.blocks
oldest_block_hash = list(blocks_in_cache.keys())[0]
oldest_block_data = blocks_in_cache.get(oldest_block_hash)
rest_of_blocks = list(blocks_in_cache.keys())[1:]
locator_cache.remove_oldest_block()
# Oldest block data is not in the cache
assert oldest_block_hash not in locator_cache.blocks
for locator in oldest_block_data:
assert locator not in locator_cache.cache
# The rest of data is in the cache
assert set(rest_of_blocks).issubset(locator_cache.blocks)
for block_hash in rest_of_blocks:
for locator in locator_cache.blocks[block_hash]:
assert locator in locator_cache.cache
def test_fix_cache(block_processor):
# This tests how a reorg will create a new version of the cache
# Let's start setting a full cache. We'll mine ``cache_size`` bocks to be sure it's full
generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE")))
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
locator_cache.init(block_processor.get_best_block_hash(), block_processor)
assert len(locator_cache.blocks) == locator_cache.cache_size
# Now let's fake a reorg of less than ``cache_size``. We'll go two blocks into the past.
current_tip = block_processor.get_best_block_hash()
current_tip_locators = locator_cache.blocks[current_tip]
current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash")
current_tip_parent_locators = locator_cache.blocks[current_tip_parent]
fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash")
locator_cache.fix(fake_tip, block_processor)
# The last two blocks are not in the cache nor are the any of its locators
assert current_tip not in locator_cache.blocks and current_tip_parent not in locator_cache.blocks
for locator in current_tip_parent_locators + current_tip_locators:
assert locator not in locator_cache.cache
# The fake tip is the new tip, and two additional blocks are at the bottom
assert fake_tip in locator_cache.blocks and list(locator_cache.blocks.keys())[-1] == fake_tip
assert len(locator_cache.blocks) == locator_cache.cache_size
# Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and
# trigger a fix. We'll use a new cache to compare with the old
old_cache_blocks = deepcopy(locator_cache.blocks)
generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2))
locator_cache.fix(block_processor.get_best_block_hash(), block_processor)
# None of the data from the old cache is in the new cache
for block_hash, locators in old_cache_blocks.items():
assert block_hash not in locator_cache.blocks
for locator in locators:
assert locator not in locator_cache.cache
# The data in the new cache corresponds to the last ``cache_size`` blocks.
block_count = block_processor.get_block_count()
for i in range(block_count, block_count - locator_cache.cache_size, -1):
block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1)
assert block_hash in locator_cache.blocks
for locator in locator_cache.blocks[block_hash]:
assert locator in locator_cache.cache
def test_watcher_init(watcher):
assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0 assert isinstance(watcher.appointments, dict) and len(watcher.appointments) == 0
assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0 assert isinstance(watcher.locator_uuid_map, dict) and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue.empty() assert watcher.block_queue.empty()
@@ -101,6 +311,7 @@ def test_init(run_bitcoind, watcher):
assert isinstance(watcher.responder, Responder) assert isinstance(watcher.responder, Responder)
assert isinstance(watcher.max_appointments, int) assert isinstance(watcher.max_appointments, int)
assert isinstance(watcher.signing_key, PrivateKey) assert isinstance(watcher.signing_key, PrivateKey)
assert isinstance(watcher.locator_cache, LocatorCache)
def test_add_appointment_non_registered(watcher): def test_add_appointment_non_registered(watcher):
@@ -171,6 +382,102 @@ def test_add_appointment(watcher):
assert len(watcher.locator_uuid_map[appointment.locator]) == 2 assert len(watcher.locator_uuid_map[appointment.locator]) == 2
def test_add_appointment_in_cache(watcher):
# Generate an appointment and add the dispute txid to the cache
user_sk, user_pk = generate_keypair()
user_id = Cryptographer.get_compressed_pk(user_pk)
watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10)
appointment, dispute_tx = generate_dummy_appointment()
dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
watcher.locator_cache.cache[appointment.locator] = dispute_txid
# Try to add the appointment
response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk))
# The appointment is accepted but it's not in the Watcher
assert (
response
and response.get("locator") == appointment.locator
and Cryptographer.get_compressed_pk(watcher.signing_key.public_key)
== Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature")))
)
assert not watcher.locator_uuid_map.get(appointment.locator)
# It went to the Responder straightaway
assert appointment.locator in [tracker.get("locator") for tracker in watcher.responder.trackers.values()]
# Trying to send it again should fail since it is already in the Responder
with pytest.raises(AppointmentAlreadyTriggered):
watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk))
def test_add_appointment_in_cache_invalid_blob(watcher):
# Generate an appointment with an invalid transaction and add the dispute txid to the cache
user_sk, user_pk = generate_keypair()
user_id = Cryptographer.get_compressed_pk(user_pk)
watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10)
# We need to create the appointment manually
dispute_tx = create_dummy_transaction()
dispute_txid = dispute_tx.tx_id.hex()
penalty_tx = create_dummy_transaction(dispute_txid)
locator = compute_locator(dispute_txid)
dummy_appointment_data = {"tx": penalty_tx.hex(), "tx_id": dispute_txid, "to_self_delay": 20}
encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx")[::-1], dummy_appointment_data.get("tx_id"))
appointment_data = {
"locator": locator,
"to_self_delay": dummy_appointment_data.get("to_self_delay"),
"encrypted_blob": encrypted_blob,
"user_id": get_random_value_hex(16),
}
appointment = ExtendedAppointment.from_dict(appointment_data)
watcher.locator_cache.cache[appointment.locator] = dispute_tx.tx_id.hex()
# Try to add the appointment
response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk))
# The appointment is accepted but dropped (same as an invalid appointment that gets triggered)
assert (
response
and response.get("locator") == appointment.locator
and Cryptographer.get_compressed_pk(watcher.signing_key.public_key)
== Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature")))
)
assert not watcher.locator_uuid_map.get(appointment.locator)
assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()]
def test_add_appointment_in_cache_invalid_transaction(watcher):
# Generate an appointment that cannot be decrypted and add the dispute txid to the cache
user_sk, user_pk = generate_keypair()
user_id = Cryptographer.get_compressed_pk(user_pk)
watcher.gatekeeper.registered_users[user_id] = UserInfo(available_slots=1, subscription_expiry=10)
appointment, dispute_tx = generate_dummy_appointment()
appointment.encrypted_blob = appointment.encrypted_blob[::-1]
dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
watcher.locator_cache.cache[appointment.locator] = dispute_txid
# Try to add the appointment
response = watcher.add_appointment(appointment, Cryptographer.sign(appointment.serialize(), user_sk))
# The appointment is accepted but dropped (same as an invalid appointment that gets triggered)
assert (
response
and response.get("locator") == appointment.locator
and Cryptographer.get_compressed_pk(watcher.signing_key.public_key)
== Cryptographer.get_compressed_pk(Cryptographer.recover_pk(appointment.serialize(), response.get("signature")))
)
assert not watcher.locator_uuid_map.get(appointment.locator)
assert appointment.locator not in [tracker.get("locator") for tracker in watcher.responder.trackers.values()]
def test_add_too_many_appointments(watcher): def test_add_too_many_appointments(watcher):
# Simulate the user is registered # Simulate the user is registered
user_sk, user_pk = generate_keypair() user_sk, user_pk = generate_keypair()
@@ -246,9 +553,37 @@ def test_do_watch(watcher, temp_db_manager):
# FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this. # FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this.
def test_do_watch_cache_update(watcher):
# Test that data is properly added/remove to/from the cache
for _ in range(10):
blocks_in_cache = watcher.locator_cache.blocks
oldest_block_hash = list(blocks_in_cache.keys())[0]
oldest_block_data = blocks_in_cache.get(oldest_block_hash)
rest_of_blocks = list(blocks_in_cache.keys())[1:]
assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size
generate_blocks_w_delay(1)
# The last oldest block is gone but the rest remain
assert oldest_block_hash not in watcher.locator_cache.blocks
assert set(rest_of_blocks).issubset(watcher.locator_cache.blocks.keys())
# The locators of the oldest block are gone but the rest remain
for locator in oldest_block_data:
assert locator not in watcher.locator_cache.cache
for block_hash in rest_of_blocks:
for locator in watcher.locator_cache.blocks[block_hash]:
assert locator in watcher.locator_cache.cache
# The size of the cache is the same
assert len(watcher.locator_cache.blocks) == watcher.locator_cache.cache_size
def test_get_breaches(watcher, txids, locator_uuid_map): def test_get_breaches(watcher, txids, locator_uuid_map):
watcher.locator_uuid_map = locator_uuid_map watcher.locator_uuid_map = locator_uuid_map
potential_breaches = watcher.get_breaches(txids) locators_txid_map = {compute_locator(txid): txid for txid in txids}
potential_breaches = watcher.get_breaches(locators_txid_map)
# All the txids must breach # All the txids must breach
assert locator_uuid_map.keys() == potential_breaches.keys() assert locator_uuid_map.keys() == potential_breaches.keys()
@@ -258,38 +593,50 @@ def test_get_breaches_random_data(watcher, locator_uuid_map):
# The likelihood of finding a potential breach with random data should be negligible # The likelihood of finding a potential breach with random data should be negligible
watcher.locator_uuid_map = locator_uuid_map watcher.locator_uuid_map = locator_uuid_map
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)] txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
locators_txid_map = {compute_locator(txid): txid for txid in txids}
potential_breaches = watcher.get_breaches(txids) potential_breaches = watcher.get_breaches(locators_txid_map)
# None of the txids should breach # None of the txids should breach
assert len(potential_breaches) == 0 assert len(potential_breaches) == 0
def test_filter_breaches_random_data(watcher): def test_check_breach(watcher):
appointments = {} # A breach will be flagged as valid only if the encrypted blob can be properly decrypted and the resulting data
locator_uuid_map = {} # matches a transaction format.
breaches = {} uuid = uuid4().hex
appointment, dispute_tx = generate_dummy_appointment()
dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
for i in range(TEST_SET_SIZE): penalty_txid, penalty_rawtx = watcher.check_breach(uuid, appointment, dispute_txid)
dummy_appointment, _ = generate_dummy_appointment() assert Cryptographer.encrypt(penalty_rawtx, dispute_txid) == appointment.encrypted_blob
uuid = uuid4().hex
appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id}
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict())
watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid)
locator_uuid_map[dummy_appointment.locator] = [uuid]
if i % 2: def test_check_breach_random_data(watcher):
dispute_txid = get_random_value_hex(32) # If a breach triggers an appointment with random data as encrypted blob, the check should fail.
breaches[dummy_appointment.locator] = dispute_txid uuid = uuid4().hex
appointment, dispute_tx = generate_dummy_appointment()
dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
watcher.locator_uuid_map = locator_uuid_map # Set the blob to something "random"
watcher.appointments = appointments appointment.encrypted_blob = get_random_value_hex(200)
valid_breaches, invalid_breaches = watcher.filter_breaches(breaches) with pytest.raises(EncryptionError):
watcher.check_breach(uuid, appointment, dispute_txid)
# We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid.
assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2 def test_check_breach_invalid_transaction(watcher):
# If the breach triggers an appointment with data that can be decrypted but does not match a transaction, it should
# fail
uuid = uuid4().hex
appointment, dispute_tx = generate_dummy_appointment()
dispute_txid = watcher.block_processor.decode_raw_transaction(dispute_tx).get("txid")
# Set the blob to something "random"
appointment.encrypted_blob = Cryptographer.encrypt(get_random_value_hex(200), dispute_txid)
with pytest.raises(InvalidTransactionFormat):
watcher.check_breach(uuid, appointment, dispute_txid)
def test_filter_valid_breaches(watcher): def test_filter_valid_breaches(watcher):
@@ -323,3 +670,30 @@ def test_filter_valid_breaches(watcher):
# We have "triggered" a single breach and it was valid. # We have "triggered" a single breach and it was valid.
assert len(invalid_breaches) == 0 and len(valid_breaches) == 1 assert len(invalid_breaches) == 0 and len(valid_breaches) == 1
def test_filter_breaches_random_data(watcher):
appointments = {}
locator_uuid_map = {}
breaches = {}
for i in range(TEST_SET_SIZE):
dummy_appointment, _ = generate_dummy_appointment()
uuid = uuid4().hex
appointments[uuid] = {"locator": dummy_appointment.locator, "user_id": dummy_appointment.user_id}
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_dict())
watcher.db_manager.create_append_locator_map(dummy_appointment.locator, uuid)
locator_uuid_map[dummy_appointment.locator] = [uuid]
if i % 2:
dispute_txid = get_random_value_hex(32)
breaches[dummy_appointment.locator] = dispute_txid
watcher.locator_uuid_map = locator_uuid_map
watcher.appointments = appointments
valid_breaches, invalid_breaches = watcher.filter_breaches(breaches)
# We have "triggered" TEST_SET_SIZE/2 breaches, all of them invalid.
assert len(valid_breaches) == 0 and len(invalid_breaches) == TEST_SET_SIZE / 2

View File

@@ -18,6 +18,7 @@ def add_appointment(plugin, tower_id, tower, appointment_dict, signature):
response = send_appointment(tower_id, tower, appointment_dict, signature) response = send_appointment(tower_id, tower, appointment_dict, signature)
plugin.log(f"Appointment accepted and signed by {tower_id}") plugin.log(f"Appointment accepted and signed by {tower_id}")
plugin.log(f"Remaining slots: {response.get('available_slots')}") plugin.log(f"Remaining slots: {response.get('available_slots')}")
plugin.log(f"Start block: {response.get('start_block')}")
# # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed. # # TODO: Not storing the whole appointments for now. The node can recreate all the data if needed.
# # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting. # # DISCUSS: It may be worth checking that the available slots match instead of blindly trusting.