Add temporary monitoring tools for pisa

Add endpoint and logic so appointment can be queried to pisa. A better implementation based on persistent storage (i.e. DB) should replace it in the future
This commit is contained in:
Sergi Delgado Segura
2019-07-08 17:55:28 +01:00
parent 44d1647b06
commit 74ecf0ab54
4 changed files with 82 additions and 7 deletions

View File

@@ -2,7 +2,7 @@ from pisa import *
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.inspector import Inspector from pisa.inspector import Inspector
from pisa.appointment import Appointment from pisa.appointment import Appointment
from flask import Flask, request, Response from flask import Flask, request, Response, abort
import json import json
app = Flask(__name__) app = Flask(__name__)
@@ -49,6 +49,63 @@ def add_appointment():
return Response(response, status=rcode, mimetype='text/plain') return Response(response, status=rcode, mimetype='text/plain')
# FIXME: THE NEXT TWO API ENDPOINTS ARE FOR TESTING AND SHOULD BE REMOVED / PROPERLY MANAGED BEFORE PRODUCTION!
@app.route('/get_appointment', methods=['GET'])
def get_appointment():
locator = request.args.get('locator')
response = []
job_in_watcher = watcher.appointments.get(locator)
if job_in_watcher:
for job in job_in_watcher:
job_data = job.to_json()
job_data['status'] = "being watched"
response.append(job_data)
if watcher.responder:
responder_jobs = watcher.responder.jobs
for job_id, job in responder_jobs.items():
if job.locator == locator:
job_data = job.to_json()
job_data['status'] = "dispute responded"
job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id)
response.append(job_data)
if not response:
response.append({"locator": locator, "status": "not found"})
response = json.dumps(response)
return response
@app.route('/get_all_appointments', methods=['GET'])
def get_all_appointments():
watcher_appointments = []
responder_jobs = []
if request.remote_addr in ['localhost', '127.0.0.1']:
for app_id, appointment in watcher.appointments.items():
jobs_data = [job.to_json() for job in appointment]
watcher_appointments.append({app_id: jobs_data})
if watcher.responder:
for job_id, job in watcher.responder.jobs.items():
job_data = job.to_json()
job_data['confirmations'] = watcher.responder.confirmation_counter.get(job_id)
responder_jobs.append({job_id: job_data})
response = json.dumps({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs})
else:
abort(404)
return response
def start_api(d, l): def start_api(d, l):
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment # FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
global debug, logging, watcher, inspector global debug, logging, watcher, inspector

View File

@@ -12,6 +12,13 @@ class Appointment:
self.cipher = cipher self.cipher = cipher
self.hash_function = hash_function self.hash_function = hash_function
def to_json(self):
appointment = {"locator": self.locator, "start_time": self.start_time, "end_time": self.end_time,
"dispute_delta": self.dispute_delta, "encrypted_blob": self.encrypted_blob.data,
"cipher": self.cipher, "hash_function": self.hash_function}
return appointment
# ToDO: We may want to add some additional things to the appointment, like # ToDO: We may want to add some additional things to the appointment, like
# minimum fee # minimum fee
# refund to be payed to the user in case of failing # refund to be payed to the user in case of failing

View File

@@ -1,5 +1,7 @@
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from hashlib import sha256
from binascii import unhexlify
from pisa.zmq_subscriber import ZMQHandler from pisa.zmq_subscriber import ZMQHandler
from pisa.rpc_errors import * from pisa.rpc_errors import *
from pisa.tools import check_tx_in_chain from pisa.tools import check_tx_in_chain
@@ -13,11 +15,20 @@ MIN_CONFIRMATIONS = 6
class Job: class Job:
def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0): def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0):
self.dispute_txid = dispute_txid self.dispute_txid = dispute_txid
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
# can be directly got from DB
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
self.justice_rawtx = justice_rawtx self.justice_rawtx = justice_rawtx
self.appointment_end = appointment_end self.appointment_end = appointment_end
self.missed_confirmations = 0 self.missed_confirmations = 0
self.retry_counter = retry_counter self.retry_counter = retry_counter
def to_json(self):
job = {"locator": self.dispute_txid, "justice_rawtx": self.justice_rawtx,
"appointment_end": self.appointment_end}
return job
class Responder: class Responder:
def __init__(self): def __init__(self):
@@ -215,4 +226,4 @@ class Responder:
# reorg manager # reorg manager
logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager") logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager")
logging.error("[Responder] reorg manager not yet implemented") logging.error("[Responder] reorg manager not yet implemented")
pass pass

View File

@@ -16,6 +16,7 @@ class Watcher:
self.asleep = True self.asleep = True
self.max_appointments = max_appointments self.max_appointments = max_appointments
self.zmq_subscriber = None self.zmq_subscriber = None
self.responder = Responder()
def add_appointment(self, appointment, debug, logging): def add_appointment(self, appointment, debug, logging):
# DISCUSS: about validation of input data # DISCUSS: about validation of input data
@@ -40,8 +41,7 @@ class Watcher:
self.asleep = False self.asleep = False
self.block_queue = Queue() self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging]) zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
responder = Responder() watcher = Thread(target=self.do_watch, args=[debug, logging])
watcher = Thread(target=self.do_watch, args=[responder, debug, logging])
zmq_thread.start() zmq_thread.start()
watcher.start() watcher.start()
@@ -66,7 +66,7 @@ class Watcher:
self.zmq_subscriber = ZMQHandler(parent='Watcher') self.zmq_subscriber = ZMQHandler(parent='Watcher')
self.zmq_subscriber.handle(block_queue, debug, logging) self.zmq_subscriber.handle(block_queue, debug, logging)
def do_watch(self, responder, debug, logging): def do_watch(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT)) BTC_RPC_PORT))
@@ -131,8 +131,8 @@ class Watcher:
logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format( logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format(
justice_txid, locator, appointment_pos)) justice_txid, locator, appointment_pos))
responder.add_response(dispute_txid, justice_txid, justice_rawtx, self.responder.add_response(dispute_txid, justice_txid, justice_rawtx,
self.appointments[locator][appointment_pos].end_time, debug, logging) self.appointments[locator][appointment_pos].end_time, debug, logging)
# If there was only one appointment that matches the locator we can delete the whole list # If there was only one appointment that matches the locator we can delete the whole list
# DISCUSS: We may want to use locks before adding / removing appointment # DISCUSS: We may want to use locks before adding / removing appointment