Merge branch 'master' into master

This commit is contained in:
Sergi Delgado Segura
2020-06-15 11:30:01 +02:00
committed by GitHub
15 changed files with 926 additions and 98 deletions

View File

@@ -22,6 +22,7 @@ DEFAULT_CONF = {
"DEFAULT_SUBSCRIPTION_DURATION": {"value": 4320, "type": int},
"EXPIRY_DELTA": {"value": 6, "type": int},
"MIN_TO_SELF_DELAY": {"value": 20, "type": int},
"LOCATOR_CACHE_SIZE": {"value": 6, "type": int},
"LOG_FILE": {"value": "teos.log", "type": str, "path": True},
"TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True},
"APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True},

View File

@@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify
from teos import LOG_PREFIX
import common.errors as errors
from teos.inspector import InspectionFailed
from teos.watcher import AppointmentLimitReached
from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered
from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure
from common.logger import Logger
@@ -192,6 +192,13 @@ class API:
rcode = HTTP_SERVICE_UNAVAILABLE
response = {"error": "appointment rejected"}
except AppointmentAlreadyTriggered:
rcode = HTTP_BAD_REQUEST
response = {
"error": "appointment rejected. The provided appointment has already been triggered",
"error_code": errors.APPOINTMENT_ALREADY_TRIGGERED,
}
logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode

View File

@@ -1,4 +1,5 @@
from common.logger import Logger
from common.exceptions import BasicException
from teos import LOG_PREFIX
from teos.tools import bitcoin_cli
@@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException
logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX)
class InvalidTransactionFormat(BasicException):
"""Raised when a transaction is not properly formatted"""
class BlockProcessor:
"""
The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication
@@ -89,17 +94,19 @@ class BlockProcessor:
raw_tx (:obj:`str`): the hex representation of the transaction.
Returns:
:obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted.
:obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted.
Returns ``None`` otherwise.
Raises:
:obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format.
"""
try:
tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx)
except JSONRPCException as e:
tx = None
logger.error("Cannot build transaction from decoded data", error=e.error)
msg = "Cannot build transaction from decoded data"
logger.error(msg, error=e.error)
raise InvalidTransactionFormat(msg)
return tx

View File

@@ -89,7 +89,13 @@ def main(command_line_conf):
db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH"))
responder = Responder(db_manager, gatekeeper, carrier, block_processor)
watcher = Watcher(
db_manager, gatekeeper, block_processor, responder, secret_key_der, config.get("MAX_APPOINTMENTS")
db_manager,
gatekeeper,
block_processor,
responder,
secret_key_der,
config.get("MAX_APPOINTMENTS"),
config.get("LOCATOR_CACHE_SIZE"),
)
# Create the chain monitor and start monitoring the chain

View File

