Starts data persistence integration

The difference between master and the old data-persistence branch was huge, seems more effective to manually cherry pick the changes and integrate them
This commit is contained in:
Sergi Delgado Segura
2019-10-23 18:30:21 +01:00
parent 3502422ee5
commit a0538739d2
6 changed files with 145 additions and 16 deletions

View File

@@ -20,6 +20,8 @@ HTTP_SERVICE_UNAVAILABLE = 503
logger = Logger("API") logger = Logger("API")
watcher = None
@app.route('/', methods=['POST']) @app.route('/', methods=['POST'])
def add_appointment(): def add_appointment():
@@ -30,6 +32,7 @@ def add_appointment():
# Check content type once if properly defined # Check content type once if properly defined
request_data = json.loads(request.get_json()) request_data = json.loads(request.get_json())
inspector = Inspector()
appointment = inspector.inspect(request_data) appointment = inspector.inspect(request_data)
error = None error = None
@@ -125,13 +128,12 @@ def get_block_count():
return jsonify({"block_count": BlockProcessor.get_block_count()}) return jsonify({"block_count": BlockProcessor.get_block_count()})
def start_api(): def start_api(w):
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment # FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
global watcher, inspector global watcher
# ToDo: #18-separate-api-from-watcher # ToDo: #18-separate-api-from-watcher
watcher = Watcher() watcher = w
inspector = Inspector()
# Setting Flask log to ERROR only so it does not mess with out logging. Also disabling flask initial messages # Setting Flask log to ERROR only so it does not mess with out logging. Also disabling flask initial messages
logging.getLogger('werkzeug').setLevel(logging.ERROR) logging.getLogger('werkzeug').setLevel(logging.ERROR)

View File

