Merge pull request #1 from talaia-labs/master

updating master
This commit is contained in:
2020-04-09 07:01:39 +02:00
committed by GitHub
53 changed files with 3497 additions and 2127 deletions

View File

@@ -1,5 +1,4 @@
import os
from teos.utils.auth_proxy import AuthServiceProxy
HOST = "0.0.0.0"
PORT = 9814
@@ -10,17 +9,19 @@ LOG_PREFIX = "teos"
# Default conf fields
DEFAULT_CONF = {
"BTC_RPC_USER": {"value": "user", "type": str},
"BTC_RPC_PASSWD": {"value": "passwd", "type": str},
"BTC_RPC_PASSWORD": {"value": "passwd", "type": str},
"BTC_RPC_CONNECT": {"value": "127.0.0.1", "type": str},
"BTC_RPC_PORT": {"value": 8332, "type": int},
"BTC_NETWORK": {"value": "mainnet", "type": str},
"FEED_PROTOCOL": {"value": "tcp", "type": str},
"FEED_CONNECT": {"value": "127.0.0.1", "type": str},
"FEED_PORT": {"value": 28332, "type": int},
"MAX_APPOINTMENTS": {"value": 100, "type": int},
"MAX_APPOINTMENTS": {"value": 1000000, "type": int},
"DEFAULT_SLOTS": {"value": 100, "type": int},
"EXPIRY_DELTA": {"value": 6, "type": int},
"MIN_TO_SELF_DELAY": {"value": 20, "type": int},
"LOG_FILE": {"value": "teos.log", "type": str, "path": True},
"TEOS_SECRET_KEY": {"value": "teos_sk.der", "type": str, "path": True},
"DB_PATH": {"value": "appointments", "type": str, "path": True},
"APPOINTMENTS_DB_PATH": {"value": "appointments", "type": str, "path": True},
"USERS_DB_PATH": {"value": "users", "type": str, "path": True},
}

View File

