From 02bc88ed84a5e35b14d719cfb3024956a159f8bb Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 31 Jan 2020 13:00:55 +0100 Subject: [PATCH] Updates Watcher to use db batch updates and avoid multiple decryptions of the same data --- pisa/watcher.py | 44 ++++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/pisa/watcher.py b/pisa/watcher.py index 7dbc68b..dc35aec 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -79,10 +79,12 @@ class Watcher: def awake(self): self.asleep = False self.chain_monitor.watcher_asleep = False - Thread(target=self.do_watch).start() + watcher_thread = Thread(target=self.do_watch, daemon=True).start() logger.info("Waking up") + return watcher_thread + def sleep(self): self.asleep = True self.chain_monitor.watcher_asleep = True @@ -180,6 +182,9 @@ class Watcher: valid_breaches, invalid_breaches = self.filter_valid_breaches(self.get_breaches(txids)) + triggered_flags = [] + appointments_to_delete = [] + for uuid, breach in valid_breaches.items(): logger.info( "Notifying responder and deleting appointment", @@ -198,24 +203,27 @@ class Watcher: block_hash, ) - Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) - - # Appointments are only flagged as triggered if they are delivered, otherwise they are just deleted. # FIXME: This is only necessary because of the triggered appointment approach. Fix if it changes. - if receipt.delivered: - self.db_manager.create_triggered_appointment_flag(uuid) + if receipt.delivered: + Cleaner.delete_appointment_from_memory(uuid, self.appointments, self.locator_uuid_map) + triggered_flags.append(uuid) else: - self.db_manager.delete_watcher_appointment(uuid) - Cleaner.update_delete_db_locator_map(uuid, breach["locator"], self.db_manager) + appointments_to_delete.append(uuid) + + # Appointments are only flagged as triggered if they are delivered, otherwise they are just deleted. + appointments_to_delete.extend(invalid_breaches) + self.db_manager.batch_create_triggered_appointment_flag(triggered_flags) Cleaner.delete_completed_appointments( - invalid_breaches, self.appointments, self.locator_uuid_map, self.db_manager + appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager ) # Register the last processed block for the watcher self.db_manager.store_last_block_hash_watcher(block_hash) + self.block_queue.task_done() + # Go back to sleep if there are no more appointments self.sleep() @@ -266,17 +274,25 @@ class Watcher: valid_breaches = {} invalid_breaches = [] + # A cache of the already decrypted blobs so replicate decryption can be avoided + decrypted_blobs = {} + for locator, dispute_txid in breaches.items(): for uuid in self.locator_uuid_map[locator]: appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid)) - try: - penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) + if appointment.encrypted_blob.data in decrypted_blobs: + penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob.data] - except ValueError: - penalty_rawtx = None + else: + try: + penalty_rawtx = Cryptographer.decrypt(appointment.encrypted_blob, dispute_txid) - penalty_tx = BlockProcessor.decode_raw_transaction(penalty_rawtx) + except ValueError: + penalty_rawtx = None + + penalty_tx = BlockProcessor.decode_raw_transaction(penalty_rawtx) + decrypted_blobs[appointment.encrypted_blob.data] = (penalty_tx, penalty_rawtx) if penalty_tx is not None: valid_breaches[uuid] = {