@@ -1,5 +1,7 @@
from queue import Queue
from threading import Thread
from collections import OrderedDict
from readerwriterlock import rwlock
from common.logger import Logger
from common.tools import compute_locator
@@ -11,6 +13,7 @@ from common.exceptions import InvalidParameter, SignatureError
from teos import LOG_PREFIX
from teos.cleaner import Cleaner
from teos.extended_appointment import ExtendedAppointment
from teos.block_processor import InvalidTransactionFormat
logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX)
@@ -19,6 +22,140 @@ class AppointmentLimitReached(BasicException):
"""Raised when the tower maximum appointment count has been reached"""
class AppointmentAlreadyTriggered(BasicException):
"""Raised when an appointment is sent to the Watcher but that same data has already been sent to the Responder"""
class LocatorCache:
"""
The LocatorCache keeps the data about the last ``cache_size`` blocks around so appointments can be checked against
it. The data is indexed by locator and it's mainly built during the normal ``Watcher`` operation so no extra steps
are normally needed.
Args:
blocks_in_cache (:obj:`int`): the numbers of blocks to keep in the cache.
Attributes:
cache (:obj:`dict`): a dictionary of ``locator:dispute_txid`` pairs that received appointments are checked
against.
blocks (:obj:`OrderedDict`): An ordered dictionary of the last ``blocks_in_cache`` blocks (block_hash:locators).
Used to keep track of what data belongs to what block, so data can be pruned accordingly. Also needed to
rebuild the cache in case of reorgs.
cache_size (:obj:`int`): the size of the cache in blocks.
"""
def __init__(self, blocks_in_cache):
self.cache = dict()
self.blocks = OrderedDict()
self.cache_size = blocks_in_cache
self.rw_lock = rwlock.RWLockWrite()
def init(self, last_known_block, block_processor):
"""
Sets the initial state of the locator cache.
Args:
last_known_block (:obj:`str`): the last known block by the ``Watcher``.
block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance.
"""
# This is needed as a separate method from __init__ since it has to be initialized right before start watching.
# Not doing so implies store temporary variables in the Watcher and initialising the cache as None.
target_block_hash = last_known_block
for _ in range(self.cache_size):
# In some setups, like regtest, it could be the case that there are no enough previous blocks.
# In those cases we pull as many as we can (up to cache_size).
if not target_block_hash:
break
target_block = block_processor.get_block(target_block_hash)
if not target_block:
break
locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")}
self.cache.update(locator_txid_map)
self.blocks[target_block_hash] = list(locator_txid_map.keys())
target_block_hash = target_block.get("previousblockhash")
self.blocks = OrderedDict(reversed((list(self.blocks.items()))))
def get_txid(self, locator):
"""
Gets a txid from the locator cache.
Args:
locator (:obj:`str`): the locator to lookup in the cache.
Returns:
:obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise.
"""
with self.rw_lock.gen_rlock():
locator = self.cache.get(locator)
return locator
def update(self, block_hash, locator_txid_map):
"""
Updates the cache with data from a new block. Removes the oldest block if the cache is full after the addition.
Args:
block_hash (:obj:`str`): the hash of the new block.
locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of transaction
ids.
"""
with self.rw_lock.gen_wlock():
self.cache.update(locator_txid_map)
self.blocks[block_hash] = list(locator_txid_map.keys())
logger.debug("Block added to cache", block_hash=block_hash)
if self.is_full():
self.remove_oldest_block()
def is_full(self):
""" Returns whether the cache is full or not """
with self.rw_lock.gen_rlock():
full = len(self.blocks) > self.cache_size
return full
def remove_oldest_block(self):
""" Removes the oldest block from the cache """
with self.rw_lock.gen_wlock():
block_hash, locators = self.blocks.popitem(last=False)
for locator in locators:
del self.cache[locator]
logger.debug("Block removed from cache", block_hash=block_hash)
def fix(self, last_known_block, block_processor):
"""
Fixes the cache after a reorg has been detected by feeding the most recent ``cache_size`` blocks to it.
Args:
last_known_block (:obj:`str`): the last known block hash after the reorg.
block_processor (:obj:`teos.block_processor.BlockProcessor`): a ``BlockProcessor`` instance.
"""
tmp_cache = LocatorCache(self.cache_size)
# We assume there are no reorgs back to genesis. If so, this would raise some log warnings. And the cache will
# be filled with less than cache_size blocks.
target_block_hash = last_known_block
for _ in range(tmp_cache.cache_size):
target_block = block_processor.get_block(target_block_hash)
if target_block:
# Compute the locator:txid pair for every transaction in the block and update both the cache and
# the block mapping.
locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")}
tmp_cache.cache.update(locator_txid_map)
tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys())
target_block_hash = target_block.get("previousblockhash")
with self.rw_lock.gen_wlock():
self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items()))))
self.cache = tmp_cache.cache
class Watcher:
"""
The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower.
@@ -41,6 +178,8 @@ class Watcher:
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
blocks_in_cache (:obj:`int`): the number of blocks to keep in cache so recently triggered appointments can be
covered.
Attributes:
appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment
@@ -60,13 +199,14 @@ class Watcher:
signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
last_known_block (:obj:`str`): the last block known by the ``Watcher``.
locator_cache (:obj:`LocatorCache`): a cache of locators for the last ``blocks_in_cache`` blocks.
Raises:
:obj:`InvalidKey <common.exceptions.InvalidKey>`: if teos sk cannot be loaded.
"""
def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments):
def __init__(self, db_manager, gatekeeper, block_processor, responder, sk_der, max_appointments, blocks_in_cache):
self.appointments = dict()
self.locator_uuid_map = dict()
self.block_queue = Queue()
@@ -77,6 +217,7 @@ class Watcher:
self.max_appointments = max_appointments
self.signing_key = Cryptographer.load_private_key_der(sk_der)
self.last_known_block = db_manager.load_last_block_hash_watcher()
self.locator_cache = LocatorCache(blocks_in_cache)
def awake(self):
"""Starts a new thread to monitor the blockchain for channel breaches"""
@@ -126,24 +267,55 @@ class Watcher:
# The user_id needs to be added to the ExtendedAppointment once the former has been authenticated
appointment.user_id = user_id
# The uuids are generated as the RIPMED160(locator||user_pubkey).
# The uuids are generated as the RIPEMD160(locator||user_pubkey).
# If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps).
uuid = hash_160("{}{}".format(appointment.locator, user_id))
# If this is a copy of an appointment we've already reacted to, the new appointment is rejected.
if uuid in self.responder.trackers:
message = "Appointment already in Responder"
logger.info(message)
raise AppointmentAlreadyTriggered(message)
# Add the appointment to the Gatekeeper
available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment)
self.appointments[uuid] = appointment.get_summary()
if appointment.locator in self.locator_uuid_map:
# If the uuid is already in the map it means this is an update.
if uuid not in self.locator_uuid_map[appointment.locator]:
self.locator_uuid_map[appointment.locator].append(uuid)
# Appointments that were triggered in blocks held in the cache
dispute_txid = self.locator_cache.get_txid(appointment.locator)
if dispute_txid:
try:
penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid)
receipt = self.responder.handle_breach(
uuid, appointment.locator, dispute_txid, penalty_txid, penalty_rawtx, user_id, self.last_known_block
)
# At this point the appointment is accepted but data is only kept if it goes through the Responder.
# Otherwise it is dropped.
if receipt.delivered:
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
self.db_manager.create_triggered_appointment_flag(uuid)
except (EncryptionError, InvalidTransactionFormat):
# If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped.
# (same as with data that bounces in the Responder). This reduces the appointment slot count so it
# could be used to discourage user misbehaviour.
pass
# Regular appointments that have not been triggered (or, at least, not recently)
else:
# Otherwise two users have sent an appointment with the same locator, so we need to store both.
self.locator_uuid_map[appointment.locator] = [uuid]
self.appointments[uuid] = appointment.get_summary()
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
if appointment.locator in self.locator_uuid_map:
# If the uuid is already in the map it means this is an update.
if uuid not in self.locator_uuid_map[appointment.locator]:
self.locator_uuid_map[appointment.locator].append(uuid)
else:
# Otherwise two users have sent an appointment with the same locator, so we need to store both.
self.locator_uuid_map[appointment.locator] = [uuid]
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
try:
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
@@ -157,6 +329,7 @@ class Watcher:
return {
"locator": appointment.locator,
"start_block": self.last_known_block,
"signature": signature,
"available_slots": available_slots,
"subscription_expiry": self.gatekeeper.registered_users[user_id].subscription_expiry,
@@ -175,14 +348,24 @@ class Watcher:
self.last_known_block = self.block_processor.get_best_block_hash()
self.db_manager.store_last_block_hash_watcher(self.last_known_block)
# Initialise the locator cache with the last ``cache_size`` blocks.
self.locator_cache.init(self.last_known_block, self.block_processor)
while True:
block_hash = self.block_queue.get()
block = self.block_processor.get_block(block_hash)
logger.info("New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"))
if len(self.appointments) > 0 and block is not None:
txids = block.get("tx")
# If a reorg is detected, the cache is fixed to cover the last `cache_size` blocks of the new chain
if self.last_known_block != block.get("previousblockhash"):
self.locator_cache.fix(block_hash, self.block_processor)
txids = block.get("tx")
# Compute the locator for every transaction in the block and add them to the cache
locator_txid_map = {compute_locator(txid): txid for txid in txids}
self.locator_cache.update(block_hash, locator_txid_map)
if len(self.appointments) > 0 and locator_txid_map:
expired_appointments = self.gatekeeper.get_expired_appointments(block["height"])
# Make sure we only try to delete what is on the Watcher (some appointments may have been triggered)
expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys()))
@@ -196,7 +379,7 @@ class Watcher:
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
)
valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(txids))
valid_breaches, invalid_breaches = self.filter_breaches(self.get_breaches(locator_txid_map))
triggered_flags = []
appointments_to_delete = []
@@ -246,28 +429,27 @@ class Watcher:
if len(self.appointments) != 0:
logger.info("No more pending appointments")
# Register the last processed block for the watcher
# Register the last processed block for the Watcher
self.db_manager.store_last_block_hash_watcher(block_hash)
self.last_known_block = block.get("hash")
self.block_queue.task_done()
def get_breaches(self, txids):
def get_breaches(self, locator_txid_map):
"""
Gets a list of channel breaches given the list of transaction ids.
Gets a dictionary of channel breaches given a map of locator:dispute_txid.
Args:
txids (:obj:`list`): the list of transaction ids included in the last received block.
locator_txid_map (:obj:`dict`): the dictionary of locators (locator:txid) derived from a list of
transaction ids.
Returns:
:obj:`dict`: A dictionary (``locator:txid``) with all the breaches found. An empty dictionary if none are
found.
"""
potential_locators = {compute_locator(txid): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
breaches = {locator: potential_locators[locator] for locator in intersection}
intersection = set(self.locator_uuid_map.keys()).intersection(locator_txid_map.keys())
breaches = {locator: locator_txid_map[locator] for locator in intersection}
if len(breaches) > 0:
logger.info("List of breaches", breaches=breaches)
@@ -277,21 +459,59 @@ class Watcher:
return breaches
def check_breach(self, uuid, appointment, dispute_txid):
"""
Checks if a breach is valid. Valid breaches should decrypt to a valid transaction.
Args:
uuid (:obj:`str`): the uuid of the appointment that was triggered by the breach.
appointment (:obj:`teos.extended_appointment.ExtendedAppointment`): the appointment data.
dispute_txid (:obj:`str`): the id of the transaction that triggered the breach.
Returns:
:obj:`tuple`: A tuple containing the penalty txid and the raw penalty tx.
Raises:
:obj:`EncryptionError`: If the encrypted blob from the provided appointment cannot be decrypted with the
key derived from the breach transaction id.
:obj:`InvalidTransactionFormat`: If the decrypted data does not have a valid transaction format.
"""
try:
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
except EncryptionError as e:
logger.info("Transaction cannot be decrypted", uuid=uuid)
raise e
except InvalidTransactionFormat as e:
logger.info("The breach contained an invalid transaction", uuid=uuid)
raise e
logger.info(
"Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
)
return penalty_tx.get("txid"), penalty_rawtx
def filter_breaches(self, breaches):
"""
Filters the valid from the invalid channel breaches.
The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen.
The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen.
Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder <teos.responder.Responder>`.
Args:
breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``).
Returns:
:obj:`dict`: A dictionary containing all the breaches flagged either as valid or invalid.
The structure is as follows:
:obj:`tuple`: A dictionary and a list. The former contains the valid breaches, while the latter contain the
invalid ones.
``{locator, dispute_txid, penalty_txid, penalty_rawtx, valid_breach}``
The valid breaches dictionary has the following structure:
``{locator, dispute_txid, penalty_txid, penalty_rawtx}``
"""
valid_breaches = {}
@@ -305,31 +525,26 @@ class Watcher:
appointment = ExtendedAppointment.from_dict(self.db_manager.load_watcher_appointment(uuid))
if appointment.encrypted_blob in decrypted_blobs:
penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]
else:
try:
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
except EncryptionError:
penalty_rawtx = None
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx)
if penalty_tx is not None:
penalty_txid, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]
valid_breaches[uuid] = {
"locator": locator,
"locator": appointment.locator,
"dispute_txid": dispute_txid,
"penalty_txid": penalty_tx.get("txid"),
"penalty_txid": penalty_txid,
"penalty_rawtx": penalty_rawtx,
}
logger.info(
"Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
)
else:
invalid_breaches.append(uuid)
try:
penalty_txid, penalty_rawtx = self.check_breach(uuid, appointment, dispute_txid)
valid_breaches[uuid] = {
"locator": appointment.locator,
"dispute_txid": dispute_txid,
"penalty_txid": penalty_txid,
"penalty_rawtx": penalty_rawtx,
}
decrypted_blobs[appointment.encrypted_blob] = (penalty_txid, penalty_rawtx)
except (EncryptionError, InvalidTransactionFormat):
invalid_breaches.append(uuid)
return valid_breaches, invalid_breaches