mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Merge branch 'cleanup' into testing
This commit is contained in:
@@ -1,2 +1,20 @@
|
||||
import logging
|
||||
|
||||
from pisa.utils.auth_proxy import AuthServiceProxy
|
||||
import pisa.conf as conf
|
||||
|
||||
|
||||
HOST = 'localhost'
|
||||
PORT = 9814
|
||||
PORT = 9814
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
|
||||
logging.FileHandler(conf.SERVER_LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
|
||||
# Create RPC connection with bitcoind
|
||||
# TODO: Check if a long lived connection like this may create problems (timeouts)
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
|
||||
conf.BTC_RPC_PORT))
|
||||
|
||||
|
||||
31
pisa/api.py
31
pisa/api.py
@@ -1,15 +1,12 @@
|
||||
from pisa import *
|
||||
import json
|
||||
from flask import Flask, request, Response, abort, jsonify
|
||||
|
||||
from pisa import HOST, PORT, logging, bitcoin_cli
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.inspector import Inspector
|
||||
from pisa.appointment import Appointment
|
||||
from flask import Flask, request, Response, abort, jsonify
|
||||
import json
|
||||
|
||||
|
||||
# FIXME: HERE FOR TESTING (get_block_count). REMOVE WHEN REMOVING THE FUNCTION
|
||||
from pisa.utils.authproxy import AuthServiceProxy
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
||||
|
||||
# ToDo: #5-add-async-to-api
|
||||
app = Flask(__name__)
|
||||
HTTP_OK = 200
|
||||
@@ -22,15 +19,14 @@ def add_appointment():
|
||||
remote_addr = request.environ.get('REMOTE_ADDR')
|
||||
remote_port = request.environ.get('REMOTE_PORT')
|
||||
|
||||
if debug:
|
||||
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
|
||||
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
|
||||
|
||||
# Check content type once if properly defined
|
||||
request_data = json.loads(request.get_json())
|
||||
appointment = inspector.inspect(request_data)
|
||||
|
||||
if type(appointment) == Appointment:
|
||||
appointment_added = watcher.add_appointment(appointment, debug, logging)
|
||||
appointment_added = watcher.add_appointment(appointment)
|
||||
|
||||
# ToDo: #13-create-server-side-signature-receipt
|
||||
if appointment_added:
|
||||
@@ -49,9 +45,7 @@ def add_appointment():
|
||||
rcode = HTTP_BAD_REQUEST
|
||||
response = "appointment rejected. Request does not match the standard"
|
||||
|
||||
if debug:
|
||||
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr,
|
||||
remote_port))
|
||||
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr, remote_port))
|
||||
|
||||
return Response(response, status=rcode, mimetype='text/plain')
|
||||
|
||||
@@ -115,21 +109,16 @@ def get_all_appointments():
|
||||
|
||||
@app.route('/get_block_count', methods=['GET'])
|
||||
def get_block_count():
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
|
||||
return jsonify({"block_count": bitcoin_cli.getblockcount()})
|
||||
|
||||
|
||||
def start_api(d, l):
|
||||
def start_api():
|
||||
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
|
||||
global debug, logging, watcher, inspector
|
||||
debug = d
|
||||
logging = l
|
||||
global watcher, inspector
|
||||
|
||||
# ToDo: #18-separate-api-from-watcher
|
||||
watcher = Watcher()
|
||||
inspector = Inspector(debug, logging)
|
||||
inspector = Inspector()
|
||||
|
||||
# Setting Flask log t ERROR only so it does not mess with out logging
|
||||
logging.getLogger('werkzeug').setLevel(logging.ERROR)
|
||||
|
||||
@@ -19,7 +19,5 @@ class Appointment:
|
||||
|
||||
return appointment
|
||||
|
||||
# ToDO: #3-improve-appointment-strcuture
|
||||
|
||||
|
||||
# ToDO: #3-improve-appointment-structure
|
||||
|
||||
|
||||
90
pisa/block_processor.py
Normal file
90
pisa/block_processor.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import binascii
|
||||
from hashlib import sha256
|
||||
|
||||
from pisa import logging, bitcoin_cli
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
|
||||
|
||||
class BlockProcessor:
|
||||
@staticmethod
|
||||
def get_block(block_hash):
|
||||
block = None
|
||||
|
||||
try:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
|
||||
except JSONRPCException as e:
|
||||
logging.error("[BlockProcessor] couldn't get block from bitcoind. Error code {}".format(e))
|
||||
|
||||
return block
|
||||
|
||||
@staticmethod
|
||||
def get_best_block_hash():
|
||||
block_hash = None
|
||||
|
||||
try:
|
||||
block_hash = bitcoin_cli.getbestblockhash()
|
||||
|
||||
except JSONRPCException as e:
|
||||
logging.error("[BlockProcessor] couldn't get block hash. Error code {}".format(e))
|
||||
|
||||
return block_hash
|
||||
|
||||
@staticmethod
|
||||
def get_potential_matches(txids, locator_uuid_map):
|
||||
potential_locators = {sha256(binascii.unhexlify(txid)).hexdigest(): txid for txid in txids}
|
||||
|
||||
# Check is any of the tx_ids in the received block is an actual match
|
||||
intersection = set(locator_uuid_map.keys()).intersection(potential_locators.keys())
|
||||
potential_matches = {locator: potential_locators[locator] for locator in intersection}
|
||||
|
||||
if len(potential_matches) > 0:
|
||||
logging.info("[BlockProcessor] list of potential matches: {}".format(potential_matches))
|
||||
|
||||
else:
|
||||
logging.info("[BlockProcessor] no potential matches found")
|
||||
|
||||
@staticmethod
|
||||
def get_matches(potential_matches, locator_uuid_map, appointments):
|
||||
matches = []
|
||||
|
||||
for locator, dispute_txid in potential_matches.items():
|
||||
for uuid in locator_uuid_map[locator]:
|
||||
try:
|
||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||
justice_rawtx = appointments[uuid].encrypted_blob.decrypt(binascii.unhexlify(dispute_txid))
|
||||
justice_rawtx = binascii.hexlify(justice_rawtx).decode()
|
||||
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
|
||||
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
|
||||
|
||||
logging.info("[BlockProcessor] match found for locator {} (uuid: {}): {}".format(
|
||||
locator, uuid, justice_txid))
|
||||
|
||||
except JSONRPCException as e:
|
||||
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
|
||||
# for the POC
|
||||
logging.error("[BlockProcessor] can't build transaction from decoded data. Error code {}".format(e))
|
||||
|
||||
return matches
|
||||
|
||||
@staticmethod
|
||||
def check_confirmations(txs, unconfirmed_txs, tx_job_map, missed_confirmations):
|
||||
|
||||
for tx in txs:
|
||||
if tx in tx_job_map and tx in unconfirmed_txs:
|
||||
unconfirmed_txs.remove(tx)
|
||||
|
||||
logging.info("[Responder] confirmation received for tx {}".format(tx))
|
||||
|
||||
elif tx in unconfirmed_txs:
|
||||
if tx in missed_confirmations:
|
||||
missed_confirmations[tx] += 1
|
||||
|
||||
else:
|
||||
missed_confirmations[tx] = 1
|
||||
|
||||
logging.info("[Responder] tx {} missed a confirmation (total missed: {})"
|
||||
.format(tx, missed_confirmations[tx]))
|
||||
|
||||
return unconfirmed_txs, missed_confirmations
|
||||
|
||||
77
pisa/carrier.py
Normal file
77
pisa/carrier.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION
|
||||
from pisa import logging, bitcoin_cli
|
||||
from pisa.rpc_errors import *
|
||||
|
||||
|
||||
class Carrier:
|
||||
class Receipt:
|
||||
def __init__(self, delivered, confirmations=0, reason=None):
|
||||
self.delivered = delivered
|
||||
self.confirmations = confirmations
|
||||
self.reason = reason
|
||||
|
||||
def send_transaction(self, rawtx, txid):
|
||||
try:
|
||||
logging.info("[Carrier] pushing transaction to the network (txid: {})".format(rawtx))
|
||||
bitcoin_cli.sendrawtransaction(rawtx)
|
||||
|
||||
receipt = self.Receipt(delivered=True)
|
||||
|
||||
except JSONRPCException as e:
|
||||
errno = e.error.get('code')
|
||||
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
|
||||
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
|
||||
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
|
||||
if errno == RPC_VERIFY_REJECTED:
|
||||
# DISCUSS: what to do in this case
|
||||
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
|
||||
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
|
||||
# DISCUSS: check errors -9 and -10
|
||||
# TODO: UNKNOWN_JSON_RPC_EXCEPTION is not the proper exception here. This is long due.
|
||||
receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
|
||||
|
||||
elif errno == RPC_VERIFY_ALREADY_IN_CHAIN:
|
||||
logging.info("[Carrier] {} is already in the blockchain. Getting confirmation count".format(txid))
|
||||
|
||||
# If the transaction is already in the chain, we get the number of confirmations and watch the job
|
||||
# until the end of the appointment
|
||||
tx_info = self.get_transaction(txid)
|
||||
|
||||
if tx_info is not None:
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
receipt = self.Receipt(delivered=True, confirmations=confirmations)
|
||||
|
||||
else:
|
||||
# There's a really unlike edge case where a transaction can be reorged between receiving the
|
||||
# notification and querying the data. In such a case we just resend
|
||||
self.send_transaction(rawtx, txid)
|
||||
|
||||
else:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
receipt = self.Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
|
||||
|
||||
return receipt
|
||||
|
||||
@staticmethod
|
||||
def get_transaction(txid):
|
||||
tx_info = None
|
||||
|
||||
try:
|
||||
tx_info = bitcoin_cli.getrawtransaction(txid, 1)
|
||||
|
||||
except JSONRPCException as e:
|
||||
# While it's quite unlikely, the transaction that was already in the blockchain could have been
|
||||
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
|
||||
# restart the job
|
||||
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
logging.info("[Carrier] transaction {} got reorged before obtaining information".format(txid))
|
||||
|
||||
# TODO: Check RPC methods to see possible returns and avoid general else
|
||||
# else:
|
||||
# # If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
# logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
|
||||
return tx_info
|
||||
|
||||
45
pisa/cleaner.py
Normal file
45
pisa/cleaner.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import pisa.conf as conf
|
||||
from pisa import logging
|
||||
|
||||
|
||||
class Cleaner:
|
||||
@staticmethod
|
||||
def delete_expired_appointment(block, appointments, locator_uuid_map):
|
||||
to_delete = [uuid for uuid, appointment in appointments.items()
|
||||
if block["height"] > appointment.end_time + conf.EXPIRY_DELTA]
|
||||
|
||||
for uuid in to_delete:
|
||||
locator = appointments[uuid].locator
|
||||
|
||||
appointments.pop(uuid)
|
||||
|
||||
if len(locator_uuid_map[locator]) == 1:
|
||||
locator_uuid_map.pop(locator)
|
||||
|
||||
else:
|
||||
locator_uuid_map[locator].remove(uuid)
|
||||
|
||||
logging.info("[Cleaner] end time reached with no match! Deleting appointment {} (uuid: {})".format(locator,
|
||||
uuid))
|
||||
|
||||
return appointments, locator_uuid_map
|
||||
|
||||
@staticmethod
|
||||
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height):
|
||||
for uuid in completed_jobs:
|
||||
logging.info("[Cleaner] job completed (uuid = {}). Appointment ended at block {} after {} confirmations"
|
||||
.format(uuid, jobs[uuid].justice_txid, height, jobs[uuid].confirmations))
|
||||
|
||||
# ToDo: #9-add-data-persistence
|
||||
justice_txid = jobs[uuid].justice_txid
|
||||
jobs.pop(uuid)
|
||||
|
||||
if len(tx_job_map[justice_txid]) == 1:
|
||||
tx_job_map.pop(justice_txid)
|
||||
|
||||
logging.info("[Cleaner] no more jobs for justice_txid {}".format(justice_txid))
|
||||
|
||||
else:
|
||||
tx_job_map[justice_txid].remove(uuid)
|
||||
|
||||
return jobs, tx_job_map
|
||||
@@ -1,5 +1,5 @@
|
||||
from binascii import unhexlify, hexlify
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify, hexlify
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
|
||||
|
||||
|
||||
@@ -1,16 +1,13 @@
|
||||
import re
|
||||
from pisa.appointment import Appointment
|
||||
|
||||
from pisa import errors
|
||||
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MIN_DISPUTE_DELTA, \
|
||||
SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
||||
import pisa.conf as conf
|
||||
from pisa import logging, bitcoin_cli
|
||||
from pisa.appointment import Appointment
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
|
||||
|
||||
class Inspector:
|
||||
def __init__(self, debug=False, logging=None):
|
||||
self.debug = debug
|
||||
self.logging = logging
|
||||
|
||||
def inspect(self, data):
|
||||
locator = data.get('locator')
|
||||
start_time = data.get('start_time')
|
||||
@@ -20,8 +17,6 @@ class Inspector:
|
||||
cipher = data.get('cipher')
|
||||
hash_function = data.get('hash_function')
|
||||
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
try:
|
||||
block_height = bitcoin_cli.getblockcount()
|
||||
|
||||
@@ -45,8 +40,7 @@ class Inspector:
|
||||
r = (rcode, message)
|
||||
|
||||
except JSONRPCException as e:
|
||||
if self.debug:
|
||||
self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
|
||||
logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
|
||||
|
||||
# In case of an unknown exception, assign a special rcode and reason.
|
||||
r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred")
|
||||
@@ -71,8 +65,7 @@ class Inspector:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
|
||||
message = "wrong locator format ({})".format(locator)
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -95,8 +88,7 @@ class Inspector:
|
||||
else:
|
||||
message = "start_time too close to current height"
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -122,8 +114,7 @@ class Inspector:
|
||||
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
|
||||
message = 'end_time is in the past'
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -139,13 +130,12 @@ class Inspector:
|
||||
elif t != int:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong dispute_delta data type ({})".format(t)
|
||||
elif dispute_delta < MIN_DISPUTE_DELTA:
|
||||
elif dispute_delta < conf.MIN_DISPUTE_DELTA:
|
||||
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
|
||||
message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format(
|
||||
MIN_DISPUTE_DELTA, dispute_delta)
|
||||
conf.MIN_DISPUTE_DELTA, dispute_delta)
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -166,8 +156,8 @@ class Inspector:
|
||||
# ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD
|
||||
message = "wrong encrypted_blob"
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -183,12 +173,11 @@ class Inspector:
|
||||
elif t != str:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong cipher data type ({})".format(t)
|
||||
elif cipher not in SUPPORTED_CIPHERS:
|
||||
elif cipher not in conf.SUPPORTED_CIPHERS:
|
||||
rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED
|
||||
message = "cipher not supported: {}".format(cipher)
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -204,11 +193,10 @@ class Inspector:
|
||||
elif t != str:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong hash_function data type ({})".format(t)
|
||||
elif hash_function not in SUPPORTED_HASH_FUNCTIONS:
|
||||
elif hash_function not in conf.SUPPORTED_HASH_FUNCTIONS:
|
||||
rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
|
||||
message = "hash_function not supported {}".format(hash_function)
|
||||
|
||||
if self.debug and message:
|
||||
self.logging.error("[Inspector] {}".format(message))
|
||||
logging.error("[Inspector] {}".format(message))
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -1,36 +1,26 @@
|
||||
import logging
|
||||
from sys import argv
|
||||
from getopt import getopt
|
||||
from threading import Thread
|
||||
|
||||
from pisa import logging
|
||||
from pisa.api import start_api
|
||||
from pisa.tools import can_connect_to_bitcoind, in_correct_network
|
||||
from pisa.utils.authproxy import AuthServiceProxy
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, BTC_NETWORK, SERVER_LOG_FILE
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
debug = False
|
||||
opts, _ = getopt(argv[1:], 'd', ['debug'])
|
||||
for opt, arg in opts:
|
||||
if opt in ['-d', '--debug']:
|
||||
debug = True
|
||||
# FIXME: Leaving this here for future option/arguments
|
||||
pass
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
|
||||
logging.FileHandler(SERVER_LOG_FILE),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
if can_connect_to_bitcoind():
|
||||
if in_correct_network():
|
||||
# Fire the api
|
||||
start_api()
|
||||
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
|
||||
if can_connect_to_bitcoind(bitcoin_cli):
|
||||
if in_correct_network(bitcoin_cli, BTC_NETWORK):
|
||||
# ToDo: This may not have to be a thead. The main thread only creates this and terminates.
|
||||
api_thread = Thread(target=start_api, args=[debug, logging])
|
||||
api_thread.start()
|
||||
else:
|
||||
logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
|
||||
"Shutting down")
|
||||
|
||||
else:
|
||||
logging.error("[Pisad] can't connect to bitcoind. Shutting down")
|
||||
|
||||
@@ -2,25 +2,25 @@ from queue import Queue
|
||||
from threading import Thread
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
from pisa.zmq_subscriber import ZMQHandler
|
||||
from pisa.rpc_errors import *
|
||||
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.carrier import Carrier
|
||||
from pisa import logging
|
||||
from pisa.tools import check_tx_in_chain
|
||||
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
|
||||
CONFIRMATIONS_BEFORE_RETRY = 6
|
||||
MIN_CONFIRMATIONS = 6
|
||||
|
||||
|
||||
class Job:
|
||||
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0):
|
||||
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry_counter=0):
|
||||
self.dispute_txid = dispute_txid
|
||||
self.justice_txid = justice_txid
|
||||
self.justice_rawtx = justice_rawtx
|
||||
self.appointment_end = appointment_end
|
||||
self.confirmations = confirmations
|
||||
|
||||
self.missed_confirmations = 0
|
||||
self.retry_counter = retry_counter
|
||||
|
||||
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
|
||||
@@ -28,8 +28,7 @@ class Job:
|
||||
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
|
||||
|
||||
def to_json(self):
|
||||
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations,
|
||||
"appointment_end": self.appointment_end}
|
||||
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "appointment_end": self.appointment_end}
|
||||
|
||||
return job
|
||||
|
||||
@@ -38,40 +37,37 @@ class Responder:
|
||||
def __init__(self):
|
||||
self.jobs = dict()
|
||||
self.tx_job_map = dict()
|
||||
self.unconfirmed_txs = []
|
||||
self.missed_confirmations = dict()
|
||||
self.block_queue = None
|
||||
self.asleep = True
|
||||
self.zmq_subscriber = None
|
||||
|
||||
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
retry=False):
|
||||
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
|
||||
if self.asleep:
|
||||
logging.info("[Responder] waking up!")
|
||||
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
carrier = Carrier()
|
||||
receipt = carrier.send_transaction(justice_rawtx, justice_txid)
|
||||
|
||||
try:
|
||||
if debug:
|
||||
if self.asleep:
|
||||
logging.info("[Responder] waking up!")
|
||||
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
|
||||
|
||||
bitcoin_cli.sendrawtransaction(justice_rawtx)
|
||||
|
||||
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
if receipt.delivered:
|
||||
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
# retry holds such information.
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
retry=retry)
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=retry,
|
||||
confirmations=receipt.confirmations)
|
||||
|
||||
except JSONRPCException as e:
|
||||
self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
|
||||
debug, logging, retry)
|
||||
else:
|
||||
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
|
||||
pass
|
||||
|
||||
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
confirmations=0, retry=False):
|
||||
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0,
|
||||
retry=False):
|
||||
|
||||
# ToDo: #23-define-behaviour-approaching-end
|
||||
if retry:
|
||||
self.jobs[uuid].retry_counter += 1
|
||||
self.jobs[uuid].missed_confirmations = 0
|
||||
|
||||
else:
|
||||
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
|
||||
|
||||
@@ -81,174 +77,119 @@ class Responder:
|
||||
else:
|
||||
self.tx_job_map[justice_txid] = [uuid]
|
||||
|
||||
if debug:
|
||||
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
|
||||
format(dispute_txid, justice_txid, appointment_end))
|
||||
if confirmations == 0:
|
||||
self.unconfirmed_txs.append(justice_txid)
|
||||
|
||||
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'
|
||||
.format(dispute_txid, justice_txid, appointment_end))
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
self.block_queue = Queue()
|
||||
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
|
||||
responder = Thread(target=self.handle_responses, args=[debug, logging])
|
||||
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
|
||||
responder = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
responder.start()
|
||||
|
||||
def do_subscribe(self, block_queue, debug, logging):
|
||||
def do_subscribe(self, block_queue):
|
||||
self.zmq_subscriber = ZMQHandler(parent='Responder')
|
||||
self.zmq_subscriber.handle(block_queue, debug, logging)
|
||||
self.zmq_subscriber.handle(block_queue)
|
||||
|
||||
def handle_responses(self, debug, logging):
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
def do_watch(self):
|
||||
# ToDo: #9-add-data-persistence
|
||||
# change prev_block_hash to the last known tip when bootstrapping
|
||||
prev_block_hash = 0
|
||||
|
||||
while len(self.jobs) > 0:
|
||||
# We get notified for every new received block
|
||||
block_hash = self.block_queue.get()
|
||||
block = BlockProcessor.get_block(block_hash)
|
||||
|
||||
try:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
if block is not None:
|
||||
txs = block.get('tx')
|
||||
height = block.get('height')
|
||||
|
||||
if debug:
|
||||
logging.info("[Responder] new block received {}".format(block_hash))
|
||||
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
|
||||
logging.info("[Responder] list of transactions: {}".format(txs))
|
||||
logging.info("[Responder] new block received {}".format(block_hash))
|
||||
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
|
||||
logging.info("[Responder] list of transactions: {}".format(txs))
|
||||
|
||||
except JSONRPCException as e:
|
||||
if debug:
|
||||
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
|
||||
# ToDo: #9-add-data-persistence
|
||||
# change prev_block_hash condition
|
||||
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
|
||||
self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations(
|
||||
txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations)
|
||||
|
||||
continue
|
||||
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
|
||||
self.jobs, self.tx_job_map = Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map,
|
||||
self.get_completed_jobs(height), height)
|
||||
|
||||
completed_jobs = []
|
||||
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
|
||||
# Keep count of the confirmations each tx gets
|
||||
for justice_txid, jobs in self.tx_job_map.items():
|
||||
for uuid in jobs:
|
||||
if justice_txid in txs or self.jobs[uuid].confirmations > 0:
|
||||
self.jobs[uuid].confirmations += 1
|
||||
self.rebroadcast(txs_to_rebroadcast)
|
||||
|
||||
if debug:
|
||||
logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
|
||||
uuid, justice_txid))
|
||||
|
||||
elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast
|
||||
# ToDO: #22-discuss-confirmations-before-retry
|
||||
# ToDo: #23-define-behaviour-approaching-end
|
||||
self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
|
||||
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug,
|
||||
logging, retry=True)
|
||||
if debug:
|
||||
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
|
||||
.format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
|
||||
|
||||
else:
|
||||
# Otherwise we increase the number of missed confirmations
|
||||
self.jobs[uuid].missed_confirmations += 1
|
||||
|
||||
if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \
|
||||
MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
completed_jobs.append(uuid)
|
||||
|
||||
self.remove_completed_jobs(completed_jobs, height, debug, logging)
|
||||
|
||||
else:
|
||||
if debug:
|
||||
else:
|
||||
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
|
||||
.format(prev_block_hash, block.get('previousblockhash')))
|
||||
|
||||
self.handle_reorgs(bitcoin_cli, debug, logging)
|
||||
self.handle_reorgs()
|
||||
|
||||
prev_block_hash = block.get('hash')
|
||||
prev_block_hash = block.get('hash')
|
||||
|
||||
# Go back to sleep if there are no more jobs
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
|
||||
if debug:
|
||||
logging.info("[Responder] no more pending jobs, going back to sleep")
|
||||
logging.info("[Responder] no more pending jobs, going back to sleep")
|
||||
|
||||
def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
|
||||
debug, logging, retry):
|
||||
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
|
||||
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
|
||||
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
|
||||
if e.error.get('code') == RPC_VERIFY_REJECTED:
|
||||
# DISCUSS: what to do in this case
|
||||
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
|
||||
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
|
||||
# DISCUSS: check errors -9 and -10
|
||||
pass
|
||||
def get_txs_to_rebroadcast(self, txs):
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
|
||||
try:
|
||||
if debug:
|
||||
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
|
||||
"start monitoring the transaction".format(justice_txid))
|
||||
for tx in txs:
|
||||
if self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY:
|
||||
# If a transactions has missed too many confirmations we add it to the rebroadcast list
|
||||
txs_to_rebroadcast.append(tx)
|
||||
|
||||
# If the transaction is already in the chain, we get the number of confirmations and watch the job
|
||||
# until the end of the appointment
|
||||
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
retry=retry, confirmations=confirmations)
|
||||
return txs_to_rebroadcast
|
||||
|
||||
except JSONRPCException as e:
|
||||
# While it's quite unlikely, the transaction that was already in the blockchain could have been
|
||||
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
|
||||
# restart the job
|
||||
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug,
|
||||
logging, retry=retry)
|
||||
elif debug:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
def get_completed_jobs(self, height):
|
||||
completed_jobs = []
|
||||
|
||||
elif debug:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error {}".format(e))
|
||||
for uuid, job in self.jobs:
|
||||
if job.appointment_end <= height:
|
||||
tx = Carrier.get_transaction(job.dispute_txid)
|
||||
|
||||
def remove_completed_jobs(self, completed_jobs, height, debug, logging):
|
||||
for uuid in completed_jobs:
|
||||
if debug:
|
||||
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
|
||||
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
|
||||
self.jobs[uuid].confirmations))
|
||||
# FIXME: Should be improved with the librarian
|
||||
if tx is not None and tx.get('confirmations') > MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
completed_jobs.append(uuid)
|
||||
|
||||
# ToDo: #9-add-data-persistency
|
||||
justice_txid = self.jobs[uuid].justice_txid
|
||||
self.jobs.pop(uuid)
|
||||
return completed_jobs
|
||||
|
||||
if len(self.tx_job_map[justice_txid]) == 1:
|
||||
self.tx_job_map.pop(justice_txid)
|
||||
def rebroadcast(self, jobs_to_rebroadcast):
|
||||
# ToDO: #22-discuss-confirmations-before-retry
|
||||
# ToDo: #23-define-behaviour-approaching-end
|
||||
|
||||
if debug:
|
||||
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
|
||||
for tx in jobs_to_rebroadcast:
|
||||
for uuid in self.tx_job_map[tx]:
|
||||
self.add_response(uuid, self.jobs[uuid].dispute_txid, self.jobs[uuid].justice_txid,
|
||||
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, retry=True)
|
||||
|
||||
else:
|
||||
self.tx_job_map[justice_txid].remove(uuid)
|
||||
logging.warning("[Responder] tx {} has missed {} confirmations. Rebroadcasting"
|
||||
.format(self.jobs[uuid].justice_txid, CONFIRMATIONS_BEFORE_RETRY))
|
||||
|
||||
def handle_reorgs(self, bitcoin_cli, debug, logging):
|
||||
# FIXME: Legacy code, must be checked and updated/fixed
|
||||
def handle_reorgs(self):
|
||||
for uuid, job in self.jobs.items():
|
||||
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
|
||||
# there either, so we'll need to call the reorg manager straight away
|
||||
dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging,
|
||||
parent='Responder',
|
||||
tx_label='dispute tx')
|
||||
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, parent='Responder', tx_label='dispute tx')
|
||||
|
||||
# If the dispute is there, we can check the justice tx
|
||||
if dispute_in_chain:
|
||||
justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug,
|
||||
logging, parent='Responder',
|
||||
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, parent='Responder',
|
||||
tx_label='justice tx')
|
||||
|
||||
# If both transactions are there, we only need to update the justice tx confirmation count
|
||||
if justice_in_chain:
|
||||
if debug:
|
||||
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
|
||||
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
|
||||
job.justice_txid, job.confirmations, justice_confirmations))
|
||||
|
||||
job.confirmations = justice_confirmations
|
||||
@@ -258,9 +199,7 @@ class Responder:
|
||||
# DISCUSS: Adding job back, should we flag it as retried?
|
||||
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
|
||||
# maintained. There is no way of doing so with the current approach. Update if required
|
||||
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
|
||||
job.appointment_end,
|
||||
debug, logging)
|
||||
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
|
||||
|
||||
else:
|
||||
# ToDo: #24-properly-handle-reorgs
|
||||
@@ -268,4 +207,3 @@ class Responder:
|
||||
# reorg manager
|
||||
logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager")
|
||||
logging.error("[Responder] reorg manager not yet implemented")
|
||||
pass
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
import re
|
||||
from pisa.utils.authproxy import JSONRPCException
|
||||
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
|
||||
from http.client import HTTPException
|
||||
|
||||
import pisa.conf as conf
|
||||
from pisa import logging, bitcoin_cli
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
|
||||
|
||||
def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='transaction'):
|
||||
|
||||
def check_tx_in_chain(tx_id, parent='', tx_label='transaction'):
|
||||
tx_in_chain = False
|
||||
confirmations = 0
|
||||
|
||||
@@ -14,22 +17,23 @@ def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='t
|
||||
if tx_info.get("confirmations"):
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
tx_in_chain = True
|
||||
if debug:
|
||||
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
|
||||
elif debug:
|
||||
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
|
||||
|
||||
else:
|
||||
logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id))
|
||||
|
||||
except JSONRPCException as e:
|
||||
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
if debug:
|
||||
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
|
||||
elif debug:
|
||||
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
|
||||
|
||||
else:
|
||||
# ToDO: Unhandled errors, check this properly
|
||||
logging.error("[{}] JSONRPCException. Error code {}".format(parent, e))
|
||||
|
||||
return tx_in_chain, confirmations
|
||||
|
||||
|
||||
def can_connect_to_bitcoind(bitcoin_cli):
|
||||
def can_connect_to_bitcoind():
|
||||
can_connect = True
|
||||
|
||||
try:
|
||||
@@ -40,18 +44,18 @@ def can_connect_to_bitcoind(bitcoin_cli):
|
||||
return can_connect
|
||||
|
||||
|
||||
def in_correct_network(bitcoin_cli, network):
|
||||
def in_correct_network():
|
||||
mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
|
||||
testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
|
||||
correct_network = False
|
||||
|
||||
genesis_block_hash = bitcoin_cli.getblockhash(0)
|
||||
|
||||
if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
|
||||
if conf.BTC_NETWORK == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
|
||||
correct_network = True
|
||||
elif network == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash:
|
||||
elif conf.BTC_NETWORK == 'testnet' and genesis_block_hash == testnet3_genesis_block_hash:
|
||||
correct_network = True
|
||||
elif network == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]:
|
||||
elif conf.BTC_NETWORK == 'regtest' and genesis_block_hash not in [mainnet_genesis_block_hash, testnet3_genesis_block_hash]:
|
||||
correct_network = True
|
||||
|
||||
return correct_network
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import zmq
|
||||
import binascii
|
||||
from pisa import logging
|
||||
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||
|
||||
|
||||
# ToDo: #7-add-async-back-to-zmq
|
||||
class ZMQHandler:
|
||||
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
|
||||
@@ -14,7 +16,7 @@ class ZMQHandler:
|
||||
self.parent = parent
|
||||
self.terminate = False
|
||||
|
||||
def handle(self, block_queue, debug, logging):
|
||||
def handle(self, block_queue):
|
||||
while not self.terminate:
|
||||
msg = self.zmqSubSocket.recv_multipart()
|
||||
|
||||
@@ -27,5 +29,4 @@ class ZMQHandler:
|
||||
block_hash = binascii.hexlify(body).decode('UTF-8')
|
||||
block_queue.put(block_hash)
|
||||
|
||||
if debug:
|
||||
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))
|
||||
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))
|
||||
134
pisa/watcher.py
134
pisa/watcher.py
@@ -1,12 +1,17 @@
|
||||
from binascii import hexlify, unhexlify
|
||||
from uuid import uuid4
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
|
||||
from pisa import logging
|
||||
from pisa.responder import Responder
|
||||
from pisa.zmq_subscriber import ZMQHandler
|
||||
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from hashlib import sha256
|
||||
from uuid import uuid4
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA
|
||||
from pisa.conf import MAX_APPOINTMENTS
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
|
||||
|
||||
# WIP: MOVED BLOCKCHAIN RELATED TASKS TO BLOCK PROCESSOR IN AN AIM TO MAKE THE CODE MORE MODULAR. THIS SHOULD HELP
|
||||
# WITH CODE REUSE WHEN MERGING THE DATA PERSISTENCE PART.
|
||||
|
||||
|
||||
class Watcher:
|
||||
@@ -19,7 +24,7 @@ class Watcher:
|
||||
self.zmq_subscriber = None
|
||||
self.responder = Responder()
|
||||
|
||||
def add_appointment(self, appointment, debug, logging):
|
||||
def add_appointment(self, appointment):
|
||||
# Rationale:
|
||||
# The Watcher will analyze every received block looking for appointment matches. If there is no work
|
||||
# to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block
|
||||
@@ -45,135 +50,68 @@ class Watcher:
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
self.block_queue = Queue()
|
||||
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
|
||||
watcher = Thread(target=self.do_watch, args=[debug, logging])
|
||||
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue])
|
||||
watcher = Thread(target=self.do_watch)
|
||||
zmq_thread.start()
|
||||
watcher.start()
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] waking up!")
|
||||
logging.info("[Watcher] waking up!")
|
||||
|
||||
appointment_added = True
|
||||
|
||||
if debug:
|
||||
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
|
||||
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
|
||||
|
||||
else:
|
||||
appointment_added = False
|
||||
|
||||
if debug:
|
||||
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'
|
||||
.format(appointment.locator))
|
||||
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'.format(
|
||||
appointment.locator))
|
||||
|
||||
return appointment_added
|
||||
|
||||
def do_subscribe(self, block_queue, debug, logging):
|
||||
def do_subscribe(self, block_queue):
|
||||
self.zmq_subscriber = ZMQHandler(parent='Watcher')
|
||||
self.zmq_subscriber.handle(block_queue, debug, logging)
|
||||
|
||||
def do_watch(self, debug, logging):
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
self.zmq_subscriber.handle(block_queue)
|
||||
|
||||
def do_watch(self):
|
||||
while len(self.appointments) > 0:
|
||||
block_hash = self.block_queue.get()
|
||||
logging.info("[Watcher] new block received {}".format(block_hash))
|
||||
|
||||
try:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
block = BlockProcessor.getblock(block_hash)
|
||||
|
||||
if block is not None:
|
||||
txids = block.get('tx')
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] new block received {}".format(block_hash))
|
||||
logging.info("[Watcher] list of transactions: {}".format(txids))
|
||||
logging.info("[Watcher] list of transactions: {}".format(txids))
|
||||
|
||||
self.delete_expired_appointment(block, debug, logging)
|
||||
self.appointments, self.locator_uuid_map = Cleaner.delete_expired_appointment(
|
||||
block, self.appointments, self.locator_uuid_map)
|
||||
|
||||
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
|
||||
|
||||
# Check is any of the tx_ids in the received block is an actual match
|
||||
# Get the locators that are both in the map and in the potential locators dict.
|
||||
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
|
||||
potential_matches = {locator: potential_locators[locator] for locator in intersection}
|
||||
|
||||
if debug:
|
||||
if len(potential_matches) > 0:
|
||||
logging.info("[Watcher] list of potential matches: {}".format(potential_matches))
|
||||
else:
|
||||
logging.info("[Watcher] no potential matches found")
|
||||
|
||||
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
|
||||
potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map)
|
||||
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
|
||||
|
||||
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
|
||||
if debug:
|
||||
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
|
||||
.format(justice_txid, locator, uuid))
|
||||
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
|
||||
.format(justice_txid, locator, uuid))
|
||||
|
||||
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
|
||||
self.appointments[uuid].end_time, debug, logging)
|
||||
self.appointments[uuid].end_time)
|
||||
|
||||
# Delete the appointment
|
||||
self.appointments.pop(uuid)
|
||||
|
||||
# If there was only one appointment that matches the locator we can delete the whole list
|
||||
if len(self.locator_uuid_map[locator]) == 1:
|
||||
# ToDo: #9-add-data-persistency
|
||||
# ToDo: #9-add-data-persistence
|
||||
self.locator_uuid_map.pop(locator)
|
||||
else:
|
||||
# Otherwise we just delete the appointment that matches locator:appointment_pos
|
||||
# ToDo: #9-add-data-persistency
|
||||
# ToDo: #9-add-data-persistence
|
||||
self.locator_uuid_map[locator].remove(uuid)
|
||||
|
||||
except JSONRPCException as e:
|
||||
if debug:
|
||||
logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
|
||||
|
||||
# Go back to sleep if there are no more appointments
|
||||
self.asleep = True
|
||||
self.zmq_subscriber.terminate = True
|
||||
|
||||
if debug:
|
||||
logging.error("[Watcher] no more pending appointments, going back to sleep")
|
||||
|
||||
def delete_expired_appointment(self, block, debug, logging):
|
||||
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
|
||||
+ EXPIRY_DELTA]
|
||||
|
||||
for uuid in to_delete:
|
||||
# ToDo: #9-add-data-persistency
|
||||
locator = self.appointments[uuid].locator
|
||||
|
||||
self.appointments.pop(uuid)
|
||||
|
||||
if len(self.locator_uuid_map[locator]) == 1:
|
||||
self.locator_uuid_map.pop(locator)
|
||||
|
||||
else:
|
||||
self.locator_uuid_map[locator].remove(uuid)
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})"
|
||||
.format(locator, uuid))
|
||||
|
||||
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
|
||||
matches = []
|
||||
|
||||
for locator, dispute_txid in potential_matches.items():
|
||||
for uuid in self.locator_uuid_map[locator]:
|
||||
try:
|
||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug,
|
||||
logging)
|
||||
justice_rawtx = hexlify(justice_rawtx).decode()
|
||||
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
|
||||
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
|
||||
justice_txid))
|
||||
except JSONRPCException as e:
|
||||
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
|
||||
# for the POC
|
||||
if debug:
|
||||
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
|
||||
|
||||
return matches
|
||||
logging.error("[Watcher] no more pending appointments, going back to sleep")
|
||||
|
||||
Reference in New Issue
Block a user