diff --git a/pisa/api.py b/pisa/api.py index 76dcdcf..6b0e6e5 100644 --- a/pisa/api.py +++ b/pisa/api.py @@ -20,6 +20,8 @@ HTTP_SERVICE_UNAVAILABLE = 503 logger = Logger("API") +watcher = None + @app.route('/', methods=['POST']) def add_appointment(): @@ -30,6 +32,7 @@ def add_appointment(): # Check content type once if properly defined request_data = json.loads(request.get_json()) + inspector = Inspector() appointment = inspector.inspect(request_data) error = None @@ -125,13 +128,12 @@ def get_block_count(): return jsonify({"block_count": BlockProcessor.get_block_count()}) -def start_api(): +def start_api(w): # FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment - global watcher, inspector + global watcher # ToDo: #18-separate-api-from-watcher - watcher = Watcher() - inspector = Inspector() + watcher = w # Setting Flask log to ERROR only so it does not mess with out logging. Also disabling flask initial messages logging.getLogger('werkzeug').setLevel(logging.ERROR) diff --git a/pisa/cleaner.py b/pisa/cleaner.py index 00076b4..8e18ce9 100644 --- a/pisa/cleaner.py +++ b/pisa/cleaner.py @@ -8,7 +8,7 @@ logger = Logger("Cleaner") class Cleaner: @staticmethod - def delete_expired_appointment(expired_appointments, appointments, locator_uuid_map): + def delete_expired_appointment(expired_appointments, appointments, locator_uuid_map, db_manager): for uuid in expired_appointments: locator = appointments[uuid].locator @@ -22,8 +22,11 @@ class Cleaner: logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid) + # Delete appointment from the db + db_manager.delete_watcher_appointment(uuid) + @staticmethod - def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): + def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height, db_manager): for uuid, confirmations in completed_jobs: logger.info("Job completed. Appointment ended after reaching enough confirmations.", uuid=uuid, height=height, confirmations=confirmations) @@ -39,3 +42,7 @@ class Cleaner: else: tx_job_map[justice_txid].remove(uuid) + + # Delete appointment from the db (both watchers's and responder's) + db_manager.ddelete_watcher_appointment(uuid) + db_manager.delete_responder_job(uuid) diff --git a/pisa/db_manager.py b/pisa/db_manager.py new file mode 100644 index 0000000..dfa3519 --- /dev/null +++ b/pisa/db_manager.py @@ -0,0 +1,81 @@ +import json +import plyvel + +from pisa.logger import Logger +from pisa.conf import WATCHER_PREFIX, RESPONDER_PREFIX, WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY + +logger = Logger("Daemon") + + +class DBManager: + def __init__(self, db_path): + try: + self.db = plyvel.DB(db_path) + + except plyvel.Error as e: + if 'create_if_missing is false' in str(e): + logger.info("No db found. Creating a fresh one") + self.db = plyvel.DB(db_path, create_if_missing=True) + + def load_appointments_db(self, prefix): + data = {} + + for k, v in self.db.iterator(prefix=prefix): + # Get uuid and appointment_data from the db + uuid = k[1:].decode('utf-8') + data[uuid] = json.loads(v) + + return data + + def get_last_known_block(self, prefix): + last_block = self.db.get(prefix) + + if last_block: + last_block = last_block.decode('utf-8') + + return last_block + + def create_entry(self, key, value, prefix=None): + if isinstance(prefix, str): + key = prefix + key + + key = key.encode('utf-8') + value = value.encode('utf-8') + + self.db.put(key, value) + + def delete_entry(self, key, prefix=None): + if isinstance(prefix, str): + key = prefix + key + + key = key.encode('utf-8') + + self.db.delete(key) + + def load_watcher_appointments(self): + return self.load_appointments_db(prefix=WATCHER_PREFIX) + + def load_responder_jobs(self): + return self.load_appointments_db(prefix=RESPONDER_PREFIX) + + def store_watcher_appointment(self, uuid, appointment): + self.create_entry(uuid, appointment, prefix=WATCHER_PREFIX) + logger.info("Adding appointment to Watchers's db", uuid=uuid) + + def store_responder_job(self, uuid, job): + self.create_entry(uuid, job, prefix=RESPONDER_PREFIX) + logger.info("Adding appointment to Responder's db", uuid=uuid) + + def delete_watcher_appointment(self, uuid): + self.delete_entry(uuid, prefix=WATCHER_PREFIX) + logger.info("Deleting appointment from Watcher's db", uuid=uuid) + + def delete_responder_job(self, uuid): + self.delete_entry(uuid, prefix=RESPONDER_PREFIX) + logger.info("Deleting appointment from Responder's db", uuid=uuid) + + def store_last_block_watcher(self, block_hash): + self.create_entry(WATCHER_LAST_BLOCK_KEY, block_hash) + + def store_last_block_responder(self, block_hash): + self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash) diff --git a/pisa/pisad.py b/pisa/pisad.py index 578279b..d7a4b9d 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -2,9 +2,12 @@ from getopt import getopt from sys import argv, exit from signal import signal, SIGINT, SIGQUIT, SIGTERM +from pisa.conf import DB_PATH from pisa.logger import Logger from pisa.api import start_api +from pisa.watcher import Watcher from pisa.conf import BTC_NETWORK +from pisa.db_manager import DBManager from pisa.tools import can_connect_to_bitcoind, in_correct_network logger = Logger("Daemon") @@ -37,8 +40,23 @@ if __name__ == '__main__': else: try: - # Fire the api - start_api() + db_manager = DBManager(DB_PATH) + + watcher_appointments = db_manager.load_watcher_appointments() + responder_jobs = db_manager.load_responder_jobs() + + if len(watcher_appointments) == 0 and len(responder_jobs) == 0: + logger.info("Fresh bootstrap") + + else: + logger.info("Bootstrapping from backed up data") + + watcher = Watcher(db_manager) + + # Create an instance of the Watcher and fire the API + start_api(watcher) + except Exception as e: logger.error("An error occurred: {}. Shutting down".format(e)) exit(1) + diff --git a/pisa/responder.py b/pisa/responder.py index 2309b03..d843295 100644 --- a/pisa/responder.py +++ b/pisa/responder.py @@ -38,7 +38,7 @@ class Job: class Responder: - def __init__(self): + def __init__(self, db_manager): self.jobs = dict() self.tx_job_map = dict() self.unconfirmed_txs = [] @@ -46,6 +46,7 @@ class Responder: self.block_queue = None self.asleep = True self.zmq_subscriber = None + self.db_manager = db_manager def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): if self.asleep: @@ -67,7 +68,8 @@ class Responder: return receipt def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0): - self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end) + job = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end) + self.jobs[uuid] = job if justice_txid in self.tx_job_map: self.tx_job_map[justice_txid].append(uuid) @@ -78,6 +80,8 @@ class Responder: if confirmations == 0: self.unconfirmed_txs.append(justice_txid) + self.db_manager.store_responder_job(uuid.encode, job.to_json()) + logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid, appointment_end=appointment_end) @@ -129,6 +133,9 @@ class Responder: # ToDo: #24-properly-handle-reorgs self.handle_reorgs() + # Register the last processed block for the responder + self.db_manager.store_last_block_responder(block_hash) + prev_block_hash = block.get('hash') # Go back to sleep if there are no more jobs diff --git a/pisa/watcher.py b/pisa/watcher.py index ef80865..0c454a8 100644 --- a/pisa/watcher.py +++ b/pisa/watcher.py @@ -18,14 +18,18 @@ logger = Logger("Watcher") class Watcher: - def __init__(self, max_appointments=MAX_APPOINTMENTS): + def __init__(self, db_manager, responder=None, max_appointments=MAX_APPOINTMENTS): self.appointments = dict() self.locator_uuid_map = dict() self.block_queue = None self.asleep = True self.max_appointments = max_appointments self.zmq_subscriber = None - self.responder = Responder() + + if not isinstance(responder, Responder): + self.responder = Responder(db_manager) + + self.db_manager = db_manager if PISA_SECRET_KEY is None: raise ValueError("No signing key provided. Please fix your pisa.conf") @@ -71,6 +75,8 @@ class Watcher: logger.info("Waking up") + self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) + appointment_added = True logger.info("New appointment accepted.", locator=appointment.locator) @@ -103,7 +109,8 @@ class Watcher: expired_appointments = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time + EXPIRY_DELTA] - Cleaner.delete_expired_appointment(expired_appointments, self.appointments, self.locator_uuid_map) + Cleaner.delete_expired_appointment(expired_appointments, self.appointments, self.locator_uuid_map, + self.db_manager) potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map) matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) @@ -118,17 +125,24 @@ class Watcher: self.appointments[uuid].end_time) # Delete the appointment - self.appointments.pop(uuid) + appointment = self.appointments.pop(uuid) # If there was only one appointment that matches the locator we can delete the whole list if len(self.locator_uuid_map[locator]) == 1: - # ToDo: #9-add-data-persistence self.locator_uuid_map.pop(locator) else: # Otherwise we just delete the appointment that matches locator:appointment_pos - # ToDo: #9-add-data-persistence self.locator_uuid_map[locator].remove(uuid) + # DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both + # the watcher's and responder's db after fulfilled + # Update appointment in the db + appointment["triggered"] = True + self.db_manager.store_watcher_appointment(uuid, appointment.to_json()) + + # Register the last processed block for the watcher + self.db_manager.store_last_block_watcher(block_hash) + # Go back to sleep if there are no more appointments self.asleep = True self.zmq_subscriber.terminate = True