Merge pull request #41 from sr-gi/testing

Unit tests for most of the pisa components
This commit is contained in:
Sergi Delgado Segura
2019-10-23 13:42:01 +01:00
committed by GitHub
44 changed files with 2598 additions and 877 deletions

7
.coveragerc Normal file
View File

@@ -0,0 +1,7 @@
[run]
omit =
pisa/pisad.py
pisa/logger.py
pisa/sample_conf.py
pisa/time_traveler.py
pisa/utils/auth_proxy.py

2
.gitignore vendored
View File

@@ -10,3 +10,5 @@ bitcoin.conf*
apps/cli/*.json
appointments/
test.py
*.pyc
.cache

View File

@@ -1,3 +1,5 @@
import logging
# PISA-SERVER
DEFAULT_PISA_API_SERVER = 'btc.pisa.watch'
DEFAULT_PISA_API_PORT = 9814
@@ -8,3 +10,9 @@ CLIENT_LOG_FILE = 'pisa.log'
# CRYPTO
SUPPORTED_HASH_FUNCTIONS = ["SHA256"]
SUPPORTED_CIPHERS = ["AES-GCM-128"]
# Configure logging
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
logging.FileHandler(CLIENT_LOG_FILE),
logging.StreamHandler()
])

View File

@@ -1,26 +1,40 @@
from binascii import hexlify, unhexlify
import re
from hashlib import sha256
from binascii import hexlify, unhexlify
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from apps.cli import SUPPORTED_HASH_FUNCTIONS, SUPPORTED_CIPHERS
from pisa.logger import Logger
logger = Logger("Client")
class Blob:
def __init__(self, data, cipher, hash_function):
if type(data) is not str or re.search(r'^[0-9A-Fa-f]+$', data) is None:
raise ValueError("Non-Hex character found in txid.")
self.data = data
self.cipher = cipher
self.hash_function = hash_function
# FIXME: We only support SHA256 for now
if self.hash_function.upper() not in SUPPORTED_HASH_FUNCTIONS:
raise Exception("Hash function not supported ({}). Supported Hash functions: {}"
.format(self.hash_function, SUPPORTED_HASH_FUNCTIONS))
raise ValueError("Hash function not supported ({}). Supported Hash functions: {}"
.format(self.hash_function, SUPPORTED_HASH_FUNCTIONS))
# FIXME: We only support AES-GCM-128 for now
if self.cipher.upper() not in SUPPORTED_CIPHERS:
raise Exception("Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function,
SUPPORTED_CIPHERS))
raise ValueError("Cipher not supported ({}). Supported ciphers: {}".format(self.hash_function,
SUPPORTED_CIPHERS))
def encrypt(self, tx_id):
if len(tx_id) != 64:
raise ValueError("txid does not matches the expected size (32-byte / 64 hex chars).")
elif re.search(r'^[0-9A-Fa-f]+$', tx_id) is None:
raise ValueError("Non-Hex character found in txid.")
def encrypt(self, tx_id, debug, logging):
# Transaction to be encrypted
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
tx = unhexlify(self.data)
@@ -38,11 +52,10 @@ class Blob:
encrypted_blob = aesgcm.encrypt(nonce=nonce, data=tx, associated_data=None)
encrypted_blob = hexlify(encrypted_blob).decode()
if debug:
logging.info("[Client] creating new blob")
logging.info("[Client] master key: {}".format(hexlify(master_key).decode()))
logging.info("[Client] sk: {}".format(hexlify(sk).decode()))
logging.info("[Client] nonce: {}".format(hexlify(nonce).decode()))
logging.info("[Client] encrypted_blob: {}".format(encrypted_blob))
logger.info("Creating new blob",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(nonce).decode(),
encrypted_blob=encrypted_blob)
return encrypted_blob

View File

@@ -2,23 +2,20 @@ import re
import os
import sys
import json
import logging
import requests
from sys import argv
from getopt import getopt, GetoptError
from hashlib import sha256
from binascii import hexlify, unhexlify
from binascii import unhexlify
from getopt import getopt, GetoptError
from requests import ConnectTimeout, ConnectionError
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT, CLIENT_LOG_FILE
from pisa.logger import Logger
from apps.cli.blob import Blob
from apps.cli.help import help_add_appointment, help_get_appointment
from apps.cli import DEFAULT_PISA_API_SERVER, DEFAULT_PISA_API_PORT
def show_message(message, debug, logging):
if debug:
logging.error('[Client] ' + message[0].lower() + message[1:])
else:
sys.exit(message)
logger = Logger("Client")
# FIXME: TESTING ENDPOINT, WON'T BE THERE IN PRODUCTION
@@ -28,9 +25,8 @@ def generate_dummy_appointment():
current_height = r.json().get("block_count")
dummy_appointment_data = {"tx": hexlify(os.urandom(192)).decode('utf-8'),
"tx_id": hexlify(os.urandom(32)).decode('utf-8'), "start_time": current_height + 5,
"end_time": current_height + 10, "dispute_delta": 20}
dummy_appointment_data = {"tx": os.urandom(192).hex(), "tx_id": os.urandom(32).hex(),
"start_time": current_height + 5, "end_time": current_height + 10, "dispute_delta": 20}
print('Generating dummy appointment data:''\n\n' + json.dumps(dummy_appointment_data, indent=4, sort_keys=True))
@@ -39,7 +35,7 @@ def generate_dummy_appointment():
print('\nData stored in dummy_appointment_data.json')
def add_appointment(args, debug, logging):
def add_appointment(args):
appointment_data = None
use_help = "Use 'help add_appointment' for help of how to use the command."
@@ -56,14 +52,14 @@ def add_appointment(args, debug, logging):
if os.path.isfile(fin):
appointment_data = json.load(open(fin))
else:
show_message("Can't find file " + fin, debug, logging)
logger.error("Can't find file " + fin)
else:
show_message("No file provided as appointment. " + use_help, debug, logging)
logger.error("No file provided as appointment. " + use_help)
else:
appointment_data = json.loads(arg_opt)
except json.JSONDecodeError:
show_message("Non-JSON encoded data provided as appointment. " + use_help, debug, logging)
logger.error("Non-JSON encoded data provided as appointment. " + use_help)
if appointment_data:
valid_locator = check_txid_format(appointment_data.get('tx_id'))
@@ -72,28 +68,27 @@ def add_appointment(args, debug, logging):
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
appointment = build_appointment(appointment_data.get('tx'), appointment_data.get('tx_id'),
appointment_data.get('start_time'), appointment_data.get('end_time'),
appointment_data.get('dispute_delta'), debug, logging)
appointment_data.get('dispute_delta'))
if debug:
logging.info("[Client] sending appointment to PISA")
logger.info("Sending appointment to PISA")
try:
r = requests.post(url=add_appointment_endpoint, json=json.dumps(appointment), timeout=5)
show_message("{} (code: {}).".format(r.text, r.status_code), debug, logging)
logger.info("{} (code: {}).".format(r.text, r.status_code))
except ConnectTimeout:
show_message("Can't connect to pisa API. Connection timeout.", debug, logging)
logger.error("Can't connect to pisa API. Connection timeout.")
except ConnectionError:
show_message("Can't connect to pisa API. Server cannot be reached.", debug, logging)
logger.error("Can't connect to pisa API. Server cannot be reached.")
else:
show_message("The provided locator is not valid.", debug, logging)
logger.error("The provided locator is not valid.")
else:
show_message("No appointment data provided. " + use_help, debug, logging)
logger.error("No appointment data provided. " + use_help)
def get_appointment(args, debug, logging):
def get_appointment(args):
if args:
arg_opt = args.pop(0)
@@ -112,18 +107,19 @@ def get_appointment(args, debug, logging):
print(json.dumps(r.json(), indent=4, sort_keys=True))
except ConnectTimeout:
show_message("Can't connect to pisa API. Connection timeout.", debug, logging)
logger.error("Can't connect to pisa API. Connection timeout.")
except ConnectionError:
show_message("Can't connect to pisa API. Server cannot be reached.", debug, logging)
logger.error("Can't connect to pisa API. Server cannot be reached.")
else:
show_message("The provided locator is not valid.", debug, logging)
logger.error("The provided locator is not valid.")
else:
show_message("The provided locator is not valid.", debug, logging)
logger.error("The provided locator is not valid.")
def build_appointment(tx, tx_id, start_block, end_block, dispute_delta, debug, logging):
def build_appointment(tx, tx_id, start_block, end_block, dispute_delta):
locator = sha256(unhexlify(tx_id)).hexdigest()
cipher = "AES-GCM-128"
@@ -131,7 +127,7 @@ def build_appointment(tx, tx_id, start_block, end_block, dispute_delta, debug, l
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
blob = Blob(tx, cipher, hash_function)
encrypted_blob = blob.encrypt(tx_id, debug, logging)
encrypted_blob = blob.encrypt(tx_id)
appointment = {"locator": locator, "start_time": start_block, "end_time": end_block,
"dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function":
@@ -165,14 +161,13 @@ def show_usage():
if __name__ == '__main__':
debug = False
pisa_api_server = DEFAULT_PISA_API_SERVER
pisa_api_port = DEFAULT_PISA_API_PORT
commands = ['add_appointment', 'get_appointment', 'help']
testing_commands = ['generate_dummy_appointment']
try:
opts, args = getopt(argv[1:], 's:p:dh', ['server', 'port', 'debug', 'help'])
opts, args = getopt(argv[1:], 's:p:h', ['server', 'port', 'help'])
for opt, arg in opts:
if opt in ['-s', 'server']:
@@ -183,15 +178,6 @@ if __name__ == '__main__':
if arg:
pisa_api_port = int(arg)
if opt in ['-d', '--debug']:
debug = True
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(CLIENT_LOG_FILE),
logging.StreamHandler()
])
if opt in ['-h', '--help']:
sys.exit(show_usage())
@@ -200,10 +186,10 @@ if __name__ == '__main__':
if command in commands:
if command == 'add_appointment':
add_appointment(args, debug, logging)
add_appointment(args)
elif command == 'get_appointment':
get_appointment(args, debug, logging)
get_appointment(args)
elif command == 'help':
if args:
@@ -216,8 +202,8 @@ if __name__ == '__main__':
sys.exit(help_get_appointment())
else:
show_message("Unknown command. Use help to check the list of available commands.", debug,
logging)
logger.error("Unknown command. Use help to check the list of available commands")
else:
sys.exit(show_usage())
@@ -227,11 +213,14 @@ if __name__ == '__main__':
generate_dummy_appointment()
else:
show_message("Unknown command. Use help to check the list of available commands.", debug, logging)
logger.error("Unknown command. Use help to check the list of available commands")
else:
show_message("No command provided. Use help to check the list of available commands.", debug, logging)
logger.error("No command provided. Use help to check the list of available commands.")
except GetoptError as e:
show_message(e, debug, logging)
logger.error("{}".format(e))
except json.JSONDecodeError as e:
show_message('Non-JSON encoded appointment passed as parameter.', debug, logging)
logger.error("Non-JSON encoded appointment passed as parameter.")

View File

@@ -1,2 +1,13 @@
import logging
from pisa.utils.auth_proxy import AuthServiceProxy
import pisa.conf as conf
HOST = 'localhost'
PORT = 9814
PORT = 9814
# Configure logging
logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
logging.FileHandler(conf.SERVER_LOG_FILE),
logging.StreamHandler()
])

View File

@@ -1,36 +1,38 @@
from pisa import *
import os
import json
from flask import Flask, request, Response, abort, jsonify
from pisa import HOST, PORT, logging
from pisa.logger import Logger
from pisa.watcher import Watcher
from pisa.inspector import Inspector
from pisa.appointment import Appointment
from flask import Flask, request, Response, abort, jsonify
import json
from pisa.block_processor import BlockProcessor
# FIXME: HERE FOR TESTING (get_block_count). REMOVE WHEN REMOVING THE FUNCTION
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
# ToDo: #5-add-async-to-api
app = Flask(__name__)
HTTP_OK = 200
HTTP_BAD_REQUEST = 400
HTTP_SERVICE_UNAVAILABLE = 503
logger = Logger("API")
@app.route('/', methods=['POST'])
def add_appointment():
remote_addr = request.environ.get('REMOTE_ADDR')
remote_port = request.environ.get('REMOTE_PORT')
if debug:
logging.info('[API] connection accepted from {}:{}'.format(remote_addr, remote_port))
logger.info('Connection accepted', from_addr_port='{}:{}'.format(remote_addr, remote_port))
# Check content type once if properly defined
request_data = json.loads(request.get_json())
appointment = inspector.inspect(request_data)
if type(appointment) == Appointment:
appointment_added = watcher.add_appointment(appointment, debug, logging)
appointment_added = watcher.add_appointment(appointment)
# ToDo: #13-create-server-side-signature-receipt
if appointment_added:
@@ -49,9 +51,8 @@ def add_appointment():
rcode = HTTP_BAD_REQUEST
response = "appointment rejected. Request does not match the standard"
if debug:
logging.info('[API] sending response and disconnecting: {} --> {}:{}'.format(response, remote_addr,
remote_port))
logger.info('Sending response and disconnecting',
from_addr_port='{}:{}'.format(remote_addr, remote_port), response=response)
return Response(response, status=rcode, mimetype='text/plain')
@@ -83,7 +84,7 @@ def get_appointment():
response.append(job_data)
if not response:
response.append({"locator": locator, "status": "not found"})
response.append({"locator": locator, "status": "not_found"})
response = jsonify(response)
@@ -103,7 +104,7 @@ def get_all_appointments():
if watcher.responder:
for uuid, job in watcher.responder.jobs.items():
responder_jobs[uuid] = job.to_json()
responder_jobs[uuid] = job.to_dict()
response = jsonify({"watcher_appointments": watcher_appointments, "responder_jobs": responder_jobs})
@@ -115,23 +116,19 @@ def get_all_appointments():
@app.route('/get_block_count', methods=['GET'])
def get_block_count():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
return jsonify({"block_count": bitcoin_cli.getblockcount()})
return jsonify({"block_count": BlockProcessor.get_block_count()})
def start_api(d, l):
def start_api():
# FIXME: Pretty ugly but I haven't found a proper way to pass it to add_appointment
global debug, logging, watcher, inspector
debug = d
logging = l
global watcher, inspector
# ToDo: #18-separate-api-from-watcher
watcher = Watcher()
inspector = Inspector(debug, logging)
inspector = Inspector()
# Setting Flask log t ERROR only so it does not mess with out logging
# Setting Flask log to ERROR only so it does not mess with out logging. Also disabling flask initial messages
logging.getLogger('werkzeug').setLevel(logging.ERROR)
os.environ['WERKZEUG_RUN_MAIN'] = 'true'
app.run(host=HOST, port=PORT)

View File

@@ -3,6 +3,7 @@ from pisa.encrypted_blob import EncryptedBlob
# Basic appointment structure
class Appointment:
# DISCUSS: 35-appointment-checks
def __init__(self, locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function):
self.locator = locator
self.start_time = start_time # ToDo: #4-standardize-appointment-fields
@@ -19,7 +20,5 @@ class Appointment:
return appointment
# ToDO: #3-improve-appointment-strcuture
# ToDO: #3-improve-appointment-structure

111
pisa/block_processor.py Normal file
View File

@@ -0,0 +1,111 @@
import binascii
from hashlib import sha256
from pisa.logger import Logger
from pisa.tools import bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException
logger = Logger("BlockProcessor")
class BlockProcessor:
@staticmethod
def get_block(block_hash):
try:
block = bitcoin_cli().getblock(block_hash)
except JSONRPCException as e:
block = None
logger.error("Couldn't get block from bitcoind.", error=e.error)
return block
@staticmethod
def get_best_block_hash():
try:
block_hash = bitcoin_cli().getbestblockhash()
except JSONRPCException as e:
block_hash = None
logger.error("Couldn't get block hash.", error=e.error)
return block_hash
@staticmethod
def get_block_count():
try:
block_count = bitcoin_cli().getblockcount()
except JSONRPCException as e:
block_count = None
logger.error("Couldn't get block count", error=e.error)
return block_count
# FIXME: The following two functions does not seem to belong here. They come from the Watcher, and need to be
# separated since they will be reused by the TimeTraveller.
# DISCUSS: 36-who-should-check-appointment-trigger
@staticmethod
def get_potential_matches(txids, locator_uuid_map):
potential_locators = {sha256(binascii.unhexlify(txid)).hexdigest(): txid for txid in txids}
# Check is any of the tx_ids in the received block is an actual match
intersection = set(locator_uuid_map.keys()).intersection(potential_locators.keys())
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if len(potential_matches) > 0:
logger.info("List of potential matches", potential_matches=potential_matches)
else:
logger.info("No potential matches found")
return potential_matches
@staticmethod
# NOTCOVERED
def get_matches(potential_matches, locator_uuid_map, appointments):
matches = []
for locator, dispute_txid in potential_matches.items():
for uuid in locator_uuid_map[locator]:
try:
# ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = appointments[uuid].encrypted_blob.decrypt(dispute_txid)
justice_txid = bitcoin_cli().decoderawtransaction(justice_rawtx).get('txid')
logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
justice_txid = None
justice_rawtx = None
logger.error("Can't build transaction from decoded data.", error=e.error)
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
return matches
# DISCUSS: This method comes from the Responder and seems like it could go back there.
@staticmethod
# NOTCOVERED
def check_confirmations(txs, unconfirmed_txs, tx_job_map, missed_confirmations):
for tx in txs:
if tx in tx_job_map and tx in unconfirmed_txs:
unconfirmed_txs.remove(tx)
logger.info("Confirmation received for transaction", tx=tx)
elif tx in unconfirmed_txs:
if tx in missed_confirmations:
missed_confirmations[tx] += 1
else:
missed_confirmations[tx] = 1
logger.info("Transaction missed a confirmation", tx=tx, missed_confirmations=missed_confirmations[tx])
return unconfirmed_txs, missed_confirmations

89
pisa/carrier.py Normal file
View File

@@ -0,0 +1,89 @@
from pisa.rpc_errors import *
from pisa.logger import Logger
from pisa.tools import bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException
from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION, RPC_TX_REORGED_AFTER_BROADCAST
logger = Logger("Carrier")
# FIXME: This class is not fully covered by unit tests
class Receipt:
def __init__(self, delivered, confirmations=0, reason=None):
self.delivered = delivered
self.confirmations = confirmations
self.reason = reason
class Carrier:
# NOTCOVERED
def send_transaction(self, rawtx, txid):
try:
logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx)
bitcoin_cli().sendrawtransaction(rawtx)
receipt = Receipt(delivered=True)
except JSONRPCException as e:
errno = e.error.get('code')
# Since we're pushing a raw transaction to the network we can face several rejections
if errno == RPC_VERIFY_REJECTED:
# DISCUSS: 37-transaction-rejection
# TODO: UNKNOWN_JSON_RPC_EXCEPTION is not the proper exception here. This is long due.
receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
elif errno == RPC_VERIFY_ERROR:
# DISCUSS: 37-transaction-rejection
receipt = Receipt(delivered=False, reason=RPC_VERIFY_ERROR)
logger.error("Transaction couldn't be broadcast", error=e.error)
elif errno == RPC_VERIFY_ALREADY_IN_CHAIN:
logger.info("Transaction is already in the blockchain. Getting confirmation count", txid=txid)
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
tx_info = self.get_transaction(txid)
if tx_info is not None:
confirmations = int(tx_info.get("confirmations"))
receipt = Receipt(delivered=True, confirmations=confirmations, reason=RPC_VERIFY_ALREADY_IN_CHAIN)
else:
# There's a really unlikely edge case where a transaction can be reorged between receiving the
# notification and querying the data. Notice that this implies the tx being also kicked off the
# mempool, which again is really unlikely.
receipt = Receipt(delivered=False, reason=RPC_TX_REORGED_AFTER_BROADCAST)
elif errno == RPC_DESERIALIZATION_ERROR:
# Adding this here just for completeness. We should never end up here. The Carrier only sends txs
# handed by the Responder, who receives them from the Watcher, who checks that the tx can be properly
# deserialized
logger.info("Transaction cannot be deserialized".format(txid))
receipt = Receipt(delivered=False, reason=RPC_DESERIALIZATION_ERROR)
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logger.error("JSONRPCException.", method='Carrier.send_transaction', error=e.error)
receipt = Receipt(delivered=False, reason=UNKNOWN_JSON_RPC_EXCEPTION)
return receipt
@staticmethod
def get_transaction(txid):
try:
tx_info = bitcoin_cli().getrawtransaction(txid, 1)
except JSONRPCException as e:
tx_info = None
# While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
logger.info("Transaction got reorged before obtaining information", txid=txid)
else:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logger.error("JSONRPCException.", method='Carrier.get_transaction', error=e.error)
return tx_info

41
pisa/cleaner.py Normal file
View File

@@ -0,0 +1,41 @@
from pisa.logger import Logger
logger = Logger("Cleaner")
# Dictionaries in Python are "passed-by-reference", so no return is needed for the Cleaner"
# https://docs.python.org/3/faq/programming.html#how-do-i-write-a-function-with-output-parameters-call-by-reference
class Cleaner:
@staticmethod
def delete_expired_appointment(expired_appointments, appointments, locator_uuid_map):
for uuid in expired_appointments:
locator = appointments[uuid].locator
appointments.pop(uuid)
if len(locator_uuid_map[locator]) == 1:
locator_uuid_map.pop(locator)
else:
locator_uuid_map[locator].remove(uuid)
logger.info("End time reached with no match. Deleting appointment.", locator=locator, uuid=uuid)
@staticmethod
def delete_completed_jobs(jobs, tx_job_map, completed_jobs, height):
for uuid, confirmations in completed_jobs:
logger.info("Job completed. Appointment ended after reaching enough confirmations.",
uuid=uuid, height=height, confirmations=confirmations)
# ToDo: #9-add-data-persistence
justice_txid = jobs[uuid].justice_txid
jobs.pop(uuid)
if len(tx_job_map[justice_txid]) == 1:
tx_job_map.pop(justice_txid)
logger.info("No more jobs for justice transaction.", justice_txid=justice_txid)
else:
tx_job_map[justice_txid].remove(uuid)

View File

@@ -1,30 +1,46 @@
from binascii import unhexlify, hexlify
from hashlib import sha256
from binascii import unhexlify, hexlify
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from pisa.logger import Logger
logger = Logger("Watcher")
# FIXME: EncryptedBlob is assuming AES-128-GCM. A cipher field should be part of the object and the decryption should be
# performed depending on the cipher.
class EncryptedBlob:
def __init__(self, data):
self.data = data
def decrypt(self, key, debug, logging):
def __eq__(self, other):
return isinstance(other, EncryptedBlob) and self.data == other.data
def decrypt(self, key):
# master_key = H(tx_id | tx_id)
key = unhexlify(key)
master_key = sha256(key + key).digest()
# The 16 MSB of the master key will serve as the AES GCM 128 secret key. The 16 LSB will serve as the IV.
sk = master_key[:16]
nonce = master_key[16:]
if debug:
logging.info("[Watcher] creating new blob")
logging.info("[Watcher] master key: {}".format(hexlify(master_key).decode()))
logging.info("[Watcher] sk: {}".format(hexlify(sk).decode()))
logging.info("[Watcher] nonce: {}".format(hexlify(nonce).decode()))
logging.info("[Watcher] encrypted_blob: {}".format(self.data))
logger.info("Creating new blob.",
master_key=hexlify(master_key).decode(),
sk=hexlify(sk).decode(),
nonce=hexlify(sk).decode(),
encrypted_blob=self.data)
# Decrypt
aesgcm = AESGCM(sk)
data = unhexlify(self.data.encode())
raw_tx = aesgcm.decrypt(nonce=nonce, data=data, associated_data=None)
return raw_tx
try:
raw_tx = aesgcm.decrypt(nonce=nonce, data=data, associated_data=None)
hex_raw_tx = hexlify(raw_tx).decode('utf8')
except InvalidTag:
hex_raw_tx = None
return hex_raw_tx

View File

@@ -9,6 +9,9 @@ APPOINTMENT_WRONG_FIELD = -7
APPOINTMENT_CIPHER_NOT_SUPPORTED = -8
APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED = -9
# Custom RPC errors
RPC_TX_REORGED_AFTER_BROADCAST = -98
# UNHANDLED
UNKNOWN_JSON_RPC_EXCEPTION = -99

View File

@@ -1,16 +1,20 @@
import re
from pisa.appointment import Appointment
from pisa import errors
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MIN_DISPUTE_DELTA, \
SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
import pisa.conf as conf
from pisa.logger import Logger
from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
logger = Logger("Inspector")
# FIXME: The inspector logs the wrong messages sent form the users. A possible attack surface would be to send a really
# long field that, even if not accepted by PISA, would be stored in the logs. This is a possible DoS surface
# since pisa would store any kind of message (no matter the length). Solution: truncate the length of the fields
# stored + blacklist if multiple wrong requests are received.
class Inspector:
def __init__(self, debug=False, logging=None):
self.debug = debug
self.logging = logging
def inspect(self, data):
locator = data.get('locator')
start_time = data.get('start_time')
@@ -20,12 +24,11 @@ class Inspector:
cipher = data.get('cipher')
hash_function = data.get('hash_function')
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try:
block_height = bitcoin_cli.getblockcount()
block_height = BlockProcessor.get_block_count()
if block_height is not None:
rcode, message = self.check_locator(locator)
if rcode == 0:
rcode, message = self.check_start_time(start_time, block_height)
if rcode == 0:
@@ -44,16 +47,14 @@ class Inspector:
else:
r = (rcode, message)
except JSONRPCException as e:
if self.debug:
self.logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
else:
# In case of an unknown exception, assign a special rcode and reason.
r = (errors.UNKNOWN_JSON_RPC_EXCEPTION, "Unexpected error occurred")
return r
def check_locator(self, locator):
@staticmethod
def check_locator(locator):
message = None
rcode = 0
@@ -71,15 +72,19 @@ class Inspector:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong locator format ({})".format(locator)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message
def check_start_time(self, start_time, block_height):
@staticmethod
def check_start_time(start_time, block_height):
message = None
rcode = 0
# TODO: What's too close to the current height is not properly defined. Right now any appointment that is in the
# future will be accepted (even if it's only one block away).
t = type(start_time)
if start_time is None:
@@ -93,17 +98,21 @@ class Inspector:
if start_time < block_height:
message = "start_time is in the past"
else:
message = "start_time too close to current height"
message = "start_time is too close to current height"
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message
def check_end_time(self, end_time, start_time, block_height):
@staticmethod
def check_end_time(end_time, start_time, block_height):
message = None
rcode = 0
# TODO: What's too close to the current height is not properly defined. Right now any appointment that ends in
# the future will be accepted (even if it's only one block away).
t = type(end_time)
if end_time is None:
@@ -118,16 +127,20 @@ class Inspector:
message = "end_time is smaller than start_time"
else:
message = "end_time is equal to start_time"
elif block_height > end_time:
elif block_height >= end_time:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = 'end_time is in the past'
if block_height > end_time:
message = 'end_time is in the past'
else:
message = 'end_time is too close to current height'
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message
def check_delta(self, dispute_delta):
@staticmethod
def check_delta(dispute_delta):
message = None
rcode = 0
@@ -139,18 +152,19 @@ class Inspector:
elif t != int:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong dispute_delta data type ({})".format(t)
elif dispute_delta < MIN_DISPUTE_DELTA:
elif dispute_delta < conf.MIN_DISPUTE_DELTA:
rcode = errors.APPOINTMENT_FIELD_TOO_SMALL
message = "dispute delta too small. The dispute delta should be at least {} (current: {})".format(
MIN_DISPUTE_DELTA, dispute_delta)
conf.MIN_DISPUTE_DELTA, dispute_delta)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message
# ToDo: #6-define-checks-encrypted-blob
def check_blob(self, encrypted_blob):
@staticmethod
def check_blob(encrypted_blob):
message = None
rcode = 0
@@ -162,16 +176,17 @@ class Inspector:
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong encrypted_blob data type ({})".format(t)
elif encrypted_blob == '':
# ToDo: #6 We may want to define this to be at least as long as one block of the cipher we are using
rcode = errors.APPOINTMENT_WRONG_FIELD
message = "wrong encrypted_blob"
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
elif re.search(r'^[0-9A-Fa-f]+$', encrypted_blob) is None:
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
message = "wrong encrypted_blob format ({})".format(encrypted_blob)
if message is not None:
logger.error(message)
return rcode, message
def check_cipher(self, cipher):
@staticmethod
def check_cipher(cipher):
message = None
rcode = 0
@@ -183,16 +198,17 @@ class Inspector:
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong cipher data type ({})".format(t)
elif cipher not in SUPPORTED_CIPHERS:
elif cipher.upper() not in conf.SUPPORTED_CIPHERS:
rcode = errors.APPOINTMENT_CIPHER_NOT_SUPPORTED
message = "cipher not supported: {}".format(cipher)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message
def check_hash_function(self, hash_function):
@staticmethod
def check_hash_function(hash_function):
message = None
rcode = 0
@@ -204,11 +220,11 @@ class Inspector:
elif t != str:
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
message = "wrong hash_function data type ({})".format(t)
elif hash_function not in SUPPORTED_HASH_FUNCTIONS:
elif hash_function.upper() not in conf.SUPPORTED_HASH_FUNCTIONS:
rcode = errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED
message = "hash_function not supported {}".format(hash_function)
if self.debug and message:
self.logging.error("[Inspector] {}".format(message))
if message is not None:
logger.error(message)
return rcode, message

33
pisa/logger.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
import time
import json
class StructuredMessage(object):
def __init__(self, message, **kwargs):
self.message = message
self.time = time.asctime()
self.kwargs = kwargs
def __str__(self):
return json.dumps({**self.kwargs, "message": self.message, "time": self.time})
class Logger(object):
def __init__(self, actor=None):
self.actor = actor
def _add_prefix(self, msg):
return msg if self.actor is None else "[{}] {}".format(self.actor, msg)
def info(self, msg, **kwargs):
logging.info(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def debug(self, msg, **kwargs):
logging.debug(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def error(self, msg, **kwargs):
logging.error(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))
def warning(self, msg, **kwargs):
logging.warning(StructuredMessage(self._add_prefix(msg), actor=self.actor, **kwargs))

View File

@@ -1,36 +1,41 @@
import logging
from sys import argv
from getopt import getopt
from threading import Thread
from sys import argv, exit
from signal import signal, SIGINT, SIGQUIT, SIGTERM
from pisa.logger import Logger
from pisa.api import start_api
from pisa.conf import BTC_NETWORK
from pisa.tools import can_connect_to_bitcoind, in_correct_network
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, BTC_NETWORK, SERVER_LOG_FILE
logger = Logger("Daemon")
def handle_signals(signal_received, frame):
logger.info("Shutting down PISA")
# TODO: #11-add-graceful-shutdown: add code to close the db, free any resources, etc.
exit(0)
if __name__ == '__main__':
debug = False
opts, _ = getopt(argv[1:], 'd', ['debug'])
logger.info("Starting PISA")
signal(SIGINT, handle_signals)
signal(SIGTERM, handle_signals)
signal(SIGQUIT, handle_signals)
opts, _ = getopt(argv[1:], '', [''])
for opt, arg in opts:
if opt in ['-d', '--debug']:
debug = True
# FIXME: Leaving this here for future option/arguments
pass
# Configure logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(SERVER_LOG_FILE),
logging.StreamHandler()
])
if not can_connect_to_bitcoind():
logger.error("Can't connect to bitcoind. Shutting down")
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
elif not in_correct_network(BTC_NETWORK):
logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down")
if can_connect_to_bitcoind(bitcoin_cli):
if in_correct_network(bitcoin_cli, BTC_NETWORK):
# ToDo: This may not have to be a thead. The main thread only creates this and terminates.
api_thread = Thread(target=start_api, args=[debug, logging])
api_thread.start()
else:
logging.error("[Pisad] bitcoind is running on a different network, check conf.py and bitcoin.conf. "
"Shutting down")
else:
logging.error("[Pisad] can't connect to bitcoind. Shutting down")
# Fire the api
start_api()

View File

@@ -1,255 +1,209 @@
import json
from queue import Queue
from threading import Thread
from hashlib import sha256
from threading import Thread
from binascii import unhexlify
from pisa.zmq_subscriber import ZMQHandler
from pisa.rpc_errors import *
from pisa.logger import Logger
from pisa.cleaner import Cleaner
from pisa.carrier import Carrier
from pisa.tools import check_tx_in_chain
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler
CONFIRMATIONS_BEFORE_RETRY = 6
MIN_CONFIRMATIONS = 6
logger = Logger("Responder")
class Job:
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, retry_counter=0):
def __init__(self, dispute_txid, justice_txid, justice_rawtx, appointment_end):
self.dispute_txid = dispute_txid
self.justice_txid = justice_txid
self.justice_rawtx = justice_rawtx
self.appointment_end = appointment_end
self.confirmations = confirmations
self.missed_confirmations = 0
self.retry_counter = retry_counter
# FIXME: locator is here so we can give info about jobs for now. It can be either passed from watcher or info
# can be directly got from DB
self.locator = sha256(unhexlify(dispute_txid)).hexdigest()
def to_json(self):
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "confirmations": self.confirmations,
"appointment_end": self.appointment_end}
def to_dict(self):
job = {"locator": self.locator, "justice_rawtx": self.justice_rawtx, "appointment_end": self.appointment_end}
return job
def to_json(self):
return json.dumps(self.to_dict())
class Responder:
def __init__(self):
self.jobs = dict()
self.tx_job_map = dict()
self.unconfirmed_txs = []
self.missed_confirmations = dict()
self.block_queue = None
self.asleep = True
self.zmq_subscriber = None
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=False):
def add_response(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, retry=False):
if self.asleep:
logger.info("Waking up")
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
carrier = Carrier()
receipt = carrier.send_transaction(justice_rawtx, justice_txid)
try:
if debug:
if self.asleep:
logging.info("[Responder] waking up!")
logging.info("[Responder] pushing transaction to the network (txid: {})".format(justice_txid))
# do_watch can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds that information. If retry is true the job already exists
if receipt.delivered:
if not retry:
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, receipt.confirmations)
bitcoin_cli.sendrawtransaction(justice_rawtx)
# handle_responses can call add_response recursively if a broadcast transaction does not get confirmations
# retry holds such information.
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry)
except JSONRPCException as e:
self.handle_send_failures(e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
debug, logging, retry)
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
confirmations=0, retry=False):
# ToDo: #23-define-behaviour-approaching-end
if retry:
self.jobs[uuid].retry_counter += 1
self.jobs[uuid].missed_confirmations = 0
else:
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
pass
if justice_txid in self.tx_job_map:
self.tx_job_map[justice_txid].append(uuid)
return receipt
else:
self.tx_job_map[justice_txid] = [uuid]
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0):
self.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
if debug:
logging.info('[Responder] new job added (dispute txid = {}, justice txid = {}, appointment end = {})'.
format(dispute_txid, justice_txid, appointment_end))
if justice_txid in self.tx_job_map:
self.tx_job_map[justice_txid].append(uuid)
else:
self.tx_job_map[justice_txid] = [uuid]
if confirmations == 0:
self.unconfirmed_txs.append(justice_txid)
logger.info("New job added.", dispute_txid=dispute_txid, justice_txid=justice_txid,
appointment_end=appointment_end)
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
responder = Thread(target=self.handle_responses, args=[debug, logging])
zmq_thread = Thread(target=self.do_subscribe)
responder = Thread(target=self.do_watch)
zmq_thread.start()
responder.start()
def do_subscribe(self, block_queue, debug, logging):
def do_subscribe(self):
self.zmq_subscriber = ZMQHandler(parent='Responder')
self.zmq_subscriber.handle(block_queue, debug, logging)
self.zmq_subscriber.handle(self.block_queue)
def do_watch(self):
# ToDo: #9-add-data-persistence
# change prev_block_hash to the last known tip when bootstrapping
prev_block_hash = BlockProcessor.get_best_block_hash()
def handle_responses(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
prev_block_hash = 0
while len(self.jobs) > 0:
# We get notified for every new received block
block_hash = self.block_queue.get()
block = BlockProcessor.get_block(block_hash)
try:
block = bitcoin_cli.getblock(block_hash)
if block is not None:
txs = block.get('tx')
height = block.get('height')
if debug:
logging.info("[Responder] new block received {}".format(block_hash))
logging.info("[Responder] prev. block hash {}".format(block.get('previousblockhash')))
logging.info("[Responder] list of transactions: {}".format(txs))
logger.info("New block received",
block_hash=block_hash, prev_block_hash=block.get('previousblockhash'), txs=txs)
except JSONRPCException as e:
if debug:
logging.error("[Responder] couldn't get block from bitcoind. Error code {}".format(e))
# ToDo: #9-add-data-persistence
if prev_block_hash == block.get('previousblockhash'):
self.unconfirmed_txs, self.missed_confirmations = BlockProcessor.check_confirmations(
txs, self.unconfirmed_txs, self.tx_job_map, self.missed_confirmations)
continue
txs_to_rebroadcast = self.get_txs_to_rebroadcast(txs)
completed_jobs = self.get_completed_jobs(height)
completed_jobs = []
if prev_block_hash == block.get('previousblockhash') or prev_block_hash == 0:
# Keep count of the confirmations each tx gets
for justice_txid, jobs in self.tx_job_map.items():
for uuid in jobs:
if justice_txid in txs or self.jobs[uuid].confirmations > 0:
self.jobs[uuid].confirmations += 1
Cleaner.delete_completed_jobs(self.jobs, self.tx_job_map, completed_jobs, height)
self.rebroadcast(txs_to_rebroadcast)
if debug:
logging.info("[Responder] new confirmation received for job = {}, txid = {}".format(
uuid, justice_txid))
# NOTCOVERED
else:
logger.warning("Reorg found", local_prev_block_hash=prev_block_hash,
remote_prev_block_hash=block.get('previousblockhash'))
elif self.jobs[uuid].missed_confirmations >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations for a while we'll try to rebroadcast
# ToDO: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end
self.add_response(uuid, self.jobs[uuid].dispute_txid, justice_txid,
self.jobs[uuid].justice_rawtx, self.jobs[uuid].appointment_end, debug,
logging, retry=True)
if debug:
logging.warning("[Responder] txid = {} has missed {} confirmations. Rebroadcasting"
.format(justice_txid, CONFIRMATIONS_BEFORE_RETRY))
# ToDo: #24-properly-handle-reorgs
self.handle_reorgs()
else:
# Otherwise we increase the number of missed confirmations
self.jobs[uuid].missed_confirmations += 1
if self.jobs[uuid].appointment_end <= height and self.jobs[uuid].confirmations >= \
MIN_CONFIRMATIONS:
# The end of the appointment has been reached
completed_jobs.append(uuid)
self.remove_completed_jobs(completed_jobs, height, debug, logging)
else:
if debug:
logging.warning("[Responder] reorg found! local prev. block id = {}, remote prev. block id = {}"
.format(prev_block_hash, block.get('previousblockhash')))
self.handle_reorgs(bitcoin_cli, debug, logging)
prev_block_hash = block.get('hash')
prev_block_hash = block.get('hash')
# Go back to sleep if there are no more jobs
self.asleep = True
self.zmq_subscriber.terminate = True
if debug:
logging.info("[Responder] no more pending jobs, going back to sleep")
logger.info("No more pending jobs, going back to sleep")
def handle_send_failures(self, e, bitcoin_cli, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end,
debug, logging, retry):
# Since we're pushing a raw transaction to the network we can get two kind of rejections:
# RPC_VERIFY_REJECTED and RPC_VERIFY_ALREADY_IN_CHAIN. The former implies that the transaction is rejected
# due to network rules, whereas the later implies that the transaction is already in the blockchain.
if e.error.get('code') == RPC_VERIFY_REJECTED:
# DISCUSS: what to do in this case
# DISCUSS: invalid transactions (properly formatted but invalid, like unsigned) fit here too.
# DISCUSS: RPC_VERIFY_ERROR could also be a possible case.
# DISCUSS: check errors -9 and -10
pass
def get_txs_to_rebroadcast(self, txs):
txs_to_rebroadcast = []
elif e.error.get('code') == RPC_VERIFY_ALREADY_IN_CHAIN:
try:
if debug:
logging.info("[Responder] {} is already in the blockchain. Getting the confirmation count and "
"start monitoring the transaction".format(justice_txid))
for tx in txs:
if tx in self.missed_confirmations and self.missed_confirmations[tx] >= CONFIRMATIONS_BEFORE_RETRY:
# If a transactions has missed too many confirmations we add it to the rebroadcast list
txs_to_rebroadcast.append(tx)
# If the transaction is already in the chain, we get the number of confirmations and watch the job
# until the end of the appointment
tx_info = bitcoin_cli.getrawtransaction(justice_txid, 1)
confirmations = int(tx_info.get("confirmations"))
self.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug, logging,
retry=retry, confirmations=confirmations)
return txs_to_rebroadcast
except JSONRPCException as e:
# While it's quite unlikely, the transaction that was already in the blockchain could have been
# reorged while we were querying bitcoind to get the confirmation count. In such a case we just
# restart the job
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
self.add_response(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, debug,
logging, retry=retry)
elif debug:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
def get_completed_jobs(self, height):
completed_jobs = []
elif debug:
# If something else happens (unlikely but possible) log it so we can treat it in future releases
logging.error("[Responder] JSONRPCException. Error {}".format(e))
for uuid, job in self.jobs.items():
if job.appointment_end <= height and job.justice_txid not in self.unconfirmed_txs:
tx = Carrier.get_transaction(job.justice_txid)
def remove_completed_jobs(self, completed_jobs, height, debug, logging):
for uuid in completed_jobs:
if debug:
logging.info("[Responder] job completed (uuid = {}, justice_txid = {}). Appointment ended at "
"block {} after {} confirmations".format(uuid, self.jobs[uuid].justice_txid, height,
self.jobs[uuid].confirmations))
# FIXME: Should be improved with the librarian
if tx is not None:
confirmations = tx.get('confirmations')
# ToDo: #9-add-data-persistency
justice_txid = self.jobs[uuid].justice_txid
self.jobs.pop(uuid)
if confirmations >= MIN_CONFIRMATIONS:
# The end of the appointment has been reached
completed_jobs.append((uuid, confirmations))
if len(self.tx_job_map[justice_txid]) == 1:
self.tx_job_map.pop(justice_txid)
return completed_jobs
if debug:
logging.info("[Responder] no more jobs for justice_txid {}".format(justice_txid))
def rebroadcast(self, txs_to_rebroadcast):
# DISCUSS: #22-discuss-confirmations-before-retry
# ToDo: #23-define-behaviour-approaching-end
else:
self.tx_job_map[justice_txid].remove(uuid)
receipts = []
def handle_reorgs(self, bitcoin_cli, debug, logging):
for txid in txs_to_rebroadcast:
self.missed_confirmations[txid] = 0
for uuid in self.tx_job_map[txid]:
job = self.jobs[uuid]
receipt = self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
job.appointment_end, retry=True)
logger.warning("Transaction has missed many confirmations. Rebroadcasting.",
justice_txid=job.justice_txid, confirmations_missed=CONFIRMATIONS_BEFORE_RETRY)
receipts.append((txid, receipt))
return receipts
# FIXME: Legacy code, must be checked and updated/fixed
# NOTCOVERED
def handle_reorgs(self):
for uuid, job in self.jobs.items():
# First we check if the dispute transaction is still in the blockchain. If not, the justice can not be
# there either, so we'll need to call the reorg manager straight away
dispute_in_chain, _ = check_tx_in_chain(bitcoin_cli, job.dispute_txid, debug, logging,
parent='Responder',
tx_label='dispute tx')
dispute_in_chain, _ = check_tx_in_chain(job.dispute_txid, logger=logger, tx_label='Dispute tx')
# If the dispute is there, we can check the justice tx
if dispute_in_chain:
justice_in_chain, justice_confirmations = check_tx_in_chain(bitcoin_cli, job.justice_txid, debug,
logging, parent='Responder',
tx_label='justice tx')
justice_in_chain, justice_confirmations = check_tx_in_chain(job.justice_txid, logger=logger,
tx_label='Justice tx')
# If both transactions are there, we only need to update the justice tx confirmation count
if justice_in_chain:
if debug:
logging.info("[Responder] updating confirmation count for {}: prev. {}, current {}".format(
job.justice_txid, job.confirmations, justice_confirmations))
logger.info("Updating confirmation count for transaction.",
justice_txid=job.justice_txid,
prev_count=job.confirmations,
curr_count=justice_confirmations)
job.confirmations = justice_confirmations
@@ -258,14 +212,11 @@ class Responder:
# DISCUSS: Adding job back, should we flag it as retried?
# FIXME: Whether we decide to increase the retried counter or not, the current counter should be
# maintained. There is no way of doing so with the current approach. Update if required
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx,
job.appointment_end,
debug, logging)
self.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
else:
# ToDo: #24-properly-handle-reorgs
# FIXME: if the dispute is not on chain (either in mempool or not there al all), we need to call the
# FIXME: if the dispute is not on chain (either in mempool or not there at all), we need to call the
# reorg manager
logging.warning("[Responder] dispute and justice transaction missing. Calling the reorg manager")
logging.error("[Responder] reorg manager not yet implemented")
pass
logger.warning("Dispute and justice transaction missing. Calling the reorg manager")
logger.error("Reorg manager not yet implemented")

View File

@@ -1,51 +1,64 @@
import re
from pisa.utils.authproxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
from http.client import HTTPException
import pisa.conf as conf
from pisa.logger import Logger
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
def check_tx_in_chain(bitcoin_cli, tx_id, debug, logging, parent='', tx_label='transaction'):
# NOTCOVERED
def bitcoin_cli():
return AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))
# TODO: currently only used in the Responder; might move there or in the BlockProcessor
# NOTCOVERED
def check_tx_in_chain(tx_id, logger=Logger(), tx_label='Transaction'):
tx_in_chain = False
confirmations = 0
try:
tx_info = bitcoin_cli.getrawtransaction(tx_id, 1)
tx_info = bitcoin_cli().getrawtransaction(tx_id, 1)
if tx_info.get("confirmations"):
confirmations = int(tx_info.get("confirmations"))
tx_in_chain = True
if debug:
logging.error("[{}] {} found in the blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug:
logging.error("[{}] {} found in mempool (txid: {}) ".format(parent, tx_label, tx_id))
logger.error("{} found in the blockchain".format(tx_label), txid=tx_id)
else:
logger.error("{} found in mempool".format(tx_label), txid=tx_id)
except JSONRPCException as e:
if e.error.get('code') == RPC_INVALID_ADDRESS_OR_KEY:
if debug:
logging.error("[{}] {} not found in mempool nor blockchain (txid: {}) ".format(parent, tx_label, tx_id))
elif debug:
logger.error("{} not found in mempool nor blockchain".format(tx_label), txid=tx_id)
else:
# ToDO: Unhandled errors, check this properly
logging.error("[{}] JSONRPCException. Error code {}".format(parent, e))
logger.error("JSONRPCException.", method='tools.check_tx_in_chain', error=e.error)
return tx_in_chain, confirmations
def can_connect_to_bitcoind(bitcoin_cli):
# NOTCOVERED
def can_connect_to_bitcoind():
can_connect = True
try:
bitcoin_cli.help()
bitcoin_cli().help()
except (ConnectionRefusedError, JSONRPCException, HTTPException):
can_connect = False
return can_connect
def in_correct_network(bitcoin_cli, network):
def in_correct_network(network):
mainnet_genesis_block_hash = "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"
testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
correct_network = False
genesis_block_hash = bitcoin_cli.getblockhash(0)
genesis_block_hash = bitcoin_cli().getblockhash(0)
if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
correct_network = True
@@ -60,3 +73,4 @@ def in_correct_network(bitcoin_cli, network):
def check_txid_format(txid):
# TODO: #12-check-txid-regexp
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None

View File

@@ -1,7 +1,9 @@
import zmq
import binascii
from pisa.logger import Logger
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
# ToDo: #7-add-async-back-to-zmq
class ZMQHandler:
""" Adapted from https://github.com/bitcoin/bitcoin/blob/master/contrib/zmq/zmq_sub.py"""
@@ -11,10 +13,11 @@ class ZMQHandler:
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
self.parent = parent
self.logger = Logger("ZMQHandler-{}".format(parent))
self.terminate = False
def handle(self, block_queue, debug, logging):
def handle(self, block_queue):
while not self.terminate:
msg = self.zmqSubSocket.recv_multipart()
@@ -27,5 +30,4 @@ class ZMQHandler:
block_hash = binascii.hexlify(body).decode('UTF-8')
block_queue.put(block_hash)
if debug:
logging.info("[ZMQHandler-{}] new block received via ZMQ".format(self.parent, block_hash))
self.logger.info("New block received via ZMQ", block_hash=block_hash)

View File

@@ -1,12 +1,16 @@
from binascii import hexlify, unhexlify
from uuid import uuid4
from queue import Queue
from threading import Thread
from pisa.logger import Logger
from pisa.cleaner import Cleaner
from pisa.conf import EXPIRY_DELTA
from pisa.responder import Responder
from pisa.zmq_subscriber import ZMQHandler
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from hashlib import sha256
from uuid import uuid4
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS, EXPIRY_DELTA
from pisa.conf import MAX_APPOINTMENTS
from pisa.block_processor import BlockProcessor
from pisa.utils.zmq_subscriber import ZMQHandler
logger = Logger("Watcher")
class Watcher:
@@ -19,7 +23,7 @@ class Watcher:
self.zmq_subscriber = None
self.responder = Responder()
def add_appointment(self, appointment, debug, logging):
def add_appointment(self, appointment):
# Rationale:
# The Watcher will analyze every received block looking for appointment matches. If there is no work
# to do the watcher can go sleep (if appointments = {} then asleep = True) otherwise for every received block
@@ -45,135 +49,71 @@ class Watcher:
if self.asleep:
self.asleep = False
self.block_queue = Queue()
zmq_thread = Thread(target=self.do_subscribe, args=[self.block_queue, debug, logging])
watcher = Thread(target=self.do_watch, args=[debug, logging])
zmq_thread = Thread(target=self.do_subscribe)
watcher = Thread(target=self.do_watch)
zmq_thread.start()
watcher.start()
if debug:
logging.info("[Watcher] waking up!")
logger.info("Waking up")
appointment_added = True
if debug:
logging.info('[Watcher] new appointment accepted (locator = {})'.format(appointment.locator))
logger.info("New appointment accepted.", locator=appointment.locator)
else:
appointment_added = False
if debug:
logging.info('[Watcher] maximum appointments reached, appointment rejected (locator = {})'
.format(appointment.locator))
logger.info("Maximum appointments reached, appointment rejected.", locator=appointment.locator)
return appointment_added
def do_subscribe(self, block_queue, debug, logging):
self.zmq_subscriber = ZMQHandler(parent='Watcher')
self.zmq_subscriber.handle(block_queue, debug, logging)
def do_watch(self, debug, logging):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
def do_subscribe(self):
self.zmq_subscriber = ZMQHandler(parent="Watcher")
self.zmq_subscriber.handle(self.block_queue)
def do_watch(self):
while len(self.appointments) > 0:
block_hash = self.block_queue.get()
logger.info("New block received", block_hash=block_hash)
try:
block = bitcoin_cli.getblock(block_hash)
block = BlockProcessor.get_block(block_hash)
if block is not None:
txids = block.get('tx')
if debug:
logging.info("[Watcher] new block received {}".format(block_hash))
logging.info("[Watcher] list of transactions: {}".format(txids))
logger.info("List of transactions.", txids=txids)
self.delete_expired_appointment(block, debug, logging)
expired_appointments = [uuid for uuid, appointment in self.appointments.items()
if block["height"] > appointment.end_time + EXPIRY_DELTA]
potential_locators = {sha256(unhexlify(txid)).hexdigest(): txid for txid in txids}
Cleaner.delete_expired_appointment(expired_appointments, self.appointments, self.locator_uuid_map)
# Check is any of the tx_ids in the received block is an actual match
# Get the locators that are both in the map and in the potential locators dict.
intersection = set(self.locator_uuid_map.keys()).intersection(potential_locators.keys())
potential_matches = {locator: potential_locators[locator] for locator in intersection}
if debug:
if len(potential_matches) > 0:
logging.info("[Watcher] list of potential matches: {}".format(potential_matches))
else:
logging.info("[Watcher] no potential matches found")
matches = self.check_potential_matches(potential_matches, bitcoin_cli, debug, logging)
potential_matches = BlockProcessor.get_potential_matches(txids, self.locator_uuid_map)
matches = BlockProcessor.get_matches(potential_matches, self.locator_uuid_map, self.appointments)
for locator, uuid, dispute_txid, justice_txid, justice_rawtx in matches:
if debug:
logging.info("[Watcher] notifying responder about {} and deleting appointment {} (uuid: {})"
.format(justice_txid, locator, uuid))
# Errors decrypting the Blob will result in a None justice_txid
if justice_txid is not None:
logger.info("Notifying responder and deleting appointment.", justice_txid=justice_txid,
locator=locator, uuid=uuid)
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time, debug, logging)
self.responder.add_response(uuid, dispute_txid, justice_txid, justice_rawtx,
self.appointments[uuid].end_time)
# Delete the appointment
self.appointments.pop(uuid)
# If there was only one appointment that matches the locator we can delete the whole list
if len(self.locator_uuid_map[locator]) == 1:
# ToDo: #9-add-data-persistency
# ToDo: #9-add-data-persistence
self.locator_uuid_map.pop(locator)
else:
# Otherwise we just delete the appointment that matches locator:appointment_pos
# ToDo: #9-add-data-persistency
# ToDo: #9-add-data-persistence
self.locator_uuid_map[locator].remove(uuid)
except JSONRPCException as e:
if debug:
logging.error("[Watcher] couldn't get block from bitcoind. Error code {}".format(e))
# Go back to sleep if there are no more appointments
self.asleep = True
self.zmq_subscriber.terminate = True
if debug:
logging.error("[Watcher] no more pending appointments, going back to sleep")
def delete_expired_appointment(self, block, debug, logging):
to_delete = [uuid for uuid, appointment in self.appointments.items() if block["height"] > appointment.end_time
+ EXPIRY_DELTA]
for uuid in to_delete:
# ToDo: #9-add-data-persistency
locator = self.appointments[uuid].locator
self.appointments.pop(uuid)
if len(self.locator_uuid_map[locator]) == 1:
self.locator_uuid_map.pop(locator)
else:
self.locator_uuid_map[locator].remove(uuid)
if debug:
logging.info("[Watcher] end time reached with no match! Deleting appointment {} (uuid: {})"
.format(locator, uuid))
def check_potential_matches(self, potential_matches, bitcoin_cli, debug, logging):
matches = []
for locator, dispute_txid in potential_matches.items():
for uuid in self.locator_uuid_map[locator]:
try:
# ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = self.appointments[uuid].encrypted_blob.decrypt(unhexlify(dispute_txid), debug,
logging)
justice_rawtx = hexlify(justice_rawtx).decode()
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid')
matches.append((locator, uuid, dispute_txid, justice_txid, justice_rawtx))
if debug:
logging.info("[Watcher] match found for locator {} (uuid: {}): {}".format(locator, uuid,
justice_txid))
except JSONRPCException as e:
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
# for the POC
if debug:
logging.error("[Watcher] can't build transaction from decoded data. Error code {}".format(e))
return matches
logger.error("No more pending appointments, going back to sleep")

View File

@@ -0,0 +1,146 @@
import re
import pytest
from time import sleep
from threading import Thread
from test.simulator.transaction import TX
from test.unit.conftest import get_random_value_hex
from test.simulator.bitcoind_sim import run_simulator
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, get_random_value_hex(32)]
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
@pytest.fixture(scope='module')
def run_bitcoind():
bitcoind_thread = Thread(target=run_simulator, kwargs={"mode": "event"})
bitcoind_thread.daemon = True
bitcoind_thread.start()
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1)
@pytest.fixture(scope="module")
def genesis_block_hash(run_bitcoind):
return bitcoin_cli.getblockhash(0)
def check_hash_format(txid):
# TODO: #12-check-txid-regexp
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None
def test_help(run_bitcoind):
# Help should always return 0
assert(bitcoin_cli.help() == 0)
# FIXME: Better assert for the exceptions would be nice (check the returned errno is the expected one)
def test_getblockhash(genesis_block_hash):
# First block
assert(check_hash_format(genesis_block_hash))
# Check that the values are within range and of the proper format (all should fail)
for v in MIXED_VALUES:
try:
bitcoin_cli.getblockhash(v)
assert False
except JSONRPCException as e:
assert True
def test_get_block(genesis_block_hash):
# getblock should return a list of transactions and the height
block = bitcoin_cli.getblock(genesis_block_hash)
assert(isinstance(block.get('tx'), list))
assert(len(block.get('tx')) != 0)
assert(isinstance(block.get('height'), int))
# It should fail for wrong data formats and random ids
for v in MIXED_VALUES:
try:
bitcoin_cli.getblock(v)
assert False
except JSONRPCException as e:
assert True
def test_decoderawtransaction(genesis_block_hash):
# decoderawtransaction should only return if the given transaction matches a txid format
block = bitcoin_cli.getblock(genesis_block_hash)
coinbase_txid = block.get('tx')[0]
coinbase_tx = bitcoin_cli.getrawtransaction(coinbase_txid).get("hex")
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_hash_format(tx.get('txid')))
# Therefore should also work for a random transaction hex in our simulation
random_tx = TX.create_dummy_transaction()
tx = bitcoin_cli.decoderawtransaction(random_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_hash_format(tx.get('txid')))
# But it should fail for not proper formatted one
for v in MIXED_VALUES:
try:
bitcoin_cli.decoderawtransaction(v)
assert False
except JSONRPCException as e:
assert True
def test_sendrawtransaction(genesis_block_hash):
# sendrawtransaction should only allow txids that the simulator has not mined yet
bitcoin_cli.sendrawtransaction(TX.create_dummy_transaction())
# Any data not matching the txid format or that matches with an already mined transaction should fail
try:
genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
bitcoin_cli.sendrawtransaction(genesis_tx)
assert False
except JSONRPCException as e:
assert True
for v in MIXED_VALUES:
try:
bitcoin_cli.sendrawtransaction(v)
assert False
except JSONRPCException as e:
assert True
def test_getrawtransaction(genesis_block_hash):
# getrawtransaction should work for existing transactions, and fail for non-existing ones
genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
tx = bitcoin_cli.getrawtransaction(genesis_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('confirmations'), int))
for v in MIXED_VALUES:
try:
bitcoin_cli.getrawtransaction(v)
assert False
except JSONRPCException as e:
assert True
def test_getblockcount():
# getblockcount should always return a positive integer
bc = bitcoin_cli.getblockcount()
assert (isinstance(bc, int))
assert (bc >= 0)

View File

@@ -1,20 +1,37 @@
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
from flask import Flask, request, Response, abort
from tests.simulator.zmq_publisher import ZMQPublisher
from threading import Thread
from pisa.rpc_errors import *
from pisa.tools import check_txid_format
import logging
import binascii
import json
import os
import time
import json
import logging
import binascii
from threading import Thread, Event
from flask import Flask, request, Response, abort
from pisa.rpc_errors import *
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.simulator.zmq_publisher import ZMQPublisher
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
app = Flask(__name__)
HOST = 'localhost'
PORT = '18443'
blockchain = []
blocks = {}
mined_transactions = {}
mempool = []
mine_new_block = Event()
@app.route('/generate', methods=['POST'])
def generate():
global mine_new_block
mine_new_block.set()
return Response(status=200, mimetype='application/json')
@app.route('/', methods=['POST'])
def process_request():
@@ -46,6 +63,8 @@ def process_request():
getblockhash: a block hash is only queried by pisad on bootstrapping to check the network bitcoind is
running on.
getbestblockhash: returns the hash of the block in the tip of the chain
help: help is only used as a sample command to test if bitcoind is running when bootstrapping
pisad. It will return a 200/OK with no data.
"""
@@ -58,10 +77,12 @@ def process_request():
no_param_err = {"code": RPC_MISC_ERROR, "message": "JSON value is not a {} as expected"}
if method == "decoderawtransaction":
txid = get_param(request_data)
rawtx = get_param(request_data)
if isinstance(txid, str):
if check_txid_format(txid):
if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
txid = sha256d(rawtx)
if TX.deserialize(rawtx) is not None:
response["result"] = {"txid": txid}
else:
@@ -73,12 +94,15 @@ def process_request():
elif method == "sendrawtransaction":
# TODO: A way of rejecting transactions should be added to test edge cases.
txid = get_param(request_data)
rawtx = get_param(request_data)
if isinstance(txid, str):
if check_txid_format(txid):
if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
txid = sha256d(rawtx)
if TX.deserialize(rawtx) is not None:
if txid not in list(mined_transactions.keys()):
mempool.append(txid)
mempool.append(rawtx)
response["result"] = {"txid": txid}
else:
response["error"] = {"code": RPC_VERIFY_ALREADY_IN_CHAIN,
@@ -95,10 +119,10 @@ def process_request():
txid = get_param(request_data)
if isinstance(txid, str):
block = blocks.get(mined_transactions.get(txid))
if block:
response["result"] = {"confirmations": len(blockchain) - block.get('height')}
if txid in mined_transactions:
block = blocks.get(mined_transactions[txid]["block"])
rawtx = mined_transactions[txid].get('tx')
response["result"] = {"hex": rawtx, "confirmations": len(blockchain) - block.get('height')}
elif txid in mempool:
response["result"] = {"confirmations": 0}
@@ -144,6 +168,9 @@ def process_request():
response["error"] = no_param_err
response["error"]["message"] = response["error"]["message"].format("integer")
elif method == "getbestblockhash":
response["result"] = blockchain[-1]
elif method == "help":
pass
@@ -157,6 +184,7 @@ def get_param(request_data):
param = None
params = request_data.get("params")
if isinstance(params, list) and len(params) > 0:
param = params[0]
@@ -167,45 +195,57 @@ def load_data():
pass
def simulate_mining():
global mempool, mined_transactions, blocks, blockchain
def simulate_mining(mode, time_between_blocks):
global mempool, mined_transactions, blocks, blockchain, mine_new_block
prev_block_hash = None
while True:
block_hash = binascii.hexlify(os.urandom(32)).decode('utf-8')
coinbase_tx_hash = binascii.hexlify(os.urandom(32)).decode('utf-8')
txs_to_mine = [coinbase_tx_hash]
mining_simulator = ZMQPublisher(topic=b'hashblock', feed_protocol=FEED_PROTOCOL, feed_addr=FEED_ADDR,
feed_port=FEED_PORT)
# Set the mining event to initialize the blockchain with a block
mine_new_block.set()
while mine_new_block.wait():
block_hash = os.urandom(32).hex()
coinbase_tx = TX.create_dummy_transaction()
coinbase_tx_hash = sha256d(coinbase_tx)
txs_to_mine = dict({coinbase_tx_hash: coinbase_tx})
if len(mempool) != 0:
# We'll mine up to 100 txs per block
txs_to_mine += mempool[:99]
for rawtx in mempool[:99]:
txid = sha256d(rawtx)
txs_to_mine[txid] = rawtx
mempool = mempool[99:]
# Keep track of the mined transaction (to respond to getrawtransaction)
for tx in txs_to_mine:
mined_transactions[tx] = block_hash
for txid, tx in txs_to_mine.items():
mined_transactions[txid] = {"tx": tx, "block": block_hash}
blocks[block_hash] = {"tx": list(txs_to_mine.keys()), "height": len(blockchain),
"previousblockhash": prev_block_hash}
blocks[block_hash] = {"tx": txs_to_mine, "height": len(blockchain), "previousblockhash": prev_block_hash}
mining_simulator.publish_data(binascii.unhexlify(block_hash))
blockchain.append(block_hash)
prev_block_hash = block_hash
print("New block mined: {}".format(block_hash))
print("\tTransactions: {}".format(txs_to_mine))
print("\tTransactions: {}".format(list(txs_to_mine.keys())))
time.sleep(10)
if mode == 'time':
time.sleep(time_between_blocks)
else:
mine_new_block.clear()
if __name__ == '__main__':
mining_simulator = ZMQPublisher(topic=b'hashblock', feed_protocol=FEED_PROTOCOL, feed_addr=FEED_ADDR,
feed_port=FEED_PORT)
def run_simulator(mode='time', time_between_blocks=5):
if mode not in ["time", 'event']:
raise ValueError("Node must be time or event")
mempool = []
mined_transactions = {}
blocks = {}
blockchain = []
mining_thread = Thread(target=simulate_mining)
mining_thread = Thread(target=simulate_mining, args=[mode, time_between_blocks])
mining_thread.start()
# Setting Flask log to ERROR only so it does not mess with out logging

View File

@@ -0,0 +1,150 @@
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
from os import urandom
from test.simulator.utils import *
class TX:
""" Defines a class TX (transaction) that holds all the modifiable fields of a Bitcoin transaction, such as
version, number of inputs, reference to previous transactions, input and output scripts, value, etc.
"""
def __init__(self):
self.version = None
self.inputs = None
self.outputs = None
self.nLockTime = None
self.prev_tx_id = []
self.prev_out_index = []
self.scriptSig = []
self.scriptSig_len = []
self.nSequence = []
self.value = []
self.scriptPubKey = []
self.scriptPubKey_len = []
self.offset = 0
self.hex = ""
@classmethod
def deserialize(cls, hex_tx):
""" Builds a transaction object from the hexadecimal serialization format of a transaction that
could be obtained, for example, from a blockexplorer.
:param hex_tx: Hexadecimal serialized transaction.
:type hex_tx: hex str
:return: The transaction build using the provided hex serialized transaction.
:rtype: TX
"""
tx = cls()
tx.hex = hex_tx
try:
tx.version = int(change_endianness(parse_element(tx, 4)), 16)
# INPUTS
tx.inputs = int(parse_varint(tx), 16)
for i in range(tx.inputs):
tx.prev_tx_id.append(change_endianness(parse_element(tx, 32)))
tx.prev_out_index.append(int(change_endianness(parse_element(tx, 4)), 16))
# ScriptSig
tx.scriptSig_len.append(int(parse_varint(tx), 16))
tx.scriptSig.append(parse_element(tx, tx.scriptSig_len[i]))
tx.nSequence.append(int(parse_element(tx, 4), 16))
# OUTPUTS
tx.outputs = int(parse_varint(tx), 16)
for i in range(tx.outputs):
tx.value.append(int(change_endianness(parse_element(tx, 8)), 16))
# ScriptPubKey
tx.scriptPubKey_len.append(int(parse_varint(tx), 16))
tx.scriptPubKey.append(parse_element(tx, tx.scriptPubKey_len[i]))
tx.nLockTime = int(parse_element(tx, 4), 16)
if tx.offset != len(tx.hex):
# There is some error in the serialized transaction passed as input. Transaction can't be built
tx = None
else:
tx.offset = 0
except ValueError:
# If a parsing error occurs, the deserialization stops and None is returned
tx = None
return tx
def serialize(self, rtype=hex):
""" Serialize all the transaction fields arranged in the proper order, resulting in a hexadecimal string
ready to be broadcast to the network.
:param self: self
:type self: TX
:param rtype: Whether the serialized transaction is returned as a hex str or a byte array.
:type rtype: hex or bool
:return: Serialized transaction representation (hexadecimal or bin depending on rtype parameter).
:rtype: hex str / bin
"""
if rtype not in [hex, bin]:
raise Exception("Invalid return type (rtype). It should be either hex or bin.")
serialized_tx = change_endianness(int2bytes(self.version, 4)) # 4-byte version number (LE).
# INPUTS
serialized_tx += encode_varint(self.inputs) # Varint number of inputs.
for i in range(self.inputs):
serialized_tx += change_endianness(self.prev_tx_id[i]) # 32-byte hash of the previous transaction (LE).
serialized_tx += change_endianness(int2bytes(self.prev_out_index[i], 4)) # 4-byte output index (LE)
serialized_tx += encode_varint(len(self.scriptSig[i]) // 2) # Varint input script length.
# ScriptSig
serialized_tx += self.scriptSig[i] # Input script.
serialized_tx += int2bytes(self.nSequence[i], 4) # 4-byte sequence number.
# OUTPUTS
serialized_tx += encode_varint(self.outputs) # Varint number of outputs.
if self.outputs != 0:
for i in range(self.outputs):
serialized_tx += change_endianness(int2bytes(self.value[i], 8)) # 8-byte field Satoshi value (LE)
# ScriptPubKey
serialized_tx += encode_varint(len(self.scriptPubKey[i]) // 2) # Varint Output script length.
serialized_tx += self.scriptPubKey[i] # Output script.
serialized_tx += int2bytes(self.nLockTime, 4) # 4-byte lock time field
# If return type has been set to binary, the serialized transaction is converted.
if rtype is bin:
serialized_tx = unhexlify(serialized_tx)
return serialized_tx
@staticmethod
def create_dummy_transaction(prev_tx_id=None, prev_out_index=None):
tx = TX()
if prev_tx_id is None:
prev_tx_id = urandom(32).hex()
if prev_out_index is None:
prev_out_index = 0
tx.version = 1
tx.inputs = 1
tx.outputs = 1
tx.prev_tx_id = [prev_tx_id]
tx.prev_out_index = [prev_out_index]
tx.nLockTime = 0
tx.scriptSig = [
'47304402204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860'
'a4acdd12909d831cc56cbbac4622082221a8768d1d0901']
tx.scriptSig_len = [77]
tx.nSequence = [4294967295]
tx.value = [5000000000]
tx.scriptPubKey = [
'4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c'
'1b7303b8a0626f1baded5c72a704f7e6cd84cac']
tx.scriptPubKey_len = [67]
return tx.serialize()

128
test/simulator/utils.py Normal file
View File

@@ -0,0 +1,128 @@
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
from hashlib import sha256
from binascii import unhexlify, hexlify
def change_endianness(x):
""" Changes the endianness (from BE to LE and vice versa) of a given value.
:param x: Given value which endianness will be changed.
:type x: hex str
:return: The opposite endianness representation of the given value.
:rtype: hex str
"""
# If there is an odd number of elements, we make it even by adding a 0
if (len(x) % 2) == 1:
x += "0"
y = unhexlify(x)
z = y[::-1]
return hexlify(z).decode('utf-8')
def parse_varint(tx):
""" Parses a given transaction for extracting an encoded varint element.
:param tx: Transaction where the element will be extracted.
:type tx: TX
:return: The b-bytes representation of the given value (a) in hex format.
:rtype: hex str
"""
# First of all, the offset of the hex transaction if moved to the proper position (i.e where the varint should be
# located) and the length and format of the data to be analyzed is checked.
data = tx.hex[tx.offset:]
assert (len(data) > 0)
size = int(data[:2], 16)
assert (size <= 255)
# Then, the integer is encoded as a varint using the proper prefix, if needed.
if size <= 252: # No prefix
storage_length = 1
elif size == 253: # 0xFD
storage_length = 3
elif size == 254: # 0xFE
storage_length = 5
elif size == 255: # 0xFF
storage_length = 9
else:
raise Exception("Wrong input data size")
# Finally, the storage length is used to extract the proper number of bytes from the transaction hex and the
# transaction offset is updated.
varint = data[:storage_length * 2]
tx.offset += storage_length * 2
return varint
def parse_element(tx, size):
""" Parses a given transaction to extract an element of a given size.
:param tx: Transaction where the element will be extracted.
:type tx: TX
:param size: Size of the parameter to be extracted.
:type size: int
:return: The extracted element.
:rtype: hex str
"""
element = tx.hex[tx.offset:tx.offset + size * 2]
tx.offset += size * 2
return element
def encode_varint(value):
""" Encodes a given integer value to a varint. It only used the four varint representation cases used by bitcoin:
1-byte, 2-byte, 4-byte or 8-byte integers.
:param value: The integer value that will be encoded into varint.
:type value: int
:return: The varint representation of the given integer value.
:rtype: str
"""
# The value is checked in order to choose the size of its final representation.
# 0xFD(253), 0xFE(254) and 0xFF(255) are special cases, since are the prefixes defined for 2-byte, 4-byte
# and 8-byte long values respectively.
if value < pow(2, 8) - 3:
size = 1
varint = int2bytes(value, size) # No prefix
else:
if value < pow(2, 16):
size = 2
prefix = 253 # 0xFD
elif value < pow(2, 32):
size = 4
prefix = 254 # 0xFE
elif value < pow(2, 64):
size = 8
prefix = 255 # 0xFF
else:
raise Exception("Wrong input data size")
varint = format(prefix, 'x') + change_endianness(int2bytes(value, size))
return varint
def int2bytes(a, b):
""" Converts a given integer value (a) its b-byte representation, in hex format.
:param a: Value to be converted.
:type a: int
:param b: Byte size to be filled.
:type b: int
:return: The b-bytes representation of the given value (a) in hex format.
:rtype: hex str
"""
m = pow(2, 8*b) - 1
if a > m:
raise Exception(str(a) + " is too big to be represented with " + str(b) + " bytes. Maximum value is "
+ str(m) + ".")
return ('%0' + str(2 * b) + 'x') % a
def sha256d(hex_data):
data = unhexlify(hex_data)
double_sha256 = sha256(sha256(data).digest()).hexdigest()
return change_endianness(double_sha256)

0
test/unit/__init__.py Normal file
View File

51
test/unit/conftest.py Normal file
View File

@@ -0,0 +1,51 @@
import pytest
import random
import requests
from time import sleep
from threading import Thread
from pisa.api import start_api
from test.simulator.bitcoind_sim import run_simulator, HOST, PORT
@pytest.fixture(scope='session')
def run_bitcoind():
bitcoind_thread = Thread(target=run_simulator, kwargs={"mode": "event"})
bitcoind_thread.daemon = True
bitcoind_thread.start()
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1)
@pytest.fixture(scope='session')
def run_api():
api_thread = Thread(target=start_api)
api_thread.daemon = True
api_thread.start()
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1)
@pytest.fixture(scope='session', autouse=True)
def prng_seed():
random.seed(0)
def get_random_value_hex(nbytes):
pseudo_random_value = random.getrandbits(8*nbytes)
prv_hex = '{:x}'.format(pseudo_random_value)
return prv_hex.zfill(2*nbytes)
def generate_block():
requests.post(url="http://{}:{}/generate".format(HOST, PORT), timeout=5)
sleep(0.5)
def generate_blocks(n):
for _ in range(n):
generate_block()

172
test/unit/test_api.py Normal file
View File

@@ -0,0 +1,172 @@
import json
import pytest
import requests
from hashlib import sha256
from binascii import unhexlify
from apps.cli.blob import Blob
from pisa import HOST, PORT, logging
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from pisa.utils.auth_proxy import AuthServiceProxy
from test.unit.conftest import generate_block, get_random_value_hex
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
logging.getLogger().disabled = True
PISA_API = "http://{}:{}".format(HOST, PORT)
MULTIPLE_APPOINTMENTS = 10
appointments = []
locator_dispute_tx_map = {}
def generate_dummy_appointment():
r = requests.get(url=PISA_API + '/get_block_count', timeout=5)
current_height = r.json().get("block_count")
dispute_tx = TX.create_dummy_transaction()
dispute_txid = sha256d(dispute_tx)
justice_tx = TX.create_dummy_transaction(dispute_txid)
dummy_appointment_data = {"tx": justice_tx, "tx_id": dispute_txid, "start_time": current_height + 5,
"end_time": current_height + 30, "dispute_delta": 20}
cipher = "AES-GCM-128"
hash_function = "SHA256"
locator = sha256(unhexlify(dispute_txid)).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")))
appointment = {"locator": locator, "start_time": dummy_appointment_data.get("start_time"),
"end_time": dummy_appointment_data.get("end_time"),
"dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function}
return appointment, dispute_tx
@pytest.fixture
def new_appointment():
appointment, dispute_tx = generate_dummy_appointment()
locator_dispute_tx_map[appointment["locator"]] = dispute_tx
return appointment
def add_appointment(appointment):
r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5)
if r.status_code == 200:
appointments.append(appointment)
return r
def test_add_appointment(run_api, run_bitcoind, new_appointment):
# Properly formatted appointment
r = add_appointment(new_appointment)
assert (r.status_code == 200)
# Incorrect appointment
new_appointment["dispute_delta"] = 0
r = add_appointment(new_appointment)
assert (r.status_code == 400)
def test_request_appointment(new_appointment):
# First we need to add an appointment
r = add_appointment(new_appointment)
assert (r.status_code == 200)
# Next we can request it
r = requests.get(url=PISA_API + "/get_appointment?locator=" + new_appointment["locator"])
assert (r.status_code == 200)
# Each locator may point to multiple appointments, check them all
received_appointments = json.loads(r.content)
# Take the status out and leave the received appointments ready to compare
appointment_status = [appointment.pop("status") for appointment in received_appointments]
# Check that the appointment is within the received appoints
assert (new_appointment in received_appointments)
# Check that all the appointments are being watched
assert (all([status == "being_watched" for status in appointment_status]))
def test_request_random_appointment():
r = requests.get(url=PISA_API + "/get_appointment?locator=" + get_random_value_hex(32))
assert (r.status_code == 200)
received_appointments = json.loads(r.content)
appointment_status = [appointment.pop("status") for appointment in received_appointments]
assert (all([status == "not_found" for status in appointment_status]))
def test_add_appointment_multiple_times(new_appointment, n=MULTIPLE_APPOINTMENTS):
# Multiple appointments with the same locator should be valid
# DISCUSS: #34-store-identical-appointments
for _ in range(n):
r = add_appointment(new_appointment)
assert (r.status_code == 200)
def test_request_multiple_appointments_same_locator(new_appointment, n=MULTIPLE_APPOINTMENTS):
for _ in range(n):
r = add_appointment(new_appointment)
assert (r.status_code == 200)
test_request_appointment(new_appointment)
def test_add_too_many_appointment(new_appointment):
for _ in range(MAX_APPOINTMENTS-len(appointments)):
r = add_appointment(new_appointment)
assert (r.status_code == 200)
r = add_appointment(new_appointment)
assert (r.status_code == 503)
def test_get_all_appointments_watcher():
r = requests.get(url=PISA_API + "/get_all_appointments")
assert (r.status_code == 200 and r.reason == 'OK')
received_appointments = json.loads(r.content)
# Make sure there all the locators re in the watcher
watcher_locators = [v["locator"] for k, v in received_appointments["watcher_appointments"].items()]
local_locators = [appointment["locator"] for appointment in appointments]
assert(set(watcher_locators) == set(local_locators))
assert(len(received_appointments["responder_jobs"]) == 0)
def test_get_all_appointments_responder():
# Trigger all disputes
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
locators = [appointment["locator"] for appointment in appointments]
for locator, dispute_tx in locator_dispute_tx_map.items():
if locator in locators:
bitcoin_cli.sendrawtransaction(dispute_tx)
# Wait a bit for them to get confirmed
generate_block()
# Get all appointments
r = requests.get(url=PISA_API + "/get_all_appointments")
received_appointments = json.loads(r.content)
# Make sure there is not pending locator in the watcher
responder_jobs = [v["locator"] for k, v in received_appointments["responder_jobs"].items()]
local_locators = [appointment["locator"] for appointment in appointments]
assert (set(responder_jobs) == set(local_locators))
assert (len(received_appointments["watcher_appointments"]) == 0)

View File

@@ -0,0 +1,47 @@
from pytest import fixture
from pisa.appointment import Appointment
from pisa.encrypted_blob import EncryptedBlob
from test.unit.conftest import get_random_value_hex
# Not much to test here, adding it for completeness
@fixture
def appointment_data():
locator = get_random_value_hex(32)
start_time = 100
end_time = 120
dispute_delta = 20
encrypted_blob_data = get_random_value_hex(100)
cipher = "AES-GCM-128"
hash_function = "SHA256"
return locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function
def test_init_appointment(appointment_data):
# The appointment has no checks whatsoever, since the inspector is the one taking care or that, and the only one
# creating appointments.
# DISCUSS: whether this makes sense by design or checks should be ported from the inspector to the appointment
# 35-appointment-checks
locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function = appointment_data
appointment = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function)
assert (locator == appointment.locator and start_time == appointment.start_time and end_time == appointment.end_time
and EncryptedBlob(encrypted_blob_data) == appointment.encrypted_blob and cipher == appointment.cipher
and dispute_delta == appointment.dispute_delta and hash_function == appointment.hash_function)
def test_to_json(appointment_data):
locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function = appointment_data
appointment = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function)
json_appointment = appointment.to_json()
assert (locator == json_appointment.get("locator") and start_time == json_appointment.get("start_time")
and end_time == json_appointment.get("end_time") and dispute_delta == json_appointment.get("dispute_delta")
and cipher == json_appointment.get("cipher") and hash_function == json_appointment.get("hash_function")
and encrypted_blob_data == json_appointment.get("encrypted_blob"))

90
test/unit/test_blob.py Normal file
View File

@@ -0,0 +1,90 @@
from binascii import unhexlify
from pisa import logging
from apps.cli.blob import Blob
from test.unit.conftest import get_random_value_hex
from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
logging.getLogger().disabled = True
def test_init_blob():
data = get_random_value_hex(64)
# Fixed (valid) hash function, try different valid ciphers
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
for cipher in SUPPORTED_CIPHERS:
cipher_cases = [cipher, cipher.lower(), cipher.capitalize()]
for case in cipher_cases:
blob = Blob(data, case, hash_function)
assert(blob.data == data and blob.cipher == case and blob.hash_function == hash_function)
# Fixed (valid) cipher, try different valid hash functions
cipher = SUPPORTED_CIPHERS[0]
for hash_function in SUPPORTED_HASH_FUNCTIONS:
hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()]
for case in hash_function_cases:
blob = Blob(data, cipher, case)
assert(blob.data == data and blob.cipher == cipher and blob.hash_function == case)
# Invalid data
data = unhexlify(get_random_value_hex(64))
cipher = SUPPORTED_CIPHERS[0]
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
try:
Blob(data, cipher, hash_function)
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
# Invalid cipher
data = get_random_value_hex(64)
cipher = "A" * 10
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
try:
Blob(data, cipher, hash_function)
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
# Invalid hash function
data = get_random_value_hex(64)
cipher = SUPPORTED_CIPHERS[0]
hash_function = "A" * 10
try:
Blob(data, cipher, hash_function)
assert False, "Able to create blob with wrong data"
except ValueError:
assert True
def test_encrypt():
# Valid data, valid key
data = get_random_value_hex(64)
blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
key = get_random_value_hex(32)
encrypted_blob = blob.encrypt(key)
# Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
invalid_key = unhexlify(get_random_value_hex(32))
try:
blob.encrypt(invalid_key)
assert False, "Able to create encrypt with invalid key"
except ValueError:
assert True
# Check that two encryptions of the same data have the same result
encrypted_blob2 = blob.encrypt(key)
assert(encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2))

View File

@@ -0,0 +1,80 @@
import pytest
import logging
from uuid import uuid4
from hashlib import sha256
from binascii import unhexlify
from pisa.block_processor import BlockProcessor
from test.unit.conftest import get_random_value_hex
logging.getLogger().disabled = True
APPOINTMENT_COUNT = 100
TEST_SET_SIZE = 200
@pytest.fixture(scope='module')
def txids():
return [get_random_value_hex(32) for _ in range(APPOINTMENT_COUNT)]
@pytest.fixture(scope='module')
def locator_uuid_map(txids):
return {sha256(unhexlify(txid)).hexdigest(): uuid4().hex for txid in txids}
@pytest.fixture
def best_block_hash():
return BlockProcessor.get_best_block_hash()
def test_get_best_block_hash(run_bitcoind, best_block_hash):
# As long as bitcoind is running (or mocked in this case) we should always a block hash
assert best_block_hash is not None and isinstance(best_block_hash, str)
def test_get_block(best_block_hash):
# Getting a block from a block hash we are aware of should return data
block = BlockProcessor.get_block(best_block_hash)
# Checking that the received block has at least the fields we need
# FIXME: We could be more strict here, but we'll need to add those restrictions to bitcoind_sim too
assert isinstance(block, dict)
assert block.get('hash') == best_block_hash and 'height' in block and 'previousblockhash' in block and 'tx' in block
def test_get_random_block():
block = BlockProcessor.get_block(get_random_value_hex(32))
assert block is None
def test_get_block_count():
block_count = BlockProcessor.get_block_count()
assert isinstance(block_count, int) and block_count >= 0
def test_potential_matches(txids, locator_uuid_map):
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
# All the txids must match
assert locator_uuid_map.keys() == potential_matches.keys()
def test_potential_matches_random(locator_uuid_map):
txids = [get_random_value_hex(32) for _ in range(len(locator_uuid_map))]
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
# None of the ids should match
assert len(potential_matches) == 0
def test_potential_matches_random_data(locator_uuid_map):
# The likelihood of finding a potential match with random data should be negligible
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
# None of the txids should match
assert len(potential_matches) == 0

78
test/unit/test_carrier.py Normal file
View File

@@ -0,0 +1,78 @@
import pytest
import logging
from pisa.carrier import Carrier
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.unit.conftest import generate_blocks
from test.unit.conftest import get_random_value_hex
from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR
logging.getLogger().disabled = True
# FIXME: This test do not fully cover the carrier since the simulator does not support every single error bitcoind may
# return for RPC_VERIFY_REJECTED and RPC_VERIFY_ERROR. Further development of the simulator / mocks or simulation
# with bitcoind is required
sent_txs = []
@pytest.fixture(scope='module')
def carrier():
return Carrier()
def test_send_transaction(run_bitcoind, carrier):
tx = TX.create_dummy_transaction()
txid = sha256d(tx)
receipt = carrier.send_transaction(tx, txid)
assert(receipt.delivered is True)
def test_send_double_spending_transaction(carrier):
# We can test what happens if the same transaction is sent twice
tx = TX.create_dummy_transaction()
txid = sha256d(tx)
receipt = carrier.send_transaction(tx, txid)
sent_txs.append(txid)
# Wait for a block to be mined
generate_blocks(2)
# Try to send it again
receipt2 = carrier.send_transaction(tx, txid)
# The carrier should report delivered True for both, but in the second case the transaction was already delivered
# (either by himself or someone else)
assert(receipt.delivered is True)
assert (receipt2.delivered is True and receipt2.confirmations >= 1
and receipt2.reason == RPC_VERIFY_ALREADY_IN_CHAIN)
def test_send_transaction_invalid_format(carrier):
# Test sending a transaction that does not fits the format
tx = TX.create_dummy_transaction()
txid = sha256d(tx)
receipt = carrier.send_transaction(txid, txid)
assert (receipt.delivered is False and receipt.reason == RPC_DESERIALIZATION_ERROR)
def test_get_transaction():
# We should be able to get back every transaction we've sent
for tx in sent_txs:
tx_info = Carrier.get_transaction(tx)
assert tx_info is not None
def test_get_non_existing_transaction():
tx_info = Carrier.get_transaction(get_random_value_hex(32))
assert tx_info is None

81
test/unit/test_cleaner.py Normal file
View File

@@ -0,0 +1,81 @@
import random
from os import urandom
from uuid import uuid4
from pisa import logging
from pisa.responder import Job
from pisa.cleaner import Cleaner
from pisa.appointment import Appointment
from test.unit.conftest import get_random_value_hex
CONFIRMATIONS = 6
ITEMS = 10
MAX_ITEMS = 100
ITERATIONS = 1000
logging.getLogger().disabled = True
def set_up_appointments(total_appointments):
appointments = dict()
locator_uuid_map = dict()
for _ in range(total_appointments):
uuid = uuid4().hex
locator = get_random_value_hex(32)
appointments[uuid] = Appointment(locator, None, None, None, None, None, None)
locator_uuid_map[locator] = [uuid]
# Each locator can have more than one uuid assigned to it. Do a coin toss to add multiple ones
while random.randint(0, 1):
uuid = uuid4().hex
appointments[uuid] = Appointment(locator, None, None, None, None, None, None)
locator_uuid_map[locator].append(uuid)
return appointments, locator_uuid_map
def set_up_jobs(total_jobs):
jobs = dict()
tx_job_map = dict()
for _ in range(total_jobs):
uuid = uuid4().hex
txid = get_random_value_hex(32)
# Assign both justice_txid and dispute_txid the same id (it shouldn't matter)
jobs[uuid] = Job(txid, txid, None, None)
tx_job_map[txid] = [uuid]
# Each justice_txid can have more than one uuid assigned to it. Do a coin toss to add multiple ones
while random.randint(0, 1):
uuid = uuid4().hex
jobs[uuid] = Job(txid, txid, None, None)
tx_job_map[txid].append(uuid)
return jobs, tx_job_map
def test_delete_expired_appointment():
for _ in range(ITERATIONS):
appointments, locator_uuid_map = set_up_appointments(MAX_ITEMS)
expired_appointments = random.sample(list(appointments.keys()), k=ITEMS)
Cleaner.delete_expired_appointment(expired_appointments, appointments, locator_uuid_map)
assert not set(expired_appointments).issubset(appointments.keys())
def test_delete_completed_jobs():
for _ in range(ITERATIONS):
jobs, tx_job_map = set_up_jobs(MAX_ITEMS)
selected_jobs = random.sample(list(jobs.keys()), k=ITEMS)
completed_jobs = [(job, 6) for job in selected_jobs]
Cleaner.delete_completed_jobs(jobs, tx_job_map, completed_jobs, 0)
assert not set(completed_jobs).issubset(jobs.keys())

View File

@@ -0,0 +1,31 @@
from pisa import logging
from pisa.encrypted_blob import EncryptedBlob
from test.unit.conftest import get_random_value_hex
logging.getLogger().disabled = True
def test_init_encrypted_blob():
# No much to test here, basically that the object is properly created
data = get_random_value_hex(64)
assert (EncryptedBlob(data).data == data)
def test_decrypt():
# TODO: The decryption tests are assuming the cipher is AES-GCM-128, since EncryptedBlob assumes the same. Fix this.
key = get_random_value_hex(32)
encrypted_data = get_random_value_hex(64)
encrypted_blob = EncryptedBlob(encrypted_data)
# Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function
# returns None
hex_tx = encrypted_blob.decrypt(key)
assert hex_tx is None
# Valid data should run with no InvalidTag and verify
data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777"
key = 'b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659'
encrypted_data = "092e93d4a34aac4367075506f2c050ddfa1a201ee6669b65058572904dcea642aeb01ea4b57293618e8c46809dfadadc"
encrypted_blob = EncryptedBlob(encrypted_data)
assert(encrypted_blob.decrypt(key) == data)

219
test/unit/test_inspector.py Normal file
View File

@@ -0,0 +1,219 @@
from binascii import unhexlify
from pisa import logging
from pisa.errors import *
from pisa.inspector import Inspector
from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor
from test.unit.conftest import get_random_value_hex
from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
inspector = Inspector()
APPOINTMENT_OK = (0, None)
NO_HEX_STRINGS = ["R" * 64, get_random_value_hex(31) + "PP", "$"*64, " "*64]
WRONG_TYPES = [[], '', get_random_value_hex(32), 3.2, 2.0, (), object, {}, " "*32, object()]
WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()]
logging.getLogger().disabled = True
def test_check_locator():
# Right appointment type, size and format
locator = get_random_value_hex(32)
assert(Inspector.check_locator(locator) == APPOINTMENT_OK)
# Wrong size (too big)
locator = get_random_value_hex(33)
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
# Wrong size (too small)
locator = get_random_value_hex(31)
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
# Empty
locator = None
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_EMPTY_FIELD)
# Wrong type (several types tested, it should do for anything that is not a string)
locators = [[], -1, 3.2, 0, 4, (), object, {}, object()]
for locator in locators:
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
# Wrong format (no hex)
locators = NO_HEX_STRINGS
for locator in locators:
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
def test_check_start_time():
# Time is defined in block height
current_time = 100
# Right format and right value (start time in the future)
start_time = 101
assert (Inspector.check_start_time(start_time, current_time) == APPOINTMENT_OK)
# Start time too small (either same block or block in the past)
start_times = [100, 99, 98, -1]
for start_time in start_times:
assert (Inspector.check_start_time(start_time, current_time)[0] == APPOINTMENT_FIELD_TOO_SMALL)
# Empty field
start_time = None
assert (Inspector.check_start_time(start_time, current_time)[0] == APPOINTMENT_EMPTY_FIELD)
# Wrong data type
start_times = WRONG_TYPES
for start_time in start_times:
assert (Inspector.check_start_time(start_time, current_time)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
def test_check_end_time():
# Time is defined in block height
current_time = 100
start_time = 120
# Right format and right value (start time before end and end in the future)
end_time = 121
assert (Inspector.check_end_time(end_time, start_time, current_time) == APPOINTMENT_OK)
# End time too small (start time after end time)
end_times = [120, 119, 118, -1]
for end_time in end_times:
assert (Inspector.check_end_time(end_time, start_time, current_time)[0] == APPOINTMENT_FIELD_TOO_SMALL)
# End time too small (either same height as current block or in the past)
current_time = 130
end_times = [130, 129, 128, -1]
for end_time in end_times:
assert (Inspector.check_end_time(end_time, start_time, current_time)[0] == APPOINTMENT_FIELD_TOO_SMALL)
# Empty field
end_time = None
assert (Inspector.check_end_time(end_time, start_time, current_time)[0] == APPOINTMENT_EMPTY_FIELD)
# Wrong data type
end_times = WRONG_TYPES
for end_time in end_times:
assert (Inspector.check_end_time(end_time, start_time, current_time)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
def test_check_delta():
# Right value, right format
deltas = [MIN_DISPUTE_DELTA, MIN_DISPUTE_DELTA+1, MIN_DISPUTE_DELTA+1000]
for delta in deltas:
assert (Inspector.check_delta(delta) == APPOINTMENT_OK)
# Delta too small
deltas = [MIN_DISPUTE_DELTA-1, MIN_DISPUTE_DELTA-2, 0, -1, -1000]
for delta in deltas:
assert (Inspector.check_delta(delta)[0] == APPOINTMENT_FIELD_TOO_SMALL)
# Empty field
delta = None
assert(Inspector.check_delta(delta)[0] == APPOINTMENT_EMPTY_FIELD)
# Wrong data type
deltas = WRONG_TYPES
for delta in deltas:
assert (Inspector.check_delta(delta)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
def test_check_blob():
# Right format and length
encrypted_blob = get_random_value_hex(120)
assert(Inspector.check_blob(encrypted_blob) == APPOINTMENT_OK)
# # Wrong content
# # FIXME: There is not proper defined format for this yet. It should be restricted by size at least, and check it
# # is multiple of the block size defined by the encryption function.
# Wrong type
encrypted_blobs = WRONG_TYPES_NO_STR
for encrypted_blob in encrypted_blobs:
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
# Empty field
encrypted_blob = None
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_EMPTY_FIELD)
# Wrong format (no hex)
encrypted_blobs = NO_HEX_STRINGS
for encrypted_blob in encrypted_blobs:
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
def test_check_cipher():
# Right format and content (any case combination should be accepted)
for cipher in SUPPORTED_CIPHERS:
cipher_cases = [cipher, cipher.lower(), cipher.capitalize()]
for case in cipher_cases:
assert(Inspector.check_cipher(case) == APPOINTMENT_OK)
# Wrong type
ciphers = WRONG_TYPES_NO_STR
for cipher in ciphers:
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
# Wrong value
ciphers = NO_HEX_STRINGS
for cipher in ciphers:
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED)
# Empty field
cipher = None
assert (Inspector.check_cipher(cipher)[0] == APPOINTMENT_EMPTY_FIELD)
def test_check_hash_function():
# Right format and content (any case combination should be accepted)
for hash_function in SUPPORTED_HASH_FUNCTIONS:
hash_function_cases = [hash_function, hash_function.lower(), hash_function.capitalize()]
for case in hash_function_cases:
assert (Inspector.check_hash_function(case) == APPOINTMENT_OK)
# Wrong type
hash_functions = WRONG_TYPES_NO_STR
for hash_function in hash_functions:
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
# Wrong value
hash_functions = NO_HEX_STRINGS
for hash_function in hash_functions:
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED)
# Empty field
hash_function = None
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_EMPTY_FIELD)
def test_inspect(run_bitcoind):
# At this point every single check function has been already tested, let's test inspect with an invalid and a valid
# appointments.
# Invalid appointment, every field is empty
appointment_data = dict()
appointment = inspector.inspect(appointment_data)
assert (type(appointment) == tuple and appointment[0] != 0)
# Valid appointment
locator = get_random_value_hex(32)
start_time = BlockProcessor.get_block_count() + 5
end_time = start_time + 20
dispute_delta = MIN_DISPUTE_DELTA
encrypted_blob = get_random_value_hex(64)
cipher = SUPPORTED_CIPHERS[0]
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
appointment_data = {"locator": locator, "start_time": start_time, "end_time": end_time,
"dispute_delta": dispute_delta, "encrypted_blob": encrypted_blob, "cipher": cipher,
"hash_function": hash_function}
appointment = inspector.inspect(appointment_data)
assert(type(appointment) == Appointment and appointment.locator == locator and appointment.start_time == start_time
and appointment.end_time == end_time and appointment.dispute_delta == dispute_delta and
appointment.encrypted_blob.data == encrypted_blob and appointment.cipher == cipher and
appointment.hash_function == hash_function)

333
test/unit/test_responder.py Normal file
View File

@@ -0,0 +1,333 @@
import json
import pytest
from uuid import uuid4
from threading import Thread
from queue import Queue, Empty
from pisa.tools import check_txid_format
from test.simulator.utils import sha256d
from pisa.responder import Responder, Job
from test.simulator.bitcoind_sim import TX
from pisa.utils.auth_proxy import AuthServiceProxy
from test.unit.conftest import get_random_value_hex
from test.unit.conftest import generate_block, generate_blocks
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
@pytest.fixture(scope="module")
def responder():
return Responder()
def create_dummy_job_data(random_txid=False, justice_rawtx=None):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
# The following transaction data corresponds to a valid transaction. For some test it may be interesting to have
# some valid data, but for others we may need multiple different justice_txids.
dispute_txid = "0437cd7f8525ceed2324359c2d0ba26006d92d856a9c20fa0241106ee5a597c9"
justice_txid = "f4184fc596403b9d638783cf57adfe4c75c605f6356fbc91338530e9831e9e16"
if justice_rawtx is None:
justice_rawtx = "0100000001c997a5e56e104102fa209c6a852dd90660a20b2d9c352423edce25857fcd3704000000004847304402" \
"204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860a4" \
"acdd12909d831cc56cbbac4622082221a8768d1d0901ffffffff0200ca9a3b00000000434104ae1a62fe09c5f51b" \
"13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c1b7303b8a0626f1ba" \
"ded5c72a704f7e6cd84cac00286bee0000000043410411db93e1dcdb8a016b49840f8c53bc1eb68a382e97b1482e" \
"cad7b148a6909a5cb2e0eaddfb84ccf9744464f82e160bfa9b8b64f9d4c03f999b8643f656b412a3ac00000000"
else:
justice_txid = sha256d(justice_rawtx)
if random_txid is True:
justice_txid = get_random_value_hex(32)
appointment_end = bitcoin_cli.getblockcount() + 2
return dispute_txid, justice_txid, justice_rawtx, appointment_end
def create_dummy_job(random_txid=False, justice_rawtx=None):
dispute_txid, justice_txid, justice_rawtx, appointment_end = create_dummy_job_data(random_txid, justice_rawtx)
return Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
def test_job_init(run_bitcoind):
dispute_txid, justice_txid, justice_rawtx, appointment_end = create_dummy_job_data()
job = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
assert job.dispute_txid == dispute_txid and job.justice_txid == justice_txid \
and job.justice_rawtx == justice_rawtx and job.appointment_end == appointment_end
def test_job_to_dict():
job = create_dummy_job()
job_dict = job.to_dict()
assert job.locator == job_dict["locator"] and job.justice_rawtx == job_dict["justice_rawtx"] \
and job.appointment_end == job_dict["appointment_end"]
def test_job_to_json():
job = create_dummy_job()
job_dict = json.loads(job.to_json())
assert job.locator == job_dict["locator"] and job.justice_rawtx == job_dict["justice_rawtx"] \
and job.appointment_end == job_dict["appointment_end"]
def test_init_responder(responder):
assert type(responder.jobs) is dict and len(responder.jobs) == 0
assert type(responder.tx_job_map) is dict and len(responder.tx_job_map) == 0
assert type(responder.unconfirmed_txs) is list and len(responder.unconfirmed_txs) == 0
assert type(responder.missed_confirmations) is dict and len(responder.missed_confirmations) == 0
assert responder.block_queue is None
assert responder.asleep is True
assert responder.zmq_subscriber is None
def test_add_response(responder):
uuid = uuid4().hex
job = create_dummy_job()
# The responder automatically fires create_job on adding a job if it is asleep (initial state). Avoid this by
# setting the state to awake.
responder.asleep = False
receipt = responder.add_response(uuid, job.dispute_txid, job.justice_txid, job.justice_rawtx, job.appointment_end)
assert receipt.delivered is True
def test_create_job(responder):
responder.asleep = False
for _ in range(20):
uuid = uuid4().hex
confirmations = 0
dispute_txid, justice_txid, justice_rawtx, appointment_end = create_dummy_job_data(random_txid=True)
# Check the job is not within the responder jobs before adding it
assert uuid not in responder.jobs
assert justice_txid not in responder.tx_job_map
assert justice_txid not in responder.unconfirmed_txs
# And that it is afterwards
responder.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
assert uuid in responder.jobs
assert justice_txid in responder.tx_job_map
assert justice_txid in responder.unconfirmed_txs
# Check that the rest of job data also matches
job = responder.jobs[uuid]
assert job.dispute_txid == dispute_txid and job.justice_txid == justice_txid \
and job.justice_rawtx == justice_rawtx and job.appointment_end == appointment_end \
and job.appointment_end == appointment_end
def test_create_job_already_confirmed(responder):
responder.asleep = False
for i in range(20):
uuid = uuid4().hex
confirmations = i+1
dispute_txid, justice_txid, justice_rawtx, appointment_end = create_dummy_job_data(
justice_rawtx=TX.create_dummy_transaction())
responder.create_job(uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations)
assert justice_txid not in responder.unconfirmed_txs
def test_do_subscribe(responder):
responder.block_queue = Queue()
zmq_thread = Thread(target=responder.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
try:
generate_block()
block_hash = responder.block_queue.get()
assert check_txid_format(block_hash)
except Empty:
assert False
def test_do_watch(responder):
# Reinitializing responder (but keeping the subscriber)
responder.jobs = dict()
responder.tx_job_map = dict()
responder.unconfirmed_txs = []
responder.missed_confirmations = dict()
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
jobs = [create_dummy_job(justice_rawtx=TX.create_dummy_transaction()) for _ in range(20)]
# Let's set up the jobs first
for job in jobs:
uuid = uuid4().hex
responder.jobs[uuid] = job
responder.tx_job_map[job.justice_txid] = [uuid]
responder.missed_confirmations[job.justice_txid] = 0
responder.unconfirmed_txs.append(job.justice_txid)
# Let's start to watch
watch_thread = Thread(target=responder.do_watch)
watch_thread.daemon = True
watch_thread.start()
# And broadcast some of the transactions
broadcast_txs = []
for job in jobs[:5]:
bitcoin_cli.sendrawtransaction(job.justice_rawtx)
broadcast_txs.append(job.justice_txid)
# Mine a block
generate_block()
# The transactions we sent shouldn't be in the unconfirmed transaction list anymore
assert not set(broadcast_txs).issubset(responder.unconfirmed_txs)
# TODO: test that reorgs can be detected once data persistence is merged (new version of the simulator)
# Generating 5 additional blocks should complete the 5 jobs
generate_blocks(5)
assert not set(broadcast_txs).issubset(responder.tx_job_map)
# Do the rest
broadcast_txs = []
for job in jobs[5:]:
bitcoin_cli.sendrawtransaction(job.justice_rawtx)
broadcast_txs.append(job.justice_txid)
# Mine a block
generate_blocks(6)
assert len(responder.tx_job_map) == 0
assert responder.asleep is True
def test_get_txs_to_rebroadcast(responder):
# Let's create a few fake txids and assign at least 6 missing confirmations to each
txs_missing_too_many_conf = {get_random_value_hex(32): 6+i for i in range(10)}
# Let's create some other transaction that has missed some confirmations but not that many
txs_missing_some_conf = {get_random_value_hex(32): 3 for _ in range(10)}
# All the txs in the first dict should be flagged as to_rebroadcast
responder.missed_confirmations = txs_missing_too_many_conf
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_too_many_conf)
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
# Non of the txs in the second dict should be flagged
responder.missed_confirmations = txs_missing_some_conf
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf)
assert txs_to_rebroadcast == []
# Let's check that it also works with a mixed dict
responder.missed_confirmations.update(txs_missing_too_many_conf)
txs_to_rebroadcast = responder.get_txs_to_rebroadcast(txs_missing_some_conf)
assert txs_to_rebroadcast == list(txs_missing_too_many_conf.keys())
def test_get_completed_jobs():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
initial_height = bitcoin_cli.getblockcount()
# Let's use a fresh responder for this to make it easier to compare the results
responder = Responder()
# A complete job is a job that has reached the appointment end with enough confirmations (> MIN_CONFIRMATIONS)
# We'll create three type of transactions: end reached + enough conf, end reached + no enough conf, end not reached
jobs_end_conf = {uuid4().hex: create_dummy_job(justice_rawtx=TX.create_dummy_transaction()) for _ in range(10)}
jobs_end_no_conf = {}
for _ in range(10):
job = create_dummy_job(justice_rawtx=TX.create_dummy_transaction())
responder.unconfirmed_txs.append(job.justice_txid)
jobs_end_no_conf[uuid4().hex] = job
jobs_no_end = {}
for _ in range(10):
job = create_dummy_job(justice_rawtx=TX.create_dummy_transaction())
job.appointment_end += 10
jobs_no_end[uuid4().hex] = job
# Let's add all to the responder
responder.jobs.update(jobs_end_conf)
responder.jobs.update(jobs_end_no_conf)
responder.jobs.update(jobs_no_end)
for uuid, job in responder.jobs.items():
bitcoin_cli.sendrawtransaction(job.justice_rawtx)
# The dummy appointments have a end_appointment time of current + 2, but jobs need at least 6 confs by default
generate_blocks(6)
# And now let's check
completed_jobs = responder.get_completed_jobs(initial_height + 6)
completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs]
ended_jobs_keys = list(jobs_end_conf.keys())
assert set(completed_jobs_ids) == set(ended_jobs_keys)
# Generating 6 additional blocks should also confirm jobs_no_end
generate_blocks(6)
completed_jobs = responder.get_completed_jobs(initial_height + 12)
completed_jobs_ids = [job_id for job_id, confirmations in completed_jobs]
ended_jobs_keys.extend(list(jobs_no_end.keys()))
assert set(completed_jobs_ids) == set(ended_jobs_keys)
def test_rebroadcast():
responder = Responder()
responder.asleep = False
txs_to_rebroadcast = []
# Rebroadcast calls add_response with retry=True. The job data is already in jobs.
for i in range(20):
uuid = uuid4().hex
dispute_txid, justice_txid, justice_rawtx, appointment_end = create_dummy_job_data(
justice_rawtx=TX.create_dummy_transaction())
responder.jobs[uuid] = Job(dispute_txid, justice_txid, justice_rawtx, appointment_end)
responder.tx_job_map[justice_txid] = [uuid]
responder.unconfirmed_txs.append(justice_txid)
# Let's add some of the txs in the rebroadcast list
if (i % 2) == 0:
txs_to_rebroadcast.append(justice_txid)
receipts = responder.rebroadcast(txs_to_rebroadcast)
# All txs should have been delivered and the missed confirmation reset
for txid, receipt in receipts:
# Sanity check
assert txid in txs_to_rebroadcast
assert receipt.delivered is True
assert responder.missed_confirmations[txid] == 0

44
test/unit/test_tools.py Normal file
View File

@@ -0,0 +1,44 @@
from pisa import logging
from pisa.tools import check_txid_format
from pisa.tools import can_connect_to_bitcoind, in_correct_network, bitcoin_cli
logging.getLogger().disabled = True
def test_in_correct_network(run_bitcoind):
# The simulator runs as if it was regtest, so every other network should fail
assert in_correct_network('mainnet') is False
assert in_correct_network('testnet') is False
assert in_correct_network('regtest') is True
def test_can_connect_to_bitcoind():
assert can_connect_to_bitcoind() is True
# def test_can_connect_to_bitcoind_bitcoin_not_running():
# # Kill the simulator thread and test the check fails
# bitcoind_process.kill()
# assert can_connect_to_bitcoind() is False
def test_bitcoin_cli():
try:
bitcoin_cli().help()
assert True
except Exception:
assert False
def test_check_txid_format():
assert(check_txid_format(None) is False)
assert(check_txid_format("") is False)
assert(check_txid_format(0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef) is False) # wrong type
assert(check_txid_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True) # lowercase
assert(check_txid_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True) # uppercase
assert(check_txid_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True) # mixed case
assert(check_txid_format("0123456789012345678901234567890123456789012345678901234567890123") is True) # only nums
assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False) # too short
assert(check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False) # too long
assert(check_txid_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False) # non-hex

153
test/unit/test_watcher.py Normal file
View File

@@ -0,0 +1,153 @@
import pytest
import logging
from uuid import uuid4
from hashlib import sha256
from threading import Thread
from binascii import unhexlify
from queue import Queue, Empty
from apps.cli.blob import Blob
from pisa.watcher import Watcher
from pisa.responder import Responder
from pisa.conf import MAX_APPOINTMENTS
from pisa.appointment import Appointment
from pisa.tools import check_txid_format
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from pisa.utils.auth_proxy import AuthServiceProxy
from test.unit.conftest import generate_block, generate_blocks
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
logging.getLogger().disabled = True
APPOINTMENTS = 5
START_TIME_OFFSET = 1
END_TIME_OFFSET = 1
@pytest.fixture(scope="module")
def watcher():
return Watcher()
def generate_dummy_appointment():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
dispute_tx = TX.create_dummy_transaction()
dispute_txid = sha256d(dispute_tx)
justice_tx = TX.create_dummy_transaction(dispute_txid)
start_time = bitcoin_cli.getblockcount() + 1
end_time = start_time + 1
dispute_delta = 20
cipher = "AES-GCM-128"
hash_function = "SHA256"
locator = sha256(unhexlify(dispute_txid)).hexdigest()
blob = Blob(justice_tx, cipher, hash_function)
encrypted_blob = blob.encrypt(dispute_txid)
appointment = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
return appointment, dispute_tx
def create_appointments(n):
locator_uuid_map = dict()
appointments = dict()
dispute_txs = []
for i in range(n):
appointment, dispute_tx = generate_dummy_appointment()
uuid = uuid4().hex
appointments[uuid] = appointment
locator_uuid_map[appointment.locator] = [uuid]
dispute_txs.append(dispute_tx)
return appointments, locator_uuid_map, dispute_txs
def test_init(watcher):
assert type(watcher.appointments) is dict and len(watcher.appointments) == 0
assert type(watcher.locator_uuid_map) is dict and len(watcher.locator_uuid_map) == 0
assert watcher.block_queue is None
assert watcher.asleep is True
assert watcher.max_appointments == MAX_APPOINTMENTS
assert watcher.zmq_subscriber is None
assert type(watcher.responder) is Responder
def test_add_appointment(run_bitcoind, watcher):
# The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
# Avoid this by setting the state to awake.
watcher.asleep = False
# We should be able to add appointments up to the limit
for _ in range(10):
appointment, dispute_tx = generate_dummy_appointment()
added_appointment = watcher.add_appointment(appointment)
assert added_appointment is True
def test_add_too_many_appointments(watcher):
# Any appointment on top of those should fail
watcher.appointments = dict()
for _ in range(MAX_APPOINTMENTS):
appointment, dispute_tx = generate_dummy_appointment()
added_appointment = watcher.add_appointment(appointment)
assert added_appointment is True
appointment, dispute_tx = generate_dummy_appointment()
added_appointment = watcher.add_appointment(appointment)
assert added_appointment is False
def test_do_subscribe(watcher):
watcher.block_queue = Queue()
zmq_thread = Thread(target=watcher.do_subscribe)
zmq_thread.daemon = True
zmq_thread.start()
try:
generate_block()
block_hash = watcher.block_queue.get()
assert check_txid_format(block_hash)
except Empty:
assert False
def test_do_watch(watcher):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
# We will wipe all the previous data and add 5 appointments
watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
watch_thread = Thread(target=watcher.do_watch)
watch_thread.daemon = True
watch_thread.start()
# Broadcast the first two
for dispute_tx in dispute_txs[:2]:
bitcoin_cli.sendrawtransaction(dispute_tx)
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced
# by two
generate_blocks(START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == APPOINTMENTS - 2
# The rest of appointments will timeout after the end (2) + EXPIRY_DELTA
# Wait for an additional block to be safe
generate_blocks(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET)
assert len(watcher.appointments) == 0
assert watcher.asleep is True

View File

@@ -1,121 +0,0 @@
import os
import json
import requests
import time
from copy import deepcopy
from hashlib import sha256
from binascii import hexlify, unhexlify
from apps.cli.blob import Blob
from pisa import HOST, PORT
from pisa.utils.authproxy import AuthServiceProxy
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
PISA_API = "http://{}:{}".format(HOST, PORT)
def generate_dummy_appointment(dispute_txid):
r = requests.get(url=PISA_API + '/get_block_count', timeout=5)
current_height = r.json().get("block_count")
dummy_appointment_data = {"tx": hexlify(os.urandom(32)).decode('utf-8'),
"tx_id": dispute_txid, "start_time": current_height + 5,
"end_time": current_height + 10, "dispute_delta": 20}
cipher = "AES-GCM-128"
hash_function = "SHA256"
locator = sha256(unhexlify(dummy_appointment_data.get("tx_id"))).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")), debug=False, logging=False)
appointment = {"locator": locator, "start_time": dummy_appointment_data.get("start_time"),
"end_time": dummy_appointment_data.get("end_time"),
"dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function}
return appointment
def test_add_appointment(appointment=None):
if not appointment:
dispute_txid = hexlify(os.urandom(32)).decode('utf-8')
appointment = generate_dummy_appointment(dispute_txid)
print("Sending appointment (locator: {}) to PISA".format(appointment.get("locator")))
r = requests.post(url=PISA_API, json=json.dumps(appointment), timeout=5)
assert (r.status_code == 200 and r.reason == 'OK')
print(r.content.decode())
print("Requesting it back from PISA")
r = requests.get(url=PISA_API + "/get_appointment?locator=" + appointment["locator"])
assert (r.status_code == 200 and r.reason == 'OK')
received_appointments = json.loads(r.content)
# Take the status out and leave the received appointments ready to compare
appointment_status = [appointment.pop("status") for appointment in received_appointments]
# Check that the appointment is within the received appoints
assert (appointment in received_appointments)
# Check that all the appointments are being watched
assert (all([status == "being_watched" for status in appointment_status]))
def test_same_locator_multiple_appointments():
dispute_txid = hexlify(os.urandom(32)).decode('utf-8')
appointment = generate_dummy_appointment(dispute_txid)
# Send it once
test_add_appointment(appointment)
time.sleep(0.5)
# Try again with the same data
print("Sending it again")
test_add_appointment(appointment)
time.sleep(0.5)
# Try again with the same data but increasing the end time
print("Sending once more")
dup_appointment = deepcopy(appointment)
dup_appointment["end_time"] += 1
test_add_appointment(dup_appointment)
print("Sleeping 5 sec")
time.sleep(5)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
print("Triggering PISA with dispute tx")
bitcoin_cli.sendrawtransaction(dispute_txid)
print("Sleeping 10 sec (waiting for a new block)")
time.sleep(10)
print("Getting all appointments")
r = requests.get(url=PISA_API + "/get_all_appointments")
assert (r.status_code == 200 and r.reason == 'OK')
received_appointments = json.loads(r.content)
# Make sure there is not pending instance of the locator in the watcher
watcher_locators = [appointment["locator"] for appointment in received_appointments["watcher_appointments"]]
assert(appointment["locator"] not in watcher_locators)
# Make sure all the appointments went trough
target_jobs = [v for k, v in received_appointments["responder_jobs"].items() if v["locator"] ==
appointment["locator"]]
assert (len(target_jobs) == 3)
if __name__ == '__main__':
test_same_locator_multiple_appointments()
print("All good!")

View File

@@ -1,139 +0,0 @@
import logging
from pisa.inspector import Inspector
from pisa.appointment import Appointment
from pisa import errors
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, SUPPORTED_HASH_FUNCTIONS, \
SUPPORTED_CIPHERS, TEST_LOG_FILE
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, handlers=[
logging.FileHandler(TEST_LOG_FILE)
])
appointment = {"locator": None, "start_time": None, "end_time": None, "dispute_delta": None,
"encrypted_blob": None, "cipher": None, "hash_function": None}
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
BTC_RPC_PORT))
try:
block_height = bitcoin_cli.getblockcount()
except JSONRPCException as e:
logging.error("[Inspector] JSONRPCException. Error code {}".format(e))
locators = [None, 0, 'A' * 31, "A" * 63 + "_"]
start_times = [None, 0, '', 15.0, block_height - 10]
end_times = [None, 0, '', 26.123, block_height - 11]
dispute_deltas = [None, 0, '', 1.2, -3, 30]
encrypted_blobs = [None, 0, '']
ciphers = [None, 0, '', 'foo']
hash_functions = [None, 0, '', 'foo']
locators_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_WRONG_FIELD_SIZE, errors.APPOINTMENT_WRONG_FIELD_FORMAT]
start_time_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_FIELD_TOO_SMALL,
errors.APPOINTMENT_WRONG_FIELD_TYPE, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_FIELD_TOO_SMALL]
end_time_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_FIELD_TOO_SMALL,
errors.APPOINTMENT_WRONG_FIELD_TYPE, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_FIELD_TOO_SMALL]
dispute_delta_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_FIELD_TOO_SMALL,
errors.APPOINTMENT_WRONG_FIELD_TYPE, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_FIELD_TOO_SMALL]
encrypted_blob_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_WRONG_FIELD]
cipher_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_CIPHER_NOT_SUPPORTED, errors.APPOINTMENT_CIPHER_NOT_SUPPORTED]
hash_function_rets = [errors.APPOINTMENT_EMPTY_FIELD, errors.APPOINTMENT_WRONG_FIELD_TYPE,
errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED, errors.APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED]
inspector = Inspector(debug=True, logging=logging)
print("Locator tests\n")
for locator, ret in zip(locators, locators_rets):
appointment["locator"] = locator
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Set locator to a 'valid' one
appointment['locator'] = 'A' * 64
print("\nStart time tests\n")
for start_time, ret in zip(start_times, start_time_rets):
appointment["start_time"] = start_time
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the start time to some time in the future
appointment['start_time'] = block_height + 10
print("\nEnd time tests\n")
for end_time, ret in zip(end_times, end_time_rets):
appointment["end_time"] = end_time
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the end time to something consistent with start time
appointment['end_time'] = block_height + 30
print("\nDelta tests\n")
for dispute_delta, ret in zip(dispute_deltas, dispute_delta_rets):
appointment["dispute_delta"] = dispute_delta
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the a proper dispute delta
appointment['dispute_delta'] = appointment['end_time'] - appointment['start_time']
print("\nEncrypted blob tests\n")
for encrypted_blob, ret in zip(encrypted_blobs, encrypted_blob_rets):
appointment["encrypted_blob"] = encrypted_blob
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the encrypted blob to something that may pass
appointment['encrypted_blob'] = 'A' * 32
print("\nCipher tests\n")
for cipher, ret in zip(ciphers, cipher_rets):
appointment["cipher"] = cipher
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the cipher to the only supported one for now
appointment['cipher'] = SUPPORTED_CIPHERS[0]
print("\nHash function tests\n")
for hash_function, ret in zip(hash_functions, hash_function_rets):
appointment["hash_function"] = hash_function
r = inspector.inspect(appointment)
assert r[0] == ret
print(r)
# Setting the cipher to the only supported one for now
appointment['hash_function'] = SUPPORTED_HASH_FUNCTIONS[0]
r = inspector.inspect(appointment)
assert type(r) == Appointment
print("\nAll tests passed!")

View File

@@ -1,109 +0,0 @@
import os
import binascii
from pisa.utils.authproxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
from pisa.tools import check_txid_format
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
# Help should always return 0
assert(bitcoin_cli.help() == 0)
# getblockhash should return a blockid (which matches the txid format)
block_hash = bitcoin_cli.getblockhash(0)
assert(check_txid_format(block_hash))
# Check that the values are within range and of the proper format (all should fail)
values = [-1, 500, None, '', '111', [], 1.1]
print("getblockhash fails ({}):".format(len(values)))
for v in values:
try:
block_hash = bitcoin_cli.getblockhash(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# getblock should return a list of transactions and the height
block = bitcoin_cli.getblock(block_hash)
assert(isinstance(block.get('tx'), list))
assert(len(block.get('tx')) != 0)
assert(isinstance(block.get('height'), int))
# Some fails
values += ["a"*64, binascii.hexlify(os.urandom(32)).decode()]
print("\ngetblock fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.getblock(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# decoderawtransaction should only return if the given transaction matches a txid format
coinbase_tx = block.get('tx')[0]
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_txid_format(tx.get('txid')))
# Therefore should also work for a random formatted 32-byte hex in our simulation
random_tx = binascii.hexlify(os.urandom(32)).decode()
tx = bitcoin_cli.decoderawtransaction(random_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_txid_format(tx.get('txid')))
# But it should fail for not proper formatted one
values = [1, None, '', "a"*63, "b"*65, [], binascii.hexlify(os.urandom(31)).hex()]
print("\ndecoderawtransaction fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.decoderawtransaction(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# sendrawtransaction should only allow txids that the simulator has not mined yet
bitcoin_cli.sendrawtransaction(binascii.hexlify(os.urandom(32)).decode())
# Any data not matching the txid format or that matches with an already mined transaction should fail
values += [coinbase_tx]
print("\nsendrawtransaction fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.sendrawtransaction(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# getrawtransaction should work for existing transactions, and fail for non-existing ones
tx = bitcoin_cli.getrawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('confirmations'), int))
print("\nsendrawtransaction fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.sendrawtransaction(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# getblockcount should always return a positive integer
bc = bitcoin_cli.getblockcount()
assert (isinstance(bc, int))
assert (bc >= 0)
print("\nAll tests passed!")