Merge branch 'testing' into 13-appointment-signature

This commit is contained in:
Salvatore Ingala
2019-10-18 08:32:38 +08:00
17 changed files with 598 additions and 206 deletions

6
.coveragerc Normal file
View File

@@ -0,0 +1,6 @@
[run]
omit =
pisa/pisad.py
pisa/sample_conf.py
pisa/time_traveler.py
pisa/utils/auth_proxy.py

View File

@@ -11,8 +11,3 @@ logging.basicConfig(format='%(message)s', level=logging.INFO, handlers=[
logging.FileHandler(conf.SERVER_LOG_FILE), logging.FileHandler(conf.SERVER_LOG_FILE),
logging.StreamHandler() logging.StreamHandler()
]) ])
# Create RPC connection with bitcoind
# TODO: Check if a long lived connection like this may create problems (timeouts)
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))

View File

@@ -1,8 +1,8 @@
import binascii import binascii
from hashlib import sha256 from hashlib import sha256
from pisa import bitcoin_cli
from pisa.logger import Logger from pisa.logger import Logger
from pisa.tools import bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException from pisa.utils.auth_proxy import JSONRPCException
logger = Logger("BlockProcessor") logger = Logger("BlockProcessor")
@@ -13,7 +13,7 @@ class BlockProcessor:
def get_block(block_hash): def get_block(block_hash):
try: try:
block = bitcoin_cli.getblock(block_hash) block = bitcoin_cli().getblock(block_hash)
except JSONRPCException as e: except JSONRPCException as e:
block = None block = None
@@ -25,7 +25,7 @@ class BlockProcessor:
def get_best_block_hash(): def get_best_block_hash():
try: try:
block_hash = bitcoin_cli.getbestblockhash() block_hash = bitcoin_cli().getbestblockhash()
except JSONRPCException as e: except JSONRPCException as e:
block_hash = None block_hash = None
@@ -37,7 +37,7 @@ class BlockProcessor:
def get_block_count(): def get_block_count():
try: try:
block_count = bitcoin_cli.getblockcount() block_count = bitcoin_cli().getblockcount()
except JSONRPCException as e: except JSONRPCException as e:
block_count = None block_count = None
@@ -73,7 +73,7 @@ class BlockProcessor:
try: try:
# ToDo: #20-test-tx-decrypting-edge-cases # ToDo: #20-test-tx-decrypting-edge-cases
justice_rawtx = appointments[uuid].encrypted_blob.decrypt(dispute_txid) justice_rawtx = appointments[uuid].encrypted_blob.decrypt(dispute_txid)
justice_txid = bitcoin_cli.decoderawtransaction(justice_rawtx).get('txid') justice_txid = bitcoin_cli().decoderawtransaction(justice_rawtx).get('txid')
logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid) logger.info("Match found for locator.", locator=locator, uuid=uuid, justice_txid=justice_txid)
except JSONRPCException as e: except JSONRPCException as e:

View File

@@ -1,6 +1,6 @@
from pisa.rpc_errors import * from pisa.rpc_errors import *
from pisa import bitcoin_cli
from pisa.logger import Logger from pisa.logger import Logger
from pisa.tools import bitcoin_cli
from pisa.utils.auth_proxy import JSONRPCException from pisa.utils.auth_proxy import JSONRPCException
from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION from pisa.errors import UNKNOWN_JSON_RPC_EXCEPTION
@@ -20,7 +20,7 @@ class Carrier:
def send_transaction(self, rawtx, txid): def send_transaction(self, rawtx, txid):
try: try:
logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx) logger.info("Pushing transaction to the network", txid=txid, rawtx=rawtx)
bitcoin_cli.sendrawtransaction(rawtx) bitcoin_cli().sendrawtransaction(rawtx)
receipt = Receipt(delivered=True) receipt = Receipt(delivered=True)
@@ -70,7 +70,7 @@ class Carrier:
@staticmethod @staticmethod
def get_transaction(txid): def get_transaction(txid):
try: try:
tx_info = bitcoin_cli.getrawtransaction(txid, 1) tx_info = bitcoin_cli().getrawtransaction(txid, 1)
except JSONRPCException as e: except JSONRPCException as e:
tx_info = None tx_info = None

View File

@@ -2,7 +2,6 @@ import re
from pisa import errors from pisa import errors
import pisa.conf as conf import pisa.conf as conf
from pisa import bitcoin_cli
from pisa.logger import Logger from pisa.logger import Logger
from pisa.appointment import Appointment from pisa.appointment import Appointment
from pisa.block_processor import BlockProcessor from pisa.block_processor import BlockProcessor

View File

@@ -1,6 +1,7 @@
import json
from queue import Queue from queue import Queue
from threading import Thread
from hashlib import sha256 from hashlib import sha256
from threading import Thread
from binascii import unhexlify from binascii import unhexlify
from pisa.logger import Logger from pisa.logger import Logger
@@ -34,6 +35,9 @@ class Job:
return job return job
def to_json(self):
return json.dumps(self.to_dict())
class Responder: class Responder:
def __init__(self): def __init__(self):
@@ -62,6 +66,8 @@ class Responder:
# TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED) # TODO: Add the missing reasons (e.g. RPC_VERIFY_REJECTED)
pass pass
return receipt
def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0, def create_job(self, uuid, dispute_txid, justice_txid, justice_rawtx, appointment_end, confirmations=0,
retry=False): retry=False):
@@ -155,7 +161,7 @@ class Responder:
for uuid, job in self.jobs.items(): for uuid, job in self.jobs.items():
if job.appointment_end <= height: if job.appointment_end <= height:
tx = Carrier.get_transaction(job.dispute_txid) tx = Carrier.get_transaction(job.justice_txid)
# FIXME: Should be improved with the librarian # FIXME: Should be improved with the librarian
confirmations = tx.get('confirmations') confirmations = tx.get('confirmations')