@@ -1,13 +1,22 @@
import os
import json
import logging
from math import ceil
from flask import Flask, request, abort, jsonify
import teos.errors as errors
from teos import HOST, PORT, LOG_PREFIX
from common.logger import Logger
from common.appointment import Appointment
from teos.inspector import InspectionFailed
from teos.gatekeeper import NotEnoughSlots, IdentificationFailure
from common.constants import HTTP_OK, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, LOCATOR_LEN_HEX
from common.logger import Logger
from common.cryptographer import hash_160
from common.constants import (
HTTP_OK,
HTTP_BAD_REQUEST,
HTTP_SERVICE_UNAVAILABLE,
HTTP_NOT_FOUND,
ENCRYPTED_BLOB_MAX_SIZE_HEX,
)
# ToDo: #5-add-async-to-api
@@ -15,90 +24,220 @@ app = Flask(__name__)
logger = Logger(actor="API", log_name_prefix=LOG_PREFIX)
# NOTCOVERED: not sure how to monkey path this one. May be related to #77
def get_remote_addr():
"""
Gets the remote client ip address. The HTTP_X_REAL_IP field is tried first in case the server is behind a reverse
proxy.
Returns:
:obj:`str`: the IP address of the client.
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = request.environ.get("HTTP_X_REAL_IP")
if not remote_addr:
remote_addr = request.environ.get("REMOTE_ADDR")
return remote_addr
# NOTCOVERED: not sure how to monkey path this one. May be related to #77
def get_request_data_json(request):
"""
Gets the content of a json POST request and makes sure it decodes to a dictionary.
Args:
request (:obj:`Request`): the request sent by the user.
Returns:
:obj:`dict`: the dictionary parsed from the json request.
Raises:
:obj:`TypeError`: if the request is not json encoded or it does not decodes to a dictionary.
"""
if request.is_json:
request_data = request.get_json()
if isinstance(request_data, dict):
return request_data
else:
raise TypeError("Invalid request content")
else:
raise TypeError("Request is not json encoded")
class API:
"""
The :class:`API` is in charge of the interface between the user and the tower. It handles and server user requests.
The :class:`API` is in charge of the interface between the user and the tower. It handles and serves user requests.
Args:
inspector (:obj:`Inspector <teos.inspector.Inspector>`): an ``Inspector`` instance to check the correctness of
the received data.
the received appointment data.
watcher (:obj:`Watcher <teos.watcher.Watcher>`): a ``Watcher`` instance to pass the requests to.
gatekeeper (:obj:`Watcher <teos.gatekeeper.Gatekeeper>`): a `Gatekeeper` instance in charge to control the user
access.
"""
def __init__(self, inspector, watcher):
def __init__(self, inspector, watcher, gatekeeper):
self.inspector = inspector
self.watcher = watcher
self.gatekeeper = gatekeeper
self.app = app
# Adds all the routes to the functions listed above.
routes = {
"/register": (self.register, ["POST"]),
"/add_appointment": (self.add_appointment, ["POST"]),
"/get_appointment": (self.get_appointment, ["POST"]),
"/get_all_appointments": (self.get_all_appointments, ["GET"]),
}
for url, params in routes.items():
app.add_url_rule(url, view_func=params[0], methods=params[1])
def register(self):
"""
Registers a user by creating a subscription.
Registration is pretty straightforward for now, since it does not require payments.
The amount of slots cannot be requested by the user yet either. This is linked to the previous point.
Users register by sending a public key to the proper endpoint. This is exploitable atm, but will be solved when
payments are introduced.
Returns:
:obj:`tuple`: A tuple containing the response (:obj:`str`) and response code (:obj:`int`). For accepted
requests, the ``rcode`` is always 200 and the response contains a json with the public key and number of
slots in the subscription. For rejected requests, the ``rcode`` is a 404 and the value contains an
application error, and an error message. Error messages can be found at :mod:`Errors <teos.errors>`.
"""
remote_addr = get_remote_addr()
logger.info("Received register request", from_addr="{}".format(remote_addr))
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
except TypeError as e:
logger.info("Received invalid register request", from_addr="{}".format(remote_addr))
return abort(HTTP_BAD_REQUEST, e)
client_pk = request_data.get("public_key")
if client_pk:
try:
rcode = HTTP_OK
available_slots = self.gatekeeper.add_update_user(client_pk)
response = {"public_key": client_pk, "available_slots": available_slots}
except ValueError as e:
rcode = HTTP_BAD_REQUEST
error = "Error {}: {}".format(errors.REGISTRATION_MISSING_FIELD, str(e))
response = {"error": error}
else:
rcode = HTTP_BAD_REQUEST
error = "Error {}: public_key not found in register message".format(errors.REGISTRATION_WRONG_FIELD_FORMAT)
response = {"error": error}
logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode
def add_appointment(self):
"""
Main endpoint of the Watchtower.
The client sends requests (appointments) to this endpoint to request a job to the Watchtower. Requests must be
json encoded and contain an ``appointment`` field and optionally a ``signature`` and ``public_key`` fields.
json encoded and contain an ``appointment`` and ``signature`` fields.
Returns:
:obj:`tuple`: A tuple containing the response (``json``) and response code (``int``). For accepted
appointments, the ``rcode`` is always 0 and the response contains the signed receipt. For rejected
appointments, the ``rcode`` is a negative value and the response contains the error message. Error messages
can be found at :mod:`Errors <teos.errors>`.
:obj:`tuple`: A tuple containing the response (:obj:`str`) and response code (:obj:`int`). For accepted
appointments, the ``rcode`` is always 200 and the response contains the receipt signature (json). For
rejected appointments, the ``rcode`` is a 404 and the value contains an application error, and an error
message. Error messages can be found at :mod:`Errors <teos.errors>`.
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = request.environ.get("HTTP_X_REAL_IP")
if not remote_addr:
remote_addr = request.environ.get("REMOTE_ADDR")
remote_addr = get_remote_addr()
logger.info("Received add_appointment request", from_addr="{}".format(remote_addr))
# FIXME: Logging every request so we can get better understanding of bugs in the alpha
logger.debug("Request details", data="{}".format(request.data))
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
if request.is_json:
# Check content type once if properly defined
request_data = json.loads(request.get_json())
appointment = self.inspector.inspect(
request_data.get("appointment"), request_data.get("signature"), request_data.get("public_key")
)
except TypeError as e:
return abort(HTTP_BAD_REQUEST, e)
error = None
response = None
# We kind of have the chicken an the egg problem here. Data must be verified and the signature must be checked:
# - If we verify the data first, we may encounter that the signature is wrong and wasted some time.
# - If we check the signature first, we may need to verify some of the information or expose to build
# appointments with potentially wrong data, which may be exploitable.
#
# The first approach seems safer since it only implies a bunch of pretty quick checks.
if type(appointment) == Appointment:
appointment_added, signature = self.watcher.add_appointment(appointment)
try:
appointment = self.inspector.inspect(request_data.get("appointment"))
user_pk = self.gatekeeper.identify_user(appointment.serialize(), request_data.get("signature"))
if appointment_added:
rcode = HTTP_OK
response = {"locator": appointment.locator, "signature": signature}
# Check if the appointment is an update. Updates will return a summary.
appointment_uuid = hash_160("{}{}".format(appointment.locator, user_pk))
appointment_summary = self.watcher.get_appointment_summary(appointment_uuid)
else:
rcode = HTTP_SERVICE_UNAVAILABLE
error = "appointment rejected"
if appointment_summary:
used_slots = ceil(appointment_summary.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX)
required_slots = ceil(len(appointment.encrypted_blob.data) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
slot_diff = required_slots - used_slots
elif type(appointment) == tuple:
rcode = HTTP_BAD_REQUEST
error = "appointment rejected. Error {}: {}".format(appointment[0], appointment[1])
# For updates we only reserve the slot difference provided the new one is bigger.
required_slots = slot_diff if slot_diff > 0 else 0
else:
# We should never end up here, since inspect only returns appointments or tuples. Just in case.
rcode = HTTP_BAD_REQUEST
error = "appointment rejected. Request does not match the standard"
# For regular appointments 1 slot is reserved per ENCRYPTED_BLOB_MAX_SIZE_HEX block.
slot_diff = 0
required_slots = ceil(len(appointment.encrypted_blob.data) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
else:
# Slots are reserved before adding the appointments to prevent race conditions.
# DISCUSS: It may be worth using signals here to avoid race conditions anyway.
self.gatekeeper.fill_slots(user_pk, required_slots)
appointment_added, signature = self.watcher.add_appointment(appointment, user_pk)
if appointment_added:
# If the appointment is added and the update is smaller than the original, the difference is given back.
if slot_diff < 0:
self.gatekeeper.free_slots(user_pk, abs(slot_diff))
rcode = HTTP_OK
response = {
"locator": appointment.locator,
"signature": signature,
"available_slots": self.gatekeeper.registered_users[user_pk].get("available_slots"),
}
else:
# If the appointment is not added the reserved slots are given back
self.gatekeeper.free_slots(user_pk, required_slots)
rcode = HTTP_SERVICE_UNAVAILABLE
response = {"error": "appointment rejected"}
except InspectionFailed as e:
rcode = HTTP_BAD_REQUEST
error = "appointment rejected. Request is not json encoded"
response = None
error = "appointment rejected. Error {}: {}".format(e.erno, e.reason)
response = {"error": error}
logger.info(
"Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response, error=error
)
except (IdentificationFailure, NotEnoughSlots):
rcode = HTTP_BAD_REQUEST
error = "appointment rejected. Error {}: {}".format(
errors.APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS,
"Invalid signature or user does not have enough slots available",
)
response = {"error": error}
if error is None:
return jsonify(response), rcode
else:
return jsonify({"error": error}), rcode
logger.info("Sending response and disconnecting", from_addr="{}".format(remote_addr), response=response)
return jsonify(response), rcode
# FIXME: THE NEXT TWO API ENDPOINTS ARE FOR TESTING AND SHOULD BE REMOVED / PROPERLY MANAGED BEFORE PRODUCTION!
# ToDo: #17-add-api-keys
def get_appointment(self):
"""
Gives information about a given appointment state in the Watchtower.
@@ -106,7 +245,9 @@ class API:
The information is requested by ``locator``.
Returns:
:obj:`dict`: A json formatted dictionary containing information about the requested appointment.
:obj:`str`: A json formatted dictionary containing information about the requested appointment.
Returns not found if the user does not have the requested appointment or the locator is invalid.
A ``status`` flag is added to the data provided by either the :obj:`Watcher <teos.watcher.Watcher>` or the
:obj:`Responder <teos.responder.Responder>` that signals the status of the appointment.
@@ -117,44 +258,54 @@ class API:
"""
# Getting the real IP if the server is behind a reverse proxy
remote_addr = request.environ.get("HTTP_X_REAL_IP")
if not remote_addr:
remote_addr = request.environ.get("REMOTE_ADDR")
remote_addr = get_remote_addr()
locator = request.args.get("locator")
response = []
# Check that data type and content are correct. Abort otherwise.
try:
request_data = get_request_data_json(request)
logger.info("Received get_appointment request", from_addr="{}".format(remote_addr), locator=locator)
except TypeError as e:
logger.info("Received invalid get_appointment request", from_addr="{}".format(remote_addr))
return abort(HTTP_BAD_REQUEST, e)
# ToDo: #15-add-system-monitor
if not isinstance(locator, str) or len(locator) != LOCATOR_LEN_HEX:
response.append({"locator": locator, "status": "not_found"})
return jsonify(response)
locator = request_data.get("locator")
locator_map = self.watcher.db_manager.load_locator_map(locator)
triggered_appointments = self.watcher.db_manager.load_all_triggered_flags()
try:
self.inspector.check_locator(locator)
logger.info("Received get_appointment request", from_addr="{}".format(remote_addr), locator=locator)
if locator_map is not None:
for uuid in locator_map:
if uuid not in triggered_appointments:
appointment_data = self.watcher.db_manager.load_watcher_appointment(uuid)
message = "get appointment {}".format(locator).encode()
signature = request_data.get("signature")
user_pk = self.gatekeeper.identify_user(message, signature)
if appointment_data is not None:
appointment_data["status"] = "being_watched"
response.append(appointment_data)
triggered_appointments = self.watcher.db_manager.load_all_triggered_flags()
uuid = hash_160("{}{}".format(locator, user_pk))
tracker_data = self.watcher.db_manager.load_responder_tracker(uuid)
# If the appointment has been triggered, it should be in the locator (default else just in case).
if uuid in triggered_appointments:
appointment_data = self.watcher.db_manager.load_responder_tracker(uuid)
if appointment_data:
rcode = HTTP_OK
response = {"locator": locator, "status": "dispute_responded", "appointment": appointment_data}
else:
rcode = HTTP_NOT_FOUND
response = {"locator": locator, "status": "not_found"}
if tracker_data is not None:
tracker_data["status"] = "dispute_responded"
response.append(tracker_data)
# Otherwise it should be either in the watcher, or not in the system.
else:
appointment_data = self.watcher.db_manager.load_watcher_appointment(uuid)
if appointment_data:
rcode = HTTP_OK
response = {"locator": locator, "status": "being_watched", "appointment": appointment_data}
else:
rcode = HTTP_NOT_FOUND
response = {"locator": locator, "status": "not_found"}
else:
response.append({"locator": locator, "status": "not_found"})
except (InspectionFailed, IdentificationFailure):
rcode = HTTP_NOT_FOUND
response = {"locator": locator, "status": "not_found"}
response = jsonify(response)
return response
return jsonify(response), rcode
def get_all_appointments(self):
"""
@@ -163,10 +314,8 @@ class API:
This endpoint should only be accessible by the administrator. Requests are only allowed from localhost.
Returns:
:obj:`dict`: A json formatted dictionary containing all the appointments hold by the
:obj:`Watcher <teos.watcher.Watcher>` (``watcher_appointments``) and by the
:obj:`Responder <teos.responder.Responder>` (``responder_trackers``).
:obj:`str`: A json formatted dictionary containing all the appointments hold by the ``Watcher``
(``watcher_appointments``) and by the ``Responder>`` (``responder_trackers``).
"""
# ToDo: #15-add-system-monitor
@@ -185,19 +334,10 @@ class API:
def start(self):
"""
This function starts the Flask server used to run the API. Adds all the routes to the functions listed above.
This function starts the Flask server used to run the API.
"""
routes = {
"/": (self.add_appointment, ["POST"]),
"/get_appointment": (self.get_appointment, ["GET"]),
"/get_all_appointments": (self.get_all_appointments, ["GET"]),
}
for url, params in routes.items():
app.add_url_rule(url, view_func=params[0], methods=params[1])
# Setting Flask log to ERROR only so it does not mess with out logging. Also disabling flask initial messages
# Setting Flask log to ERROR only so it does not mess with our logging. Also disabling flask initial messages
logging.getLogger("werkzeug").setLevel(logging.ERROR)
os.environ["WERKZEUG_RUN_MAIN"] = "true"

508
teos/appointments_dbm.py Normal file
View File

@@ -0,0 +1,508 @@
import json
import plyvel
from teos.db_manager import DBManager
from teos import LOG_PREFIX
from common.logger import Logger
logger = Logger(actor="AppointmentsDBM", log_name_prefix=LOG_PREFIX)
WATCHER_PREFIX = "w"
WATCHER_LAST_BLOCK_KEY = "bw"
RESPONDER_PREFIX = "r"
RESPONDER_LAST_BLOCK_KEY = "br"
LOCATOR_MAP_PREFIX = "m"
TRIGGERED_APPOINTMENTS_PREFIX = "ta"
class AppointmentsDBM(DBManager):
"""
The :class:`AppointmentsDBM` is in charge of interacting with the appointments database (``LevelDB``).
Keys and values are stored as bytes in the database but processed as strings by the manager.
The database is split in six prefixes:
- ``WATCHER_PREFIX``, defined as ``b'w``, is used to store :obj:`Watcher <teos.watcher.Watcher>` appointments.
- ``RESPONDER_PREFIX``, defines as ``b'r``, is used to store :obj:`Responder <teos.responder.Responder>` trackers.
- ``WATCHER_LAST_BLOCK_KEY``, defined as ``b'bw``, is used to store the last block hash known by the :obj:`Watcher <teos.watcher.Watcher>`.
- ``RESPONDER_LAST_BLOCK_KEY``, defined as ``b'br``, is used to store the last block hash known by the :obj:`Responder <teos.responder.Responder>`.
- ``LOCATOR_MAP_PREFIX``, defined as ``b'm``, is used to store the ``locator:uuid`` maps.
- ``TRIGGERED_APPOINTMENTS_PREFIX``, defined as ``b'ta``, is used to stored triggered appointments (appointments that have been handed to the :obj:`Responder <teos.responder.Responder>`.)
Args:
db_path (:obj:`str`): the path (relative or absolute) to the system folder containing the database. A fresh
database will be created if the specified path does not contain one.
Raises:
:obj:`ValueError`: If the provided ``db_path`` is not a string.
:obj:`plyvel.Error`: If the db is currently unavailable (being used by another process).
"""
def __init__(self, db_path):
if not isinstance(db_path, str):
raise ValueError("db_path must be a valid path/name")
try:
super().__init__(db_path)
except plyvel.Error as e:
if "LOCK: Resource temporarily unavailable" in str(e):
logger.info("The db is already being used by another process (LOCK)")
raise e
def load_appointments_db(self, prefix):
"""
Loads all data from the appointments database given a prefix. Two prefixes are defined: ``WATCHER_PREFIX`` and
``RESPONDER_PREFIX``.
Args:
prefix (:obj:`str`): the prefix of the data to load.
Returns:
:obj:`dict`: A dictionary containing the requested data (appointments or trackers) indexed by ``uuid``.
Returns an empty dictionary if no data is found.
"""
data = {}
for k, v in self.db.iterator(prefix=prefix.encode("utf-8")):
# Get uuid and appointment_data from the db
uuid = k[len(prefix) :].decode("utf-8")
data[uuid] = json.loads(v)
return data
def get_last_known_block(self, key):
"""
Loads the last known block given a key.
Args:
key (:obj:`str`): the identifier of the db to look into (either ``WATCHER_LAST_BLOCK_KEY`` or
``RESPONDER_LAST_BLOCK_KEY``).
Returns:
:obj:`str` or :obj:`None`: A 16-byte hex-encoded str representing the last known block hash.
Returns ``None`` if the entry is not found.
"""
last_block = self.db.get(key.encode("utf-8"))
if last_block:
last_block = last_block.decode("utf-8")
return last_block
def load_watcher_appointment(self, uuid):
"""
Loads an appointment from the database using ``WATCHER_PREFIX`` as prefix to the given ``uuid``.
Args:
uuid (:obj:`str`): the appointment's unique identifier.
Returns:
:obj:`dict`: A dictionary containing the appointment data if they ``key`` is found.
Returns ``None`` otherwise.
"""
try:
data = self.load_entry(uuid, prefix=WATCHER_PREFIX)
data = json.loads(data)
except (TypeError, json.decoder.JSONDecodeError):
data = None
return data
def load_responder_tracker(self, uuid):
"""
Loads a tracker from the database using ``RESPONDER_PREFIX`` as a prefix to the given ``uuid``.
Args:
uuid (:obj:`str`): the tracker's unique identifier.
Returns:
:obj:`dict`: A dictionary containing the tracker data if they ``key`` is found.
Returns ``None`` otherwise.
"""
try:
data = self.load_entry(uuid, prefix=RESPONDER_PREFIX)
data = json.loads(data)
except (TypeError, json.decoder.JSONDecodeError):
data = None
return data
def load_watcher_appointments(self, include_triggered=False):
"""
Loads all the appointments from the database (all entries with the ``WATCHER_PREFIX`` prefix).
Args:
include_triggered (:obj:`bool`): whether to include the appointments flagged as triggered or not. ``False``
by default.
Returns:
:obj:`dict`: A dictionary with all the appointments stored in the database. An empty dictionary if there
are none.
"""
appointments = self.load_appointments_db(prefix=WATCHER_PREFIX)
triggered_appointments = self.load_all_triggered_flags()
if not include_triggered:
not_triggered = list(set(appointments.keys()).difference(triggered_appointments))
appointments = {uuid: appointments[uuid] for uuid in not_triggered}
return appointments
def load_responder_trackers(self):
"""
Loads all the trackers from the database (all entries with the ``RESPONDER_PREFIX`` prefix).
Returns:
:obj:`dict`: A dictionary with all the trackers stored in the database. An empty dictionary is there are
none.
"""
return self.load_appointments_db(prefix=RESPONDER_PREFIX)
def store_watcher_appointment(self, uuid, appointment):
"""
Stores an appointment in the database using the ``WATCHER_PREFIX`` prefix.
Args:
uuid (:obj:`str`): the identifier of the appointment to be stored.
appointment (:obj:`dict`): an appointment encoded as dictionary.
Returns:
:obj:`bool`: True if the appointment was stored in the db. False otherwise.
"""
try:
self.create_entry(uuid, json.dumps(appointment), prefix=WATCHER_PREFIX)
logger.info("Adding appointment to Watchers's db", uuid=uuid)
return True
except json.JSONDecodeError:
logger.info("Could't add appointment to db. Wrong appointment format.", uuid=uuid, appoinent=appointment)
return False
except TypeError:
logger.info("Could't add appointment to db.", uuid=uuid, appoinent=appointment)
return False
def store_responder_tracker(self, uuid, tracker):
"""
Stores a tracker in the database using the ``RESPONDER_PREFIX`` prefix.
Args:
uuid (:obj:`str`): the identifier of the appointment to be stored.
tracker (:obj:`dict`): a tracker encoded as dictionary.
Returns:
:obj:`bool`: True if the tracker was stored in the db. False otherwise.
"""
try:
self.create_entry(uuid, json.dumps(tracker), prefix=RESPONDER_PREFIX)
logger.info("Adding tracker to Responder's db", uuid=uuid)
return True
except json.JSONDecodeError:
logger.info("Could't add tracker to db. Wrong tracker format.", uuid=uuid, tracker=tracker)
return False
except TypeError:
logger.info("Could't add tracker to db.", uuid=uuid, tracker=tracker)
return False
def load_locator_map(self, locator):
"""
Loads the ``locator:uuid`` map of a given ``locator`` from the database.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string representing the appointment locator.
Returns:
:obj:`dict` or :obj:`None`: The requested ``locator:uuid`` map if found.
Returns ``None`` otherwise.
"""
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
locator_map = self.db.get(key)
if locator_map is not None:
locator_map = json.loads(locator_map.decode("utf-8"))
else:
logger.info("Locator not found in the db", locator=locator)
return locator_map
def create_append_locator_map(self, locator, uuid):
"""
Creates (or appends to if already exists) a ``locator:uuid`` map.
If the map already exists, the new ``uuid`` is appended to the existing ones (if it is not already there).
Args:
locator (:obj:`str`): a 16-byte hex-encoded string used as the key of the map.
uuid (:obj:`str`): a 16-byte hex-encoded unique id to create (or add to) the map.
"""
locator_map = self.load_locator_map(locator)
if locator_map is not None:
if uuid not in locator_map:
locator_map.append(uuid)
logger.info("Updating locator map", locator=locator, uuid=uuid)
else:
logger.info("UUID already in the map", locator=locator, uuid=uuid)
else:
locator_map = [uuid]
logger.info("Creating new locator map", locator=locator, uuid=uuid)
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
self.db.put(key, json.dumps(locator_map).encode("utf-8"))
def update_locator_map(self, locator, locator_map):
"""
Updates a ``locator:uuid`` map in the database by deleting one of it's uuid. It will only work as long as
the given ``locator_map`` is a subset of the current one and it's not empty.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string used as the key of the map.
locator_map (:obj:`list`): a list of uuids to replace the current one on the db.
"""
current_locator_map = self.load_locator_map(locator)
if set(locator_map).issubset(current_locator_map) and len(locator_map) != 0:
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
self.db.put(key, json.dumps(locator_map).encode("utf-8"))
else:
logger.error("Trying to update a locator_map with completely different, or empty, data")
def delete_locator_map(self, locator):
"""
Deletes a ``locator:uuid`` map.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string identifying the map to delete.
Returns:
:obj:`bool`: True if the locator map was deleted from the database or it was non-existent, False otherwise.
"""
try:
self.delete_entry(locator, prefix=LOCATOR_MAP_PREFIX)
logger.info("Deleting locator map from db", locator=locator)
return True
except TypeError:
logger.info("Couldn't delete locator map from db, locator has wrong type", locator=locator)
return False
def delete_watcher_appointment(self, uuid):
"""
Deletes an appointment from the database.
Args:
uuid (:obj:`str`): a 16-byte hex-encoded string identifying the appointment to be deleted.
Returns:
:obj:`bool`: True if the appointment was deleted from the database or it was non-existent, False otherwise.
"""
try:
self.delete_entry(uuid, prefix=WATCHER_PREFIX)
logger.info("Deleting appointment from Watcher's db", uuid=uuid)
return True
except TypeError:
logger.info("Couldn't delete appointment from db, uuid has wrong type", uuid=uuid)
return False
def batch_delete_watcher_appointments(self, uuids):
"""
Deletes an appointment from the database.
Args:
uuids (:obj:`list`): a list of 16-byte hex-encoded strings identifying the appointments to be deleted.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((WATCHER_PREFIX + uuid).encode("utf-8"))
logger.info("Deleting appointment from Watcher's db", uuid=uuid)
def delete_responder_tracker(self, uuid):
"""
Deletes a tracker from the database.
Args:
uuid (:obj:`str`): a 16-byte hex-encoded string identifying the tracker to be deleted.
Returns:
:obj:`bool`: True if the tracker was deleted from the database or it was non-existent, False otherwise.
"""
try:
self.delete_entry(uuid, prefix=RESPONDER_PREFIX)
logger.info("Deleting tracker from Responder's db", uuid=uuid)
return True
except TypeError:
logger.info("Couldn't delete tracker from db, uuid has wrong type", uuid=uuid)
return False
def batch_delete_responder_trackers(self, uuids):
"""
Deletes an appointment from the database.
Args:
uuids (:obj:`list`): a list of 16-byte hex-encoded strings identifying the trackers to be deleted.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((RESPONDER_PREFIX + uuid).encode("utf-8"))
logger.info("Deleting appointment from Responder's db", uuid=uuid)
def load_last_block_hash_watcher(self):
"""
Loads the last known block hash of the :obj:`Watcher <teos.watcher.Watcher>` from the database.
Returns:
:obj:`str` or :obj:`None`: A 32-byte hex-encoded string representing the last known block hash if found.
Returns ``None`` otherwise.
"""
return self.get_last_known_block(WATCHER_LAST_BLOCK_KEY)
def load_last_block_hash_responder(self):
"""
Loads the last known block hash of the :obj:`Responder <teos.responder.Responder>` from the database.
Returns:
:obj:`str` or :obj:`None`: A 32-byte hex-encoded string representing the last known block hash if found.
Returns ``None`` otherwise.
"""
return self.get_last_known_block(RESPONDER_LAST_BLOCK_KEY)
def store_last_block_hash_watcher(self, block_hash):
"""
Stores a block hash as the last known block of the :obj:`Watcher <teos.watcher.Watcher>`.
Args:
block_hash (:obj:`str`): the block hash to be stored (32-byte hex-encoded)
Returns:
:obj:`bool`: True if the block hash was stored in the db. False otherwise.
"""
try:
self.create_entry(WATCHER_LAST_BLOCK_KEY, block_hash)
return True
except (TypeError, json.JSONDecodeError):
return False
def store_last_block_hash_responder(self, block_hash):
"""
Stores a block hash as the last known block of the :obj:`Responder <teos.responder.Responder>`.
Args:
block_hash (:obj:`str`): the block hash to be stored (32-byte hex-encoded)
Returns:
:obj:`bool`: True if the block hash was stored in the db. False otherwise.
"""
try:
self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash)
return True
except (TypeError, json.JSONDecodeError):
return False
def create_triggered_appointment_flag(self, uuid):
"""
Creates a flag that signals that an appointment has been triggered.
Args:
uuid (:obj:`str`): the identifier of the flag to be created.
"""
self.db.put((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"), "".encode("utf-8"))
logger.info("Flagging appointment as triggered", uuid=uuid)
def batch_create_triggered_appointment_flag(self, uuids):
"""
Creates a flag that signals that an appointment has been triggered for every appointment in the given list
Args:
uuids (:obj:`list`): a list of identifiers for the appointments to flag.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.put((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"), b"")
logger.info("Flagging appointment as triggered", uuid=uuid)
def load_all_triggered_flags(self):
"""
Loads all the appointment triggered flags from the database.
Returns:
:obj:`list`: a list of all the uuids of the triggered appointments.
"""
return [
k.decode()[len(TRIGGERED_APPOINTMENTS_PREFIX) :]
for k, v in self.db.iterator(prefix=TRIGGERED_APPOINTMENTS_PREFIX.encode("utf-8"))
]
def delete_triggered_appointment_flag(self, uuid):
"""
Deletes a flag that signals that an appointment has been triggered.
Args:
uuid (:obj:`str`): the identifier of the flag to be removed.
Returns:
:obj:`bool`: True if the flag was deleted from the database or it was non-existent, False otherwise.
"""
try:
self.delete_entry(uuid, prefix=TRIGGERED_APPOINTMENTS_PREFIX)
logger.info("Removing triggered flag from appointment appointment", uuid=uuid)
return True
except TypeError:
logger.info("Couldn't delete triggered flag from db, uuid has wrong type", uuid=uuid)
return False
def batch_delete_triggered_appointment_flag(self, uuids):
"""
Deletes a list of flag signaling that some appointment have been triggered.
Args:
uuids (:obj:`list`): the identifier of the flag to be removed.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"))
logger.info("Removing triggered flag from appointment appointment", uuid=uuid)

View File

@@ -26,7 +26,11 @@ class Builder:
locator_uuid_map = {}
for uuid, data in appointments_data.items():
appointments[uuid] = {"locator": data.get("locator"), "end_time": data.get("end_time")}
appointments[uuid] = {
"locator": data.get("locator"),
"end_time": data.get("end_time"),
"size": len(data.get("encrypted_blob")),
}
if data.get("locator") in locator_uuid_map:
locator_uuid_map[data.get("locator")].append(uuid)
@@ -94,8 +98,9 @@ class Builder:
@staticmethod
def update_states(watcher, missed_blocks_watcher, missed_blocks_responder):
"""
Updates the states of both the :mod:`Watcher <teos.watcher.Watcher>` and the :mod:`Responder <teos.responder.Responder>`.
If both have pending blocks to process they need to be updates at the same time, block by block.
Updates the states of both the :mod:`Watcher <teos.watcher.Watcher>` and the
:mod:`Responder <teos.responder.Responder>`. If both have pending blocks to process they need to be updated at
the same time, block by block.
If only one instance has to be updated, ``populate_block_queue`` should be used.

View File

@@ -1,7 +1,7 @@
from teos import LOG_PREFIX
from teos.rpc_errors import *
from common.logger import Logger
from teos.tools import bitcoin_cli
import teos.rpc_errors as rpc_errors
from teos.utils.auth_proxy import JSONRPCException
from teos.errors import UNKNOWN_JSON_RPC_EXCEPTION, RPC_TX_REORGED_AFTER_BROADCAST
@@ -36,12 +36,12 @@ class Receipt:
class Carrier:
"""
The :class:`Carrier` is the class in charge of interacting with ``bitcoind`` to send/get transactions. It uses
:obj:`Receipt` objects to report about the sending outcome.
The :class:`Carrier` is in charge of interacting with ``bitcoind`` to send/get transactions. It uses :obj:`Receipt`
objects to report about the sending outcome.
Args:
btc_connect_params (:obj:`dict`): a dictionary with the parameters to connect to bitcoind
(rpc user, rpc passwd, host and port)
(rpc user, rpc password, host and port)
Attributes:
issued_receipts (:obj:`dict`): a dictionary of issued receipts to prevent resending the same transaction over
@@ -81,17 +81,17 @@ class Carrier:
except JSONRPCException as e:
errno = e.error.get("code")
# Since we're pushing a raw transaction to the network we can face several rejections
if errno == RPC_VERIFY_REJECTED:
if errno == rpc_errors.RPC_VERIFY_REJECTED:
# DISCUSS: 37-transaction-rejection
receipt = Receipt(delivered=False, reason=RPC_VERIFY_REJECTED)
receipt = Receipt(delivered=False, reason=rpc_errors.RPC_VERIFY_REJECTED)
logger.error("Transaction couldn't be broadcast", error=e.error)
elif errno == RPC_VERIFY_ERROR:
elif errno == rpc_errors.RPC_VERIFY_ERROR:
# DISCUSS: 37-transaction-rejection
receipt = Receipt(delivered=False, reason=RPC_VERIFY_ERROR)
receipt = Receipt(delivered=False, reason=rpc_errors.RPC_VERIFY_ERROR)
logger.error("Transaction couldn't be broadcast", error=e.error)
elif errno == RPC_VERIFY_ALREADY_IN_CHAIN:
elif errno == rpc_errors.RPC_VERIFY_ALREADY_IN_CHAIN:
logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid)
# If the transaction is already in the chain, we get the number of confirmations and watch the tracker
@@ -100,7 +100,9 @@ class Carrier:
if tx_info is not None:
confirmations = int(tx_info.get("confirmations"))
receipt = Receipt(delivered=True, confirmations=confirmations, reason=RPC_VERIFY_ALREADY_IN_CHAIN)
receipt = Receipt(
delivered=True, confirmations=confirmations, reason=rpc_errors.RPC_VERIFY_ALREADY_IN_CHAIN
)
else:
# There's a really unlikely edge case where a transaction can be reorged between receiving the
@@ -108,12 +110,12 @@ class Carrier:
# mempool, which again is really unlikely.
receipt = Receipt(delivered=False, reason=RPC_TX_REORGED_AFTER_BROADCAST)
elif errno == RPC_DESERIALIZATION_ERROR:
elif errno == rpc_errors.RPC_DESERIALIZATION_ERROR:
# Adding this here just for completeness. We should never end up here. The Carrier only sends txs
# handed by the Responder, who receives them from the Watcher, who checks that the tx can be properly
# deserialized
logger.info("Transaction cannot be deserialized".format(txid))
receipt = Receipt(delivered=False, reason=RPC_DESERIALIZATION_ERROR)
receipt = Receipt(delivered=False, reason=rpc_errors.RPC_DESERIALIZATION_ERROR)
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
@@ -133,23 +135,22 @@ class Carrier:
Returns:
:obj:`dict` or :obj:`None`: A dictionary with the transaction data if the transaction can be found on the
chain.
Returns ``None`` otherwise.
chain. ``None`` otherwise.
"""
try:
tx_info = bitcoin_cli(self.btc_connect_params).getrawtransaction(txid, 1)
return tx_info
except JSONRPCException as e:
tx_info = None
# While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the tracker
if e.error.get("code") == RPC_INVALID_ADDRESS_OR_KEY:
# reorged while we were querying bitcoind to get the confirmation count. In that case we just restart
# the tracker
if e.error.get("code") == rpc_errors.RPC_INVALID_ADDRESS_OR_KEY:
logger.info("Transaction not found in mempool nor blockchain", txid=txid)
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logger.error("JSONRPCException", method="Carrier.get_transaction", error=e.error)
return tx_info
return None

View File

@@ -10,8 +10,8 @@ logger = Logger(actor="ChainMonitor", log_name_prefix=LOG_PREFIX)
class ChainMonitor:
"""
The :class:`ChainMonitor` is the class in charge of monitoring the blockchain (via ``bitcoind``) to detect new
blocks on top of the best chain. If a new best block is spotted, the chain monitor will notify the
The :class:`ChainMonitor` is in charge of monitoring the blockchain (via ``bitcoind``) to detect new blocks on top
of the best chain. If a new best block is spotted, the chain monitor will notify the
:obj:`Watcher <teos.watcher.Watcher>` and the :obj:`Responder <teos.responder.Responder>` using ``Queues``.
The :class:`ChainMonitor` monitors the chain using two methods: ``zmq`` and ``polling``. Blocks are only notified
@@ -34,7 +34,6 @@ class ChainMonitor:
watcher_queue (:obj:`Queue`): a queue to send new best tips to the :obj:`Watcher <teos.watcher.Watcher>`.
responder_queue (:obj:`Queue`): a queue to send new best tips to the
:obj:`Responder <teos.responder.Responder>`.
polling_delta (:obj:`int`): time between polls (in seconds).
max_block_window_size (:obj:`int`): max size of last_tips.
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a blockProcessor instance.
@@ -75,7 +74,6 @@ class ChainMonitor:
Args:
block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
block_hash (:obj:`str`): the new block hash to be sent to the subscribers.
"""
self.watcher_queue.put(block_hash)
@@ -90,7 +88,7 @@ class ChainMonitor:
block_hash (:obj:`block_hash`): the new best tip.
Returns:
(:obj:`bool`): ``True`` is the state was successfully updated, ``False`` otherwise.
:obj:`bool`: True is the state was successfully updated, False otherwise.
"""
if block_hash != self.best_tip and block_hash not in self.last_tips:

View File

@@ -7,7 +7,7 @@ logger = Logger(actor="Cleaner", log_name_prefix=LOG_PREFIX)
class Cleaner:
"""
The :class:`Cleaner` is the class in charge of removing expired/completed data from the tower.
The :class:`Cleaner` is in charge of removing expired/completed data from the tower.
Mutable objects (like dicts) are passed-by-reference in Python, so no return is needed for the Cleaner.
"""
@@ -15,15 +15,16 @@ class Cleaner:
@staticmethod
def delete_appointment_from_memory(uuid, appointments, locator_uuid_map):
"""
Deletes an appointment from memory (appointments and locator_uuid_map dictionaries). If the given appointment
does not share locator with any other, the map will completely removed, otherwise, the uuid will be removed from
the map.
Deletes an appointment from memory (``appointments`` and ``locator_uuid_map`` dictionaries). If the given
appointment does not share locator with any other, the map will completely removed, otherwise, the uuid will be
removed from the map.
Args:
uuid (:obj:`str`): the identifier of the appointment to be deleted.
appointments (:obj:`dict`): the appointments dictionary from where the appointment should be removed.
locator_uuid_map (:obj:`dict`): the locator:uuid map from where the appointment should also be removed.
"""
locator = appointments[uuid].get("locator")
# Delete the appointment
@@ -43,8 +44,8 @@ class Cleaner:
Args:
uuid (:obj:`str`): the identifier of the appointment to be deleted.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
db_manager.delete_watcher_appointment(uuid)
@@ -61,8 +62,8 @@ class Cleaner:
Args:
uuids (:obj:`list`): a list of identifiers to be removed from the map.
locator (:obj:`str`): the identifier of the map to be either updated or deleted.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
locator_map = db_manager.load_locator_map(locator)
@@ -95,8 +96,8 @@ class Cleaner:
appointments.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map for the :obj:`Watcher <teos.watcher.Watcher>`
appointments.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
locator_maps_to_update = {}
@@ -123,8 +124,9 @@ class Cleaner:
"""
Deletes a completed appointment from memory (:obj:`Watcher <teos.watcher.Watcher>`) and disk.
Currently, an appointment is only completed if it cannot make it to the (:obj:`Responder <teos.responder.Responder>`),
otherwise, it will be flagged as triggered and removed once the tracker is completed.
Currently, an appointment is only completed if it cannot make it to the
(:obj:`Responder <teos.responder.Responder>`), otherwise, it will be flagged as triggered and removed once the
tracker is completed.
Args:
completed_appointments (:obj:`list`): a list of appointments to be deleted.
@@ -132,9 +134,10 @@ class Cleaner:
appointments.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map for the :obj:`Watcher <teos.watcher.Watcher>`
appointments.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
locator_maps_to_update = {}
for uuid in completed_appointments:
@@ -160,7 +163,7 @@ class Cleaner:
@staticmethod
def flag_triggered_appointments(triggered_appointments, appointments, locator_uuid_map, db_manager):
"""
Deletes a list of triggered appointment from memory (:obj:`Watcher <teos.watcher.Watcher>`) and flags them as
Deletes a list of triggered appointment from memory (:obj:`Watcher <teos.watcher.Watcher>`) and flags them as
triggered on disk.
Args:
@@ -169,8 +172,8 @@ class Cleaner:
appointments.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map for the :obj:`Watcher <teos.watcher.Watcher>`
appointments.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
for uuid in triggered_appointments:
@@ -190,8 +193,8 @@ class Cleaner:
<teos.responder.Responder>` trackers.
completed_trackers (:obj:`dict`): a dict of completed trackers to be deleted (uuid:confirmations).
height (:obj:`int`): the block height at which the trackers were completed.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
"""
locator_maps_to_update = {}

View File

@@ -1,34 +1,11 @@
import json
import plyvel
from teos import LOG_PREFIX
from common.logger import Logger
logger = Logger(actor="DBManager", log_name_prefix=LOG_PREFIX)
WATCHER_PREFIX = "w"
WATCHER_LAST_BLOCK_KEY = "bw"
RESPONDER_PREFIX = "r"
RESPONDER_LAST_BLOCK_KEY = "br"
LOCATOR_MAP_PREFIX = "m"
TRIGGERED_APPOINTMENTS_PREFIX = "ta"
class DBManager:
"""
The :class:`DBManager` is the class in charge of interacting with the appointments database (``LevelDB``).
The :class:`DBManager` is in charge of interacting with a database (``LevelDB``).
Keys and values are stored as bytes in the database but processed as strings by the manager.
The database is split in six prefixes:
- ``WATCHER_PREFIX``, defined as ``b'w``, is used to store :obj:`Watcher <teos.watcher.Watcher>` appointments.
- ``RESPONDER_PREFIX``, defines as ``b'r``, is used to store :obj:`Responder <teos.responder.Responder>` trackers.
- ``WATCHER_LAST_BLOCK_KEY``, defined as ``b'bw``, is used to store the last block hash known by the :obj:`Watcher <teos.watcher.Watcher>`.
- ``RESPONDER_LAST_BLOCK_KEY``, defined as ``b'br``, is used to store the last block hash known by the :obj:`Responder <teos.responder.Responder>`.
- ``LOCATOR_MAP_PREFIX``, defined as ``b'm``, is used to store the ``locator:uuid`` maps.
- ``TRIGGERED_APPOINTMENTS_PREFIX``, defined as ``b'ta``, is used to stored triggered appointments (appointments that have been handed to the :obj:`Responder <teos.responder.Responder>`.)
Args:
db_path (:obj:`str`): the path (relative or absolute) to the system folder containing the database. A fresh
database will be create if the specified path does not contain one.
@@ -42,57 +19,7 @@ class DBManager:
if not isinstance(db_path, str):
raise ValueError("db_path must be a valid path/name")
try:
self.db = plyvel.DB(db_path)
except plyvel.Error as e:
if "create_if_missing is false" in str(e):
logger.info("No db found. Creating a fresh one")
self.db = plyvel.DB(db_path, create_if_missing=True)
elif "LOCK: Resource temporarily unavailable" in str(e):
logger.info("The db is already being used by another process (LOCK)")
raise e
def load_appointments_db(self, prefix):
"""
Loads all data from the appointments database given a prefix. Two prefixes are defined: ``WATCHER_PREFIX`` and
``RESPONDER_PREFIX``.
Args:
prefix (:obj:`str`): the prefix of the data to load.
Returns:
:obj:`dict`: A dictionary containing the requested data (appointments or trackers) indexed by ``uuid``.
Returns an empty dictionary if no data is found.
"""
data = {}
for k, v in self.db.iterator(prefix=prefix.encode("utf-8")):
# Get uuid and appointment_data from the db
uuid = k[len(prefix) :].decode("utf-8")
data[uuid] = json.loads(v)
return data
def get_last_known_block(self, key):
"""
Loads the last known block given a key (either ``WATCHER_LAST_BLOCK_KEY`` or ``RESPONDER_LAST_BLOCK_KEY``).
Returns:
:obj:`str` or :obj:`None`: A 16-byte hex-encoded str representing the last known block hash.
Returns ``None`` if the entry is not found.
"""
last_block = self.db.get(key.encode("utf-8"))
if last_block:
last_block = last_block.decode("utf-8")
return last_block
self.db = plyvel.DB(db_path, create_if_missing=True)
def create_entry(self, key, value, prefix=None):
"""
@@ -102,8 +29,20 @@ class DBManager:
key (:obj:`str`): the key of the new entry, used to identify it.
value (:obj:`str`): the data stored under the given ``key``.
prefix (:obj:`str`): an optional prefix added to the ``key``.
Raises:
(:obj:`TypeError`) if key, value or prefix are not strings.
"""
if not isinstance(key, str):
raise TypeError("Key must be str")
if not isinstance(value, str):
raise TypeError("Value must be str")
if not isinstance(prefix, str) and prefix is not None:
raise TypeError("Prefix (if set) must be str")
if isinstance(prefix, str):
key = prefix + key
@@ -112,348 +51,55 @@ class DBManager:
self.db.put(key, value)
def load_entry(self, key):
def load_entry(self, key, prefix=None):
"""
Loads an entry from the database given a ``key``.
Loads an entry from the database given a ``key`` (and optionally a ``prefix``).
Args:
key (:obj:`str`): the key that identifies the entry to be loaded.
prefix (:obj:`str`): an optional prefix added to the ``key``.
Returns:
:obj:`dict` or :obj:`None`: A dictionary containing the requested data (an appointment or a tracker).
:obj:`bytes` or :obj:`None`: A byte-array containing the requested data.
Returns ``None`` if the entry is not found.
Raises:
(:obj:`TypeError`) if key or prefix are not strings.
"""
data = self.db.get(key.encode("utf-8"))
data = json.loads(data) if data is not None else data
return data
if not isinstance(key, str):
raise TypeError("Key must be str")
if not isinstance(prefix, str) and prefix is not None:
raise TypeError("Prefix (if set) must be str")
if isinstance(prefix, str):
key = prefix + key
return self.db.get(key.encode("utf-8"))
def delete_entry(self, key, prefix=None):
"""
Deletes an entry from the database given an ``key`` (and optionally a ``prefix``)
Deletes an entry from the database given an ``key`` (and optionally a ``prefix``).
Args:
key (:obj:`str`): the key that identifies the data to be deleted.
prefix (:obj:`str`): an optional prefix to be prepended to the ``key``.
Raises:
(:obj:`TypeError`) if key or prefix are not strings.
"""
if not isinstance(key, str):
raise TypeError("Key must be str")
if not isinstance(prefix, str) and prefix is not None:
raise TypeError("Prefix (if set) must be str")
if isinstance(prefix, str):
key = prefix + key
key = key.encode("utf-8")
self.db.delete(key)
def load_watcher_appointment(self, key):
"""
Loads an appointment from the database using ``WATCHER_PREFIX`` as prefix to the given ``key``.
Returns:
:obj:`dict`: A dictionary containing the appointment data if they ``key`` is found.
Returns ``None`` otherwise.
"""
return self.load_entry(WATCHER_PREFIX + key)
def load_responder_tracker(self, key):
"""
Loads a tracker from the database using ``RESPONDER_PREFIX`` as a prefix to the given ``key``.
Returns:
:obj:`dict`: A dictionary containing the tracker data if they ``key`` is found.
Returns ``None`` otherwise.
"""
return self.load_entry(RESPONDER_PREFIX + key)
def load_watcher_appointments(self, include_triggered=False):
"""
Loads all the appointments from the database (all entries with the ``WATCHER_PREFIX`` prefix).
Args:
include_triggered (:obj:`bool`): Whether to include the appointments flagged as triggered or not. ``False``
by default.
Returns:
:obj:`dict`: A dictionary with all the appointments stored in the database. An empty dictionary is there
are none.
"""
appointments = self.load_appointments_db(prefix=WATCHER_PREFIX)
triggered_appointments = self.load_all_triggered_flags()
if not include_triggered:
not_triggered = list(set(appointments.keys()).difference(triggered_appointments))
appointments = {uuid: appointments[uuid] for uuid in not_triggered}
return appointments
def load_responder_trackers(self):
"""
Loads all the trackers from the database (all entries with the ``RESPONDER_PREFIX`` prefix).
Returns:
:obj:`dict`: A dictionary with all the trackers stored in the database. An empty dictionary is there are
none.
"""
return self.load_appointments_db(prefix=RESPONDER_PREFIX)
def store_watcher_appointment(self, uuid, appointment):
"""
Stores an appointment in the database using the ``WATCHER_PREFIX`` prefix.
Args:
uuid (:obj:`str`): the identifier of the appointment to be stored.
appointment (:obj: `str`): the json encoded appointment to be stored as data.
"""
self.create_entry(uuid, appointment, prefix=WATCHER_PREFIX)
logger.info("Adding appointment to Watchers's db", uuid=uuid)
def store_responder_tracker(self, uuid, tracker):
"""
Stores a tracker in the database using the ``RESPONDER_PREFIX`` prefix.
Args:
uuid (:obj:`str`): the identifier of the appointment to be stored.
tracker (:obj: `str`): the json encoded tracker to be stored as data.
"""
self.create_entry(uuid, tracker, prefix=RESPONDER_PREFIX)
logger.info("Adding appointment to Responder's db", uuid=uuid)
def load_locator_map(self, locator):
"""
Loads the ``locator:uuid`` map of a given ``locator`` from the database.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string representing the appointment locator.
Returns:
:obj:`dict` or :obj:`None`: The requested ``locator:uuid`` map if found.
Returns ``None`` otherwise.
"""
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
locator_map = self.db.get(key)
if locator_map is not None:
locator_map = json.loads(locator_map.decode("utf-8"))
else:
logger.info("Locator not found in the db", locator=locator)
return locator_map
def create_append_locator_map(self, locator, uuid):
"""
Creates (or appends to if already exists) a ``locator:uuid`` map.
If the map already exists, the new ``uuid`` is appended to the existing ones (if it is not already there).
Args:
locator (:obj:`str`): a 16-byte hex-encoded string used as the key of the map.
uuid (:obj:`str`): a 16-byte hex-encoded unique id to create (or add to) the map.
"""
locator_map = self.load_locator_map(locator)
if locator_map is not None:
if uuid not in locator_map:
locator_map.append(uuid)
logger.info("Updating locator map", locator=locator, uuid=uuid)
else:
logger.info("UUID already in the map", locator=locator, uuid=uuid)
else:
locator_map = [uuid]
logger.info("Creating new locator map", locator=locator, uuid=uuid)
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
self.db.put(key, json.dumps(locator_map).encode("utf-8"))
def update_locator_map(self, locator, locator_map):
"""
Updates a ``locator:uuid`` map in the database by deleting one of it's uuid. It will only work as long as
the given ``locator_map`` is a subset of the current one and it's not empty.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string used as the key of the map.
locator_map (:obj:`list`): a list of uuids to replace the current one on the db.
"""
current_locator_map = self.load_locator_map(locator)
if set(locator_map).issubset(current_locator_map) and len(locator_map) is not 0:
key = (LOCATOR_MAP_PREFIX + locator).encode("utf-8")
self.db.put(key, json.dumps(locator_map).encode("utf-8"))
else:
logger.error("Trying to update a locator_map with completely different, or empty, data")
def delete_locator_map(self, locator):
"""
Deletes a ``locator:uuid`` map.
Args:
locator (:obj:`str`): a 16-byte hex-encoded string identifying the map to delete.
"""
self.delete_entry(locator, prefix=LOCATOR_MAP_PREFIX)
logger.info("Deleting locator map from db", uuid=locator)
def delete_watcher_appointment(self, uuid):
"""
Deletes an appointment from the database.
Args:
uuid (:obj:`str`): a 16-byte hex-encoded string identifying the appointment to be deleted.
"""
self.delete_entry(uuid, prefix=WATCHER_PREFIX)
logger.info("Deleting appointment from Watcher's db", uuid=uuid)
def batch_delete_watcher_appointments(self, uuids):
"""
Deletes an appointment from the database.
Args:
uuids (:obj:`list`): a list of 16-byte hex-encoded strings identifying the appointments to be deleted.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((WATCHER_PREFIX + uuid).encode("utf-8"))
logger.info("Deleting appointment from Watcher's db", uuid=uuid)
def delete_responder_tracker(self, uuid):
"""
Deletes a tracker from the database.
Args:
uuid (:obj:`str`): a 16-byte hex-encoded string identifying the tracker to be deleted.
"""
self.delete_entry(uuid, prefix=RESPONDER_PREFIX)
logger.info("Deleting appointment from Responder's db", uuid=uuid)
def batch_delete_responder_trackers(self, uuids):
"""
Deletes an appointment from the database.
Args:
uuids (:obj:`list`): a list of 16-byte hex-encoded strings identifying the trackers to be deleted.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((RESPONDER_PREFIX + uuid).encode("utf-8"))
logger.info("Deleting appointment from Responder's db", uuid=uuid)
def load_last_block_hash_watcher(self):
"""
Loads the last known block hash of the :obj:`Watcher <teos.watcher.Watcher>` from the database.
Returns:
:obj:`str` or :obj:`None`: A 32-byte hex-encoded string representing the last known block hash if found.
Returns ``None`` otherwise.
"""
return self.get_last_known_block(WATCHER_LAST_BLOCK_KEY)
def load_last_block_hash_responder(self):
"""
Loads the last known block hash of the :obj:`Responder <teos.responder.Responder>` from the database.
Returns:
:obj:`str` or :obj:`None`: A 32-byte hex-encoded string representing the last known block hash if found.
Returns ``None`` otherwise.
"""
return self.get_last_known_block(RESPONDER_LAST_BLOCK_KEY)
def store_last_block_hash_watcher(self, block_hash):
"""
Stores a block hash as the last known block of the :obj:`Watcher <teos.watcher.Watcher>`.
Args:
block_hash (:obj:`str`): the block hash to be stored (32-byte hex-encoded)
"""
self.create_entry(WATCHER_LAST_BLOCK_KEY, block_hash)
def store_last_block_hash_responder(self, block_hash):
"""
Stores a block hash as the last known block of the :obj:`Responder <teos.responder.Responder>`.
Args:
block_hash (:obj:`str`): the block hash to be stored (32-byte hex-encoded)
"""
self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash)
def create_triggered_appointment_flag(self, uuid):
"""
Creates a flag that signals that an appointment has been triggered.
Args:
uuid (:obj:`str`): the identifier of the flag to be created.
"""
self.db.put((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"), "".encode("utf-8"))
logger.info("Flagging appointment as triggered", uuid=uuid)
def batch_create_triggered_appointment_flag(self, uuids):
"""
Creates a flag that signals that an appointment has been triggered for every appointment in the given list
Args:
uuids (:obj:`list`): a list of identifier for the appointments to flag.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.put((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"), b"")
logger.info("Flagging appointment as triggered", uuid=uuid)
def load_all_triggered_flags(self):
"""
Loads all the appointment triggered flags from the database.
Returns:
:obj:`list`: a list of all the uuids of the triggered appointments.
"""
return [
k.decode()[len(TRIGGERED_APPOINTMENTS_PREFIX) :]
for k, v in self.db.iterator(prefix=TRIGGERED_APPOINTMENTS_PREFIX.encode("utf-8"))
]
def delete_triggered_appointment_flag(self, uuid):
"""
Deletes a flag that signals that an appointment has been triggered.
Args:
uuid (:obj:`str`): the identifier of the flag to be removed.
"""
self.delete_entry(uuid, prefix=TRIGGERED_APPOINTMENTS_PREFIX)
logger.info("Removing triggered flag from appointment appointment", uuid=uuid)
def batch_delete_triggered_appointment_flag(self, uuids):
"""
Deletes a list of flag signaling that some appointment have been triggered.
Args:
uuids (:obj:`list`): the identifier of the flag to be removed.
"""
with self.db.write_batch() as b:
for uuid in uuids:
b.delete((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"))
logger.info("Removing triggered flag from appointment appointment", uuid=uuid)

View File

@@ -1,4 +1,4 @@
# Appointment errors
# Appointment errors [-1, -64]
APPOINTMENT_EMPTY_FIELD = -1
APPOINTMENT_WRONG_FIELD_TYPE = -2
APPOINTMENT_WRONG_FIELD_SIZE = -3
@@ -6,7 +6,11 @@ APPOINTMENT_WRONG_FIELD_FORMAT = -4
APPOINTMENT_FIELD_TOO_SMALL = -5
APPOINTMENT_FIELD_TOO_BIG = -6
APPOINTMENT_WRONG_FIELD = -7
APPOINTMENT_INVALID_SIGNATURE = -8
APPOINTMENT_INVALID_SIGNATURE_OR_INSUFFICIENT_SLOTS = -8
# Registration errors [-65, -128]
REGISTRATION_MISSING_FIELD = -65
REGISTRATION_WRONG_FIELD_FORMAT = -66
# Custom RPC errors
RPC_TX_REORGED_AFTER_BROADCAST = -98

117
teos/gatekeeper.py Normal file
View File

@@ -0,0 +1,117 @@
from common.tools import is_compressed_pk
from common.cryptographer import Cryptographer
class NotEnoughSlots(ValueError):
"""Raised when trying to subtract more slots than a user has available"""
def __init__(self, user_pk, requested_slots):
self.user_pk = user_pk
self.requested_slots = requested_slots
class IdentificationFailure(Exception):
"""
Raised when a user can not be identified. Either the user public key cannot be recovered or the user is
not found within the registered ones.
"""
pass
class Gatekeeper:
"""
The :class:`Gatekeeper` is in charge of managing the access to the tower. Only registered users are allowed to
perform actions.
Attributes:
registered_users (:obj:`dict`): a map of user_pk:appointment_slots.
"""
def __init__(self, user_db, default_slots):
self.default_slots = default_slots
self.user_db = user_db
self.registered_users = user_db.load_all_users()
def add_update_user(self, user_pk):
"""
Adds a new user or updates the subscription of an existing one, by adding additional slots.
Args:
user_pk(:obj:`str`): the public key that identifies the user (33-bytes hex str).
Returns:
:obj:`int`: the number of available slots in the user subscription.
"""
if not is_compressed_pk(user_pk):
raise ValueError("Provided public key does not match expected format (33-byte hex string)")
if user_pk not in self.registered_users:
self.registered_users[user_pk] = {"available_slots": self.default_slots}
else:
self.registered_users[user_pk]["available_slots"] += self.default_slots
self.user_db.store_user(user_pk, self.registered_users[user_pk])
return self.registered_users[user_pk]["available_slots"]
def identify_user(self, message, signature):
"""
Checks if a request comes from a registered user by ec-recovering their public key from a signed message.
Args:
message (:obj:`bytes`): byte representation of the original message from where the signature was generated.
signature (:obj:`str`): the user's signature (hex-encoded).
Returns:
:obj:`str`: a compressed key recovered from the signature and matching a registered user.
Raises:
:obj:`IdentificationFailure`: if the user cannot be identified.
"""
if isinstance(message, bytes) and isinstance(signature, str):
rpk = Cryptographer.recover_pk(message, signature)
compressed_pk = Cryptographer.get_compressed_pk(rpk)
if compressed_pk in self.registered_users:
return compressed_pk
else:
raise IdentificationFailure("User not found.")
else:
raise IdentificationFailure("Wrong message or signature.")
def fill_slots(self, user_pk, n):
"""
Fills a given number os slots of the user subscription.
Args:
user_pk(:obj:`str`): the public key that identifies the user (33-bytes hex str).
n (:obj:`int`): the number of slots to fill.
Raises:
:obj:`NotEnoughSlots`: if the user subscription does not have enough slots.
"""
# DISCUSS: we may want to return a different exception if the user does not exist
if user_pk in self.registered_users and n <= self.registered_users.get(user_pk).get("available_slots"):
self.registered_users[user_pk]["available_slots"] -= n
self.user_db.store_user(user_pk, self.registered_users[user_pk])
else:
raise NotEnoughSlots(user_pk, n)
def free_slots(self, user_pk, n):
"""
Frees some slots of a user subscription.
Args:
user_pk(:obj:`str`): the public key that identifies the user (33-bytes hex str).
n (:obj:`int`): the number of slots to free.
"""
# DISCUSS: if the user does not exist we may want to log or return an exception.
if user_pk in self.registered_users:
self.registered_users[user_pk]["available_slots"] += n
self.user_db.store_user(user_pk, self.registered_users[user_pk])

View File

@@ -3,7 +3,8 @@ def show_usage():
"USAGE: "
"\n\tpython teosd.py [global options]"
"\n\nGLOBAL OPTIONS:"
"\n\t--btcnetwork \t\tNetwork bitcoind is connected to. Either mainnet, testnet or regtest. Defaults to 'mainnet' (modifiable in conf file)."
"\n\t--btcnetwork \t\tNetwork bitcoind is connected to. Either mainnet, testnet or regtest. Defaults to "
"'mainnet' (modifiable in conf file)."
"\n\t--btcrpcuser \t\tbitcoind rpcuser. Defaults to 'user' (modifiable in conf file)."
"\n\t--btcrpcpassword \tbitcoind rpcpassword. Defaults to 'passwd' (modifiable in conf file)."
"\n\t--btcrpcconnect \tbitcoind rpcconnect. Defaults to 'localhost' (modifiable in conf file)."

View File

@@ -1,13 +1,12 @@
import re
from binascii import unhexlify
import common.cryptographer
from common.logger import Logger
from common.tools import is_locator
from common.constants import LOCATOR_LEN_HEX
from common.cryptographer import Cryptographer, PublicKey
from common.appointment import Appointment
from teos import errors, LOG_PREFIX
from common.logger import Logger
from common.appointment import Appointment
logger = Logger(actor="Inspector", log_name_prefix=LOG_PREFIX)
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_PREFIX)
@@ -19,7 +18,14 @@ common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_
BLOCKS_IN_A_MONTH = 4320 # 4320 = roughly a month in blocks
ENCRYPTED_BLOB_MAX_SIZE_HEX = 2 * 2048
class InspectionFailed(Exception):
"""Raise this the inspector finds a problem with any of the appointment fields"""
def __init__(self, erno, reason):
self.erno = erno
self.reason = reason
class Inspector:
@@ -36,97 +42,65 @@ class Inspector:
self.block_processor = block_processor
self.min_to_self_delay = min_to_self_delay
def inspect(self, appointment_data, signature, public_key):
def inspect(self, appointment_data):
"""
Inspects whether the data provided by the user is correct.
Args:
appointment_data (:obj:`dict`): a dictionary containing the appointment data.
signature (:obj:`str`): the appointment signature provided by the user (hex encoded).
public_key (:obj:`str`): the user's public key (hex encoded).
Returns:
:obj:`Appointment <teos.appointment.Appointment>` or :obj:`tuple`: An appointment initialized with the
provided data if it is correct.
:obj:`Appointment <teos.appointment.Appointment>`: An appointment initialized with the provided data.
Returns a tuple ``(return code, message)`` describing the error otherwise.
Errors are defined in :mod:`Errors <teos.errors>`.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
if appointment_data is None:
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty appointment received")
elif not isinstance(appointment_data, dict):
raise InspectionFailed(errors.APPOINTMENT_WRONG_FIELD, "wrong appointment format")
block_height = self.block_processor.get_block_count()
if block_height is None:
raise InspectionFailed(errors.UNKNOWN_JSON_RPC_EXCEPTION, "unexpected error occurred")
if block_height is not None:
rcode, message = self.check_locator(appointment_data.get("locator"))
self.check_locator(appointment_data.get("locator"))
self.check_start_time(appointment_data.get("start_time"), block_height)
self.check_end_time(appointment_data.get("end_time"), appointment_data.get("start_time"), block_height)
self.check_to_self_delay(appointment_data.get("to_self_delay"))
self.check_blob(appointment_data.get("encrypted_blob"))
if rcode == 0:
rcode, message = self.check_start_time(appointment_data.get("start_time"), block_height)
if rcode == 0:
rcode, message = self.check_end_time(
appointment_data.get("end_time"), appointment_data.get("start_time"), block_height
)
if rcode == 0:
rcode, message = self.check_to_self_delay(appointment_data.get("to_self_delay"))
if rcode == 0:
rcode, message = self.check_blob(appointment_data.get("encrypted_blob"))
if rcode == 0:
rcode, message = self.check_appointment_signature(appointment_data, signature, public_key)
if rcode == 0:
r = Appointment.from_dict(appointment_data)
else:
r = (rcode, message)
else:
# In case of an unknown exception, assign a special rcode and reason.
r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred")
return r
return Appointment.from_dict(appointment_data)
@staticmethod
def check_locator(locator):
"""
Checks if the provided ``locator`` is correct.
Locators must be 16-byte hex encoded strings.
Locators must be 16-byte hex-encoded strings.
Args:
locator (:obj:`str`): the locator to be checked.
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``locator`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``,
``APPOINTMENT_WRONG_FIELD_SIZE``, and ``APPOINTMENT_WRONG_FIELD_FORMAT``.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
message = None
rcode = 0
if locator is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty locator received"
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty locator received")
elif type(locator) != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong locator data type ({})".format(type(locator))
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_TYPE, "wrong locator data type ({})".format(type(locator))
)
elif len(locator) != LOCATOR_LEN_HEX:
rcode = errors.APPOINTMENT_WRONG_FIELD_SIZE
message = "wrong locator size ({})".format(len(locator))
# TODO: #12-check-txid-regexp
raise InspectionFailed(errors.APPOINTMENT_WRONG_FIELD_SIZE, "wrong locator size ({})".format(len(locator)))
elif re.search(r"^[0-9A-Fa-f]+$", locator) is None:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong locator format ({})".format(locator)
if message is not None:
logger.error(message)
return rcode, message
elif not is_locator(locator):
raise InspectionFailed(errors.APPOINTMENT_WRONG_FIELD_FORMAT, "wrong locator format ({})".format(locator))
@staticmethod
def check_start_time(start_time, block_height):
@@ -139,50 +113,32 @@ class Inspector:
start_time (:obj:`int`): the block height at which the tower is requested to start watching for breaches.
block_height (:obj:`int`): the chain height.
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``start_time`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
``APPOINTMENT_FIELD_TOO_SMALL``.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
message = None
rcode = 0
# TODO: What's too close to the current height is not properly defined. Right now any appointment that is in the
# future will be accepted (even if it's only one block away).
t = type(start_time)
if start_time is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty start_time received"
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty start_time received")
elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong start_time data type ({})".format(t)
elif type(start_time) != int:
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_TYPE, "wrong start_time data type ({})".format(type(start_time))
)
elif start_time <= block_height:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
if start_time < block_height:
message = "start_time is in the past"
else:
message = (
"start_time is too close to current height. "
"Accepted times are: [current_height+1, current_height+6]"
)
elif start_time < block_height:
raise InspectionFailed(errors.APPOINTMENT_FIELD_TOO_SMALL, "start_time is in the past")
elif start_time == block_height:
raise InspectionFailed(
errors.APPOINTMENT_FIELD_TOO_SMALL,
"start_time is too close to current height. Accepted times are: [current_height+1, current_height+6]",
)
elif start_time > block_height + 6:
rcode = errors.APPOINTMENT_FIELD_TOO_BIG
message = "start_time is too far in the future. Accepted start times are up to 6 blocks in the future"
if message is not None:
logger.error(message)
return rcode, message
raise InspectionFailed(
errors.APPOINTMENT_FIELD_TOO_BIG,
"start_time is too far in the future. Accepted start times are up to 6 blocks in the future",
)
@staticmethod
def check_end_time(end_time, start_time, block_height):
@@ -196,54 +152,36 @@ class Inspector:
start_time (:obj:`int`): the block height at which the tower is requested to start watching for breaches.
block_height (:obj:`int`): the chain height.
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``end_time`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
``APPOINTMENT_FIELD_TOO_SMALL``.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
message = None
rcode = 0
# TODO: What's too close to the current height is not properly defined. Right now any appointment that ends in
# the future will be accepted (even if it's only one block away).
t = type(end_time)
if end_time is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty end_time received"
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty end_time received")
elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong end_time data type ({})".format(t)
elif type(end_time) != int:
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_TYPE, "wrong end_time data type ({})".format(type(end_time))
)
elif end_time > block_height + BLOCKS_IN_A_MONTH: # 4320 = roughly a month in blocks
rcode = errors.APPOINTMENT_FIELD_TOO_BIG
message = "end_time should be within the next month (<= current_height + 4320)"
raise InspectionFailed(
errors.APPOINTMENT_FIELD_TOO_BIG, "end_time should be within the next month (<= current_height + 4320)"
)
elif start_time > end_time:
raise InspectionFailed(errors.APPOINTMENT_FIELD_TOO_SMALL, "end_time is smaller than start_time")
elif start_time >= end_time:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
if start_time > end_time:
message = "end_time is smaller than start_time"
else:
message = "end_time is equal to start_time"
elif start_time == end_time:
raise InspectionFailed(errors.APPOINTMENT_FIELD_TOO_SMALL, "end_time is equal to start_time")
elif block_height >= end_time:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
if block_height > end_time:
message = "end_time is in the past"
else:
message = "end_time is too close to current height"
elif block_height > end_time:
raise InspectionFailed(errors.APPOINTMENT_FIELD_TOO_SMALL, "end_time is in the past")
if message is not None:
logger.error(message)
return rcode, message
elif block_height == end_time:
raise InspectionFailed(errors.APPOINTMENT_FIELD_TOO_SMALL, "end_time is too close to current height")
def check_to_self_delay(self, to_self_delay):
"""
@@ -252,49 +190,35 @@ class Inspector:
To self delays must be greater or equal to ``MIN_TO_SELF_DELAY``.
Args:
to_self_delay (:obj:`int`): The ``to_self_delay`` encoded in the ``csv`` of the ``htlc`` that this
appointment is covering.
to_self_delay (:obj:`int`): The ``to_self_delay`` encoded in the ``csv`` of ``to_remote`` output of the
commitment transaction this appointment is covering.
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``to_self_delay`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
``APPOINTMENT_FIELD_TOO_SMALL``.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
message = None
rcode = 0
t = type(to_self_delay)
if to_self_delay is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty to_self_delay received"
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty to_self_delay received")
elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong to_self_delay data type ({})".format(t)
elif type(to_self_delay) != int:
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_TYPE, "wrong to_self_delay data type ({})".format(type(to_self_delay))
)
elif to_self_delay > pow(2, 32):
rcode = errors.APPOINTMENT_FIELD_TOO_BIG
message = "to_self_delay must fit the transaction nLockTime field ({} > {})".format(
to_self_delay, pow(2, 32)
raise InspectionFailed(
errors.APPOINTMENT_FIELD_TOO_BIG,
"to_self_delay must fit the transaction nLockTime field ({} > {})".format(to_self_delay, pow(2, 32)),
)
elif to_self_delay < self.min_to_self_delay:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = "to_self_delay too small. The to_self_delay should be at least {} (current: {})".format(
self.min_to_self_delay, to_self_delay
raise InspectionFailed(
errors.APPOINTMENT_FIELD_TOO_SMALL,
"to_self_delay too small. The to_self_delay should be at least {} (current: {})".format(
self.min_to_self_delay, to_self_delay
),
)
if message is not None:
logger.error(message)
return rcode, message
# ToDo: #6-define-checks-encrypted-blob
@staticmethod
def check_blob(encrypted_blob):
@@ -302,88 +226,21 @@ class Inspector:
Checks if the provided ``encrypted_blob`` may be correct.
Args:
encrypted_blob (:obj:`str`): the encrypted blob to be checked (hex encoded).
encrypted_blob (:obj:`str`): the encrypted blob to be checked (hex-encoded).
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``encrypted_blob`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
``APPOINTMENT_WRONG_FIELD_FORMAT``.
Raises:
:obj:`InspectionFailed`: if any of the fields is wrong.
"""
message = None
rcode = 0
t = type(encrypted_blob)
if encrypted_blob is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty encrypted_blob received"
raise InspectionFailed(errors.APPOINTMENT_EMPTY_FIELD, "empty encrypted_blob received")
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong encrypted_blob data type ({})".format(t)
elif len(encrypted_blob) > ENCRYPTED_BLOB_MAX_SIZE_HEX:
rcode = errors.APPOINTMENT_FIELD_TOO_BIG
message = "encrypted_blob has to be 2Kib at most (current {})".format(len(encrypted_blob) // 2)
elif type(encrypted_blob) != str:
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_TYPE, "wrong encrypted_blob data type ({})".format(type(encrypted_blob))
)
elif re.search(r"^[0-9A-Fa-f]+$", encrypted_blob) is None:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong encrypted_blob format ({})".format(encrypted_blob)
if message is not None:
logger.error(message)
return rcode, message
@staticmethod
# Verifies that the appointment signature is a valid signature with public key
def check_appointment_signature(appointment_data, signature, pk):
"""
Checks if the provided user signature is correct.
Args:
appointment_data (:obj:`dict`): the appointment that was signed by the user.
signature (:obj:`str`): the user's signature (hex encoded).
pk (:obj:`str`): the user's public key (hex encoded).
Returns:
:obj:`tuple`: A tuple (return code, message) as follows:
- ``(0, None)`` if the ``signature`` is correct.
- ``!= (0, None)`` otherwise.
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
``APPOINTMENT_WRONG_FIELD_FORMAT``.
"""
message = None
rcode = 0
if signature is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty signature received"
elif pk is None:
rcode = errors.APPOINTMENT_EMPTY_FIELD
message = "empty public key received"
elif re.match(r"^[0-9A-Fa-f]{66}$", pk) is None:
rcode = errors.APPOINTMENT_WRONG_FIELD
message = "public key must be a hex encoded 33-byte long value"
else:
appointment = Appointment.from_dict(appointment_data)
rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
pk = PublicKey(unhexlify(pk))
valid_sig = Cryptographer.verify_rpk(pk, rpk)
if not valid_sig:
rcode = errors.APPOINTMENT_INVALID_SIGNATURE
message = "invalid signature"
return rcode, message
raise InspectionFailed(
errors.APPOINTMENT_WRONG_FIELD_FORMAT, "wrong encrypted_blob format ({})".format(encrypted_blob)
)

