diff --git a/pisa/cleaner.py b/pisa/cleaner.py index a00cc22..6b8d73e 100644 --- a/pisa/cleaner.py +++ b/pisa/cleaner.py @@ -1,4 +1,5 @@ from common.logger import Logger +from common.appointment import Appointment logger = Logger("Cleaner") @@ -27,7 +28,7 @@ class Cleaner: """ for uuid in expired_appointments: - locator = appointments[uuid].locator + locator = appointments[uuid].get("locator") appointments.pop(uuid) @@ -58,20 +59,21 @@ class Cleaner: database. """ + locator = appointments[uuid].get("locator") + # Delete the appointment - appointment = appointments.pop(uuid) + appointments.pop(uuid) # If there was only one appointment that matches the locator we can delete the whole list - if len(locator_uuid_map[appointment.locator]) == 1: - locator_uuid_map.pop(appointment.locator) + if len(locator_uuid_map[locator]) == 1: + locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos - locator_uuid_map[appointment.locator].remove(uuid) + locator_uuid_map[locator].remove(uuid) # DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both # the watcher's and responder's db after fulfilled - # Update appointment in the db - db_manager.store_watcher_appointment(uuid, appointment.to_json(triggered=True)) + db_manager.create_triggered_appointment_flag(uuid) @staticmethod def delete_completed_trackers(completed_trackers, height, trackers, tx_tracker_map, db_manager): @@ -98,8 +100,8 @@ class Cleaner: confirmations=confirmations, ) - penalty_txid = trackers[uuid].penalty_txid - locator = trackers[uuid].locator + penalty_txid = trackers[uuid].get("penalty_txid") + locator = trackers[uuid].get("locator") trackers.pop(uuid) if len(tx_tracker_map[penalty_txid]) == 1: @@ -110,9 +112,10 @@ class Cleaner: else: tx_tracker_map[penalty_txid].remove(uuid) - # Delete appointment from the db (both watchers's and responder's) + # Delete appointment from the db (from watchers's and responder's db) and remove flag db_manager.delete_watcher_appointment(uuid) db_manager.delete_responder_tracker(uuid) + db_manager.delete_triggered_appointment_flag(uuid) # Update / delete the locator map locator_map = db_manager.load_locator_map(locator) @@ -120,13 +123,10 @@ class Cleaner: if uuid in locator_map: if len(locator_map) == 1: db_manager.delete_locator_map(locator) - else: locator_map.remove(uuid) db_manager.store_update_locator_map(locator, locator_map) - else: logger.error("UUID not found in the db", uuid=uuid) - else: logger.error("Locator not found in the db", uuid=uuid) diff --git a/pisa/responder.py b/pisa/responder.py index 0fce1cb..aa5f672 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -118,8 +118,9 @@ class Responder: database. Attributes: - trackers (:obj:`dict`): A dictionary containing all the :obj:`TransactionTracker` handled by the - :obj:`Responder`. Each entry is identified by a ``uuid``. + trackers (:obj:`dict`): A dictionary containing the minimum information about the :obj:`TransactionTracker` + required by the :obj:`Responder` (``penalty_txid``, ``locator`` and ``end_time``). + Each entry is identified by a ``uuid``. tx_tracker_map (:obj:`dict`): A ``penalty_txid:uuid`` map used to allow the :obj:`Responder` to deal with several trackers triggered by the same ``penalty_txid``. unconfirmed_txs (:obj:`list`): A list that keeps track of all unconfirmed ``penalty_txs``. @@ -221,8 +222,9 @@ class Responder: """ Creates a :obj:`TransactionTracker` after successfully broadcasting a ``penalty_tx``. - The :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the ``penalty_txid`` added to - ``unconfirmed_txs`` if ``confirmations=0``. Finally, the data is also stored in the database. + A reduction of :obj:`TransactionTracker` is stored in ``trackers`` and ``tx_tracker_map`` and the + ``penalty_txid`` added to ``unconfirmed_txs`` if ``confirmations=0``. Finally, all the data is stored in the + database. ``add_tracker`` awakes the :obj:`Responder` and creates a connection with the :obj:`ZMQSubscriber ` if he is asleep. @@ -240,7 +242,13 @@ class Responder: """ tracker = TransactionTracker(locator, dispute_txid, penalty_txid, penalty_rawtx, appointment_end) - self.trackers[uuid] = tracker + + # We only store the penalty_txid, locator and appointment_end in memory. The rest is dumped into the db. + self.trackers[uuid] = { + "penalty_txid": tracker.penalty_txid, + "locator": locator, + "appointment_end": appointment_end, + } if penalty_txid in self.tx_tracker_map: self.tx_tracker_map[penalty_txid].append(uuid) @@ -392,15 +400,16 @@ class Responder: completed_trackers = [] - for uuid, tracker in self.trackers.items(): - if tracker.appointment_end <= height and tracker.penalty_txid not in self.unconfirmed_txs: - tx = Carrier.get_transaction(tracker.penalty_txid) + for uuid, tracker_data in self.trackers.items(): + appointment_end = tracker_data.get("appointment_end") + penalty_txid = tracker_data.get("penalty_txid") + if appointment_end <= height and penalty_txid not in self.unconfirmed_txs: + tx = Carrier.get_transaction(penalty_txid) - # FIXME: Should be improved with the librarian if tx is not None: confirmations = tx.get("confirmations") - if confirmations >= MIN_CONFIRMATIONS: + if confirmations is not None and confirmations >= MIN_CONFIRMATIONS: # The end of the appointment has been reached completed_trackers.append((uuid, confirmations)) @@ -434,7 +443,7 @@ class Responder: # FIXME: This would potentially grab multiple instances of the same transaction and try to send them. # should we do it only once? for uuid in self.tx_tracker_map[txid]: - tracker = self.trackers[uuid] + tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid)) logger.warning( "Transaction has missed many confirmations. Rebroadcasting", penalty_txid=tracker.penalty_txid ) @@ -460,7 +469,9 @@ class Responder: """ carrier = Carrier() - for uuid, tracker in self.trackers.items(): + for uuid in self.trackers.keys(): + tracker = TransactionTracker.from_dict(self.db_manager.load_responder_tracker(uuid)) + # First we check if the dispute transaction is known (exists either in mempool or blockchain) dispute_tx = carrier.get_transaction(tracker.dispute_txid) diff --git a/pisa/watcher.py b/pisa/watcher.py index 9d659db..62ba514 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -4,8 +4,9 @@ from threading import Thread from common.cryptographer import Cryptographer from common.constants import LOCATOR_LEN_HEX - +from common.appointment import Appointment from common.logger import Logger + from pisa.cleaner import Cleaner from pisa.responder import Responder from pisa.block_processor import BlockProcessor @@ -40,8 +41,9 @@ class Watcher: Attributes: - appointments (:obj:`dict`): a dictionary containing all the appointments (:obj:`Appointment - ` instances) accepted by the tower. It's populated trough ``add_appointment``. + appointments (:obj:`dict`): a dictionary containing a simplification of the appointments (:obj:`Appointment + ` instances) accepted by the tower (``locator`` and ``end_time``). + It's populated trough ``add_appointment``. locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several appointments with the same ``locator``. asleep (:obj:`bool`): A flag that signals whether the :obj:`Watcher` is asleep or awake. @@ -116,8 +118,9 @@ class Watcher: """ if len(self.appointments) < self.max_appointments: + # Appointments are stored in disk, we only keep the end_time, locator and locator_uuid map in memory uuid = uuid4().hex - self.appointments[uuid] = appointment + self.appointments[uuid] = {"locator": appointment.locator, "end_time": appointment.end_time} if appointment.locator in self.locator_uuid_map: self.locator_uuid_map[appointment.locator].append(uuid) @@ -138,11 +141,10 @@ class Watcher: self.db_manager.store_update_locator_map(appointment.locator, uuid) appointment_added = True + signature = Cryptographer.sign(appointment.serialize(), self.signing_key) logger.info("New appointment accepted", locator=appointment.locator) - signature = Cryptographer.sign(appointment.serialize(), self.signing_key) - else: appointment_added = False signature = None @@ -181,8 +183,8 @@ class Watcher: expired_appointments = [ uuid - for uuid, appointment in self.appointments.items() - if block["height"] > appointment.end_time + EXPIRY_DELTA + for uuid, appointment_data in self.appointments.items() + if block["height"] > appointment_data.get("end_time") + EXPIRY_DELTA ] Cleaner.delete_expired_appointment( @@ -207,7 +209,7 @@ class Watcher: filtered_breach["dispute_txid"], filtered_breach["penalty_txid"], filtered_breach["penalty_rawtx"], - self.appointments[uuid].end_time, + self.appointments[uuid].get("end_time"), block_hash, ) @@ -274,9 +276,10 @@ class Watcher: for locator, dispute_txid in breaches.items(): for uuid in self.locator_uuid_map[locator]: + appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) try: - penalty_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid) + penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) except ValueError: penalty_rawtx = None