@@ -8,7 +8,7 @@ logger = Logger("Cleaner")
class Cleaner: class Cleaner:
@staticmethod @staticmethod
def delete_expired_appointment(expired_appointments, appointments, locator_uuid_map): def delete_expired_appointment(expired_appointments, appointments, locator_uuid_map, db_manager):
for uuid in expired_appointments: for uuid in expired_appointments:
locator = appointments[uuid].locator locator = appointments[uuid].locator
@@ -22,8 +22,11 @@ class Cleaner:
logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid) logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid)
# Delete appointment from the db
db_manager.delete_watcher_appointment(uuid)
@staticmethod @staticmethod
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height): def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height, db_manager):
for uuid, confirmations in completed_jobs: for uuid, confirmations in completed_jobs:
logger.info("Job completed. Appointment ended after reaching enough confirmations.", logger.info("Job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations) uuid=uuid, height=height, confirmations=confirmations)
@@ -39,3 +42,7 @@ class Cleaner:
else: else:
tx_job_map[justice_txid].remove(uuid) tx_job_map[justice_txid].remove(uuid)
# Delete appointment from the db (both watchers's and responder's)
db_manager.ddelete_watcher_appointment(uuid)
db_manager.delete_responder_job(uuid)

81
pisa/db_manager.py Normal file
View File

@@ -0,0 +1,81 @@
import json
import plyvel
from pisa.logger import Logger
from pisa.conf import WATCHER_PREFIX, RESPONDER_PREFIX, WATCHER_LAST_BLOCK_KEY, RESPONDER_LAST_BLOCK_KEY
logger = Logger("Daemon")
class DBManager:
def __init__(self, db_path):
try:
self.db = plyvel.DB(db_path)
except plyvel.Error as e:
if 'create_if_missing is false' in str(e):
logger.info("No db found. Creating a fresh one")
self.db = plyvel.DB(db_path, create_if_missing=True)
def load_appointments_db(self, prefix):
data = {}
for k, v in self.db.iterator(prefix=prefix):
# Get uuid and appointment_data from the db
uuid = k[1:].decode('utf-8')
data[uuid] = json.loads(v)
return data
def get_last_known_block(self, prefix):
last_block = self.db.get(prefix)
if last_block:
last_block = last_block.decode('utf-8')
return last_block
def create_entry(self, key, value, prefix=None):
if isinstance(prefix, str):
key = prefix + key
key = key.encode('utf-8')
value = value.encode('utf-8')
self.db.put(key, value)
def delete_entry(self, key, prefix=None):
if isinstance(prefix, str):
key = prefix + key
key = key.encode('utf-8')
self.db.delete(key)
def load_watcher_appointments(self):
return self.load_appointments_db(prefix=WATCHER_PREFIX)
def load_responder_jobs(self):
return self.load_appointments_db(prefix=RESPONDER_PREFIX)
def store_watcher_appointment(self, uuid, appointment):
self.create_entry(uuid, appointment, prefix=WATCHER_PREFIX)
logger.info("Adding appointment to Watchers's db", uuid=uuid)
def store_responder_job(self, uuid, job):
self.create_entry(uuid, job, prefix=RESPONDER_PREFIX)
logger.info("Adding appointment to Responder's db", uuid=uuid)
def delete_watcher_appointment(self, uuid):
self.delete_entry(uuid, prefix=WATCHER_PREFIX)
logger.info("Deleting appointment from Watcher's db", uuid=uuid)
def delete_responder_job(self, uuid):
self.delete_entry(uuid, prefix=RESPONDER_PREFIX)
logger.info("Deleting appointment from Responder's db", uuid=uuid)
def store_last_block_watcher(self, block_hash):
self.create_entry(WATCHER_LAST_BLOCK_KEY, block_hash)
def store_last_block_responder(self, block_hash):
self.create_entry(RESPONDER_LAST_BLOCK_KEY, block_hash)

View File

@@ -2,9 +2,12 @@ from getopt import getopt
from sys import argv, exit from sys import argv, exit
from signal import signal, SIGINT, SIGQUIT, SIGTERM from signal import signal, SIGINT, SIGQUIT, SIGTERM
from pisa.conf import DB_PATH
from pisa.logger import Logger from pisa.logger import Logger
from pisa.api import start_api from pisa.api import start_api
from pisa.watcher import Watcher
from pisa.conf import BTC_NETWORK from pisa.conf import BTC_NETWORK
from pisa.db_manager import DBManager
from pisa.tools import can_connect_to_bitcoind, in_correct_network from pisa.tools import can_connect_to_bitcoind, in_correct_network
logger = Logger("Daemon") logger = Logger("Daemon")
@@ -37,8 +40,23 @@ if __name__ == '__main__':
else: else:
try: try:
# Fire the api db_manager = DBManager(DB_PATH)
start_api()
watcher_appointments = db_manager.load_watcher_appointments()
responder_jobs = db_manager.load_responder_jobs()
if len(watcher_appointments) == 0 and len(responder_jobs) == 0:
logger.info("Fresh bootstrap")
else:
logger.info("Bootstrapping from backed up data")
watcher = Watcher(db_manager)
# Create an instance of the Watcher and fire the API
start_api(watcher)
except Exception as e: except Exception as e:
logger.error("An error occurred: {}. Shutting down".format(e)) logger.error("An error occurred: {}. Shutting down".format(e))
exit(1) exit(1)

View File

@@ -38,7 +38,7 @@ class Job:
class Responder: class Responder:
def __init__(self): def __init__(self, db_manager):
self.jobs = dict() self.jobs = dict()
self.tx_job_map = dict() self.tx_job_map = dict()
self.unconfirmed_txs = [] self.unconfirmed_txs = []
@@ -46,6 +46,7 @@ class Responder:
self.block_queue = None self.block_queue = None
self.asleep = True self.asleep = True
self.zmq_subscriber = None self.zmq_subscriber = None
self.db_manager = db_manager
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False): def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
if self.asleep: if self.asleep:
@@ -67,7 +68,8 @@ class Responder:
return receipt return receipt
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0): def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0):
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end) job = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
self.jobs[uuid] = job
if justice_txid in self.tx_job_map: if justice_txid in self.tx_job_map:
self.tx_job_map[justice_txid].append(uuid) self.tx_job_map[justice_txid].append(uuid)
@@ -78,6 +80,8 @@ class Responder:
if confirmations == 0: if confirmations == 0:
self.unconfirmed_txs.append(justice_txid) self.unconfirmed_txs.append(justice_txid)
self.db_manager.store_responder_job(uuid.encode, job.to_json())
logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid, logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid,
appointment_end=appointment_end) appointment_end=appointment_end)
@@ -129,6 +133,9 @@ class Responder:
# ToDo: #24-properly-handle-reorgs # ToDo: #24-properly-handle-reorgs
self.handle_reorgs() self.handle_reorgs()
# Register the last processed block for the responder
self.db_manager.store_last_block_responder(block_hash)
prev_block_hash = block.get('hash') prev_block_hash = block.get('hash')
# Go back to sleep if there are no more jobs # Go back to sleep if there are no more jobs