View File

@@ -1,4 +1,3 @@
import json
from queue import Queue
from threading import Thread
@@ -14,7 +13,7 @@ logger = Logger(actor="Responder", log_name_prefix=LOG_PREFIX)
class TransactionTracker:
"""
A :class:`TransactionTracker` is used to monitor a ``penalty_tx``. Once the dispute is seen by the
A :class:`TransactionTracker` is used to monitor a ``penalty_tx``. Once the dispute is seen by the
:obj:`Watcher <teos.watcher.Watcher>` the penalty transaction is decrypted and the relevant appointment data is
passed along to the :obj:`Responder`.
@@ -54,7 +53,7 @@ class TransactionTracker:
:obj:`TransactionTracker`: A ``TransactionTracker`` instantiated with the provided data.
Raises:
ValueError: if any of the required fields is missing.
:obj:`ValueError`: if any of the required fields is missing.
"""
locator = tx_tracker_data.get("locator")
@@ -73,7 +72,7 @@ class TransactionTracker:
def to_dict(self):
"""
Exports a :obj:`TransactionTracker` as a dictionary.
Encodes a :obj:`TransactionTracker` as a dictionary.
Returns:
:obj:`dict`: A dictionary containing the :obj:`TransactionTracker` data.
@@ -89,26 +88,19 @@ class TransactionTracker:
return tx_tracker
def to_json(self):
"""
Exports a :obj:`TransactionTracker` as a json-encoded dictionary.
Returns:
:obj:`str`: A json-encoded dictionary containing the :obj:`TransactionTracker` data.
"""
return json.dumps(self.to_dict())
class Responder:
"""
The :class:`Responder` is the class in charge of ensuring that channel breaches are dealt with. It does so handling
The :class:`Responder` is in charge of ensuring that channel breaches are dealt with. It does so handling
the decrypted ``penalty_txs`` handed by the :obj:`Watcher <teos.watcher.Watcher>` and ensuring the they make it to
the blockchain.
Args:
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): a ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
carrier (:obj:`Carrier <teos.carrier.Carrier>`): a ``Carrier`` instance to send transactions to bitcoind.
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a ``BlockProcessor`` instance to
get data from bitcoind.
Attributes:
trackers (:obj:`dict`): A dictionary containing the minimum information about the :obj:`TransactionTracker`
@@ -121,13 +113,12 @@ class Responder:
has missed. Used to trigger rebroadcast if needed.
block_queue (:obj:`Queue`): A queue used by the :obj:`Responder` to receive block hashes from ``bitcoind``. It
is populated by the :obj:`ChainMonitor <teos.chain_monitor.ChainMonitor>`.
db_manager (:obj:`DBManager <teos.db_manager.DBManager>`): A ``DBManager`` instance to interact with the
database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
carrier (:obj:`Carrier <teos.carrier.Carrier>`): a ``Carrier`` instance to send transactions to bitcoind.
block_processor (:obj:`DBManager <teos.block_processor.BlockProcessor>`): a ``BlockProcessor`` instance to get
data from bitcoind.
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a ``BlockProcessor`` instance to
get data from bitcoind.
last_known_block (:obj:`str`): the last block known by the ``Responder``.
"""
def __init__(self, db_manager, carrier, block_processor):
@@ -142,6 +133,7 @@ class Responder:
self.last_known_block = db_manager.load_last_block_hash_responder()
def awake(self):
"""Starts a new thread to monitor the blockchain to make sure triggered appointments get enough depth"""
responder_thread = Thread(target=self.do_watch, daemon=True)
responder_thread.start()
@@ -151,7 +143,7 @@ class Responder:
"""
Whether the :obj:`Responder` is on sync with ``bitcoind`` or not. Used when recovering from a crash.
The Watchtower can be instantiated with fresh or with backed up data. In the later, some triggers may have been
The Watchtower can be instantiated with fresh or with backed up data. In the later, some triggers may have been
missed. In order to go back on sync both the :obj:`Watcher <teos.watcher.Watcher>` and the :obj:`Responder`
need to perform the state transitions until they catch up.
@@ -216,9 +208,8 @@ class Responder:
"""
Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``.
A reduction of :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the
``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
database.
A summary of :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid``
added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the database.
Args:
uuid (:obj:`str`): a unique identifier for the appointment.
@@ -251,7 +242,7 @@ class Responder:
if penalty_txid not in self.unconfirmed_txs and confirmations == 0:
self.unconfirmed_txs.append(penalty_txid)
self.db_manager.store_responder_tracker(uuid, tracker.to_json())
self.db_manager.store_responder_tracker(uuid, tracker.to_dict())
logger.info(
"New tracker added", dispute_txid=dispute_txid, penalty_txid=penalty_txid, appointment_end=appointment_end
@@ -259,7 +250,7 @@ class Responder:
def do_watch(self):
"""
Monitors the blockchain whilst there are pending trackers.
Monitors the blockchain for reorgs and appointment ends.
This is the main method of the :obj:`Responder` and triggers tracker cleaning, rebroadcasting, reorg managing,
etc.
@@ -303,7 +294,7 @@ class Responder:
# Clear the receipts issued in this block
self.carrier.issued_receipts = {}
if len(self.trackers) is 0:
if len(self.trackers) != 0:
logger.info("No more pending trackers")
# Register the last processed block for the responder
@@ -395,9 +386,9 @@ class Responder:
def rebroadcast(self, txs_to_rebroadcast):
"""
Rebroadcasts a ``penalty_tx`` that has missed too many confirmations. In the current approach this would loop
forever si the transaction keeps not getting it.
forever if the transaction keeps not getting it.
Potentially the fees could be bumped here if the transaction has some tower dedicated outputs (or allows it
Potentially, the fees could be bumped here if the transaction has some tower dedicated outputs (or allows it
trough ``ANYONECANPAY`` or something similar).
Args:

View File

@@ -3,16 +3,16 @@
# General application defined errors
RPC_MISC_ERROR = -1 # std::exception thrown in command handling
RPC_TYPE_ERROR = -3 # Unexpected type was passed as parameter
RPC_INVALID_ADDRESS_OR_KEY = -5 # Invalid address or key
RPC_OUT_OF_MEMORY = -7 # Ran out of memory during operation
RPC_INVALID_PARAMETER = -8 # Invalid missing or duplicate parameter
RPC_DATABASE_ERROR = -20 # Database error
RPC_DESERIALIZATION_ERROR = -22 # Error parsing or validating structure in raw format
RPC_VERIFY_ERROR = -25 # General error during transaction or block submission
RPC_VERIFY_REJECTED = -26 # Transaction or block was rejected by network rules
RPC_VERIFY_ALREADY_IN_CHAIN = -27 # Transaction already in chain
RPC_IN_WARMUP = -28 # Client still warming up
RPC_METHOD_DEPRECATED = -32 # RPC method is deprecated
RPC_INVALID_ADDRESS_OR_KEY = -5 # Invalid address or key
RPC_OUT_OF_MEMORY = -7 # Ran out of memory during operation
RPC_INVALID_PARAMETER = -8 # Invalid missing or duplicate parameter
RPC_DATABASE_ERROR = -20 # Database error
RPC_DESERIALIZATION_ERROR = -22 # Error parsing or validating structure in raw format
RPC_VERIFY_ERROR = -25 # General error during transaction or block submission
RPC_VERIFY_REJECTED = -26 # Transaction or block was rejected by network rules
RPC_VERIFY_ALREADY_IN_CHAIN = -27 # Transaction already in chain
RPC_IN_WARMUP = -28 # Client still warming up
RPC_METHOD_DEPRECATED = -32 # RPC method is deprecated
# Aliases for backward compatibility
RPC_TRANSACTION_ERROR = RPC_VERIFY_ERROR
@@ -20,25 +20,23 @@ RPC_TRANSACTION_REJECTED = RPC_VERIFY_REJECTED
RPC_TRANSACTION_ALREADY_IN_CHAIN = RPC_VERIFY_ALREADY_IN_CHAIN
# P2P client errors
RPC_CLIENT_NOT_CONNECTED = -9 # Bitcoin is not connected
RPC_CLIENT_IN_INITIAL_DOWNLOAD = -10 # Still downloading initial blocks
RPC_CLIENT_NODE_ALREADY_ADDED = -23 # Node is already added
RPC_CLIENT_NODE_NOT_ADDED = -24 # Node has not been added before
RPC_CLIENT_NODE_NOT_CONNECTED = -29 # Node to disconnect not found in connected nodes
RPC_CLIENT_INVALID_IP_OR_SUBNET = -30 # Invalid IP/Subnet
RPC_CLIENT_P2P_DISABLED = -31 # No valid connection manager instance found
RPC_CLIENT_NOT_CONNECTED = -9 # Bitcoin is not connected
RPC_CLIENT_IN_INITIAL_DOWNLOAD = -10 # Still downloading initial blocks
RPC_CLIENT_NODE_ALREADY_ADDED = -23 # Node is already added
RPC_CLIENT_NODE_NOT_ADDED = -24 # Node has not been added before
RPC_CLIENT_NODE_NOT_CONNECTED = -29 # Node to disconnect not found in connected nodes
RPC_CLIENT_INVALID_IP_OR_SUBNET = -30 # Invalid IP/Subnet
RPC_CLIENT_P2P_DISABLED = -31 # No valid connection manager instance found
# Wallet errors
RPC_WALLET_ERROR = -4 # Unspecified problem with wallet (key not found etc.)
RPC_WALLET_INSUFFICIENT_FUNDS = -6 # Not enough funds in wallet or account
RPC_WALLET_INVALID_LABEL_NAME = -11 # Invalid label name
RPC_WALLET_KEYPOOL_RAN_OUT = -12 # Keypool ran out call keypoolrefill first
RPC_WALLET_UNLOCK_NEEDED = -13 # Enter the wallet passphrase with walletpassphrase first
RPC_WALLET_PASSPHRASE_INCORRECT = -14 # The wallet passphrase entered was incorrect
RPC_WALLET_WRONG_ENC_STATE = (
-15
) # Command given in wrong wallet encryption state (encrypting an encrypted wallet etc.)
RPC_WALLET_ENCRYPTION_FAILED = -16 # Failed to encrypt the wallet
RPC_WALLET_ALREADY_UNLOCKED = -17 # Wallet is already unlocked
RPC_WALLET_NOT_FOUND = -18 # Invalid wallet specified
RPC_WALLET_NOT_SPECIFIED = -19 # No wallet specified (error when there are multiple wallets loaded)
RPC_WALLET_ERROR = -4 # Unspecified problem with wallet (key not found etc.)
RPC_WALLET_INSUFFICIENT_FUNDS = -6 # Not enough funds in wallet or account
RPC_WALLET_INVALID_LABEL_NAME = -11 # Invalid label name
RPC_WALLET_KEYPOOL_RAN_OUT = -12 # Keypool ran out call keypoolrefill first
RPC_WALLET_UNLOCK_NEEDED = -13 # Enter the wallet passphrase with walletpassphrase first
RPC_WALLET_PASSPHRASE_INCORRECT = -14 # The wallet passphrase entered was incorrect
RPC_WALLET_WRONG_ENC_STATE = -15 # Command given in wrong wallet encryption state (encrypting an encrypted wallet etc.)
RPC_WALLET_ENCRYPTION_FAILED = -16 # Failed to encrypt the wallet
RPC_WALLET_ALREADY_UNLOCKED = -17 # Wallet is already unlocked
RPC_WALLET_NOT_FOUND = -18 # Invalid wallet specified
RPC_WALLET_NOT_SPECIFIED = -19 # No wallet specified (error when there are multiple wallets loaded)

View File

@@ -1,6 +1,6 @@
[bitcoind]
btc_rpc_user = user
btc_rpc_passwd = passwd
btc_rpc_password = passwd
btc_rpc_connect = localhost
btc_rpc_port = 8332
btc_network = mainnet
@@ -11,7 +11,8 @@ feed_connect = 127.0.0.1
feed_port = 28332
[teos]
max_appointments = 100
subscription_slots = 100
max_appointments = 1000000
expiry_delta = 6
min_to_self_delay = 20

View File

@@ -14,11 +14,13 @@ from teos.help import show_usage
from teos.watcher import Watcher
from teos.builder import Builder
from teos.carrier import Carrier
from teos.users_dbm import UsersDBM
from teos.inspector import Inspector
from teos.responder import Responder
from teos.db_manager import DBManager
from teos.gatekeeper import Gatekeeper
from teos.chain_monitor import ChainMonitor
from teos.block_processor import BlockProcessor
from teos.appointments_dbm import AppointmentsDBM
from teos.tools import can_connect_to_bitcoind, in_correct_network
from teos import LOG_PREFIX, DATA_DIR, DEFAULT_CONF, CONF_FILE_NAME
@@ -43,13 +45,14 @@ def main(command_line_conf):
signal(SIGQUIT, handle_signals)
# Loads config and sets up the data folder and log file
config_loader = ConfigLoader(DATA_DIR, CONF_FILE_NAME, DEFAULT_CONF, command_line_conf)
data_dir = command_line_conf.get("DATA_DIR") if "DATA_DIR" in command_line_conf else DATA_DIR
config_loader = ConfigLoader(data_dir, CONF_FILE_NAME, DEFAULT_CONF, command_line_conf)
config = config_loader.build_config()
setup_data_folder(DATA_DIR)
setup_data_folder(data_dir)
setup_logging(config.get("LOG_FILE"), LOG_PREFIX)
logger.info("Starting TEOS")
db_manager = DBManager(config.get("DB_PATH"))
db_manager = AppointmentsDBM(config.get("APPOINTMENTS_DB_PATH"))
bitcoind_connect_params = {k: v for k, v in config.items() if k.startswith("BTC")}
bitcoind_feed_params = {k: v for k, v in config.items() if k.startswith("FEED")}
@@ -150,7 +153,8 @@ def main(command_line_conf):
# Fire the API and the ChainMonitor
# FIXME: 92-block-data-during-bootstrap-db
chain_monitor.monitor_chain()
API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher).start()
gatekeeper = Gatekeeper(UsersDBM(config.get("USERS_DB_PATH")), config.get("DEFAULT_SLOTS"))
API(Inspector(block_processor, config.get("MIN_TO_SELF_DELAY")), watcher, gatekeeper).start()
except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e))
exit(1)
@@ -171,7 +175,7 @@ if __name__ == "__main__":
if opt in ["--btcrpcuser"]:
command_line_conf["BTC_RPC_USER"] = arg
if opt in ["--btcrpcpassword"]:
command_line_conf["BTC_RPC_PASSWD"] = arg
command_line_conf["BTC_RPC_PASSWORD"] = arg
if opt in ["--btcrpcconnect"]:
command_line_conf["BTC_RPC_CONNECT"] = arg
if opt in ["--btcrpcport"]:
@@ -180,7 +184,7 @@ if __name__ == "__main__":
except ValueError:
exit("btcrpcport must be an integer")
if opt in ["--datadir"]:
DATA_DIR = os.path.expanduser(arg)
command_line_conf["DATA_DIR"] = os.path.expanduser(arg)
if opt in ["-h", "--help"]:
exit(show_usage())

View File

@@ -15,7 +15,7 @@ def bitcoin_cli(btc_connect_params):
Args:
btc_connect_params (:obj:`dict`): a dictionary with the parameters to connect to bitcoind
(rpc user, rpc passwd, host and port)
(rpc user, rpc password, host and port)
Returns:
:obj:`AuthServiceProxy <teos.utils.auth_proxy.AuthServiceProxy>`: An authenticated service proxy to ``bitcoind``
@@ -26,7 +26,7 @@ def bitcoin_cli(btc_connect_params):
"http://%s:%s@%s:%d"
% (
btc_connect_params.get("BTC_RPC_USER"),
btc_connect_params.get("BTC_RPC_PASSWD"),
btc_connect_params.get("BTC_RPC_PASSWORD"),
btc_connect_params.get("BTC_RPC_CONNECT"),
btc_connect_params.get("BTC_RPC_PORT"),
)
@@ -40,7 +40,7 @@ def can_connect_to_bitcoind(btc_connect_params):
Args:
btc_connect_params (:obj:`dict`): a dictionary with the parameters to connect to bitcoind
(rpc user, rpc passwd, host and port)
(rpc user, rpc password, host and port)
Returns:
:obj:`bool`: ``True`` if the connection can be established. ``False`` otherwise.
"""
@@ -62,7 +62,7 @@ def in_correct_network(btc_connect_params, network):
Args:
btc_connect_params (:obj:`dict`): a dictionary with the parameters to connect to bitcoind
(rpc user, rpc passwd, host and port)
(rpc user, rpc password, host and port)
network (:obj:`str`): the network the tower is connected to.
Returns:

