mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Addaps pisa to work with the new key/encryption. Also fixes some discovered bugs
The number of confirmations stopped counting from 6+ due to a legacy part of the code. The JSONRPC error format changed with the new version and was not updated.
This commit is contained in:
@@ -6,6 +6,8 @@ from getopt import getopt
|
||||
from sys import argv
|
||||
import logging
|
||||
from conf import CLIENT_LOG_FILE
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
|
||||
from apps.blob import Blob
|
||||
from apps import PISA_API_SERVER, PISA_API_PORT
|
||||
@@ -15,7 +17,7 @@ commands = ['add_appointment']
|
||||
|
||||
|
||||
def build_appointment(tx, tx_id, start_block, end_block, dispute_delta, debug, logging):
|
||||
locator = tx_id[:32]
|
||||
locator = sha256(unhexlify(tx_id)).hexdigest()
|
||||
|
||||
cipher = "AES-GCM-128"
|
||||
hash_function = "SHA256"
|
||||
|
||||
@@ -12,9 +12,6 @@ class Inspector:
|
||||
self.logging = logging
|
||||
|
||||
def inspect(self, data):
|
||||
message = None
|
||||
rcode = 0
|
||||
|
||||
locator = data.get('locator')
|
||||
start_time = data.get('start_time')
|
||||
end_time = data.get('end_time')
|
||||
@@ -52,7 +49,7 @@ class Inspector:
|
||||
if self.debug:
|
||||
self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
|
||||
|
||||
return rcode, message
|
||||
return r
|
||||
|
||||
def check_locator(self, locator):
|
||||
message = None
|
||||
@@ -64,7 +61,7 @@ class Inspector:
|
||||
elif type(locator) != str:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong locator data type ({})".format(type(locator))
|
||||
elif len(locator) != 32:
|
||||
elif len(locator) != 64:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_SIZE
|
||||
message = "wrong locator size ({})".format(len(locator))
|
||||
elif re.search(r'^[0-9A-Fa-f]+$', locator) is None:
|
||||
@@ -169,7 +166,7 @@ class Inspector:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong encrypted_blob data type ({})".format(t)
|
||||
elif encrypted_blob == '':
|
||||
# ToDo: We may want to define this to be at least as long as one block of the cypher we are using
|
||||
# ToDo: We may want to define this to be at least as long as one block of the cipher we are using
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD
|
||||
message = "wrong encrypted_blob".format(t)
|
||||
if self.debug and message:
|
||||
|
||||
@@ -10,11 +10,10 @@ MIN_CONFIRMATIONS = 6
|
||||
|
||||
|
||||
class Job:
|
||||
def __init__(self, dispute_txid, rawtx, appointment_end, retry_counter=0):
|
||||
def __init__(self, dispute_txid, justice_rawtx, appointment_end, retry_counter=0):
|
||||
self.dispute_txid = dispute_txid
|
||||
self.rawtx = rawtx
|
||||
self.justice_rawtx = justice_rawtx
|
||||
self.appointment_end = appointment_end
|
||||
self.in_block_height = None
|
||||
self.missed_confirmations = 0
|
||||
self.retry_counter = retry_counter
|
||||
|
||||
@@ -27,17 +26,19 @@ class Responder:
|
||||
self.asleep = True
|
||||
self.zmq_subscriber = None
|
||||
|
||||
def create_job(self, dispute_txid, txid, rawtx, appointment_end, debug, logging, conf_counter=0, retry=False):
|
||||
def create_job(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, conf_counter=0,
|
||||
retry=False):
|
||||
# DISCUSS: Check what to do if the retry counter gets too big
|
||||
if retry:
|
||||
self.jobs[txid].retry_counter += 1
|
||||
self.jobs[justice_txid].retry_counter += 1
|
||||
self.jobs[justice_txid].missed_confirmations = 0
|
||||
else:
|
||||
self.confirmation_counter[txid] = conf_counter
|
||||
self.jobs[txid] = Job(dispute_txid, rawtx, appointment_end)
|
||||
self.confirmation_counter[justice_txid] = conf_counter
|
||||
self.jobs[justice_txid] = Job(dispute_txid, justice_rawtx, appointment_end)
|
||||
|
||||
if debug:
|
||||
logging.info('[Responder] new job added (dispute txid = {}, txid = {}, appointment end = {})'.format(
|
||||
dispute_txid, txid, appointment_end))
|
||||
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
|
||||
format(dispute_txid, justice_txid, appointment_end))
|
||||
|
||||
if self.asleep:
|
||||
self.asleep = False
|
||||
@@ -47,46 +48,48 @@ class Responder:
|
||||
zmq_thread.start()
|
||||
responder.start()
|
||||
|
||||
def add_response(self, dispute_txid, txid, rawtx, appointment_end, debug, logging, retry=False):
|
||||
def add_response(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=False):
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
try:
|
||||
if debug:
|
||||
if self.asleep:
|
||||
logging.info("[Responder] waking up!")
|
||||
logging.info("[Responder] pushing transaction to the network (txid: {})".format(txid))
|
||||
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
|
||||
|
||||
bitcoin_cli.sendrawtransaction(rawtx)
|
||||
bitcoin_cli.sendrawtransaction(justice_rawtx)
|
||||
|
||||
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
|
||||
# retry holds such information.
|
||||
self.create_job(dispute_txid, txid, rawtx, appointment_end, debug, logging, conf_counter=0, retry=retry)
|
||||
self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging, retry=retry)
|
||||
|
||||
except JSONRPCException as e:
|
||||
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
|
||||
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
|
||||
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
|
||||
if e.code == RPC_VERIFY_REJECTED:
|
||||
if e.error.get('code') == RPC_VERIFY_REJECTED:
|
||||
# DISCUSS: what to do in this case
|
||||
pass
|
||||
elif e.code == RPC_VERIFY_ALREADY_IN_CHAIN:
|
||||
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
|
||||
try:
|
||||
if debug:
|
||||
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count"
|
||||
"and start monitoring the transaction".format(txid))
|
||||
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
|
||||
"start monitoring the transaction".format(justice_txid))
|
||||
|
||||
# If the transaction is already in the chain, we get the number of confirmations and watch the job
|
||||
# until the end of the appointment
|
||||
tx_info = bitcoin_cli.gettransaction(txid)
|
||||
tx_info = bitcoin_cli.gettransaction(justice_txid)
|
||||
confirmations = int(tx_info.get("confirmations"))
|
||||
self.create_job(dispute_txid, txid, rawtx, appointment_end, debug, logging, retry=retry,
|
||||
conf_counter=confirmations)
|
||||
self.create_job(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
retry=retry, conf_counter=confirmations)
|
||||
|
||||
except JSONRPCException as e:
|
||||
# While it's quite unlikely, the transaction that was already in the blockchain could have been
|
||||
# reorged while we were querying bitcoind to get the confirmation count. in such a case we just
|
||||
# restart the job
|
||||
if e.code == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
self.add_response(dispute_txid, txid, rawtx, appointment_end, debug, logging, retry=retry)
|
||||
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
|
||||
self.add_response(dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
|
||||
retry=retry)
|
||||
elif debug:
|
||||
# If something else happens (unlikely but possible) log it so we can treat it in future releases
|
||||
logging.error("[Responder] JSONRPCException. Error code {}".format(e))
|
||||
@@ -120,11 +123,10 @@ class Responder:
|
||||
|
||||
jobs_to_delete = []
|
||||
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
|
||||
# Handling new jobs (aka jobs with not enough confirmations), when a job receives MIN_CONFIRMATIONS
|
||||
# it will be passed to jobs and we will simply check for chain forks.
|
||||
# Keep count of the confirmations each tx gets
|
||||
for job_id, confirmations in self.confirmation_counter.items():
|
||||
# If we see the transaction for the first time, or MIN_CONFIRMATIONS hasn't been reached
|
||||
if job_id in txs or (0 < confirmations < MIN_CONFIRMATIONS):
|
||||
if job_id in txs or (0 < confirmations):
|
||||
self.confirmation_counter[job_id] += 1
|
||||
|
||||
if debug:
|
||||
@@ -135,7 +137,7 @@ class Responder:
|
||||
# DISCUSS: How many confirmations before retry
|
||||
# DISCUSS: recursion vs setting confirmations to 0 and rebroadcast here
|
||||
# DISCUSS: how many max retries and what to do if the cap is reached
|
||||
self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].tx,
|
||||
self.add_response(self.jobs[job_id].dispute_txid, job_id, self.jobs[job_id].justice_rawtx,
|
||||
self.jobs[job_id].appointment_end, debug, logging, retry=True)
|
||||
if debug:
|
||||
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
|
||||
@@ -145,7 +147,7 @@ class Responder:
|
||||
self.jobs[job_id].missed_confirmations += 1
|
||||
|
||||
for job_id, job in self.jobs.items():
|
||||
if job.appointment_end <= height:
|
||||
if job.appointment_end <= height and self.confirmation_counter[job_id] >= MIN_CONFIRMATIONS:
|
||||
# The end of the appointment has been reached
|
||||
jobs_to_delete.append(job_id)
|
||||
|
||||
@@ -191,7 +193,7 @@ class Responder:
|
||||
try:
|
||||
bitcoin_cli.gettransaction(job.dispute_txid)
|
||||
# DISCUSS: Add job back, should we flag it as retried?
|
||||
self.add_response(job.dispute_txid, job_id, job.rawtx, job.appointment_end, debug, logging)
|
||||
self.add_response(job.dispute_txid, job_id, job.justice_rawtx, job.appointment_end, debug, logging)
|
||||
except JSONRPCException as e:
|
||||
# FIXME: It should be safe but check Exception code anyway
|
||||
if debug:
|
||||
|
||||
@@ -5,6 +5,7 @@ from conf import EXPIRY_DELTA
|
||||
from pisa.responder import Responder
|
||||
from pisa.zmq_subscriber import ZMQHandler
|
||||
from utils.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from hashlib import sha256
|
||||
from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
|
||||
|
||||
|
||||
@@ -27,9 +28,9 @@ class Watcher:
|
||||
# max_appointments is reached.
|
||||
|
||||
if len(self.appointments) < self.max_appointments:
|
||||
# Appointments are identified by the locator: the most significant 16 bytes of the commitment txid.
|
||||
# While 16-byte hash collisions are not likely, they are possible, so we will store appointments in lists
|
||||
# even if we only have one (so the code logic is simplified from this point on).
|
||||
# Appointments are identified by the locator: the sha256 of commitment txid (H(tx_id)).
|
||||
# Two different nodes may ask for appointments using the same commitment txid, what will result in a
|
||||
# collision in our appointments structure (and may be an attack surface), we use lists to avoid that.
|
||||
if not self.appointments.get(appointment.locator):
|
||||
self.appointments[appointment.locator] = []
|
||||
|
||||
@@ -76,6 +77,8 @@ class Watcher:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
txids = block.get('tx')
|
||||
|
||||
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] new block received {}".format(block_hash))
|
||||
logging.info("[Watcher] list of transactions: {}".format(txids))
|
||||
@@ -108,10 +111,12 @@ class Watcher:
|
||||
|
||||
del self.appointments[locator][i]
|
||||
|
||||
potential_matches = []
|
||||
|
||||
# Check is any of the tx_ids in the received block is an actual match
|
||||
potential_matches = {}
|
||||
for locator in self.appointments.keys():
|
||||
potential_matches += [(locator, txid[32:]) for txid in txids if txid.startswith(locator)]
|
||||
if locator in potential_locators:
|
||||
# This is locator:txid
|
||||
potential_matches[locator] = potential_locators[locator]
|
||||
|
||||
if debug:
|
||||
if len(potential_matches) > 0:
|
||||
@@ -121,12 +126,12 @@ class Watcher:
|
||||
|
||||
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
|
||||
|
||||
for locator, appointment_pos, dispute_txid, txid, raw_tx in matches:
|
||||
for locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx in matches:
|
||||
if debug:
|
||||
logging.info("[Watcher] notifying responder about {}:{} and deleting appointment".format(
|
||||
locator, appointment_pos))
|
||||
logging.info("[Watcher] notifying responder about {} and deleting appointment {}:{}".format(
|
||||
justice_txid, locator, appointment_pos))
|
||||
|
||||
responder.add_response(dispute_txid, txid, raw_tx,
|
||||
responder.add_response(dispute_txid, justice_txid, justice_rawtx,
|
||||
self.appointments[locator][appointment_pos].end_time, debug, logging)
|
||||
|
||||
# If there was only one appointment that matches the locator we can delete the whole list
|
||||
@@ -152,18 +157,17 @@ class Watcher:
|
||||
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
|
||||
matches = []
|
||||
|
||||
for locator, k in potential_matches:
|
||||
for locator, dispute_txid in potential_matches.items():
|
||||
for appointment_pos, appointment in enumerate(self.appointments.get(locator)):
|
||||
try:
|
||||
dispute_txid = locator + k
|
||||
raw_tx = appointment.encrypted_blob.decrypt(unhexlify(k), debug, logging)
|
||||
raw_tx = hexlify(raw_tx).decode()
|
||||
txid = bitcoin_cli.decoderawtransaction(raw_tx).get('txid')
|
||||
matches.append((locator, appointment_pos, dispute_txid, txid, raw_tx))
|
||||
justice_rawtx = appointment.encrypted_blob.decrypt(unhexlify(dispute_txid), debug, logging)
|
||||
justice_rawtx = hexlify(justice_rawtx).decode()
|
||||
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
|
||||
matches.append((locator, appointment_pos, dispute_txid, justice_txid, justice_rawtx))
|
||||
|
||||
if debug:
|
||||
logging.info("[Watcher] match found for {}:{}! {}".format(locator, appointment_pos,
|
||||
dispute_txid))
|
||||
justice_txid))
|
||||
except JSONRPCException as e:
|
||||
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
|
||||
# for the POC
|
||||
|
||||
Reference in New Issue
Block a user