mirror of
https://github.com/aljazceru/python-teos.git
synced 2026-02-19 05:24:37 +01:00
Merge branch '59-uncovered-tests'
This commit is contained in:
41
pisa/api.py
41
pisa/api.py
@@ -79,25 +79,30 @@ def get_appointment():
|
||||
response = []
|
||||
|
||||
# ToDo: #15-add-system-monitor
|
||||
if not isinstance(locator, str) or len(locator) != 64:
|
||||
response.append({"locator": locator, "status": "not_found"})
|
||||
return jsonify(response)
|
||||
|
||||
appointment_in_watcher = watcher.locator_uuid_map.get(locator)
|
||||
locator_map = watcher.db_manager.load_locator_map(locator)
|
||||
|
||||
if appointment_in_watcher:
|
||||
for uuid in appointment_in_watcher:
|
||||
appointment_data = watcher.appointments[uuid].to_dict()
|
||||
appointment_data["status"] = "being_watched"
|
||||
response.append(appointment_data)
|
||||
if locator_map is not None:
|
||||
for uuid in locator_map:
|
||||
appointment_data = watcher.db_manager.load_watcher_appointment(uuid)
|
||||
|
||||
if watcher.responder:
|
||||
responder_jobs = watcher.responder.jobs
|
||||
if appointment_data is not None and appointment_data["triggered"] is False:
|
||||
# Triggered is an internal flag that does not need to be send
|
||||
del appointment_data["triggered"]
|
||||
|
||||
for job in responder_jobs.values():
|
||||
if job.locator == locator:
|
||||
job_data = job.to_dict()
|
||||
appointment_data["status"] = "being_watched"
|
||||
response.append(appointment_data)
|
||||
|
||||
job_data = watcher.db_manager.load_responder_job(uuid)
|
||||
|
||||
if job_data is not None:
|
||||
job_data["status"] = "dispute_responded"
|
||||
response.append(job_data)
|
||||
|
||||
if not response:
|
||||
else:
|
||||
response.append({"locator": locator, "status": "not_found"})
|
||||
|
||||
response = jsonify(response)
|
||||
@@ -107,18 +112,12 @@ def get_appointment():
|
||||
|
||||
@app.route("/get_all_appointments", methods=["GET"])
|
||||
def get_all_appointments():
|
||||
watcher_appointments = {}
|
||||
responder_jobs = {}
|
||||
|
||||
# ToDo: #15-add-system-monitor
|
||||
response = None
|
||||
|
||||
if request.remote_addr in request.host or request.remote_addr == "127.0.0.1":
|
||||
for uuid, appointment in watcher.appointments.items():
|
||||
watcher_appointments[uuid] = appointment.to_dict()
|
||||
|
||||
if watcher.responder:
|
||||
for uuid, job in watcher.responder.jobs.items():
|
||||
responder_jobs[uuid] = job.to_dict()
|
||||
watcher_appointments = watcher.db_manager.load_watcher_appointments()
|
||||
responder_jobs = watcher.db_manager.load_responder_jobs()
|
||||
|
||||
response = jsonify({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs})
|
||||
|
||||
|
||||
@@ -27,7 +27,8 @@ class Appointment:
|
||||
encrypted_blob_data = appointment_data.get("encrypted_blob")
|
||||
cipher = appointment_data.get("cipher")
|
||||
hash_function = appointment_data.get("hash_function")
|
||||
triggered = appointment_data.get("triggered")
|
||||
|
||||
triggered = True if appointment_data.get("triggered") is True else False
|
||||
|
||||
if any(
|
||||
v is None
|
||||
|
||||
@@ -1,6 +1,3 @@
|
||||
import binascii
|
||||
from hashlib import sha256
|
||||
|
||||
from pisa.logger import Logger
|
||||
from pisa.tools import bitcoin_cli
|
||||
from pisa.utils.auth_proxy import JSONRPCException
|
||||
@@ -45,6 +42,18 @@ class BlockProcessor:
|
||||
|
||||
return block_count
|
||||
|
||||
@staticmethod
|
||||
def decode_raw_transaction(raw_tx):
|
||||
|
||||
try:
|
||||
tx = bitcoin_cli().decoderawtransaction(raw_tx)
|
||||
|
||||
except JSONRPCException as e:
|
||||
tx = None
|
||||
logger.error("Can't build transaction from decoded data.", error=e.error)
|
||||
|
||||
return tx
|
||||
|
||||
def get_missed_blocks(self, last_know_block_hash):
|
||||
current_block_hash = self.get_best_block_hash()
|
||||
missed_blocks = []
|
||||
@@ -63,7 +72,7 @@ class BlockProcessor:
|
||||
chain_tip = self.get_best_block_hash()
|
||||
chain_tip_height = self.get_block(chain_tip).get("height")
|
||||
|
||||
target_block = self.get_block(target_block_hash).get("height")
|
||||
target_block = self.get_block(target_block_hash)
|
||||
|
||||
if target_block is not None:
|
||||
target_block_height = target_block.get("height")
|
||||
@@ -71,68 +80,3 @@ class BlockProcessor:
|
||||
distance = chain_tip_height - target_block_height
|
||||
|
||||
return distance
|
||||
|
||||
# FIXME: The following two functions does not seem to belong here. They come from the Watcher, and need to be
|
||||
# separated since they will be reused by the TimeTraveller.
|
||||
# DISCUSS: 36-who-should-check-appointment-trigger
|
||||
@staticmethod
|
||||
def get_potential_matches(txids, locator_uuid_map):
|
||||
potential_locators = {sha256(binascii.unhexlify(txid)).hexdigest(): txid for txid in txids}
|
||||
|
||||
# Check is any of the tx_ids in the received block is an actual match
|
||||
intersection = set(locator_uuid_map.keys()).intersection(potential_locators.keys())
|
||||
potential_matches = {locator: potential_locators[locator] for locator in intersection}
|
||||
|
||||
if len(potential_matches) > 0:
|
||||
logger.info("List of potential matches", potential_matches=potential_matches)
|
||||
|
||||
else:
|
||||
logger.info("No potential matches found")
|
||||
|
||||
return potential_matches
|
||||
|
||||
@staticmethod
|
||||
# NOTCOVERED
|
||||
def get_matches(potential_matches, locator_uuid_map, appointments):
|
||||
matches = []
|
||||
|
||||
for locator, dispute_txid in potential_matches.items():
|
||||
for uuid in locator_uuid_map[locator]:
|
||||
try:
|
||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||
justice_rawtx = appointments[uuid].encrypted_blob.decrypt(dispute_txid)
|
||||
justice_txid = bitcoin_cli().decoderawtransaction(justice_rawtx).get("txid")
|
||||
logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
|
||||
|
||||
except JSONRPCException as e:
|
||||
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
|
||||
# for the POC
|
||||
justice_txid = None
|
||||
justice_rawtx = None
|
||||
logger.error("Can't build transaction from decoded data.", error=e.error)
|
||||
|
||||
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
|
||||
|
||||
return matches
|
||||
|
||||
# DISCUSS: This method comes from the Responder and seems like it could go back there.
|
||||
@staticmethod
|
||||
# NOTCOVERED
|
||||
def check_confirmations(txs, unconfirmed_txs, tx_job_map, missed_confirmations):
|
||||
|
||||
for tx in txs:
|
||||
if tx in tx_job_map and tx in unconfirmed_txs:
|
||||
unconfirmed_txs.remove(tx)
|
||||
|
||||
logger.info("Confirmation received for transaction", tx=tx)
|
||||
|
||||
elif tx in unconfirmed_txs:
|
||||
if tx in missed_confirmations:
|
||||
missed_confirmations[tx] += 1
|
||||
|
||||
else:
|
||||
missed_confirmations[tx] = 1
|
||||
|
||||
logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])
|
||||
|
||||
return unconfirmed_txs, missed_confirmations
|
||||
|
||||
@@ -80,7 +80,7 @@ class Carrier:
|
||||
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
|
||||
# restart the job
|
||||
if e.error.get("code") == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
logger.info("Transaction got reorged before obtaining information", txid=txid)
|
||||
logger.info("Transaction not found in mempool nor blockchain", txid=txid)
|
||||
|
||||
else:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
|
||||
@@ -25,6 +25,24 @@ class Cleaner:
|
||||
# Delete appointment from the db
|
||||
db_manager.delete_watcher_appointment(uuid)
|
||||
|
||||
@staticmethod
|
||||
def delete_completed_appointment(locator, uuid, appointments, locator_uuid_map, db_manager):
|
||||
# Delete the appointment
|
||||
appointment = appointments.pop(uuid)
|
||||
|
||||
# If there was only one appointment that matches the locator we can delete the whole list
|
||||
if len(locator_uuid_map[locator]) == 1:
|
||||
locator_uuid_map.pop(locator)
|
||||
else:
|
||||
# Otherwise we just delete the appointment that matches locator:appointment_pos
|
||||
locator_uuid_map[locator].remove(uuid)
|
||||
|
||||
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
|
||||
# the watcher's and responder's db after fulfilled
|
||||
# Update appointment in the db
|
||||
appointment.triggered = True
|
||||
db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
|
||||
@staticmethod
|
||||
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height, db_manager):
|
||||
for uuid, confirmations in completed_jobs:
|
||||
|
||||
56
pisa/cryptographer.py
Normal file
56
pisa/cryptographer.py
Normal file
@@ -0,0 +1,56 @@
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify, hexlify
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
|
||||
from pisa.logger import Logger
|
||||
|
||||
logger = Logger("Cryptographer")
|
||||
|
||||
|
||||
# FIXME: Cryptographer is assuming AES-128-GCM and SHA256 since they are the only pair accepted by the encrypted blob
|
||||
# and the only pair programmed so far.
|
||||
class Cryptographer:
|
||||
@staticmethod
|
||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||
def decrypt(encrypted_blob, key, rtype="hex"):
|
||||
if rtype not in ["hex", "bytes"]:
|
||||
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
||||
|
||||
if len(encrypted_blob.data) % 2:
|
||||
logger.info(
|
||||
"Incorrect (Odd-length) value to be decrypted.", encrypted_blob=encrypted_blob.data, dispute_txid=key
|
||||
)
|
||||
return None
|
||||
|
||||
# master_key = H(tx_id | tx_id)
|
||||
key = unhexlify(key)
|
||||
master_key = sha256(key + key).digest()
|
||||
|
||||
# The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV.
|
||||
sk = master_key[:16]
|
||||
nonce = master_key[16:]
|
||||
|
||||
logger.info(
|
||||
"Creating new blob.",
|
||||
master_key=hexlify(master_key).decode(),
|
||||
sk=hexlify(sk).decode(),
|
||||
nonce=hexlify(nonce).decode(),
|
||||
encrypted_blob=encrypted_blob.data,
|
||||
)
|
||||
|
||||
# Decrypt
|
||||
cipher = AESGCM(sk)
|
||||
data = unhexlify(encrypted_blob.data.encode())
|
||||
|
||||
try:
|
||||
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
|
||||
|
||||
# Change the blob encoding to hex depending on the rtype (default)
|
||||
if rtype == "hex":
|
||||
blob = hexlify(blob).decode("utf8")
|
||||
|
||||
except InvalidTag:
|
||||
blob = None
|
||||
|
||||
return blob
|
||||
@@ -52,6 +52,11 @@ class DBManager:
|
||||
|
||||
self.db.put(key, value)
|
||||
|
||||
def load_entry(self, key):
|
||||
data = self.db.get(key.encode("utf-8"))
|
||||
data = json.loads(data) if data is not None else data
|
||||
return data
|
||||
|
||||
def delete_entry(self, key, prefix=None):
|
||||
if isinstance(prefix, str):
|
||||
key = prefix + key
|
||||
@@ -60,6 +65,12 @@ class DBManager:
|
||||
|
||||
self.db.delete(key)
|
||||
|
||||
def load_watcher_appointment(self, key):
|
||||
return self.load_entry(WATCHER_PREFIX + key)
|
||||
|
||||
def load_responder_job(self, key):
|
||||
return self.load_entry(RESPONDER_PREFIX + key)
|
||||
|
||||
def load_watcher_appointments(self):
|
||||
all_appointments = self.load_appointments_db(prefix=WATCHER_PREFIX)
|
||||
non_triggered_appointments = {
|
||||
|
||||
@@ -1,48 +1,21 @@
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify, hexlify
|
||||
from cryptography.exceptions import InvalidTag
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
|
||||
from pisa.logger import Logger
|
||||
|
||||
logger = Logger("Watcher")
|
||||
from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
||||
|
||||
|
||||
# FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be
|
||||
# performed depending on the cipher.
|
||||
class EncryptedBlob:
|
||||
def __init__(self, data):
|
||||
def __init__(self, data, cipher="AES-GCM-128", hash_function="SHA256"):
|
||||
if cipher in SUPPORTED_CIPHERS:
|
||||
self.cipher = cipher
|
||||
|
||||
else:
|
||||
raise ValueError("Cipher not supported")
|
||||
|
||||
if hash_function in SUPPORTED_HASH_FUNCTIONS:
|
||||
self.hash_function = hash_function
|
||||
|
||||
else:
|
||||
raise ValueError("Hash function not supported")
|
||||
|
||||
self.data = data
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, EncryptedBlob) and self.data == other.data
|
||||
|
||||
def decrypt(self, key):
|
||||
# master_key = H(tx_id | tx_id)
|
||||
key = unhexlify(key)
|
||||
master_key = sha256(key + key).digest()
|
||||
|
||||
# The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV.
|
||||
sk = master_key[:16]
|
||||
nonce = master_key[16:]
|
||||
|
||||
logger.info(
|
||||
"Creating new blob.",
|
||||
master_key=hexlify(master_key).decode(),
|
||||
sk=hexlify(sk).decode(),
|
||||
nonce=hexlify(sk).decode(),
|
||||
encrypted_blob=self.data,
|
||||
)
|
||||
|
||||
# Decrypt
|
||||
aesgcm = AESGCM(sk)
|
||||
data = unhexlify(self.data.encode())
|
||||
|
||||
try:
|
||||
raw_tx = aesgcm.decrypt(nonce=nonce, data=data, associated_data=None)
|
||||
hex_raw_tx = hexlify(raw_tx).decode("utf8")
|
||||
|
||||
except InvalidTag:
|
||||
hex_raw_tx = None
|
||||
|
||||
return hex_raw_tx
|
||||
|
||||
@@ -24,36 +24,28 @@ logger = Logger("Inspector")
|
||||
|
||||
class Inspector:
|
||||
def inspect(self, appt, signature, public_key):
|
||||
locator = appt.get("locator")
|
||||
start_time = appt.get("start_time")
|
||||
end_time = appt.get("end_time")
|
||||
dispute_delta = appt.get("dispute_delta")
|
||||
encrypted_blob = appt.get("encrypted_blob")
|
||||
cipher = appt.get("cipher")
|
||||
hash_function = appt.get("hash_function")
|
||||
|
||||
block_height = BlockProcessor.get_block_count()
|
||||
|
||||
if block_height is not None:
|
||||
rcode, message = self.check_locator(locator)
|
||||
rcode, message = self.check_locator(appt.get("locator"))
|
||||
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_start_time(start_time, block_height)
|
||||
rcode, message = self.check_start_time(appt.get("start_time"), block_height)
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_end_time(end_time, start_time, block_height)
|
||||
rcode, message = self.check_end_time(appt.get("end_time"), appt.get("start_time"), block_height)
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_delta(dispute_delta)
|
||||
rcode, message = self.check_delta(appt.get("dispute_delta"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_blob(encrypted_blob)
|
||||
rcode, message = self.check_blob(appt.get("encrypted_blob"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_cipher(cipher)
|
||||
rcode, message = self.check_cipher(appt.get("cipher"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_hash_function(hash_function)
|
||||
rcode, message = self.check_hash_function(appt.get("hash_function"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_appointment_signature(appt, signature, public_key)
|
||||
|
||||
if rcode == 0:
|
||||
r = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
|
||||
r = Appointment.from_dict(appt)
|
||||
else:
|
||||
r = (rcode, message)
|
||||
|
||||
@@ -274,5 +266,6 @@ class Inspector:
|
||||
|
||||
except InvalidSignature:
|
||||
rcode = errors.APPOINTMENT_INVALID_SIGNATURE
|
||||
message = "invalid signature"
|
||||
|
||||
return rcode, message
|
||||
|
||||
@@ -65,16 +65,16 @@ if __name__ == "__main__":
|
||||
missed_blocks_responder = (
|
||||
missed_blocks_watcher
|
||||
if last_block_watcher == last_block_responder
|
||||
else block_processor.get_missed_blocks(last_block_watcher)
|
||||
else block_processor.get_missed_blocks(last_block_responder)
|
||||
)
|
||||
|
||||
responder = Responder(db_manager)
|
||||
responder.jobs, responder.tx_job_map = Builder.build_jobs(responder_jobs_data)
|
||||
responder.block_queue = Builder.build_block_queue(last_block_responder)
|
||||
responder.block_queue = Builder.build_block_queue(missed_blocks_responder)
|
||||
|
||||
watcher.responder = responder
|
||||
watcher.appointments, watcher.locator_uuid_map = Builder.build_appointments(watcher_appointments_data)
|
||||
watcher.block_queue = Builder.build_block_queue(last_block_responder)
|
||||
watcher.block_queue = Builder.build_block_queue(missed_blocks_watcher)
|
||||
|
||||
# Create an instance of the Watcher and fire the API
|
||||
start_api(watcher)
|
||||
|
||||
@@ -7,7 +7,6 @@ from binascii import unhexlify
|
||||
from pisa.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.carrier import Carrier
|
||||
from pisa.tools import check_tx_in_chain
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
|
||||
@@ -113,7 +112,8 @@ class Responder:
|
||||
else:
|
||||
self.tx_job_map[justice_txid] = [uuid]
|
||||
|
||||
if confirmations == 0:
|
||||
# In the case we receive two jobs with the same justice txid we only add it to the unconfirmed txs list once
|
||||
if justice_txid not in self.unconfirmed_txs and confirmations == 0:
|
||||
self.unconfirmed_txs.append(justice_txid)
|
||||
|
||||
self.db_manager.store_responder_job(uuid, job.to_json())
|
||||
@@ -145,7 +145,6 @@ class Responder:
|
||||
|
||||
if block is not None:
|
||||
txs = block.get("tx")
|
||||
height = block.get("height")
|
||||
|
||||
logger.info(
|
||||
"New block received", block_hash=block_hash, prev_block_hash=block.get("previousblockhash"), txs=txs
|
||||
@@ -153,10 +152,9 @@ class Responder:
|
||||
|
||||
# ToDo: #9-add-data-persistence
|
||||
if prev_block_hash == block.get("previousblockhash"):
|
||||
self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations(
|
||||
txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations
|
||||
)
|
||||
self.check_confirmations(txs)
|
||||
|
||||
height = block.get("height")
|
||||
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
|
||||
completed_jobs = self.get_completed_jobs(height)
|
||||
|
||||
@@ -172,7 +170,7 @@ class Responder:
|
||||
)
|
||||
|
||||
# ToDo: #24-properly-handle-reorgs
|
||||
self.handle_reorgs()
|
||||
self.handle_reorgs(block_hash)
|
||||
|
||||
# Register the last processed block for the responder
|
||||
self.db_manager.store_last_block_hash_responder(block_hash)
|
||||
@@ -186,6 +184,25 @@ class Responder:
|
||||
|
||||
logger.info("No more pending jobs, going back to sleep")
|
||||
|
||||
def check_confirmations(self, txs):
|
||||
# If a new confirmed tx matches a tx we are watching, then we remove it from the unconfirmed txs map
|
||||
for tx in txs:
|
||||
if tx in self.tx_job_map and tx in self.unconfirmed_txs:
|
||||
self.unconfirmed_txs.remove(tx)
|
||||
|
||||
logger.info("Confirmation received for transaction", tx=tx)
|
||||
|
||||
# We also add a missing confirmation to all those txs waiting to be confirmed that have not been confirmed in
|
||||
# the current block
|
||||
for tx in self.unconfirmed_txs:
|
||||
if tx in self.missed_confirmations:
|
||||
self.missed_confirmations[tx] += 1
|
||||
|
||||
else:
|
||||
self.missed_confirmations[tx] = 1
|
||||
|
||||
logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=self.missed_confirmations[tx])
|
||||
|
||||
def get_txs_to_rebroadcast(self, txs):
|
||||
txs_to_rebroadcast = []
|
||||
|
||||
@@ -244,41 +261,43 @@ class Responder:
|
||||
|
||||
return receipts
|
||||
|
||||
# FIXME: Legacy code, must be checked and updated/fixed
|
||||
# NOTCOVERED
|
||||
def handle_reorgs(self):
|
||||
def handle_reorgs(self, block_hash):
|
||||
carrier = Carrier()
|
||||
|
||||
for uuid, job in self.jobs.items():
|
||||
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
|
||||
# there either, so we'll need to call the reorg manager straight away
|
||||
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label="Dispute tx")
|
||||
# First we check if the dispute transaction is known (exists either in mempool or blockchain)
|
||||
dispute_tx = carrier.get_transaction(job.dispute_txid)
|
||||
|
||||
# If the dispute is there, we can check the justice tx
|
||||
if dispute_in_chain:
|
||||
justice_in_chain, justice_confirmations = check_tx_in_chain(
|
||||
job.justice_txid, logger=logger, tx_label="Justice tx"
|
||||
)
|
||||
if dispute_tx is not None:
|
||||
# If the dispute is there, we check the justice
|
||||
justice_tx = carrier.get_transaction(job.justice_txid)
|
||||
|
||||
# If both transactions are there, we only need to update the justice tx confirmation count
|
||||
if justice_in_chain:
|
||||
logger.info(
|
||||
"Updating confirmation count for transaction.",
|
||||
justice_txid=job.justice_txid,
|
||||
prev_count=job.confirmations,
|
||||
curr_count=justice_confirmations,
|
||||
)
|
||||
if justice_tx is not None:
|
||||
# If the justice exists we need to check is it's on the blockchain or not so we can update the
|
||||
# unconfirmed transactions list accordingly.
|
||||
if justice_tx.get("confirmations") is None:
|
||||
self.unconfirmed_txs.append(job.justice_txid)
|
||||
|
||||
job.confirmations = justice_confirmations
|
||||
logger.info(
|
||||
"Justice transaction back in mempool. Updating unconfirmed transactions.",
|
||||
justice_txid=job.justice_txid,
|
||||
)
|
||||
|
||||
else:
|
||||
# Otherwise, we will add the job back (implying rebroadcast of the tx) and monitor it again
|
||||
# If the justice transaction is missing, we need to reset the job.
|
||||
# DISCUSS: Adding job back, should we flag it as retried?
|
||||
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
|
||||
# maintained. There is no way of doing so with the current approach. Update if required
|
||||
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
|
||||
self.add_response(
|
||||
uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end, block_hash
|
||||
)
|
||||
|
||||
logger.warning("Justice transaction banished. Resetting the job", justice_tx=job.justice_txid)
|
||||
|
||||
else:
|
||||
# ToDo: #24-properly-handle-reorgs
|
||||
# FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the
|
||||
# reorg manager
|
||||
logger.warning("Dispute and justice transaction missing. Calling the reorg manager")
|
||||
logger.error("Reorg manager not yet implemented")
|
||||
logger.warning("Dispute and justice transaction missing. Calling the reorg manager.")
|
||||
logger.error("Reorg manager not yet implemented.")
|
||||
|
||||
@@ -2,8 +2,6 @@ import re
|
||||
from http.client import HTTPException
|
||||
|
||||
import pisa.conf as conf
|
||||
from pisa.logger import Logger
|
||||
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
|
||||
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
|
||||
|
||||
|
||||
@@ -14,34 +12,6 @@ def bitcoin_cli():
|
||||
)
|
||||
|
||||
|
||||
# TODO: currently only used in the Responder; might move there or in the BlockProcessor
|
||||
# NOTCOVERED
|
||||
def check_tx_in_chain(tx_id, logger=Logger(), tx_label="Transaction"):
|
||||
tx_in_chain = False
|
||||
confirmations = 0
|
||||
|
||||
try:
|
||||
tx_info = bitcoin_cli().getrawtransaction(tx_id, 1)
|
||||
|
||||
if tx_info.get("confirmations"):
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
tx_in_chain = True
|
||||
logger.error("{} found in the blockchain".format(tx_label), txid=tx_id)
|
||||
|
||||
else:
|
||||
logger.error("{} found in mempool".format(tx_label), txid=tx_id)
|
||||
|
||||
except JSONRPCException as e:
|
||||
if e.error.get("code") == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
logger.error("{} not found in mempool nor blockchain".format(tx_label), txid=tx_id)
|
||||
|
||||
else:
|
||||
# ToDO: Unhandled errors, check this properly
|
||||
logger.error("JSONRPCException.", method="tools.check_tx_in_chain", error=e.error)
|
||||
|
||||
return tx_in_chain, confirmations
|
||||
|
||||
|
||||
# NOTCOVERED
|
||||
def can_connect_to_bitcoind():
|
||||
can_connect = True
|
||||
|
||||
101
pisa/watcher.py
101
pisa/watcher.py
@@ -1,24 +1,27 @@
|
||||
from uuid import uuid4
|
||||
from queue import Queue
|
||||
from hashlib import sha256
|
||||
from threading import Thread
|
||||
from binascii import unhexlify
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
|
||||
from pisa.logger import Logger
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS, PISA_SECRET_KEY
|
||||
from pisa.responder import Responder
|
||||
from pisa.cryptographer import Cryptographer
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.utils.zmq_subscriber import ZMQHandler
|
||||
from pisa.conf import EXPIRY_DELTA, MAX_APPOINTMENTS, PISA_SECRET_KEY
|
||||
|
||||
logger = Logger("Watcher")
|
||||
|
||||
|
||||
class Watcher:
|
||||
def __init__(self, db_manager, responder=None, max_appointments=MAX_APPOINTMENTS):
|
||||
def __init__(self, db_manager, pisa_sk_file=PISA_SECRET_KEY, responder=None, max_appointments=MAX_APPOINTMENTS):
|
||||
self.appointments = dict()
|
||||
self.locator_uuid_map = dict()
|
||||
self.asleep = True
|
||||
@@ -30,13 +33,17 @@ class Watcher:
|
||||
if not isinstance(responder, Responder):
|
||||
self.responder = Responder(db_manager)
|
||||
|
||||
if PISA_SECRET_KEY is None:
|
||||
if pisa_sk_file is None:
|
||||
raise ValueError("No signing key provided. Please fix your pisa.conf")
|
||||
else:
|
||||
with open(PISA_SECRET_KEY, "r") as key_file:
|
||||
secret_key_pem = key_file.read().encode("utf-8")
|
||||
self.signing_key = load_pem_private_key(secret_key_pem, password=None, backend=default_backend())
|
||||
|
||||
@staticmethod
|
||||
def compute_locator(tx_id):
|
||||
return sha256(unhexlify(tx_id)).hexdigest()
|
||||
|
||||
def sign_appointment(self, appointment):
|
||||
data = appointment.serialize()
|
||||
return self.signing_key.sign(data, ec.ECDSA(hashes.SHA256()))
|
||||
@@ -115,46 +122,34 @@ class Watcher:
|
||||
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
|
||||
)
|
||||
|
||||
potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map)
|
||||
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
|
||||
filtered_matches = self.filter_valid_matches(self.get_matches(txids))
|
||||
|
||||
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
|
||||
for uuid, filtered_match in filtered_matches.items():
|
||||
# Errors decrypting the Blob will result in a None justice_txid
|
||||
if justice_txid is not None:
|
||||
if filtered_match["valid_match"] is True:
|
||||
logger.info(
|
||||
"Notifying responder and deleting appointment.",
|
||||
justice_txid=justice_txid,
|
||||
locator=locator,
|
||||
justice_txid=filtered_match["justice_txid"],
|
||||
locator=filtered_match["locator"],
|
||||
uuid=uuid,
|
||||
)
|
||||
|
||||
self.responder.add_response(
|
||||
uuid,
|
||||
dispute_txid,
|
||||
justice_txid,
|
||||
justice_rawtx,
|
||||
filtered_match["dispute_txid"],
|
||||
filtered_match["justice_txid"],
|
||||
filtered_match["justice_rawtx"],
|
||||
self.appointments[uuid].end_time,
|
||||
block_hash,
|
||||
)
|
||||
|
||||
# Delete the appointment
|
||||
appointment = self.appointments.pop(uuid)
|
||||
# Delete the appointment and update db
|
||||
Cleaner.delete_completed_appointment(
|
||||
filtered_match["locator"], uuid, self.appointments, self.locator_uuid_map, self.db_manager
|
||||
)
|
||||
|
||||
# If there was only one appointment that matches the locator we can delete the whole list
|
||||
if len(self.locator_uuid_map[locator]) == 1:
|
||||
self.locator_uuid_map.pop(locator)
|
||||
else:
|
||||
# Otherwise we just delete the appointment that matches locator:appointment_pos
|
||||
self.locator_uuid_map[locator].remove(uuid)
|
||||
|
||||
# DISCUSS: instead of deleting the appointment, we will mark it as triggered and delete it from both
|
||||
# the watcher's and responder's db after fulfilled
|
||||
# Update appointment in the db
|
||||
appointment.triggered = True
|
||||
self.db_manager.store_watcher_appointment(uuid, appointment.to_json())
|
||||
|
||||
# Register the last processed block for the watcher
|
||||
self.db_manager.store_last_block_hash_watcher(block_hash)
|
||||
# Register the last processed block for the watcher
|
||||
self.db_manager.store_last_block_hash_watcher(block_hash)
|
||||
|
||||
# Go back to sleep if there are no more appointments
|
||||
self.asleep = True
|
||||
@@ -162,3 +157,47 @@ class Watcher:
|
||||
self.block_queue = Queue()
|
||||
|
||||
logger.info("No more pending appointments, going back to sleep")
|
||||
|
||||
def get_matches(self, txids):
|
||||
potential_locators = {Watcher.compute_locator(txid): txid for txid in txids}
|
||||
|
||||
# Check is any of the tx_ids in the received block is an actual match
|
||||
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
|
||||
matches = {locator: potential_locators[locator] for locator in intersection}
|
||||
|
||||
if len(matches) > 0:
|
||||
logger.info("List of matches", potential_matches=matches)
|
||||
|
||||
else:
|
||||
logger.info("No matches found")
|
||||
|
||||
return matches
|
||||
|
||||
def filter_valid_matches(self, matches):
|
||||
filtered_matches = {}
|
||||
|
||||
for locator, dispute_txid in matches.items():
|
||||
for uuid in self.locator_uuid_map[locator]:
|
||||
|
||||
justice_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid)
|
||||
justice_tx = BlockProcessor.decode_raw_transaction(justice_rawtx)
|
||||
|
||||
if justice_tx is not None:
|
||||
justice_txid = justice_tx.get("txid")
|
||||
valid_match = True
|
||||
|
||||
logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
|
||||
|
||||
else:
|
||||
justice_txid = None
|
||||
valid_match = False
|
||||
|
||||
filtered_matches[uuid] = {
|
||||
"locator": locator,
|
||||
"dispute_txid": dispute_txid,
|
||||
"justice_txid": justice_txid,
|
||||
"justice_rawtx": justice_rawtx,
|
||||
"valid_match": valid_match,
|
||||
}
|
||||
|
||||
return filtered_matches
|
||||
|
||||
Reference in New Issue
Block a user