View File

@@ -1,10 +1,15 @@
import re import re
from http.client import HTTPException from http.client import HTTPException
from pisa import bitcoin_cli import pisa.conf as conf
from pisa.logger import Logger from pisa.logger import Logger
from pisa.utils.auth_proxy import JSONRPCException
from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY from pisa.rpc_errors import RPC_INVALID_ADDRESS_OR_KEY
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
def bitcoin_cli():
return AuthServiceProxy("http://%s:%s@%s:%d" % (conf.BTC_RPC_USER, conf.BTC_RPC_PASSWD, conf.BTC_RPC_HOST,
conf.BTC_RPC_PORT))
# TODO: currently only used in the Responder; might move there or in the BlockProcessor # TODO: currently only used in the Responder; might move there or in the BlockProcessor
@@ -13,7 +18,7 @@ def check_tx_in_chain(tx_id, logger=Logger(), tx_label='Transaction'):
confirmations = 0 confirmations = 0
try: try:
tx_info = bitcoin_cli.getrawtransaction(tx_id, 1) tx_info = bitcoin_cli().getrawtransaction(tx_id, 1)
if tx_info.get("confirmations"): if tx_info.get("confirmations"):
confirmations = int(tx_info.get("confirmations")) confirmations = int(tx_info.get("confirmations"))
@@ -38,7 +43,7 @@ def can_connect_to_bitcoind():
can_connect = True can_connect = True
try: try:
bitcoin_cli.help() bitcoin_cli().help()
except (ConnectionRefusedError, JSONRPCException, HTTPException): except (ConnectionRefusedError, JSONRPCException, HTTPException):
can_connect = False can_connect = False
@@ -50,7 +55,7 @@ def in_correct_network(network):
testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943" testnet3_genesis_block_hash = "000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943"
correct_network = False correct_network = False
genesis_block_hash = bitcoin_cli.getblockhash(0) genesis_block_hash = bitcoin_cli().getblockhash(0)
if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash: if network == 'mainnet' and genesis_block_hash == mainnet_genesis_block_hash:
correct_network = True correct_network = True
@@ -65,3 +70,4 @@ def in_correct_network(network):
def check_txid_format(txid): def check_txid_format(txid):
# TODO: #12-check-txid-regexp # TODO: #12-check-txid-regexp
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None

View File

