Removes most appointment data from memory

Data used to be stored both in memory and disk (db). This commits modifies the Watcher, Responder and Cleaner so they only keep the needed maps and load information from disk when necessary.
This commit is contained in:
Sergi Delgado Segura
2020-01-09 18:18:26 +01:00
parent 34636ab8e0
commit 5196f5df29
3 changed files with 49 additions and 35 deletions

View File

@@ -1,4 +1,5 @@
from common.logger import Logger from common.logger import Logger
from common.appointment import Appointment
logger = Logger("Cleaner") logger = Logger("Cleaner")
@@ -27,7 +28,7 @@ class Cleaner:
""" """
for uuid in expired_appointments: for uuid in expired_appointments:
locator = appointments[uuid].locator locator = appointments[uuid].get("locator")
appointments.pop(uuid) appointments.pop(uuid)
@@ -58,20 +59,21 @@ class Cleaner:
database. database.
""" """
locator = appointments[uuid].get("locator")
# Delete the appointment # Delete the appointment
appointment = appointments.pop(uuid) appointments.pop(uuid)
# If there was only one appointment that matches the locator we can delete the whole list # If there was only one appointment that matches the locator we can delete the whole list
if len(locator_uuid_map[appointment.locator]) == 1: if len(locator_uuid_map[locator]) == 1:
locator_uuid_map.pop(appointment.locator) locator_uuid_map.pop(locator)
else: else:
# Otherwise we just delete the appointment that matches locator:appointment_pos # Otherwise we just delete the appointment that matches locator:appointment_pos
locator_uuid_map[appointment.locator].remove(uuid) locator_uuid_map[locator].remove(uuid)
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both # DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
# the watcher's and responder's db after fulfilled # the watcher's and responder's db after fulfilled
# Update appointment in the db db_manager.create_triggered_appointment_flag(uuid)
db_manager.store_watcher_appointment(uuid, appointment.to_json(triggered=True))
@staticmethod @staticmethod
def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager): def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager):
@@ -98,8 +100,8 @@ class Cleaner:
confirmations=confirmations, confirmations=confirmations,
) )
penalty_txid = trackers[uuid].penalty_txid penalty_txid = trackers[uuid].get("penalty_txid")
locator = trackers[uuid].locator locator = trackers[uuid].get("locator")
trackers.pop(uuid) trackers.pop(uuid)
if len(tx_tracker_map[penalty_txid]) == 1: if len(tx_tracker_map[penalty_txid]) == 1:
@@ -110,9 +112,10 @@ class Cleaner:
else: else:
tx_tracker_map[penalty_txid].remove(uuid) tx_tracker_map[penalty_txid].remove(uuid)
# Delete appointment from the db (both watchers's and responder's) # Delete appointment from the db (from watchers's and responder's db) and remove flag
db_manager.delete_watcher_appointment(uuid) db_manager.delete_watcher_appointment(uuid)
db_manager.delete_responder_tracker(uuid) db_manager.delete_responder_tracker(uuid)
db_manager.delete_triggered_appointment_flag(uuid)
# Update / delete the locator map # Update / delete the locator map
locator_map = db_manager.load_locator_map(locator) locator_map = db_manager.load_locator_map(locator)
@@ -120,13 +123,10 @@ class Cleaner:
if uuid in locator_map: if uuid in locator_map:
if len(locator_map) == 1: if len(locator_map) == 1:
db_manager.delete_locator_map(locator) db_manager.delete_locator_map(locator)
else: else:
locator_map.remove(uuid) locator_map.remove(uuid)
db_manager.store_update_locator_map(locator, locator_map) db_manager.store_update_locator_map(locator, locator_map)
else: else:
logger.error("UUID not found in the db", uuid=uuid) logger.error("UUID not found in the db", uuid=uuid)
else: else:
logger.error("Locator not found in the db", uuid=uuid) logger.error("Locator not found in the db", uuid=uuid)

View File