128
teos/users_dbm.py Normal file
View File

@@ -0,0 +1,128 @@
import json
import plyvel
from teos import LOG_PREFIX
from teos.db_manager import DBManager
from common.logger import Logger
from common.tools import is_compressed_pk
logger = Logger(actor="UsersDBM", log_name_prefix=LOG_PREFIX)
class UsersDBM(DBManager):
"""
The :class:`UsersDBM` is in charge of interacting with the users database (``LevelDB``).
Keys and values are stored as bytes in the database but processed as strings by the manager.
Args:
db_path (:obj:`str`): the path (relative or absolute) to the system folder containing the database. A fresh
database will be created if the specified path does not contain one.
Raises:
:obj:`ValueError`: If the provided ``db_path`` is not a string.
:obj:`plyvel.Error`: If the db is currently unavailable (being used by another process).
"""
def __init__(self, db_path):
if not isinstance(db_path, str):
raise ValueError("db_path must be a valid path/name")
try:
super().__init__(db_path)
except plyvel.Error as e:
if "LOCK: Resource temporarily unavailable" in str(e):
logger.info("The db is already being used by another process (LOCK)")
raise e
def store_user(self, user_pk, user_data):
"""
Stores a user record to the database. ``user_pk`` is used as identifier.
Args:
user_pk (:obj:`str`): a 33-byte hex-encoded string identifying the user.
user_data (:obj:`dict`): the user associated data, as a dictionary.
Returns:
:obj:`bool`: True if the user was stored in the database, False otherwise.
"""
if is_compressed_pk(user_pk):
try:
self.create_entry(user_pk, json.dumps(user_data))
logger.info("Adding user to Gatekeeper's db", user_pk=user_pk)
return True
except json.JSONDecodeError:
logger.info("Could't add user to db. Wrong user data format", user_pk=user_pk, user_data=user_data)
return False
except TypeError:
logger.info("Could't add user to db", user_pk=user_pk, user_data=user_data)
return False
else:
logger.info("Could't add user to db. Wrong pk format", user_pk=user_pk, user_data=user_data)
return False
def load_user(self, user_pk):
"""
Loads a user record from the database using the ``user_pk`` as identifier.
Args:
user_pk (:obj:`str`): a 33-byte hex-encoded string identifying the user.
Returns:
:obj:`dict`: A dictionary containing the appointment data if the ``key`` is found.
Returns ``None`` otherwise.
"""
try:
data = self.load_entry(user_pk)
data = json.loads(data)
except (TypeError, json.decoder.JSONDecodeError):
data = None
return data
def delete_user(self, user_pk):
"""
Deletes a user record from the database.
Args:
user_pk (:obj:`str`): a 33-byte hex-encoded string identifying the user.
Returns:
:obj:`bool`: True if the user was deleted from the database or it was non-existent, False otherwise.
"""
try:
self.delete_entry(user_pk)
logger.info("Deleting user from Gatekeeper's db", uuid=user_pk)
return True
except TypeError:
logger.info("Cant delete user from db, user key has wrong type", uuid=user_pk)
return False
def load_all_users(self):
"""
Loads all user records from the database.
Returns:
:obj:`dict`: A dictionary containing all users indexed by ``user_pk``.
Returns an empty dictionary if no data is found.
"""
data = {}
for k, v in self.db.iterator():
# Get uuid and appointment_data from the db
user_pk = k.decode("utf-8")
data[user_pk] = json.loads(v)
return data