View File

@@ -18,14 +18,18 @@ logger = Logger("Watcher")
class Watcher: class Watcher:
def __init__(self, max_appointments=MAX_APPOINTMENTS): def __init__(self, db_manager, responder=None, max_appointments=MAX_APPOINTMENTS):
self.appointments = dict() self.appointments = dict()
self.locator_uuid_map = dict() self.locator_uuid_map = dict()
self.block_queue = None self.block_queue = None
self.asleep = True self.asleep = True
self.max_appointments = max_appointments self.max_appointments = max_appointments
self.zmq_subscriber = None self.zmq_subscriber = None
self.responder = Responder()
if not isinstance(responder, Responder):
self.responder = Responder(db_manager)
self.db_manager = db_manager
if PISA_SECRET_KEY is None: if PISA_SECRET_KEY is None:
raise ValueError("No signing key provided. Please fix your pisa.conf") raise ValueError("No signing key provided. Please fix your pisa.conf")
@@ -71,6 +75,8 @@ class Watcher:
logger.info("Waking up") logger.info("Waking up")
self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
appointment_added = True appointment_added = True
logger.info("New appointment accepted.", locator=appointment.locator) logger.info("New appointment accepted.", locator=appointment.locator)
@@ -103,7 +109,8 @@ class Watcher:
expired_appointments = [uuid for uuid, appointment in self.appointments.items() expired_appointments = [uuid for uuid, appointment in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA] if block["height"] > appointment.end_time + EXPIRY_DELTA]
Cleaner.delete_expired_appointment(expired_appointments, self.appointments, self.locator_uuid_map) Cleaner.delete_expired_appointment(expired_appointments, self.appointments, self.locator_uuid_map,
self.db_manager)
potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map) potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map)
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments) matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
@@ -118,17 +125,24 @@ class Watcher:
self.appointments[uuid].end_time) self.appointments[uuid].end_time)
# Delete the appointment # Delete the appointment
self.appointments.pop(uuid) appointment = self.appointments.pop(uuid)
# If there was only one appointment that matches the locator we can delete the whole list # If there was only one appointment that matches the locator we can delete the whole list
if len(self.locator_uuid_map[locator]) == 1: if len(self.locator_uuid_map[locator]) == 1:
# ToDo: #9-add-data-persistence
self.locator_uuid_map.pop(locator) self.locator_uuid_map.pop(locator)
else: else:
# Otherwise we just delete the appointment that matches locator:appointment_pos # Otherwise we just delete the appointment that matches locator:appointment_pos
# ToDo: #9-add-data-persistence
self.locator_uuid_map[locator].remove(uuid) self.locator_uuid_map[locator].remove(uuid)
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
# the watcher's and responder's db after fulfilled
# Update appointment in the db
appointment["triggered"] = True
self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
# Register the last processed block for the watcher
self.db_manager.store_last_block_watcher(block_hash)
# Go back to sleep if there are no more appointments # Go back to sleep if there are no more appointments
self.asleep = True self.asleep = True
self.zmq_subscriber.terminate = True self.zmq_subscriber.terminate = True