mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
teos - Adds LocatorCache
This commit is contained in:
@@ -7,6 +7,7 @@ APPOINTMENT_FIELD_TOO_SMALL = -5
|
||||
APPOINTMENT_FIELD_TOO_BIG = -6
|
||||
APPOINTMENT_WRONG_FIELD = -7
|
||||
APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8
|
||||
APPOINTMENT_ALREADY_TRIGGERED = -9
|
||||
|
||||
# Registration errors [-33, -64]
|
||||
REGISTRATION_MISSING_FIELD = -33
|
||||
|
||||
@@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify
|
||||
from teos import LOG_PREFIX
|
||||
import common.errors as errors
|
||||
from teos.inspector import InspectionFailed
|
||||
from teos.watcher import AppointmentLimitReached
|
||||
from teos.watcher import AppointmentLimitReached, AppointmentAlreadyTriggered
|
||||
from teos.gatekeeper import NotEnoughSlots, AuthenticationFailure
|
||||
|
||||
from common.logger import Logger
|
||||
@@ -192,6 +192,13 @@ class API:
|
||||
rcode = HTTP_SERVICE_UNAVAILABLE
|
||||
response = {"error": "appointment rejected"}
|
||||
|
||||
except AppointmentAlreadyTriggered:
|
||||
rcode = HTTP_BAD_REQUEST
|
||||
response = {
|
||||
"error": "appointment rejected. The provided appointment has already been triggered",
|
||||
"error_code": errors.APPOINTMENT_ALREADY_TRIGGERED,
|
||||
}
|
||||
|
||||
logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
|
||||
return jsonify(response), rcode
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from common.logger import Logger
|
||||
from common.exceptions import BasicException
|
||||
|
||||
from teos import LOG_PREFIX
|
||||
from teos.tools import bitcoin_cli
|
||||
@@ -7,6 +8,10 @@ from teos.utils.auth_proxy import JSONRPCException
|
||||
logger = Logger(actor="BlockProcessor", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
|
||||
class InvalidTransactionFormat(BasicException):
|
||||
"""Raised when a transaction is not properly formatted"""
|
||||
|
||||
|
||||
class BlockProcessor:
|
||||
"""
|
||||
The :class:`BlockProcessor` contains methods related to the blockchain. Most of its methods require communication
|
||||
@@ -89,17 +94,19 @@ class BlockProcessor:
|
||||
raw_tx (:obj:`str`): the hex representation of the transaction.
|
||||
|
||||
Returns:
|
||||
:obj:`dict` or :obj:`None`: The decoding of the given ``raw_tx`` if the transaction is well formatted.
|
||||
:obj:`dict`: The decoding of the given ``raw_tx`` if the transaction is well formatted.
|
||||
|
||||
Returns ``None`` otherwise.
|
||||
Raises:
|
||||
:obj:`InvalidTransactionFormat`: If the provided ``raw_tx`` has invalid format.
|
||||
"""
|
||||
|
||||
try:
|
||||
tx = bitcoin_cli(self.btc_connect_params).decoderawtransaction(raw_tx)
|
||||
|
||||
except JSONRPCException as e:
|
||||
tx = None
|
||||
logger.error("Cannot build transaction from decoded data", error=e.error)
|
||||
msg = "Cannot build transaction from decoded data"
|
||||
logger.error(msg, error=e.error)
|
||||
raise InvalidTransactionFormat(msg)
|
||||
|
||||
return tx
|
||||
|
||||
|
||||
111
teos/watcher.py
111
teos/watcher.py
@@ -12,6 +12,7 @@ from common.exceptions import InvalidParameter, SignatureError
|
||||
from teos import LOG_PREFIX
|
||||
from teos.cleaner import Cleaner
|
||||
from teos.extended_appointment import ExtendedAppointment
|
||||
from teos.block_processor import InvalidTransactionFormat
|
||||
|
||||
logger = Logger(actor="Watcher", log_name_prefix=LOG_PREFIX)
|
||||
|
||||
@@ -20,6 +21,10 @@ class AppointmentLimitReached(BasicException):
|
||||
"""Raised when the tower maximum appointment count has been reached"""
|
||||
|
||||
|
||||
class AppointmentAlreadyTriggered(BasicException):
|
||||
"""Raised an appointment is sent to the Watcher but that same data has already been sent to the Responder"""
|
||||
|
||||
|
||||
class LocatorCache:
|
||||
def __init__(self, blocks_in_cache):
|
||||
self.cache = dict()
|
||||
@@ -148,13 +153,48 @@ class Watcher:
|
||||
# The user_id needs to be added to the ExtendedAppointment once the former has been authenticated
|
||||
appointment.user_id = user_id
|
||||
|
||||
# The uuids are generated as the RIPMED160(locator||user_pubkey).
|
||||
# The uuids are generated as the RIPEMD160(locator||user_pubkey).
|
||||
# If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps).
|
||||
uuid = hash_160("{}{}".format(appointment.locator, user_id))
|
||||
|
||||
# Add the appointment to the Gatekeeper
|
||||
available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment)
|
||||
|
||||
# Appointments that were triggered in blocks hold in the cache
|
||||
if appointment.locator in self.locator_cache.cache:
|
||||
# If this is a copy of an appointment we've already reacted to, the new appointment is rejected.
|
||||
if uuid in self.responder.trackers:
|
||||
message = "Appointment already in Responder"
|
||||
logger.info(message)
|
||||
raise AppointmentAlreadyTriggered(message)
|
||||
|
||||
try:
|
||||
breach = self.filter_breach(uuid, appointment, self.locator_cache.cache[appointment.locator])
|
||||
receipt = self.responder.handle_breach(
|
||||
uuid,
|
||||
breach["locator"],
|
||||
breach["dispute_txid"],
|
||||
breach["penalty_txid"],
|
||||
breach["penalty_rawtx"],
|
||||
self.appointments[uuid].get("user_id"),
|
||||
self.last_known_block,
|
||||
)
|
||||
|
||||
# At this point the appointment is accepted but data is only kept if it goes through the Responder
|
||||
# otherwise it is dropped.
|
||||
if receipt.delivered:
|
||||
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
|
||||
self.db_manager.create_append_locator_map(appointment.locator, uuid)
|
||||
self.db_manager.create_triggered_appointment_flag(uuid)
|
||||
|
||||
except (EncryptionError, InvalidTransactionFormat):
|
||||
# If data inside the encrypted blob is invalid, the appointment is accepted but the data is dropped.
|
||||
# (same as with data that bounces in the Responder). This reduces the appointment slot count so it
|
||||
# could be used to discourage user misbehaviour.
|
||||
pass
|
||||
|
||||
# Regular appointments that have not been triggered (or not recently at least)
|
||||
else:
|
||||
self.appointments[uuid] = appointment.get_summary()
|
||||
|
||||
if appointment.locator in self.locator_uuid_map:
|
||||
@@ -198,6 +238,17 @@ class Watcher:
|
||||
self.last_known_block = self.block_processor.get_best_block_hash()
|
||||
self.db_manager.store_last_block_hash_watcher(self.last_known_block)
|
||||
|
||||
# Initialise the locator cache with the last ``cache_size`` blocks.
|
||||
target_block_hash = self.last_known_block
|
||||
for _ in range(self.locator_cache.cache_size):
|
||||
target_block = self.block_processor.get_block(target_block_hash)
|
||||
locators = {compute_locator(txid): txid for txid in target_block.get("tx")}
|
||||
self.locator_cache.cache.update(locators)
|
||||
self.locator_cache.blocks[target_block_hash] = locators
|
||||
target_block_hash = target_block.get("previousblockhash")
|
||||
|
||||
self.locator_cache.blocks = OrderedDict(reversed((list(self.locator_cache.blocks.items()))))
|
||||
|
||||
while True:
|
||||
block_hash = self.block_queue.get()
|
||||
block = self.block_processor.get_block(block_hash)
|
||||
@@ -210,8 +261,7 @@ class Watcher:
|
||||
self.locator_cache.blocks[block_hash] = locators
|
||||
logger.debug("Block added to cache", block_hash=block_hash)
|
||||
|
||||
# FIXME: change txids for locators?
|
||||
if len(self.appointments) > 0 and txids:
|
||||
if len(self.appointments) > 0 and locators:
|
||||
expired_appointments = self.gatekeeper.get_expired_appointments(block["height"])
|
||||
# Make sure we only try to delete what is on the Watcher (some appointments may have been triggered)
|
||||
expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys()))
|
||||
@@ -279,7 +329,7 @@ class Watcher:
|
||||
if self.locator_cache.is_full():
|
||||
self.locator_cache.remove_older_block()
|
||||
|
||||
# Register the last processed block for the watcher
|
||||
# Register the last processed block for the Watcher
|
||||
self.db_manager.store_last_block_hash_watcher(block_hash)
|
||||
self.last_known_block = block.get("hash")
|
||||
self.block_queue.task_done()
|
||||
@@ -308,11 +358,37 @@ class Watcher:
|
||||
|
||||
return breaches
|
||||
|
||||
def filter_breach(self, uuid, appointment, dispute_txid):
|
||||
try:
|
||||
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
|
||||
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
|
||||
|
||||
except EncryptionError as e:
|
||||
logger.info("Transaction cannot be decrypted", uuid=uuid)
|
||||
raise e
|
||||
|
||||
except InvalidTransactionFormat as e:
|
||||
logger.info("The breach contained an invalid transaction")
|
||||
raise e
|
||||
|
||||
valid_breach = {
|
||||
"locator": appointment.locator,
|
||||
"dispute_txid": dispute_txid,
|
||||
"penalty_txid": penalty_tx.get("txid"),
|
||||
"penalty_rawtx": penalty_rawtx,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"Breach found for locator", locator=appointment.locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
|
||||
)
|
||||
|
||||
return valid_breach
|
||||
|
||||
def filter_breaches(self, breaches):
|
||||
"""
|
||||
Filters the valid from the invalid channel breaches.
|
||||
|
||||
The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen.
|
||||
The :obj:`Watcher` cannot know if an ``encrypted_blob`` contains a valid transaction until a breach is seen.
|
||||
Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder <teos.responder.Responder>`.
|
||||
|
||||
Args:
|
||||
@@ -337,30 +413,23 @@ class Watcher:
|
||||
|
||||
if appointment.encrypted_blob in decrypted_blobs:
|
||||
penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]
|
||||
|
||||
else:
|
||||
try:
|
||||
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
|
||||
|
||||
except EncryptionError:
|
||||
penalty_rawtx = None
|
||||
|
||||
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
|
||||
decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx)
|
||||
|
||||
if penalty_tx is not None:
|
||||
valid_breaches[uuid] = {
|
||||
"locator": locator,
|
||||
"locator": appointment.locator,
|
||||
"dispute_txid": dispute_txid,
|
||||
"penalty_txid": penalty_tx.get("txid"),
|
||||
"penalty_rawtx": penalty_rawtx,
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"Breach found for locator", locator=locator, uuid=uuid, penalty_txid=penalty_tx.get("txid")
|
||||
else:
|
||||
try:
|
||||
valid_breach = self.filter_breach(uuid, appointment, dispute_txid)
|
||||
valid_breaches[uuid] = valid_breach
|
||||
decrypted_blobs[appointment.encrypted_blob] = (
|
||||
valid_breach["penalty_txid"],
|
||||
valid_breach["penalty_rawtx"],
|
||||
)
|
||||
|
||||
else:
|
||||
except (EncryptionError, InvalidTransactionFormat):
|
||||
invalid_breaches.append(uuid)
|
||||
|
||||
return valid_breaches, invalid_breaches
|
||||
|
||||
Reference in New Issue
Block a user