View File

@@ -1,4 +1,3 @@
from uuid import uuid4
from queue import Queue
from threading import Thread
@@ -6,7 +5,7 @@ import common.cryptographer
from common.logger import Logger
from common.tools import compute_locator
from common.appointment import Appointment
from common.cryptographer import Cryptographer
from common.cryptographer import Cryptographer, hash_160
from teos import LOG_PREFIX
from teos.cleaner import Cleaner
@@ -17,13 +16,12 @@ common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_
class Watcher:
"""
The :class:`Watcher` is the class in charge to watch for channel breaches for the appointments accepted by the
tower.
The :class:`Watcher` is in charge of watching for channel breaches for the appointments accepted by the tower.
The :class:`Watcher` keeps track of the accepted appointments in ``appointments`` and, for new received block,
checks if any breach has happened by comparing the txids with the appointment locators. If a breach is seen, the
:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` of the corresponding appointment is decrypted and the data
is passed to the :obj:`Responder <teos.responder.Responder>`.
:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` of the corresponding appointment is decrypted and the
data is passed to the :obj:`Responder <teos.responder.Responder>`.
If an appointment reaches its end with no breach, the data is simply deleted.
@@ -31,28 +29,30 @@ class Watcher:
:obj:`ChainMonitor <teos.chain_monitor.ChainMonitor>`.
Args:
db_manager (:obj:`DBManager <teos.db_manager>`): a ``DBManager`` instance to interact with the database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a ``BlockProcessor`` instance to
get block from bitcoind.
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
sk_der (:obj:`bytes`): a DER encoded private key used to sign appointment receipts (signaling acceptance).
max_appointments (:obj:`int`): the maximum ammount of appointments accepted by the ``Watcher`` at the same time.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
expiry_delta (:obj:`int`): the additional time the ``Watcher`` will keep an expired appointment around.
Attributes:
appointments (:obj:`dict`): a dictionary containing a simplification of the appointments (:obj:`Appointment
<teos.appointment.Appointment>` instances) accepted by the tower (``locator`` and ``end_time``).
appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`Appointment
<teos.appointment.Appointment>` instances) accepted by the tower (``locator``, ``end_time``, and ``size``).
It's populated trough ``add_appointment``.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
appointments with the same ``locator``.
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
populated by the :obj:`ChainMonitor <teos.chain_monitor.ChainMonitor>`.
db_manager (:obj:`DBManager <teos.db_manager>`): A db manager instance to interact with the database.
db_manager (:obj:`AppointmentsDBM <teos.appointments_dbm.AppointmentsDBM>`): a ``AppointmentsDBM`` instance
to interact with the database.
block_processor (:obj:`BlockProcessor <teos.block_processor.BlockProcessor>`): a ``BlockProcessor`` instance to
get block from bitcoind.
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
max_appointments (:obj:`int`): the maximum ammount of appointments accepted by the ``Watcher`` at the same time.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
expiry_delta (:obj:`int`): the additional time the ``Watcher`` will keep an expired appointment around.
Raises:
@@ -72,17 +72,33 @@ class Watcher:
self.signing_key = Cryptographer.load_private_key_der(sk_der)
def awake(self):
"""Starts a new thread to monitor the blockchain for channel breaches"""
watcher_thread = Thread(target=self.do_watch, daemon=True)
watcher_thread.start()
return watcher_thread
def add_appointment(self, appointment):
def get_appointment_summary(self, uuid):
"""
Returns the summary of an appointment. The summary consists of the data kept in memory:
{locator, end_time, and size}
Args:
uuid (:obj:`str`): a 16-byte hex string identifying the appointment.
Returns:
:obj:`dict` or :obj:`None`: a dictionary with the appointment summary, or ``None`` if the appointment is not
found.
"""
return self.appointments.get(uuid)
def add_appointment(self, appointment, user_pk):
"""
Adds a new appointment to the ``appointments`` dictionary if ``max_appointments`` has not been reached.
``add_appointment`` is the entry point of the Watcher. Upon receiving a new appointment it will start monitoring
the blockchain (``do_watch``) until ``appointments`` is empty.
``add_appointment`` is the entry point of the ``Watcher``. Upon receiving a new appointment it will start
monitoring the blockchain (``do_watch``) until ``appointments`` is empty.
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding
:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` and pass the information to the
@@ -96,6 +112,7 @@ class Watcher:
Args:
appointment (:obj:`Appointment <teos.appointment.Appointment>`): the appointment to be added to the
:obj:`Watcher`.
user_pk(:obj:`str`): the public key that identifies the user who sent the appointment (33-bytes hex str).
Returns:
:obj:`tuple`: A tuple signaling if the appointment has been added or not (based on ``max_appointments``).
@@ -103,21 +120,29 @@ class Watcher:
- ``(True, signature)`` if the appointment has been accepted.
- ``(False, None)`` otherwise.
"""
if len(self.appointments) < self.max_appointments:
uuid = uuid4().hex
self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
# The uuids are generated as the RIPMED160(locator||user_pubkey), that way the tower does not need to know
# anything about the user from this point on (no need to store user_pk in the database).
# If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps).
uuid = hash_160("{}{}".format(appointment.locator, user_pk))
self.appointments[uuid] = {
"locator": appointment.locator,
"end_time": appointment.end_time,
"size": len(appointment.encrypted_blob.data),
}
if appointment.locator in self.locator_uuid_map:
self.locator_uuid_map[appointment.locator].append(uuid)
# If the uuid is already in the map it means this is an update.
if uuid not in self.locator_uuid_map[appointment.locator]:
self.locator_uuid_map[appointment.locator].append(uuid)
else:
self.locator_uuid_map[appointment.locator] = [uuid]
self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
self.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
self.db_manager.create_append_locator_map(appointment.locator, uuid)
appointment_added = True
@@ -135,7 +160,7 @@ class Watcher:
def do_watch(self):
"""
Monitors the blockchain whilst there are pending appointments.
Monitors the blockchain for channel breaches.
This is the main method of the :obj:`Watcher` and the one in charge to pass appointments to the
:obj:`Responder <teos.responder.Responder>` upon detecting a breach.
@@ -198,7 +223,7 @@ class Watcher:
appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager
)
if len(self.appointments) is 0:
if len(self.appointments) != 0:
logger.info("No more pending appointments")
# Register the last processed block for the watcher