@@ -1,107 +1,146 @@
import re
import os import os
import pytest
from time import sleep
from threading import Thread
from test.simulator.transaction import TX
from test.simulator.bitcoind_sim import run_simulator
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
from pisa.tools import check_txid_format
MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, os.urandom(32).hex()]
@pytest.fixture(scope='module')
def run_bitcoind():
bitcoind_thread = Thread(target=run_simulator, kwargs={"mode": "event"})
bitcoind_thread.daemon = True
bitcoind_thread.start()
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1)
@pytest.fixture(scope="module")
def genesis_block_hash(run_bitcoind):
return bitcoin_cli.getblockhash(0)
def check_hash_format(txid):
# TODO: #12-check-txid-regexp
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
# Help should always return 0
assert(bitcoin_cli.help() == 0)
# getblockhash should return a blockid (which matches the txid format) def test_help(run_bitcoind):
block_hash = bitcoin_cli.getblockhash(0) # Help should always return 0
assert(check_txid_format(block_hash)) assert(bitcoin_cli.help() == 0)
# Check that the values are within range and of the proper format (all should fail)
values = [-1, 500, None, '', '111', [], 1.1]
print("getblockhash fails ({}):".format(len(values)))
for v in values: # FIXME: Better assert for the exceptions would be nice (check the returned errno is the expected one)
def test_getblockhash(genesis_block_hash):
# First block
assert(check_hash_format(genesis_block_hash))
# Check that the values are within range and of the proper format (all should fail)
for v in MIXED_VALUES:
try:
bitcoin_cli.getblockhash(v)
assert False
except JSONRPCException as e:
assert True
def test_get_block(genesis_block_hash):
# getblock should return a list of transactions and the height
block = bitcoin_cli.getblock(genesis_block_hash)
assert(isinstance(block.get('tx'), list))
assert(len(block.get('tx')) != 0)
assert(isinstance(block.get('height'), int))
# It should fail for wrong data formats and random ids
for v in MIXED_VALUES:
try:
bitcoin_cli.getblock(v)
assert False
except JSONRPCException as e:
assert True
def test_decoderawtransaction(genesis_block_hash):
# decoderawtransaction should only return if the given transaction matches a txid format
block = bitcoin_cli.getblock(genesis_block_hash)
coinbase_txid = block.get('tx')[0]
coinbase_tx = bitcoin_cli.getrawtransaction(coinbase_txid).get("hex")
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_hash_format(tx.get('txid')))
# Therefore should also work for a random transaction hex in our simulation
random_tx = TX.create_dummy_transaction()
tx = bitcoin_cli.decoderawtransaction(random_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_hash_format(tx.get('txid')))
# But it should fail for not proper formatted one
for v in MIXED_VALUES:
try:
bitcoin_cli.decoderawtransaction(v)
assert False
except JSONRPCException as e:
assert True
def test_sendrawtransaction(genesis_block_hash):
# sendrawtransaction should only allow txids that the simulator has not mined yet
bitcoin_cli.sendrawtransaction(TX.create_dummy_transaction())
# Any data not matching the txid format or that matches with an already mined transaction should fail
try: try:
block_hash = bitcoin_cli.getblockhash(v) genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
bitcoin_cli.sendrawtransaction(genesis_tx)
assert False assert False
except JSONRPCException as e: except JSONRPCException as e:
print('\t{}'.format(e)) assert True
# getblock should return a list of transactions and the height for v in MIXED_VALUES:
block = bitcoin_cli.getblock(block_hash) try:
assert(isinstance(block.get('tx'), list)) bitcoin_cli.sendrawtransaction(v)
assert(len(block.get('tx')) != 0) assert False
assert(isinstance(block.get('height'), int)) except JSONRPCException as e:
assert True
# Some fails
values += ["a"*64, os.urandom(32).hex()]
print("\ngetblock fails ({}):".format(len(values)))
for v in values: def test_getrawtransaction(genesis_block_hash):
try: # getrawtransaction should work for existing transactions, and fail for non-existing ones
block = bitcoin_cli.getblock(v) genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
assert False tx = bitcoin_cli.getrawtransaction(genesis_tx)
except JSONRPCException as e:
print('\t{}'.format(e))
# decoderawtransaction should only return if the given transaction matches a txid format assert(isinstance(tx, dict))
coinbase_tx = block.get('tx')[0] assert(isinstance(tx.get('confirmations'), int))
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('txid'), str))
assert(check_txid_format(tx.get('txid')))
# Therefore should also work for a random formatted 32-byte hex in our simulation for v in MIXED_VALUES:
random_tx = os.urandom(32).hex() try:
tx = bitcoin_cli.decoderawtransaction(random_tx) bitcoin_cli.getrawtransaction(v)
assert(isinstance(tx, dict)) assert False
assert(isinstance(tx.get('txid'), str)) except JSONRPCException as e:
assert(check_txid_format(tx.get('txid'))) assert True
# But it should fail for not proper formatted one
values = [1, None, '', "a"*63, "b"*65, [], os.urandom(31).hex()]
print("\ndecoderawtransaction fails ({}):".format(len(values)))
for v in values: def test_getblockcount():
try: # getblockcount should always return a positive integer
block = bitcoin_cli.decoderawtransaction(v) bc = bitcoin_cli.getblockcount()
assert False assert (isinstance(bc, int))
except JSONRPCException as e: assert (bc >= 0)
print('\t{}'.format(e))
# sendrawtransaction should only allow txids that the simulator has not mined yet
bitcoin_cli.sendrawtransaction(os.urandom(32).hex())
# Any data not matching the txid format or that matches with an already mined transaction should fail
values += [coinbase_tx]
print("\nsendrawtransaction fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.sendrawtransaction(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# getrawtransaction should work for existing transactions, and fail for non-existing ones
tx = bitcoin_cli.getrawtransaction(coinbase_tx)
assert(isinstance(tx, dict))
assert(isinstance(tx.get('confirmations'), int))
print("\nsendrawtransaction fails ({}):".format(len(values)))
for v in values:
try:
block = bitcoin_cli.sendrawtransaction(v)
assert False
except JSONRPCException as e:
print('\t{}'.format(e))
# getblockcount should always return a positive integer
bc = bitcoin_cli.getblockcount()
assert (isinstance(bc, int))
assert (bc >= 0)
print("\nAll tests passed!")

View File

@@ -1,26 +1,36 @@
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
from flask import Flask, request, Response, abort
from test.simulator.zmq_publisher import ZMQPublisher
from threading import Thread
from pisa.rpc_errors import *
from pisa.tools import check_txid_format
import logging
import binascii
import json
import os import os
import time import time
import json
import logging
import binascii
from threading import Thread, Event
from flask import Flask, request, Response, abort
from pisa.rpc_errors import *
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.simulator.zmq_publisher import ZMQPublisher
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
app = Flask(__name__) app = Flask(__name__)
HOST = 'localhost' HOST = 'localhost'
PORT = '18443' PORT = '18443'
TIME_BETWEEN_BLOCKS = 10
mempool = []
mined_transactions = {}
blocks = {}
blockchain = [] blockchain = []
blocks = {}
mined_transactions = {}
mempool = []
mine_new_block = Event()
@app.route('/generate', methods=['POST'])
def generate():
global mine_new_block
mine_new_block.set()
return Response(status=200, mimetype='application/json')
@app.route('/', methods=['POST']) @app.route('/', methods=['POST'])
@@ -67,10 +77,12 @@ def process_request():
no_param_err = {"code": RPC_MISC_ERROR, "message": "JSON value is not a {} as expected"} no_param_err = {"code": RPC_MISC_ERROR, "message": "JSON value is not a {} as expected"}
if method == "decoderawtransaction": if method == "decoderawtransaction":
txid = get_param(request_data) rawtx = get_param(request_data)
if isinstance(txid, str): if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
if check_txid_format(txid): txid = sha256d(rawtx)
if TX.deserialize(rawtx) is not None:
response["result"] = {"txid": txid} response["result"] = {"txid": txid}
else: else:
@@ -82,12 +94,15 @@ def process_request():
elif method == "sendrawtransaction": elif method == "sendrawtransaction":
# TODO: A way of rejecting transactions should be added to test edge cases. # TODO: A way of rejecting transactions should be added to test edge cases.
txid = get_param(request_data) rawtx = get_param(request_data)
if isinstance(txid, str): if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
if check_txid_format(txid): txid = sha256d(rawtx)
if TX.deserialize(rawtx) is not None:
if txid not in list(mined_transactions.keys()): if txid not in list(mined_transactions.keys()):
mempool.append(txid) mempool.append(rawtx)
response["result"] = {"txid": txid}
else: else:
response["error"] = {"code": RPC_VERIFY_ALREADY_IN_CHAIN, response["error"] = {"code": RPC_VERIFY_ALREADY_IN_CHAIN,
@@ -104,10 +119,10 @@ def process_request():
txid = get_param(request_data) txid = get_param(request_data)
if isinstance(txid, str): if isinstance(txid, str):
block = blocks.get(mined_transactions.get(txid)) if txid in mined_transactions:
block = blocks.get(mined_transactions[txid]["block"])
if block: rawtx = mined_transactions[txid].get('tx')
response["result"] = {"confirmations": len(blockchain) - block.get('height')} response["result"] = {"hex": rawtx, "confirmations": len(blockchain) - block.get('height')}
elif txid in mempool: elif txid in mempool:
response["result"] = {"confirmations": 0} response["result"] = {"confirmations": 0}
@@ -169,6 +184,7 @@ def get_param(request_data):
param = None param = None
params = request_data.get("params") params = request_data.get("params")
if isinstance(params, list) and len(params) > 0: if isinstance(params, list) and len(params) > 0:
param = params[0] param = params[0]
@@ -179,40 +195,57 @@ def load_data():
pass pass
def simulate_mining(): def simulate_mining(mode, time_between_blocks):
global mempool, mined_transactions, blocks, blockchain global mempool, mined_transactions, blocks, blockchain, mine_new_block
prev_block_hash = None prev_block_hash = None
mining_simulator = ZMQPublisher(topic=b'hashblock', feed_protocol=FEED_PROTOCOL, feed_addr=FEED_ADDR, mining_simulator = ZMQPublisher(topic=b'hashblock', feed_protocol=FEED_PROTOCOL, feed_addr=FEED_ADDR,
feed_port=FEED_PORT) feed_port=FEED_PORT)
while True: # Set the mining event to initialize the blockchain with a block
mine_new_block.set()
while mine_new_block.wait():
block_hash = os.urandom(32).hex() block_hash = os.urandom(32).hex()
coinbase_tx_hash = os.urandom(32).hex() coinbase_tx = TX.create_dummy_transaction()
txs_to_mine = [coinbase_tx_hash] coinbase_tx_hash = sha256d(coinbase_tx)
txs_to_mine = dict({coinbase_tx_hash: coinbase_tx})
if len(mempool) != 0: if len(mempool) != 0:
# We'll mine up to 100 txs per block # We'll mine up to 100 txs per block
txs_to_mine += mempool[:99] for rawtx in mempool[:99]:
txid = sha256d(rawtx)
txs_to_mine[txid] = rawtx
mempool = mempool[99:] mempool = mempool[99:]
# Keep track of the mined transaction (to respond to getrawtransaction) # Keep track of the mined transaction (to respond to getrawtransaction)
for tx in txs_to_mine: for txid, tx in txs_to_mine.items():
mined_transactions[tx] = block_hash mined_transactions[txid] = {"tx": tx, "block": block_hash}
blocks[block_hash] = {"tx": list(txs_to_mine.keys()), "height": len(blockchain),
"previousblockhash": prev_block_hash}
blocks[block_hash] = {"tx": txs_to_mine, "height": len(blockchain), "previousblockhash": prev_block_hash}
mining_simulator.publish_data(binascii.unhexlify(block_hash)) mining_simulator.publish_data(binascii.unhexlify(block_hash))
blockchain.append(block_hash) blockchain.append(block_hash)
prev_block_hash = block_hash prev_block_hash = block_hash
print("New block mined: {}".format(block_hash)) print("New block mined: {}".format(block_hash))
print("\tTransactions: {}".format(txs_to_mine)) print("\tTransactions: {}".format(list(txs_to_mine.keys())))
time.sleep(TIME_BETWEEN_BLOCKS) if mode == 'time':
time.sleep(time_between_blocks)
else:
mine_new_block.clear()
def run_simulator(): def run_simulator(mode='time', time_between_blocks=5):
mining_thread = Thread(target=simulate_mining) if mode not in ["time", 'event']:
raise ValueError("Node must be time or event")
mining_thread = Thread(target=simulate_mining, args=[mode, time_between_blocks])
mining_thread.start() mining_thread.start()
# Setting Flask log to ERROR only so it does not mess with out logging # Setting Flask log to ERROR only so it does not mess with out logging

View File

@@ -0,0 +1,150 @@
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
from os import urandom
from test.simulator.utils import *
class TX:
""" Defines a class TX (transaction) that holds all the modifiable fields of a Bitcoin transaction, such as
version, number of inputs, reference to previous transactions, input and output scripts, value, etc.
"""
def __init__(self):
self.version = None
self.inputs = None
self.outputs = None
self.nLockTime = None
self.prev_tx_id = []
self.prev_out_index = []
self.scriptSig = []
self.scriptSig_len = []
self.nSequence = []
self.value = []
self.scriptPubKey = []
self.scriptPubKey_len = []
self.offset = 0
self.hex = ""
@classmethod
def deserialize(cls, hex_tx):
""" Builds a transaction object from the hexadecimal serialization format of a transaction that
could be obtained, for example, from a blockexplorer.
:param hex_tx: Hexadecimal serialized transaction.
:type hex_tx: hex str
:return: The transaction build using the provided hex serialized transaction.
:rtype: TX
"""
tx = cls()
tx.hex = hex_tx
try:
tx.version = int(change_endianness(parse_element(tx, 4)), 16)
# INPUTS
tx.inputs = int(parse_varint(tx), 16)
for i in range(tx.inputs):
tx.prev_tx_id.append(change_endianness(parse_element(tx, 32)))
tx.prev_out_index.append(int(change_endianness(parse_element(tx, 4)), 16))
# ScriptSig
tx.scriptSig_len.append(int(parse_varint(tx), 16))
tx.scriptSig.append(parse_element(tx, tx.scriptSig_len[i]))
tx.nSequence.append(int(parse_element(tx, 4), 16))
# OUTPUTS
tx.outputs = int(parse_varint(tx), 16)
for i in range(tx.outputs):
tx.value.append(int(change_endianness(parse_element(tx, 8)), 16))
# ScriptPubKey
tx.scriptPubKey_len.append(int(parse_varint(tx), 16))
tx.scriptPubKey.append(parse_element(tx, tx.scriptPubKey_len[i]))
tx.nLockTime = int(parse_element(tx, 4), 16)
if tx.offset != len(tx.hex):
# There is some error in the serialized transaction passed as input. Transaction can't be built
tx = None
else:
tx.offset = 0
except ValueError:
# If a parsing error occurs, the deserialization stops and None is returned
tx = None
return tx
def serialize(self, rtype=hex):
""" Serialize all the transaction fields arranged in the proper order, resulting in a hexadecimal string
ready to be broadcast to the network.
:param self: self
:type self: TX
:param rtype: Whether the serialized transaction is returned as a hex str or a byte array.
:type rtype: hex or bool
:return: Serialized transaction representation (hexadecimal or bin depending on rtype parameter).
:rtype: hex str / bin
"""
if rtype not in [hex, bin]:
raise Exception("Invalid return type (rtype). It should be either hex or bin.")
serialized_tx = change_endianness(int2bytes(self.version, 4)) # 4-byte version number (LE).
# INPUTS
serialized_tx += encode_varint(self.inputs) # Varint number of inputs.
for i in range(self.inputs):
serialized_tx += change_endianness(self.prev_tx_id[i]) # 32-byte hash of the previous transaction (LE).
serialized_tx += change_endianness(int2bytes(self.prev_out_index[i], 4)) # 4-byte output index (LE)
serialized_tx += encode_varint(len(self.scriptSig[i]) // 2) # Varint input script length.
# ScriptSig
serialized_tx += self.scriptSig[i] # Input script.
serialized_tx += int2bytes(self.nSequence[i], 4) # 4-byte sequence number.
# OUTPUTS
serialized_tx += encode_varint(self.outputs) # Varint number of outputs.
if self.outputs != 0:
for i in range(self.outputs):
serialized_tx += change_endianness(int2bytes(self.value[i], 8)) # 8-byte field Satoshi value (LE)
# ScriptPubKey
serialized_tx += encode_varint(len(self.scriptPubKey[i]) // 2) # Varint Output script length.
serialized_tx += self.scriptPubKey[i] # Output script.
serialized_tx += int2bytes(self.nLockTime, 4) # 4-byte lock time field
# If return type has been set to binary, the serialized transaction is converted.
if rtype is bin:
serialized_tx = unhexlify(serialized_tx)
return serialized_tx
@staticmethod
def create_dummy_transaction(prev_tx_id=None, prev_out_index=None):
tx = TX()
if prev_tx_id is None:
prev_tx_id = urandom(32).hex()
if prev_out_index is None:
prev_out_index = 0
tx.version = 1
tx.inputs = 1
tx.outputs = 1
tx.prev_tx_id = [prev_tx_id]
tx.prev_out_index = [prev_out_index]
tx.nLockTime = 0
tx.scriptSig = [
'47304402204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860'
'a4acdd12909d831cc56cbbac4622082221a8768d1d0901']
tx.scriptSig_len = [77]
tx.nSequence = [4294967295]
tx.value = [5000000000]
tx.scriptPubKey = [
'4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c'
'1b7303b8a0626f1baded5c72a704f7e6cd84cac']
tx.scriptPubKey_len = [67]
return tx.serialize()

128
test/simulator/utils.py Normal file
View File

@@ -0,0 +1,128 @@
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
from hashlib import sha256
from binascii import unhexlify
def change_endianness(x):
""" Changes the endianness (from BE to LE and vice versa) of a given value.
:param x: Given value which endianness will be changed.
:type x: hex str
:return: The opposite endianness representation of the given value.
:rtype: hex str
"""
# If there is an odd number of elements, we make it even by adding a 0
if (len(x) % 2) == 1:
x += "0"
y = bytes(x, 'utf-8')
z = y[::-1]
return z.decode('utf-8')
def parse_varint(tx):
""" Parses a given transaction for extracting an encoded varint element.
:param tx: Transaction where the element will be extracted.
:type tx: TX
:return: The b-bytes representation of the given value (a) in hex format.
:rtype: hex str
"""
# First of all, the offset of the hex transaction if moved to the proper position (i.e where the varint should be
# located) and the length and format of the data to be analyzed is checked.
data = tx.hex[tx.offset:]
assert (len(data) > 0)
size = int(data[:2], 16)
assert (size <= 255)
# Then, the integer is encoded as a varint using the proper prefix, if needed.
if size <= 252: # No prefix
storage_length = 1
elif size == 253: # 0xFD
storage_length = 3
elif size == 254: # 0xFE
storage_length = 5
elif size == 255: # 0xFF
storage_length = 9
else:
raise Exception("Wrong input data size")
# Finally, the storage length is used to extract the proper number of bytes from the transaction hex and the
# transaction offset is updated.
varint = data[:storage_length * 2]
tx.offset += storage_length * 2
return varint
def parse_element(tx, size):
""" Parses a given transaction to extract an element of a given size.
:param tx: Transaction where the element will be extracted.
:type tx: TX
:param size: Size of the parameter to be extracted.
:type size: int
:return: The extracted element.
:rtype: hex str
"""
element = tx.hex[tx.offset:tx.offset + size * 2]
tx.offset += size * 2
return element
def encode_varint(value):
""" Encodes a given integer value to a varint. It only used the four varint representation cases used by bitcoin:
1-byte, 2-byte, 4-byte or 8-byte integers.
:param value: The integer value that will be encoded into varint.
:type value: int
:return: The varint representation of the given integer value.
:rtype: str
"""
# The value is checked in order to choose the size of its final representation.
# 0xFD(253), 0xFE(254) and 0xFF(255) are special cases, since are the prefixes defined for 2-byte, 4-byte
# and 8-byte long values respectively.
if value < pow(2, 8) - 3:
size = 1
varint = int2bytes(value, size) # No prefix
else:
if value < pow(2, 16):
size = 2
prefix = 253 # 0xFD
elif value < pow(2, 32):
size = 4
prefix = 254 # 0xFE
elif value < pow(2, 64):
size = 8
prefix = 255 # 0xFF
else:
raise Exception("Wrong input data size")
varint = format(prefix, 'x') + change_endianness(int2bytes(value, size))
return varint
def int2bytes(a, b):
""" Converts a given integer value (a) its b-byte representation, in hex format.
:param a: Value to be converted.
:type a: int
:param b: Byte size to be filled.
:type b: int
:return: The b-bytes representation of the given value (a) in hex format.
:rtype: hex str
"""
m = pow(2, 8*b) - 1
if a > m:
raise Exception(str(a) + " is too big to be represented with " + str(b) + " bytes. Maximum value is "
+ str(m) + ".")
return ('%0' + str(2 * b) + 'x') % a
def sha256d(hex_data):
data = unhexlify(hex_data)
double_sha256 = sha256(sha256(data).digest()).hexdigest()
return change_endianness(double_sha256)

View File

@@ -1,14 +1,15 @@
import pytest import pytest
import requests
from time import sleep from time import sleep
from threading import Thread from threading import Thread
from pisa.api import start_api from pisa.api import start_api
from test.simulator.bitcoind_sim import run_simulator from test.simulator.bitcoind_sim import run_simulator, HOST, PORT
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def run_bitcoind(): def run_bitcoind():
bitcoind_thread = Thread(target=run_simulator) bitcoind_thread = Thread(target=run_simulator, kwargs={"mode": "event"})
bitcoind_thread.daemon = True bitcoind_thread.daemon = True
bitcoind_thread.start() bitcoind_thread.start()
@@ -24,3 +25,10 @@ def run_api():
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail) # It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
sleep(0.1) sleep(0.1)
def generate_block():
requests.post(url="http://{}:{}/generate".format(HOST, PORT), timeout=5)
sleep(0.5)

View File

@@ -1,37 +1,42 @@
import os
import json import json
import pytest import pytest
import time
import requests import requests
from hashlib import sha256 from hashlib import sha256
from binascii import unhexlify from binascii import unhexlify
from apps.cli.blob import Blob from apps.cli.blob import Blob
from pisa import HOST, PORT, logging from pisa import HOST, PORT, logging
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.unit.conftest import generate_block
from pisa.utils.auth_proxy import AuthServiceProxy from pisa.utils.auth_proxy import AuthServiceProxy
from test.simulator.bitcoind_sim import TIME_BETWEEN_BLOCKS
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
logging.getLogger().disabled = True logging.getLogger().disabled = True
PISA_API = "http://{}:{}".format(HOST, PORT) PISA_API = "http://{}:{}".format(HOST, PORT)
MULTIPLE_APPOINTMENTS = 10 MULTIPLE_APPOINTMENTS = 10
appointments = [] appointments = []
locator_dispute_txid_map = {} locator_dispute_tx_map = {}
def generate_dummy_appointment(dispute_txid): def generate_dummy_appointment():
r = requests.get(url=PISA_API + '/get_block_count', timeout=5) r = requests.get(url=PISA_API + '/get_block_count', timeout=5)
current_height = r.json().get("block_count") current_height = r.json().get("block_count")
dummy_appointment_data = {"tx": os.urandom(32).hex(), "tx_id": dispute_txid, "start_time": current_height + 5, dispute_tx = TX.create_dummy_transaction()
dispute_txid = sha256d(dispute_tx)
justice_tx = TX.create_dummy_transaction(dispute_txid)
dummy_appointment_data = {"tx": justice_tx, "tx_id": dispute_txid, "start_time": current_height + 5,
"end_time": current_height + 30, "dispute_delta": 20} "end_time": current_height + 30, "dispute_delta": 20}
cipher = "AES-GCM-128" cipher = "AES-GCM-128"
hash_function = "SHA256" hash_function = "SHA256"
locator = sha256(unhexlify(dummy_appointment_data.get("tx_id"))).hexdigest() locator = sha256(unhexlify(dispute_txid)).hexdigest()
blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function) blob = Blob(dummy_appointment_data.get("tx"), cipher, hash_function)
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id"))) encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")))
@@ -41,22 +46,13 @@ def generate_dummy_appointment(dispute_txid):
"dispute_delta": dummy_appointment_data.get("dispute_delta"), "dispute_delta": dummy_appointment_data.get("dispute_delta"),
"encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function} "encrypted_blob": encrypted_blob, "cipher": cipher, "hash_function": hash_function}
return appointment return appointment, dispute_tx
@pytest.fixture @pytest.fixture
def new_appointment(dispute_txid=None): def new_appointment():
appointment = create_appointment(dispute_txid) appointment, dispute_tx = generate_dummy_appointment()
locator_dispute_tx_map[appointment["locator"]] = dispute_tx
return appointment
def create_appointment(dispute_txid=None):
if dispute_txid is None:
dispute_txid = os.urandom(32).hex()
appointment = generate_dummy_appointment(dispute_txid)
locator_dispute_txid_map[appointment["locator"]] = dispute_txid
return appointment return appointment
@@ -147,12 +143,12 @@ def test_get_all_appointments_responder():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
locators = [appointment["locator"] for appointment in appointments] locators = [appointment["locator"] for appointment in appointments]
for locator, dispute_txid in locator_dispute_txid_map.items(): for locator, dispute_tx in locator_dispute_tx_map.items():
if locator in locators: if locator in locators:
bitcoin_cli.sendrawtransaction(dispute_txid) bitcoin_cli.sendrawtransaction(dispute_tx)
# Wait a bit for them to get confirmed # Wait a bit for them to get confirmed
time.sleep(TIME_BETWEEN_BLOCKS) generate_block()
# Get all appointments # Get all appointments
r = requests.get(url=PISA_API + "/get_all_appointments") r = requests.get(url=PISA_API + "/get_all_appointments")