@@ -118,8 +118,9 @@ class Responder:
database. database.
Attributes: Attributes:
trackers (:obj:`dict`): A dictionary containing all the :obj:`TransactionTracker` handled by the trackers (:obj:`dict`): A dictionary containing the minimum information about the :obj:`TransactionTracker`
:obj:`Responder`. Each entry is identified by a ``uuid``. required by the :obj:`Responder` (``penalty_txid``, ``locator`` and ``end_time``).
Each entry is identified by a ``uuid``.
tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with
several trackers triggered by the same ``penalty_txid``. several trackers triggered by the same ``penalty_txid``.
unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``.
@@ -221,8 +222,9 @@ class Responder:
""" """
Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``. Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``.
The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to A reduction of :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the
``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database. ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the
database.
``add_tracker`` awakes the :obj:`Responder` and creates a connection with the ``add_tracker`` awakes the :obj:`Responder` and creates a connection with the
:obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` if he is asleep. :obj:`ZMQSubscriber <pisa.utils.zmq_subscriber.ZMQSubscriber>` if he is asleep.
@@ -240,7 +242,13 @@ class Responder:
""" """
tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end)
self.trackers[uuid] = tracker
# We only store the penalty_txid, locator and appointment_end in memory. The rest is dumped into the db.
self.trackers[uuid] = {
"penalty_txid": tracker.penalty_txid,
"locator": locator,
"appointment_end": appointment_end,
}
if penalty_txid in self.tx_tracker_map: if penalty_txid in self.tx_tracker_map:
self.tx_tracker_map[penalty_txid].append(uuid) self.tx_tracker_map[penalty_txid].append(uuid)
@@ -392,15 +400,16 @@ class Responder:
completed_trackers = [] completed_trackers = []
for uuid, tracker in self.trackers.items(): for uuid, tracker_data in self.trackers.items():
if tracker.appointment_end <= height and tracker.penalty_txid not in self.unconfirmed_txs: appointment_end = tracker_data.get("appointment_end")
tx = Carrier.get_transaction(tracker.penalty_txid) penalty_txid = tracker_data.get("penalty_txid")
if appointment_end <= height and penalty_txid not in self.unconfirmed_txs:
tx = Carrier.get_transaction(penalty_txid)
# FIXME: Should be improved with the librarian
if tx is not None: if tx is not None:
confirmations = tx.get("confirmations") confirmations = tx.get("confirmations")
if confirmations >= MIN_CONFIRMATIONS: if confirmations is not None and confirmations >= MIN_CONFIRMATIONS:
# The end of the appointment has been reached # The end of the appointment has been reached
completed_trackers.append((uuid, confirmations)) completed_trackers.append((uuid, confirmations))
@@ -434,7 +443,7 @@ class Responder:
# FIXME: This would potentially grab multiple instances of the same transaction and try to send them. # FIXME: This would potentially grab multiple instances of the same transaction and try to send them.
# should we do it only once? # should we do it only once?
for uuid in self.tx_tracker_map[txid]: for uuid in self.tx_tracker_map[txid]:
tracker = self.trackers[uuid] tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid))
logger.warning( logger.warning(
"Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid "Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid
) )
@@ -460,7 +469,9 @@ class Responder:
""" """
carrier = Carrier() carrier = Carrier()
for uuid, tracker in self.trackers.items(): for uuid in self.trackers.keys():
tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid))
# First we check if the dispute transaction is known (exists either in mempool or blockchain) # First we check if the dispute transaction is known (exists either in mempool or blockchain)
dispute_tx = carrier.get_transaction(tracker.dispute_txid) dispute_tx = carrier.get_transaction(tracker.dispute_txid)

View File

@@ -4,8 +4,9 @@ from threading import Thread
from common.cryptographer import Cryptographer from common.cryptographer import Cryptographer
from common.constants import LOCATOR_LEN_HEX from common.constants import LOCATOR_LEN_HEX
from common.appointment import Appointment
from common.logger import Logger from common.logger import Logger
from pisa.cleaner import Cleaner from pisa.cleaner import Cleaner
from pisa.responder import Responder from pisa.responder import Responder
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor
@@ -40,8 +41,9 @@ class Watcher:
Attributes: Attributes:
appointments (:obj:`dict`): a dictionary containing all the appointments (:obj:`Appointment appointments (:obj:`dict`): a dictionary containing a simplification of the appointments (:obj:`Appointment
<pisa.appointment.Appointment>` instances) accepted by the tower. It's populated trough ``add_appointment``. <pisa.appointment.Appointment>` instances) accepted by the tower (``locator`` and ``end_time``).
It's populated trough ``add_appointment``.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
appointments with the same ``locator``. appointments with the same ``locator``.
asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake.
@@ -116,8 +118,9 @@ class Watcher:
""" """
if len(self.appointments) < self.max_appointments: if len(self.appointments) < self.max_appointments:
# Appointments are stored in disk, we only keep the end_time, locator and locator_uuid map in memory
uuid = uuid4().hex uuid = uuid4().hex
self.appointments[uuid] = appointment self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time}
if appointment.locator in self.locator_uuid_map: if appointment.locator in self.locator_uuid_map:
self.locator_uuid_map[appointment.locator].append(uuid) self.locator_uuid_map[appointment.locator].append(uuid)
@@ -138,11 +141,10 @@ class Watcher:
self.db_manager.store_update_locator_map(appointment.locator, uuid) self.db_manager.store_update_locator_map(appointment.locator, uuid)
appointment_added = True appointment_added = True
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
logger.info("New appointment accepted", locator=appointment.locator) logger.info("New appointment accepted", locator=appointment.locator)
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
else: else:
appointment_added = False appointment_added = False
signature = None signature = None
@@ -181,8 +183,8 @@ class Watcher:
expired_appointments = [ expired_appointments = [
uuid uuid
for uuid, appointment in self.appointments.items() for uuid, appointment_data in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA if block["height"] > appointment_data.get("end_time") + EXPIRY_DELTA
] ]
Cleaner.delete_expired_appointment( Cleaner.delete_expired_appointment(
@@ -207,7 +209,7 @@ class Watcher:
filtered_breach["dispute_txid"], filtered_breach["dispute_txid"],
filtered_breach["penalty_txid"], filtered_breach["penalty_txid"],
filtered_breach["penalty_rawtx"], filtered_breach["penalty_rawtx"],
self.appointments[uuid].end_time, self.appointments[uuid].get("end_time"),
block_hash, block_hash,
) )
@@ -274,9 +276,10 @@ class Watcher:
for locator, dispute_txid in breaches.items(): for locator, dispute_txid in breaches.items():
for uuid in self.locator_uuid_map[locator]: for uuid in self.locator_uuid_map[locator]:
appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid))
try: try:
penalty_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid) penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid)
except ValueError: except ValueError:
penalty_rawtx = None penalty_rawtx = None