mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 06:34:19 +01:00
Merge pull request #83 from sr-gi/64-data-to-disk
Removes unnecessary data from memory
This commit is contained in:
@@ -81,28 +81,18 @@ class Appointment:
|
|||||||
|
|
||||||
return appointment
|
return appointment
|
||||||
|
|
||||||
def to_json(self, triggered=False):
|
def to_json(self):
|
||||||
"""
|
"""
|
||||||
Exports an appointment as a deterministic json encoded string.
|
Exports an appointment as a deterministic json encoded string.
|
||||||
|
|
||||||
This method ensures that multiple invocations with the same data yield the same value. This is the format used
|
This method ensures that multiple invocations with the same data yield the same value. This is the format used
|
||||||
to store appointments in the database.
|
to store appointments in the database.
|
||||||
|
|
||||||
Args:
|
|
||||||
triggered (:mod:`bool`): Whether the dispute has been triggered or not. When an appointment passes from the
|
|
||||||
:mod:`Watcher <pisa.watcher>` to the :mod:`Responder <pisa.responder>` it is not deleted straightaway.
|
|
||||||
Instead, the appointment is stored in the DB flagged as ``triggered``. This aims to ease handling block
|
|
||||||
reorgs in the future.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
:obj:`str`: A json-encoded str representing the appointment.
|
:obj:`str`: A json-encoded str representing the appointment.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
appointment = self.to_dict()
|
return json.dumps(self.to_dict(), sort_keys=True, separators=(",", ":"))
|
||||||
|
|
||||||
appointment["triggered"] = triggered
|
|
||||||
|
|
||||||
return json.dumps(appointment, sort_keys=True, separators=(",", ":"))
|
|
||||||
|
|
||||||
def serialize(self):
|
def serialize(self):
|
||||||
"""
|
"""
|
||||||
|
|||||||
13
pisa/api.py
13
pisa/api.py
@@ -110,17 +110,16 @@ class API:
|
|||||||
return jsonify(response)
|
return jsonify(response)
|
||||||
|
|
||||||
locator_map = self.watcher.db_manager.load_locator_map(locator)
|
locator_map = self.watcher.db_manager.load_locator_map(locator)
|
||||||
|
triggered_appointments = self.watcher.db_manager.load_all_triggered_flags()
|
||||||
|
|
||||||
if locator_map is not None:
|
if locator_map is not None:
|
||||||
for uuid in locator_map:
|
for uuid in locator_map:
|
||||||
appointment_data = self.watcher.db_manager.load_watcher_appointment(uuid)
|
if uuid not in triggered_appointments:
|
||||||
|
appointment_data = self.watcher.db_manager.load_watcher_appointment(uuid)
|
||||||
|
|
||||||
if appointment_data is not None and appointment_data["triggered"] is False:
|
if appointment_data is not None:
|
||||||
# Triggered is an internal flag
|
appointment_data["status"] = "being_watched"
|
||||||
del appointment_data["triggered"]
|
response.append(appointment_data)
|
||||||
|
|
||||||
appointment_data["status"] = "being_watched"
|
|
||||||
response.append(appointment_data)
|
|
||||||
|
|
||||||
tracker_data = self.watcher.db_manager.load_responder_tracker(uuid)
|
tracker_data = self.watcher.db_manager.load_responder_tracker(uuid)
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from common.logger import Logger
|
from common.logger import Logger
|
||||||
|
from common.appointment import Appointment
|
||||||
|
|
||||||
logger = Logger("Cleaner")
|
logger = Logger("Cleaner")
|
||||||
|
|
||||||
@@ -27,7 +28,7 @@ class Cleaner:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
for uuid in expired_appointments:
|
for uuid in expired_appointments:
|
||||||
locator = appointments[uuid].locator
|
locator = appointments[uuid].get("locator")
|
||||||
|
|
||||||
appointments.pop(uuid)
|
appointments.pop(uuid)
|
||||||
|
|
||||||
@@ -58,20 +59,21 @@ class Cleaner:
|
|||||||
database.
|
database.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
locator = appointments[uuid].get("locator")
|
||||||
|
|
||||||
# Delete the appointment
|
# Delete the appointment
|
||||||
appointment = appointments.pop(uuid)
|
appointments.pop(uuid)
|
||||||
|
|
||||||
# If there was only one appointment that matches the locator we can delete the whole list
|
# If there was only one appointment that matches the locator we can delete the whole list
|
||||||
if len(locator_uuid_map[appointment.locator]) == 1:
|
if len(locator_uuid_map[locator]) == 1:
|
||||||
locator_uuid_map.pop(appointment.locator)
|
locator_uuid_map.pop(locator)
|
||||||
else:
|
else:
|
||||||
# Otherwise we just delete the appointment that matches locator:appointment_pos
|
# Otherwise we just delete the appointment that matches locator:appointment_pos
|
||||||
locator_uuid_map[appointment.locator].remove(uuid)
|
locator_uuid_map[locator].remove(uuid)
|
||||||
|
|
||||||
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
|
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
|
||||||
# the watcher's and responder's db after fulfilled
|
# the watcher's and responder's db after fulfilled
|
||||||
# Update appointment in the db
|
db_manager.create_triggered_appointment_flag(uuid)
|
||||||
db_manager.store_watcher_appointment(uuid, appointment.to_json(triggered=True))
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager):
|
def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager):
|
||||||
@@ -98,8 +100,8 @@ class Cleaner:
|
|||||||
confirmations=confirmations,
|
confirmations=confirmations,
|
||||||
)
|
)
|
||||||
|
|
||||||
penalty_txid = trackers[uuid].penalty_txid
|
penalty_txid = trackers[uuid].get("penalty_txid")
|
||||||
locator = trackers[uuid].locator
|
locator = trackers[uuid].get("locator")
|
||||||
trackers.pop(uuid)
|
trackers.pop(uuid)
|
||||||
|
|
||||||
if len(tx_tracker_map[penalty_txid]) == 1:
|
if len(tx_tracker_map[penalty_txid]) == 1:
|
||||||
@@ -110,9 +112,10 @@ class Cleaner:
|
|||||||
else:
|
else:
|
||||||
tx_tracker_map[penalty_txid].remove(uuid)
|
tx_tracker_map[penalty_txid].remove(uuid)
|
||||||
|
|
||||||
# Delete appointment from the db (both watchers's and responder's)
|
# Delete appointment from the db (from watchers's and responder's db) and remove flag
|
||||||
db_manager.delete_watcher_appointment(uuid)
|
db_manager.delete_watcher_appointment(uuid)
|
||||||
db_manager.delete_responder_tracker(uuid)
|
db_manager.delete_responder_tracker(uuid)
|
||||||
|
db_manager.delete_triggered_appointment_flag(uuid)
|
||||||
|
|
||||||
# Update / delete the locator map
|
# Update / delete the locator map
|
||||||
locator_map = db_manager.load_locator_map(locator)
|
locator_map = db_manager.load_locator_map(locator)
|
||||||
@@ -120,13 +123,10 @@ class Cleaner:
|
|||||||
if uuid in locator_map:
|
if uuid in locator_map:
|
||||||
if len(locator_map) == 1:
|
if len(locator_map) == 1:
|
||||||
db_manager.delete_locator_map(locator)
|
db_manager.delete_locator_map(locator)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
locator_map.remove(uuid)
|
locator_map.remove(uuid)
|
||||||
db_manager.store_update_locator_map(locator, locator_map)
|
db_manager.store_update_locator_map(locator, locator_map)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error("UUID not found in the db", uuid=uuid)
|
logger.error("UUID not found in the db", uuid=uuid)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error("Locator not found in the db", uuid=uuid)
|
logger.error("Locator not found in the db", uuid=uuid)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ WATCHER_LAST_BLOCK_KEY = "bw"
|
|||||||
RESPONDER_PREFIX = "r"
|
RESPONDER_PREFIX = "r"
|
||||||
RESPONDER_LAST_BLOCK_KEY = "br"
|
RESPONDER_LAST_BLOCK_KEY = "br"
|
||||||
LOCATOR_MAP_PREFIX = "m"
|
LOCATOR_MAP_PREFIX = "m"
|
||||||
|
TRIGGERED_APPOINTMENTS_PREFIX = "ta"
|
||||||
|
|
||||||
|
|
||||||
class DBManager:
|
class DBManager:
|
||||||
@@ -17,13 +18,14 @@ class DBManager:
|
|||||||
The :class:`DBManager` is the class in charge of interacting with the appointments database (``LevelDB``).
|
The :class:`DBManager` is the class in charge of interacting with the appointments database (``LevelDB``).
|
||||||
Keys and values are stored as bytes in the database but processed as strings by the manager.
|
Keys and values are stored as bytes in the database but processed as strings by the manager.
|
||||||
|
|
||||||
The database is split in five prefixes:
|
The database is split in six prefixes:
|
||||||
|
|
||||||
- ``WATCHER_PREFIX``, defined as ``b'w``, is used to store :obj:`Watcher <pisa.watcher.Watcher>` appointments.
|
- ``WATCHER_PREFIX``, defined as ``b'w``, is used to store :obj:`Watcher <pisa.watcher.Watcher>` appointments.
|
||||||
- ``RESPONDER_PREFIX``, defines as ``b'r``, is used to store :obj:`Responder <pisa.responder.Responder>` trackers.
|
- ``RESPONDER_PREFIX``, defines as ``b'r``, is used to store :obj:`Responder <pisa.responder.Responder>` trackers.
|
||||||
- ``WATCHER_LAST_BLOCK_KEY``, defined as ``b'bw``, is used to store the last block hash known by the :obj:`Watcher <pisa.watcher.Watcher>`.
|
- ``WATCHER_LAST_BLOCK_KEY``, defined as ``b'bw``, is used to store the last block hash known by the :obj:`Watcher <pisa.watcher.Watcher>`.
|
||||||
- ``RESPONDER_LAST_BLOCK_KEY``, defined as ``b'br``, is used to store the last block hash known by the :obj:`Responder <pisa.responder.Responder>`.
|
- ``RESPONDER_LAST_BLOCK_KEY``, defined as ``b'br``, is used to store the last block hash known by the :obj:`Responder <pisa.responder.Responder>`.
|
||||||
- ``LOCATOR_MAP_PREFIX``, defined as ``b'm``, is used to store the ``locator:uuid`` maps.
|
- ``LOCATOR_MAP_PREFIX``, defined as ``b'm``, is used to store the ``locator:uuid`` maps.
|
||||||
|
- ``TRIGGERED_APPOINTMENTS_PREFIX``, defined as ``b'ta``, is used to stored triggered appointments (appointments that have been handed to the :obj:`Responder <pisa.responder.Responder>`.)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
db_path (:obj:`str`): the path (relative or absolute) to the system folder containing the database. A fresh
|
db_path (:obj:`str`): the path (relative or absolute) to the system folder containing the database. A fresh
|
||||||
@@ -160,10 +162,9 @@ class DBManager:
|
|||||||
def load_watcher_appointments(self, include_triggered=False):
|
def load_watcher_appointments(self, include_triggered=False):
|
||||||
"""
|
"""
|
||||||
Loads all the appointments from the database (all entries with the ``WATCHER_PREFIX`` prefix).
|
Loads all the appointments from the database (all entries with the ``WATCHER_PREFIX`` prefix).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
include_triggered (:obj:`bool`): Whether to include the appointments flagged as triggered or not. ``False`` by
|
include_triggered (:obj:`bool`): Whether to include the appointments flagged as triggered or not. ``False``
|
||||||
default.
|
by default.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
:obj:`dict`: A dictionary with all the appointments stored in the database. An empty dictionary is there
|
:obj:`dict`: A dictionary with all the appointments stored in the database. An empty dictionary is there
|
||||||
@@ -171,10 +172,11 @@ class DBManager:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
appointments = self.load_appointments_db(prefix=WATCHER_PREFIX)
|
appointments = self.load_appointments_db(prefix=WATCHER_PREFIX)
|
||||||
|
triggered_appointments = self.load_all_triggered_flags()
|
||||||
|
|
||||||
if not include_triggered:
|
if not include_triggered:
|
||||||
appointments = {
|
appointments = {
|
||||||
uuid: appointment for uuid, appointment in appointments.items() if appointment["triggered"] is False
|
uuid: appointment for uuid, appointment in appointments.items() if uuid not in triggered_appointments
|
||||||
}
|
}
|
||||||
|
|
||||||
return appointments
|
return appointments
|
||||||
@@ -332,3 +334,30 @@ class DBManager:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash)
|
self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash)
|
||||||
|
|
||||||
|
def create_triggered_appointment_flag(self, uuid):
|
||||||
|
"""
|
||||||
|
Creates a flag that signals that an appointment has been triggered.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.db.put((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8"), "".encode("utf-8"))
|
||||||
|
|
||||||
|
def load_all_triggered_flags(self):
|
||||||
|
"""
|
||||||
|
Loads all the appointment triggered flags from the database.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
:obj:`list`: a list of all the uuids of the triggered appointments.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return [
|
||||||
|
k.decode()[len(TRIGGERED_APPOINTMENTS_PREFIX) :]
|
||||||
|
for k, v in self.db.iterator(prefix=TRIGGERED_APPOINTMENTS_PREFIX.encode("utf-8"))
|
||||||
|
]
|
||||||
|
|
||||||
|
def delete_triggered_appointment_flag(self, uuid):
|
||||||
|
"""
|
||||||
|
Deletes a flag that signals that an appointment has been triggered.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.delete_entry(uuid, prefix=TRIGGERED_APPOINTMENTS_PREFIX)
|
||||||
|
|||||||
@@ -117,8 +117,9 @@ class Responder:
|
|||||||
database.
|
database.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
trackers (:obj:`dict`): A dictionary containing all the :obj:`TransactionTracker` handled by the
|
trackers (:obj:`dict`): A dictionary containing the minimum information about the :obj:`TransactionTracker`
|
||||||
:obj:`Responder`. Each entry is identified by a ``uuid``.
|
required by the :obj:`Responder` (``penalty_txid``, ``locator`` and ``end_time``).
|
||||||
|
Each entry is identified by a ``uuid``.
|
||||||
tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with
|
tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with
|
||||||
several trackers triggered by the same ``penalty_txid``.
|
several trackers triggered by the same ``penalty_txid``.
|
||||||
unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``.
|
unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``.
|
||||||
@@ -220,8 +221,9 @@ class Responder:
|
|||||||
"""
|
"""
|
||||||
Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``.
|
Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``.
|
||||||
|
|
||||||
The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to
|
A reduction of :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the
|
||||||
``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database.
|
``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
|
||||||
|
database.
|
||||||
|
|
||||||
``add_tracker`` awakes the :obj:`Responder` if it is asleep.
|
``add_tracker`` awakes the :obj:`Responder` if it is asleep.
|
||||||
|
|
||||||
@@ -238,7 +240,13 @@ class Responder:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||||
self.trackers[uuid] = tracker
|
|
||||||
|
# We only store the penalty_txid, locator and appointment_end in memory. The rest is dumped into the db.
|
||||||
|
self.trackers[uuid] = {
|
||||||
|
"penalty_txid": tracker.penalty_txid,
|
||||||
|
"locator": locator,
|
||||||
|
"appointment_end": appointment_end,
|
||||||
|
}
|
||||||
|
|
||||||
if penalty_txid in self.tx_tracker_map:
|
if penalty_txid in self.tx_tracker_map:
|
||||||
self.tx_tracker_map[penalty_txid].append(uuid)
|
self.tx_tracker_map[penalty_txid].append(uuid)
|
||||||
@@ -378,15 +386,16 @@ class Responder:
|
|||||||
|
|
||||||
completed_trackers = []
|
completed_trackers = []
|
||||||
|
|
||||||
for uuid, tracker in self.trackers.items():
|
for uuid, tracker_data in self.trackers.items():
|
||||||
if tracker.appointment_end <= height and tracker.penalty_txid not in self.unconfirmed_txs:
|
appointment_end = tracker_data.get("appointment_end")
|
||||||
tx = Carrier.get_transaction(tracker.penalty_txid)
|
penalty_txid = tracker_data.get("penalty_txid")
|
||||||
|
if appointment_end <= height and penalty_txid not in self.unconfirmed_txs:
|
||||||
|
tx = Carrier.get_transaction(penalty_txid)
|
||||||
|
|
||||||
# FIXME: Should be improved with the librarian
|
|
||||||
if tx is not None:
|
if tx is not None:
|
||||||
confirmations = tx.get("confirmations")
|
confirmations = tx.get("confirmations")
|
||||||
|
|
||||||
if confirmations >= MIN_CONFIRMATIONS:
|
if confirmations is not None and confirmations >= MIN_CONFIRMATIONS:
|
||||||
# The end of the appointment has been reached
|
# The end of the appointment has been reached
|
||||||
completed_trackers.append((uuid, confirmations))
|
completed_trackers.append((uuid, confirmations))
|
||||||
|
|
||||||
@@ -420,7 +429,7 @@ class Responder:
|
|||||||
# FIXME: This would potentially grab multiple instances of the same transaction and try to send them.
|
# FIXME: This would potentially grab multiple instances of the same transaction and try to send them.
|
||||||
# should we do it only once?
|
# should we do it only once?
|
||||||
for uuid in self.tx_tracker_map[txid]:
|
for uuid in self.tx_tracker_map[txid]:
|
||||||
tracker = self.trackers[uuid]
|
tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid))
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid
|
"Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid
|
||||||
)
|
)
|
||||||
@@ -446,7 +455,9 @@ class Responder:
|
|||||||
"""
|
"""
|
||||||
carrier = Carrier()
|
carrier = Carrier()
|
||||||
|
|
||||||
for uuid, tracker in self.trackers.items():
|
for uuid in self.trackers.keys():
|
||||||
|
tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid))
|
||||||
|
|
||||||
# First we check if the dispute transaction is known (exists either in mempool or blockchain)
|
# First we check if the dispute transaction is known (exists either in mempool or blockchain)
|
||||||
dispute_tx = carrier.get_transaction(tracker.dispute_txid)
|
dispute_tx = carrier.get_transaction(tracker.dispute_txid)
|
||||||
|
|
||||||
|
|||||||
@@ -3,9 +3,11 @@ from queue import Queue
|
|||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from common.cryptographer import Cryptographer
|
from common.cryptographer import Cryptographer
|
||||||
|
from common.appointment import Appointment
|
||||||
from common.tools import compute_locator
|
from common.tools import compute_locator
|
||||||
|
|
||||||
from common.logger import Logger
|
from common.logger import Logger
|
||||||
|
|
||||||
from pisa.cleaner import Cleaner
|
from pisa.cleaner import Cleaner
|
||||||
from pisa.responder import Responder
|
from pisa.responder import Responder
|
||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
@@ -40,8 +42,9 @@ class Watcher:
|
|||||||
|
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
appointments (:obj:`dict`): a dictionary containing all the appointments (:obj:`Appointment
|
appointments (:obj:`dict`): a dictionary containing a simplification of the appointments (:obj:`Appointment
|
||||||
<pisa.appointment.Appointment>` instances) accepted by the tower. It's populated trough ``add_appointment``.
|
<pisa.appointment.Appointment>` instances) accepted by the tower (``locator`` and ``end_time``).
|
||||||
|
It's populated trough ``add_appointment``.
|
||||||
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
|
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
|
||||||
appointments with the same ``locator``.
|
appointments with the same ``locator``.
|
||||||
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
|
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
|
||||||
@@ -103,8 +106,9 @@ class Watcher:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
if len(self.appointments) < self.config.get("MAX_APPOINTMENTS"):
|
if len(self.appointments) < self.config.get("MAX_APPOINTMENTS"):
|
||||||
|
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
self.appointments[uuid] = appointment
|
self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
|
||||||
|
|
||||||
if appointment.locator in self.locator_uuid_map:
|
if appointment.locator in self.locator_uuid_map:
|
||||||
self.locator_uuid_map[appointment.locator].append(uuid)
|
self.locator_uuid_map[appointment.locator].append(uuid)
|
||||||
@@ -123,11 +127,10 @@ class Watcher:
|
|||||||
self.db_manager.store_update_locator_map(appointment.locator, uuid)
|
self.db_manager.store_update_locator_map(appointment.locator, uuid)
|
||||||
|
|
||||||
appointment_added = True
|
appointment_added = True
|
||||||
|
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
|
||||||
|
|
||||||
logger.info("New appointment accepted", locator=appointment.locator)
|
logger.info("New appointment accepted", locator=appointment.locator)
|
||||||
|
|
||||||
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
appointment_added = False
|
appointment_added = False
|
||||||
signature = None
|
signature = None
|
||||||
@@ -157,8 +160,8 @@ class Watcher:
|
|||||||
|
|
||||||
expired_appointments = [
|
expired_appointments = [
|
||||||
uuid
|
uuid
|
||||||
for uuid, appointment in self.appointments.items()
|
for uuid, appointment_data in self.appointments.items()
|
||||||
if block["height"] > appointment.end_time + self.config.get("EXPIRY_DELTA")
|
if block["height"] > appointment_data.get("end_time") + self.config.get("EXPIRY_DELTA")
|
||||||
]
|
]
|
||||||
|
|
||||||
Cleaner.delete_expired_appointment(
|
Cleaner.delete_expired_appointment(
|
||||||
@@ -183,7 +186,7 @@ class Watcher:
|
|||||||
filtered_breach["dispute_txid"],
|
filtered_breach["dispute_txid"],
|
||||||
filtered_breach["penalty_txid"],
|
filtered_breach["penalty_txid"],
|
||||||
filtered_breach["penalty_rawtx"],
|
filtered_breach["penalty_rawtx"],
|
||||||
self.appointments[uuid].end_time,
|
self.appointments[uuid].get("end_time"),
|
||||||
block_hash,
|
block_hash,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -249,9 +252,10 @@ class Watcher:
|
|||||||
|
|
||||||
for locator, dispute_txid in breaches.items():
|
for locator, dispute_txid in breaches.items():
|
||||||
for uuid in self.locator_uuid_map[locator]:
|
for uuid in self.locator_uuid_map[locator]:
|
||||||
|
appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
penalty_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid)
|
penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
|
||||||
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
penalty_rawtx = None
|
penalty_rawtx = None
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from uuid import uuid4
|
|||||||
from pisa.responder import TransactionTracker
|
from pisa.responder import TransactionTracker
|
||||||
from pisa.cleaner import Cleaner
|
from pisa.cleaner import Cleaner
|
||||||
from common.appointment import Appointment
|
from common.appointment import Appointment
|
||||||
from pisa.db_manager import WATCHER_PREFIX
|
from pisa.db_manager import WATCHER_PREFIX, TRIGGERED_APPOINTMENTS_PREFIX
|
||||||
|
|
||||||
from test.pisa.unit.conftest import get_random_value_hex
|
from test.pisa.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
@@ -26,7 +26,7 @@ def set_up_appointments(db_manager, total_appointments):
|
|||||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||||
|
|
||||||
appointment = Appointment(locator, None, None, None, None)
|
appointment = Appointment(locator, None, None, None, None)
|
||||||
appointments[uuid] = appointment
|
appointments[uuid] = {"locator": appointment.locator}
|
||||||
locator_uuid_map[locator] = [uuid]
|
locator_uuid_map[locator] = [uuid]
|
||||||
|
|
||||||
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||||
@@ -36,7 +36,7 @@ def set_up_appointments(db_manager, total_appointments):
|
|||||||
if i % 2:
|
if i % 2:
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
|
|
||||||
appointments[uuid] = appointment
|
appointments[uuid] = {"locator": appointment.locator}
|
||||||
locator_uuid_map[locator].append(uuid)
|
locator_uuid_map[locator].append(uuid)
|
||||||
|
|
||||||
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||||
@@ -59,7 +59,7 @@ def set_up_trackers(db_manager, total_trackers):
|
|||||||
|
|
||||||
# Assign both penalty_txid and dispute_txid the same id (it shouldn't matter)
|
# Assign both penalty_txid and dispute_txid the same id (it shouldn't matter)
|
||||||
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, None, None)
|
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, None, None)
|
||||||
trackers[uuid] = tracker
|
trackers[uuid] = {"locator": tracker.locator, "penalty_txid": tracker.penalty_txid}
|
||||||
tx_tracker_map[penalty_txid] = [uuid]
|
tx_tracker_map[penalty_txid] = [uuid]
|
||||||
|
|
||||||
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||||
@@ -69,7 +69,7 @@ def set_up_trackers(db_manager, total_trackers):
|
|||||||
if i % 2:
|
if i % 2:
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
|
|
||||||
trackers[uuid] = tracker
|
trackers[uuid] = {"locator": tracker.locator, "penalty_txid": tracker.penalty_txid}
|
||||||
tx_tracker_map[penalty_txid].append(uuid)
|
tx_tracker_map[penalty_txid].append(uuid)
|
||||||
|
|
||||||
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||||
@@ -99,9 +99,8 @@ def test_delete_completed_appointments(db_manager):
|
|||||||
assert len(appointments) == 0
|
assert len(appointments) == 0
|
||||||
|
|
||||||
# Make sure that all appointments are flagged as triggered in the db
|
# Make sure that all appointments are flagged as triggered in the db
|
||||||
db_appointments = db_manager.load_appointments_db(prefix=WATCHER_PREFIX)
|
|
||||||
for uuid in uuids:
|
for uuid in uuids:
|
||||||
assert db_appointments[uuid]["triggered"] is True
|
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + uuid).encode("utf-8")) is not None
|
||||||
|
|
||||||
|
|
||||||
def test_delete_completed_trackers_db_match(db_manager):
|
def test_delete_completed_trackers_db_match(db_manager):
|
||||||
@@ -128,12 +127,12 @@ def test_delete_completed_trackers_no_db_match(db_manager):
|
|||||||
# Let's change some uuid's by creating new trackers that are not included in the db and share a penalty_txid
|
# Let's change some uuid's by creating new trackers that are not included in the db and share a penalty_txid
|
||||||
# with another tracker that is stored in the db.
|
# with another tracker that is stored in the db.
|
||||||
for uuid in selected_trackers[: ITEMS // 2]:
|
for uuid in selected_trackers[: ITEMS // 2]:
|
||||||
penalty_txid = trackers[uuid].penalty_txid
|
penalty_txid = trackers[uuid].get("penalty_txid")
|
||||||
dispute_txid = get_random_value_hex(32)
|
dispute_txid = get_random_value_hex(32)
|
||||||
locator = dispute_txid[:LOCATOR_LEN_HEX]
|
locator = dispute_txid[:LOCATOR_LEN_HEX]
|
||||||
new_uuid = uuid4().hex
|
new_uuid = uuid4().hex
|
||||||
|
|
||||||
trackers[new_uuid] = TransactionTracker(locator, dispute_txid, penalty_txid, None, None)
|
trackers[new_uuid] = {"locator": locator, "penalty_txid": penalty_txid}
|
||||||
tx_tracker_map[penalty_txid].append(new_uuid)
|
tx_tracker_map[penalty_txid].append(new_uuid)
|
||||||
selected_trackers.append(new_uuid)
|
selected_trackers.append(new_uuid)
|
||||||
|
|
||||||
@@ -144,7 +143,7 @@ def test_delete_completed_trackers_no_db_match(db_manager):
|
|||||||
dispute_txid = get_random_value_hex(32)
|
dispute_txid = get_random_value_hex(32)
|
||||||
locator = dispute_txid[:LOCATOR_LEN_HEX]
|
locator = dispute_txid[:LOCATOR_LEN_HEX]
|
||||||
|
|
||||||
trackers[uuid] = TransactionTracker(locator, dispute_txid, penalty_txid, None, None)
|
trackers[uuid] = {"locator": locator, "penalty_txid": penalty_txid}
|
||||||
tx_tracker_map[penalty_txid] = [uuid]
|
tx_tracker_map[penalty_txid] = [uuid]
|
||||||
selected_trackers.append(uuid)
|
selected_trackers.append(uuid)
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,12 @@ import shutil
|
|||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from pisa.db_manager import DBManager
|
from pisa.db_manager import DBManager
|
||||||
from pisa.db_manager import WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY, LOCATOR_MAP_PREFIX
|
from pisa.db_manager import (
|
||||||
|
WATCHER_LAST_BLOCK_KEY,
|
||||||
|
RESPONDER_LAST_BLOCK_KEY,
|
||||||
|
LOCATOR_MAP_PREFIX,
|
||||||
|
TRIGGERED_APPOINTMENTS_PREFIX,
|
||||||
|
)
|
||||||
|
|
||||||
from common.constants import LOCATOR_LEN_BYTES
|
from common.constants import LOCATOR_LEN_BYTES
|
||||||
|
|
||||||
@@ -221,7 +226,8 @@ def test_store_load_triggered_appointment(db_manager):
|
|||||||
# Create an appointment flagged as triggered
|
# Create an appointment flagged as triggered
|
||||||
triggered_appointment, _ = generate_dummy_appointment(real_height=False)
|
triggered_appointment, _ = generate_dummy_appointment(real_height=False)
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
db_manager.store_watcher_appointment(uuid, triggered_appointment.to_json(triggered=True))
|
db_manager.store_watcher_appointment(uuid, triggered_appointment.to_json())
|
||||||
|
db_manager.create_triggered_appointment_flag(uuid)
|
||||||
|
|
||||||
# The new appointment is grabbed only if we set include_triggered
|
# The new appointment is grabbed only if we set include_triggered
|
||||||
assert db_watcher_appointments == db_manager.load_watcher_appointments()
|
assert db_watcher_appointments == db_manager.load_watcher_appointments()
|
||||||
@@ -282,3 +288,40 @@ def test_store_load_last_block_hash_responder(db_manager):
|
|||||||
db_last_block_hash = db_manager.load_last_block_hash_responder()
|
db_last_block_hash = db_manager.load_last_block_hash_responder()
|
||||||
|
|
||||||
assert local_last_block_hash == db_last_block_hash
|
assert local_last_block_hash == db_last_block_hash
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_triggered_appointment_flag(db_manager):
|
||||||
|
# Test that flags are added
|
||||||
|
key = get_random_value_hex(16)
|
||||||
|
db_manager.create_triggered_appointment_flag(key)
|
||||||
|
|
||||||
|
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + key).encode("utf-8")) is not None
|
||||||
|
|
||||||
|
# Test to get a random one that we haven't added
|
||||||
|
key = get_random_value_hex(16)
|
||||||
|
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + key).encode("utf-8")) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_all_triggered_flags(db_manager):
|
||||||
|
# There should be a some flags in the db from the previous tests. Let's load them
|
||||||
|
flags = db_manager.load_all_triggered_flags()
|
||||||
|
|
||||||
|
# We can add another flag and see that there's two now
|
||||||
|
new_uuid = uuid4().hex
|
||||||
|
db_manager.create_triggered_appointment_flag(new_uuid)
|
||||||
|
flags.append(new_uuid)
|
||||||
|
|
||||||
|
assert set(db_manager.load_all_triggered_flags()) == set(flags)
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_triggered_appointment_flag(db_manager):
|
||||||
|
# Test data is properly deleted.
|
||||||
|
keys = db_manager.load_all_triggered_flags()
|
||||||
|
|
||||||
|
# Delete all entries
|
||||||
|
for k in keys:
|
||||||
|
db_manager.delete_triggered_appointment_flag(k)
|
||||||
|
|
||||||
|
# Try to load them back
|
||||||
|
for k in keys:
|
||||||
|
assert db_manager.db.get((TRIGGERED_APPOINTMENTS_PREFIX + k).encode("utf-8")) is None
|
||||||
|
|||||||
@@ -231,11 +231,9 @@ def test_add_tracker(responder):
|
|||||||
# Check that the rest of tracker data also matches
|
# Check that the rest of tracker data also matches
|
||||||
tracker = responder.trackers[uuid]
|
tracker = responder.trackers[uuid]
|
||||||
assert (
|
assert (
|
||||||
tracker.dispute_txid == dispute_txid
|
tracker.get("penalty_txid") == penalty_txid
|
||||||
and tracker.penalty_txid == penalty_txid
|
and tracker.get("locator") == locator
|
||||||
and tracker.penalty_rawtx == penalty_rawtx
|
and tracker.get("appointment_end") == appointment_end
|
||||||
and tracker.appointment_end == appointment_end
|
|
||||||
and tracker.appointment_end == appointment_end
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -259,11 +257,9 @@ def test_add_tracker_same_penalty_txid(responder):
|
|||||||
for uuid in [uuid_1, uuid_2]:
|
for uuid in [uuid_1, uuid_2]:
|
||||||
tracker = responder.trackers[uuid]
|
tracker = responder.trackers[uuid]
|
||||||
assert (
|
assert (
|
||||||
tracker.dispute_txid == dispute_txid
|
tracker.get("penalty_txid") == penalty_txid
|
||||||
and tracker.penalty_txid == penalty_txid
|
and tracker.get("locator") == locator
|
||||||
and tracker.penalty_rawtx == penalty_rawtx
|
and tracker.get("appointment_end") == appointment_end
|
||||||
and tracker.appointment_end == appointment_end
|
|
||||||
and tracker.appointment_end == appointment_end
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -293,11 +289,19 @@ def test_do_watch(temp_db_manager, chain_monitor):
|
|||||||
for tracker in trackers:
|
for tracker in trackers:
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
|
|
||||||
responder.trackers[uuid] = tracker
|
responder.trackers[uuid] = {
|
||||||
|
"locator": tracker.locator,
|
||||||
|
"penalty_txid": tracker.penalty_txid,
|
||||||
|
"appointment_end": tracker.appointment_end,
|
||||||
|
}
|
||||||
responder.tx_tracker_map[tracker.penalty_txid] = [uuid]
|
responder.tx_tracker_map[tracker.penalty_txid] = [uuid]
|
||||||
responder.missed_confirmations[tracker.penalty_txid] = 0
|
responder.missed_confirmations[tracker.penalty_txid] = 0
|
||||||
responder.unconfirmed_txs.append(tracker.penalty_txid)
|
responder.unconfirmed_txs.append(tracker.penalty_txid)
|
||||||
|
|
||||||
|
# We also need to store the info in the db
|
||||||
|
responder.db_manager.create_triggered_appointment_flag(uuid)
|
||||||
|
responder.db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||||
|
|
||||||
# Let's start to watch
|
# Let's start to watch
|
||||||
Thread(target=responder.do_watch, daemon=True).start()
|
Thread(target=responder.do_watch, daemon=True).start()
|
||||||
|
|
||||||
@@ -413,12 +417,20 @@ def test_get_completed_trackers(db_manager, chain_monitor):
|
|||||||
tracker.appointment_end += 10
|
tracker.appointment_end += 10
|
||||||
trackers_no_end[uuid4().hex] = tracker
|
trackers_no_end[uuid4().hex] = tracker
|
||||||
|
|
||||||
# Let's add all to the responder
|
all_trackers = {}
|
||||||
responder.trackers.update(trackers_end_conf)
|
all_trackers.update(trackers_end_conf)
|
||||||
responder.trackers.update(trackers_end_no_conf)
|
all_trackers.update(trackers_end_no_conf)
|
||||||
responder.trackers.update(trackers_no_end)
|
all_trackers.update(trackers_no_end)
|
||||||
|
|
||||||
for uuid, tracker in responder.trackers.items():
|
# Let's add all to the responder
|
||||||
|
for uuid, tracker in all_trackers.items():
|
||||||
|
responder.trackers[uuid] = {
|
||||||
|
"locator": tracker.locator,
|
||||||
|
"penalty_txid": tracker.penalty_txid,
|
||||||
|
"appointment_end": tracker.appointment_end,
|
||||||
|
}
|
||||||
|
|
||||||
|
for uuid, tracker in all_trackers.items():
|
||||||
bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx)
|
bitcoin_cli().sendrawtransaction(tracker.penalty_rawtx)
|
||||||
|
|
||||||
# The dummy appointments have a end_appointment time of current + 2, but trackers need at least 6 confs by default
|
# The dummy appointments have a end_appointment time of current + 2, but trackers need at least 6 confs by default
|
||||||
@@ -454,9 +466,18 @@ def test_rebroadcast(db_manager, chain_monitor):
|
|||||||
penalty_rawtx=TX.create_dummy_transaction()
|
penalty_rawtx=TX.create_dummy_transaction()
|
||||||
)
|
)
|
||||||
|
|
||||||
responder.trackers[uuid] = TransactionTracker(
|
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
|
||||||
locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end
|
|
||||||
)
|
responder.trackers[uuid] = {
|
||||||
|
"locator": locator,
|
||||||
|
"penalty_txid": penalty_txid,
|
||||||
|
"appointment_end": appointment_end,
|
||||||
|
}
|
||||||
|
|
||||||
|
# We need to add it to the db too
|
||||||
|
responder.db_manager.create_triggered_appointment_flag(uuid)
|
||||||
|
responder.db_manager.store_responder_tracker(uuid, tracker.to_json())
|
||||||
|
|
||||||
responder.tx_tracker_map[penalty_txid] = [uuid]
|
responder.tx_tracker_map[penalty_txid] = [uuid]
|
||||||
responder.unconfirmed_txs.append(penalty_txid)
|
responder.unconfirmed_txs.append(penalty_txid)
|
||||||
|
|
||||||
|
|||||||
@@ -130,9 +130,18 @@ def test_add_too_many_appointments(watcher):
|
|||||||
|
|
||||||
def test_do_watch(watcher):
|
def test_do_watch(watcher):
|
||||||
# We will wipe all the previous data and add 5 appointments
|
# We will wipe all the previous data and add 5 appointments
|
||||||
watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
|
appointments, locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
|
||||||
watcher.chain_monitor.watcher_asleep = False
|
watcher.chain_monitor.watcher_asleep = False
|
||||||
|
|
||||||
|
# Set the data into the Watcher and in the db
|
||||||
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
|
watcher.appointments = {}
|
||||||
|
|
||||||
|
for uuid, appointment in appointments.items():
|
||||||
|
watcher.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
|
||||||
|
watcher.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||||
|
watcher.db_manager.store_update_locator_map(appointment.locator, uuid)
|
||||||
|
|
||||||
Thread(target=watcher.do_watch, daemon=True).start()
|
Thread(target=watcher.do_watch, daemon=True).start()
|
||||||
|
|
||||||
# Broadcast the first two
|
# Broadcast the first two
|
||||||
@@ -179,7 +188,9 @@ def test_filter_valid_breaches_random_data(watcher):
|
|||||||
for i in range(TEST_SET_SIZE):
|
for i in range(TEST_SET_SIZE):
|
||||||
dummy_appointment, _ = generate_dummy_appointment()
|
dummy_appointment, _ = generate_dummy_appointment()
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
appointments[uuid] = dummy_appointment
|
appointments[uuid] = {"locator": dummy_appointment.locator, "end_time": dummy_appointment.end_time}
|
||||||
|
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_json())
|
||||||
|
watcher.db_manager.store_update_locator_map(dummy_appointment.locator, uuid)
|
||||||
|
|
||||||
locator_uuid_map[dummy_appointment.locator] = [uuid]
|
locator_uuid_map[dummy_appointment.locator] = [uuid]
|
||||||
|
|
||||||
@@ -215,7 +226,11 @@ def test_filter_valid_breaches(watcher):
|
|||||||
locator_uuid_map = {dummy_appointment.locator: [uuid]}
|
locator_uuid_map = {dummy_appointment.locator: [uuid]}
|
||||||
breaches = {dummy_appointment.locator: dispute_txid}
|
breaches = {dummy_appointment.locator: dispute_txid}
|
||||||
|
|
||||||
watcher.appointments = appointments
|
for uuid, appointment in appointments.items():
|
||||||
|
watcher.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
|
||||||
|
watcher.db_manager.store_watcher_appointment(uuid, dummy_appointment.to_json())
|
||||||
|
watcher.db_manager.store_update_locator_map(dummy_appointment.locator, uuid)
|
||||||
|
|
||||||
watcher.locator_uuid_map = locator_uuid_map
|
watcher.locator_uuid_map = locator_uuid_map
|
||||||
|
|
||||||
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
filtered_valid_breaches = watcher.filter_valid_breaches(breaches)
|
||||||
|
|||||||
Reference in New Issue
Block a user