View File

@@ -1,11 +1,13 @@
import pytest import pytest
import logging import logging
from os import urandom from os import urandom
from time import sleep
from pisa.carrier import Carrier from pisa.carrier import Carrier
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.unit.conftest import generate_block
from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR
from test.simulator.bitcoind_sim import TIME_BETWEEN_BLOCKS
logging.getLogger().disabled = True logging.getLogger().disabled = True
@@ -24,23 +26,28 @@ def carrier():
def test_send_transaction(run_bitcoind, carrier): def test_send_transaction(run_bitcoind, carrier):
# We are mocking bitcoind and in our simulator txid == tx # We are mocking bitcoind and in our simulator txid == tx
tx = urandom(32).hex() tx = TX.create_dummy_transaction()
receipt = carrier.send_transaction(tx, tx) txid = sha256d(tx)
receipt = carrier.send_transaction(tx, txid)
assert(receipt.delivered is True) assert(receipt.delivered is True)
def test_send_double_spending_transaction(carrier): def test_send_double_spending_transaction(carrier):
# We can test what happens if the same transaction is sent twice # We can test what happens if the same transaction is sent twice
tx = urandom(32).hex() tx = TX.create_dummy_transaction()
receipt = carrier.send_transaction(tx, tx) txid = sha256d(tx)
sent_txs.append(tx)
receipt = carrier.send_transaction(tx, txid)
sent_txs.append(txid)
# Wait for a block to be mined # Wait for a block to be mined
sleep(2*TIME_BETWEEN_BLOCKS) for _ in range(2):
generate_block()
# Try to send it again # Try to send it again
receipt2 = carrier.send_transaction(tx, tx) receipt2 = carrier.send_transaction(tx, txid)
# The carrier should report delivered True for both, but in the second case the transaction was already delivered # The carrier should report delivered True for both, but in the second case the transaction was already delivered
# (either by himself or someone else) # (either by himself or someone else)
@@ -51,8 +58,9 @@ def test_send_double_spending_transaction(carrier):
def test_send_transaction_invalid_format(carrier): def test_send_transaction_invalid_format(carrier):
# Test sending a transaction that does not fits the format # Test sending a transaction that does not fits the format
tx = urandom(31).hex() tx = TX.create_dummy_transaction()
receipt = carrier.send_transaction(tx, tx) txid = sha256d(tx)
receipt = carrier.send_transaction(txid, txid)
assert (receipt.delivered is False and receipt.reason == RPC_DESERIALIZATION_ERROR) assert (receipt.delivered is False and receipt.reason == RPC_DESERIALIZATION_ERROR)

View File

@@ -1,5 +1,4 @@
from os import urandom from os import urandom
from cryptography.exceptions import InvalidTag
from pisa import logging from pisa import logging
from pisa.encrypted_blob import EncryptedBlob from pisa.encrypted_blob import EncryptedBlob

View File

@@ -1,4 +1,4 @@
from pisa import logging, bitcoin_cli from pisa import logging
from pisa.tools import check_txid_format from pisa.tools import check_txid_format
from pisa.tools import can_connect_to_bitcoind, in_correct_network from pisa.tools import can_connect_to_bitcoind, in_correct_network

View File

@@ -1,23 +1,28 @@
import pytest import pytest
import logging import logging
from os import urandom
from time import sleep
from uuid import uuid4 from uuid import uuid4
from hashlib import sha256 from hashlib import sha256
from threading import Thread from threading import Thread
from binascii import unhexlify
from queue import Queue, Empty from queue import Queue, Empty
from apps.cli.blob import Blob
from pisa.watcher import Watcher from pisa.watcher import Watcher
from pisa.responder import Responder from pisa.responder import Responder
from pisa.conf import MAX_APPOINTMENTS from pisa.conf import MAX_APPOINTMENTS
from pisa.appointment import Appointment from pisa.appointment import Appointment
from pisa.tools import check_txid_format from pisa.tools import check_txid_format
from test.simulator.utils import sha256d
from test.simulator.transaction import TX
from test.unit.conftest import generate_block
from pisa.utils.auth_proxy import AuthServiceProxy from pisa.utils.auth_proxy import AuthServiceProxy
from test.simulator.bitcoind_sim import TIME_BETWEEN_BLOCKS
from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT from pisa.conf import EXPIRY_DELTA, BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
logging.getLogger().disabled = True logging.getLogger().disabled = True
APPOINTMENTS = 5 APPOINTMENTS = 5
START_TIME_OFFSET = 1
END_TIME_OFFSET = 1
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@@ -25,37 +30,44 @@ def watcher():
return Watcher() return Watcher()
def create_appointment(locator=None): def generate_dummy_appointment():
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
if locator is None: dispute_tx = TX.create_dummy_transaction()
locator = urandom(32).hex() dispute_txid = sha256d(dispute_tx)
justice_tx = TX.create_dummy_transaction(dispute_txid)
start_time = bitcoin_cli.getblockcount() + 1 start_time = bitcoin_cli.getblockcount() + 1
end_time = start_time + 1 end_time = start_time + 1
dispute_delta = 20 dispute_delta = 20
encrypted_blob_data = urandom(100).hex()
cipher = "AES-GCM-128" cipher = "AES-GCM-128"
hash_function = "SHA256" hash_function = "SHA256"
return Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob_data, cipher, hash_function) locator = sha256(unhexlify(dispute_txid)).hexdigest()
blob = Blob(justice_tx, cipher, hash_function)
encrypted_blob = blob.encrypt(dispute_txid)
appointment = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
return appointment, dispute_tx
def create_appointments(n): def create_appointments(n):
locator_uuid_map = dict() locator_uuid_map = dict()
appointments = dict() appointments = dict()
txids = [] dispute_txs = []
for i in range(n): for i in range(n):
txid = urandom(32) appointment, dispute_tx = generate_dummy_appointment()
uuid = uuid4().hex uuid = uuid4().hex
locator = sha256(txid).hexdigest()
appointments[uuid] = create_appointment(locator) appointments[uuid] = appointment
locator_uuid_map[locator] = [uuid] locator_uuid_map[appointment.locator] = [uuid]
txids.append(txid.hex()) dispute_txs.append(dispute_tx)
return appointments, locator_uuid_map, txids return appointments, locator_uuid_map, dispute_txs
def test_init(watcher): def test_init(watcher):
@@ -69,13 +81,14 @@ def test_init(watcher):
def test_add_appointment(run_bitcoind, watcher): def test_add_appointment(run_bitcoind, watcher):
# The watcher automatically fire do_watch and do_subscribe on adding an appointment if it is asleep (initial state). # The watcher automatically fires do_watch and do_subscribe on adding an appointment if it is asleep (initial state)
# Avoid this by setting the state to awake. # Avoid this by setting the state to awake.
watcher.asleep = False watcher.asleep = False
# We should be able to add appointments up to the limit # We should be able to add appointments up to the limit
for _ in range(10): for _ in range(10):
added_appointment, sig = watcher.add_appointment(create_appointment()) appointment, dispute_tx = generate_dummy_appointment()
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True assert added_appointment is True
@@ -85,11 +98,13 @@ def test_add_too_many_appointments(watcher):
watcher.appointments = dict() watcher.appointments = dict()
for _ in range(MAX_APPOINTMENTS): for _ in range(MAX_APPOINTMENTS):
added_appointment, sig = watcher.add_appointment(create_appointment()) appointment, dispute_tx = generate_dummy_appointment()
added_appointment, sig = watcher.add_appointment(appointment)
assert added_appointment is True assert added_appointment is True
added_appointment, sig = watcher.add_appointment(create_appointment()) appointment, dispute_tx = generate_dummy_appointment()
added_appointment = watcher.add_appointment(appointment)
assert added_appointment is False assert added_appointment is False
@@ -102,7 +117,8 @@ def test_do_subscribe(watcher):
zmq_thread.start() zmq_thread.start()
try: try:
block_hash = watcher.block_queue.get(timeout=MAX_APPOINTMENTS) generate_block()
block_hash = watcher.block_queue.get()
assert check_txid_format(block_hash) assert check_txid_format(block_hash)
except Empty: except Empty:
@@ -113,25 +129,28 @@ def test_do_watch(watcher):
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
# We will wipe all the previous data and add 5 appointments # We will wipe all the previous data and add 5 appointments
watcher.appointments, watcher.locator_uuid_map, txids = create_appointments(APPOINTMENTS) watcher.appointments, watcher.locator_uuid_map, dispute_txs = create_appointments(APPOINTMENTS)
watch_thread = Thread(target=watcher.do_watch) watch_thread = Thread(target=watcher.do_watch)
watch_thread.daemon = True watch_thread.daemon = True
watch_thread.start() watch_thread.start()
# Broadcast the first two # Broadcast the first two
for txid in txids[:2]: for dispute_tx in dispute_txs[:2]:
bitcoin_cli.sendrawtransaction(txid) r = bitcoin_cli.sendrawtransaction(dispute_tx)
# After leaving some time for the block to be mined and processed, the number of appointments should have reduced # After leaving some time for the block to be mined and processed, the number of appointments should have reduced
# by two # by two
sleep(TIME_BETWEEN_BLOCKS*2) for _ in range(START_TIME_OFFSET + END_TIME_OFFSET):
generate_block()
assert len(watcher.appointments) == APPOINTMENTS - 2 assert len(watcher.appointments) == APPOINTMENTS - 2
# The rest of appointments will timeout after the end (2) + EXPIRY_DELTA # The rest of appointments will timeout after the end (2) + EXPIRY_DELTA
# Wait for an additional block to be safe # Wait for an additional block to be safe
sleep((EXPIRY_DELTA + 2 + 1) * TIME_BETWEEN_BLOCKS) for _ in range(EXPIRY_DELTA + START_TIME_OFFSET + END_TIME_OFFSET):
generate_block()
assert len(watcher.appointments) == 0 assert len(watcher.appointments) == 0
assert watcher.asleep is True assert watcher.asleep is True