diff --git a/pisa/pisad.py b/pisa/pisad.py index 3bbf476..307d4b3 100644 --- a/pisa/pisad.py +++ b/pisa/pisad.py @@ -29,13 +29,13 @@ if __name__ == '__main__': # FIXME: Leaving this here for future option/arguments pass - if can_connect_to_bitcoind(): - if in_correct_network(BTC_NETWORK): - # Fire the api - start_api() + if not can_connect_to_bitcoind(): + logger.error("Can't connect to bitcoind. Shutting down") - else: - logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down") + elif not in_correct_network(BTC_NETWORK): + logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down") else: - logger.error("Can't connect to bitcoind. Shutting down") + # Fire the api + start_api() + diff --git a/test/simulator/bitcoin_sim_tests.py b/test/simulator/bitcoin_sim_tests.py index f0eafe5..37717cb 100644 --- a/test/simulator/bitcoin_sim_tests.py +++ b/test/simulator/bitcoin_sim_tests.py @@ -1,15 +1,17 @@ import re -import os import pytest from time import sleep from threading import Thread from test.simulator.transaction import TX +from test.unit.conftest import get_random_value_hex from test.simulator.bitcoind_sim import run_simulator from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT -MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, os.urandom(32).hex()] +MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, get_random_value_hex(32)] + +bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) @pytest.fixture(scope='module') @@ -32,9 +34,6 @@ def check_hash_format(txid): return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None -bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT)) - - def test_help(run_bitcoind): # Help should always return 0 assert(bitcoin_cli.help() == 0) diff --git a/test/unit/conftest.py b/test/unit/conftest.py index e337de9..5fd697c 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -1,4 +1,5 @@ import pytest +import random import requests from time import sleep from threading import Thread @@ -27,6 +28,17 @@ def run_api(): sleep(0.1) +@pytest.fixture(scope='session', autouse=True) +def prng_seed(): + random.seed(0) + + +def get_random_value_hex(nbytes): + pseudo_random_value = random.getrandbits(8*nbytes) + prv_hex = '{:x}'.format(pseudo_random_value) + return prv_hex.zfill(2*nbytes) + + def generate_block(): requests.post(url="http://{}:{}/generate".format(HOST, PORT), timeout=5) sleep(0.5) diff --git a/test/unit/test_api.py b/test/unit/test_api.py index 63b2967..242430e 100644 --- a/test/unit/test_api.py +++ b/test/unit/test_api.py @@ -1,7 +1,6 @@ import json import pytest import requests -from os import urandom from hashlib import sha256 from binascii import unhexlify @@ -9,8 +8,8 @@ from apps.cli.blob import Blob from pisa import HOST, PORT, logging from test.simulator.utils import sha256d from test.simulator.transaction import TX -from test.unit.conftest import generate_block from pisa.utils.auth_proxy import AuthServiceProxy +from test.unit.conftest import generate_block, get_random_value_hex from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS logging.getLogger().disabled = True @@ -101,7 +100,7 @@ def test_request_appointment(new_appointment): def test_request_random_appointment(): - r = requests.get(url=PISA_API + "/get_appointment?locator=" + urandom(32).hex()) + r = requests.get(url=PISA_API + "/get_appointment?locator=" + get_random_value_hex(32)) assert (r.status_code == 200) received_appointments = json.loads(r.content) diff --git a/test/unit/test_appointment.py b/test/unit/test_appointment.py index 0caf459..b6ed6b1 100644 --- a/test/unit/test_appointment.py +++ b/test/unit/test_appointment.py @@ -1,19 +1,19 @@ -from os import urandom from pytest import fixture from pisa.appointment import Appointment from pisa.encrypted_blob import EncryptedBlob +from test.unit.conftest import get_random_value_hex # Not much to test here, adding it for completeness @fixture def appointment_data(): - locator = urandom(32).hex() + locator = get_random_value_hex(32) start_time = 100 end_time = 120 dispute_delta = 20 - encrypted_blob_data = urandom(100).hex() + encrypted_blob_data = get_random_value_hex(100) cipher = "AES-GCM-128" hash_function = "SHA256" diff --git a/test/unit/test_blob.py b/test/unit/test_blob.py index de9403f..9bdd7d3 100644 --- a/test/unit/test_blob.py +++ b/test/unit/test_blob.py @@ -1,14 +1,15 @@ -from os import urandom +from binascii import unhexlify from pisa import logging from apps.cli.blob import Blob +from test.unit.conftest import get_random_value_hex from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS logging.getLogger().disabled = True def test_init_blob(): - data = urandom(64).hex() + data = get_random_value_hex(64) # Fixed (valid) hash function, try different valid ciphers hash_function = SUPPORTED_HASH_FUNCTIONS[0] @@ -29,7 +30,7 @@ def test_init_blob(): assert(blob.data == data and blob.cipher == cipher and blob.hash_function == case) # Invalid data - data = urandom(64) + data = unhexlify(get_random_value_hex(64)) cipher = SUPPORTED_CIPHERS[0] hash_function = SUPPORTED_HASH_FUNCTIONS[0] @@ -41,7 +42,7 @@ def test_init_blob(): assert True # Invalid cipher - data = urandom(64).hex() + data = get_random_value_hex(64) cipher = "A" * 10 hash_function = SUPPORTED_HASH_FUNCTIONS[0] @@ -53,7 +54,7 @@ def test_init_blob(): assert True # Invalid hash function - data = urandom(64).hex() + data = get_random_value_hex(64) cipher = SUPPORTED_CIPHERS[0] hash_function = "A" * 10 @@ -67,14 +68,14 @@ def test_init_blob(): def test_encrypt(): # Valid data, valid key - data = urandom(64).hex() + data = get_random_value_hex(64) blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0]) - key = urandom(32).hex() + key = get_random_value_hex(32) encrypted_blob = blob.encrypt(key) # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created) - invalid_key = urandom(32) + invalid_key = unhexlify(get_random_value_hex(32)) try: blob.encrypt(invalid_key) diff --git a/test/unit/test_block_processor.py b/test/unit/test_block_processor.py index 5183842..d7d8e72 100644 --- a/test/unit/test_block_processor.py +++ b/test/unit/test_block_processor.py @@ -1,11 +1,11 @@ import pytest import logging -from os import urandom from uuid import uuid4 from hashlib import sha256 from binascii import unhexlify from pisa.block_processor import BlockProcessor +from test.unit.conftest import get_random_value_hex logging.getLogger().disabled = True @@ -15,7 +15,7 @@ TEST_SET_SIZE = 200 @pytest.fixture(scope='module') def txids(): - return [urandom(32).hex() for _ in range(APPOINTMENT_COUNT)] + return [get_random_value_hex(32) for _ in range(APPOINTMENT_COUNT)] @pytest.fixture(scope='module') @@ -44,7 +44,7 @@ def test_get_block(best_block_hash): def test_get_random_block(): - block = BlockProcessor.get_block(urandom(32).hex()) + block = BlockProcessor.get_block(get_random_value_hex(32)) assert block is None @@ -62,7 +62,7 @@ def test_potential_matches(txids, locator_uuid_map): def test_potential_matches_random(locator_uuid_map): - txids = [urandom(32).hex() for _ in range(len(locator_uuid_map))] + txids = [get_random_value_hex(32) for _ in range(len(locator_uuid_map))] potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map) @@ -72,7 +72,7 @@ def test_potential_matches_random(locator_uuid_map): def test_potential_matches_random_data(locator_uuid_map): # The likelihood of finding a potential match with random data should be negligible - txids = [urandom(32).hex() for _ in range(TEST_SET_SIZE)] + txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)] potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map) diff --git a/test/unit/test_carrier.py b/test/unit/test_carrier.py index 165d417..595dc0c 100644 --- a/test/unit/test_carrier.py +++ b/test/unit/test_carrier.py @@ -1,12 +1,11 @@ import pytest import logging -from os import urandom - from pisa.carrier import Carrier from test.simulator.utils import sha256d from test.simulator.transaction import TX from test.unit.conftest import generate_blocks +from test.unit.conftest import get_random_value_hex from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR logging.getLogger().disabled = True @@ -72,7 +71,7 @@ def test_get_transaction(): def test_get_non_existing_transaction(): - tx_info = Carrier.get_transaction(urandom(32).hex()) + tx_info = Carrier.get_transaction(get_random_value_hex(32)) assert tx_info is None diff --git a/test/unit/test_cleaner.py b/test/unit/test_cleaner.py index 0fdf5df..92c2a35 100644 --- a/test/unit/test_cleaner.py +++ b/test/unit/test_cleaner.py @@ -6,6 +6,7 @@ from pisa import logging from pisa.responder import Job from pisa.cleaner import Cleaner from pisa.appointment import Appointment +from test.unit.conftest import get_random_value_hex CONFIRMATIONS = 6 ITEMS = 10 @@ -21,7 +22,7 @@ def set_up_appointments(total_appointments): for _ in range(total_appointments): uuid = uuid4().hex - locator = urandom(32).hex() + locator = get_random_value_hex(32) appointments[uuid] = Appointment(locator, None, None, None, None, None, None) locator_uuid_map[locator] = [uuid] @@ -42,7 +43,7 @@ def set_up_jobs(total_jobs): for _ in range(total_jobs): uuid = uuid4().hex - txid = urandom(32).hex() + txid = get_random_value_hex(32) # Assign both justice_txid and dispute_txid the same id (it shouldn't matter) jobs[uuid] = Job(txid, txid, None, None) diff --git a/test/unit/test_encrypted_blob.py b/test/unit/test_encrypted_blob.py index 25dc78f..f05422d 100644 --- a/test/unit/test_encrypted_blob.py +++ b/test/unit/test_encrypted_blob.py @@ -1,21 +1,20 @@ -from os import urandom - from pisa import logging from pisa.encrypted_blob import EncryptedBlob +from test.unit.conftest import get_random_value_hex logging.getLogger().disabled = True def test_init_encrypted_blob(): # No much to test here, basically that the object is properly created - data = urandom(64).hex() + data = get_random_value_hex(64) assert (EncryptedBlob(data).data == data) def test_decrypt(): # TODO: The decryption tests are assuming the cipher is AES-GCM-128, since EncryptedBlob assumes the same. Fix this. - key = urandom(32).hex() - encrypted_data = urandom(64).hex() + key = get_random_value_hex(32) + encrypted_data = get_random_value_hex(64) encrypted_blob = EncryptedBlob(encrypted_data) # Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function diff --git a/test/unit/test_inspector.py b/test/unit/test_inspector.py index bed6a9a..b393614 100644 --- a/test/unit/test_inspector.py +++ b/test/unit/test_inspector.py @@ -1,33 +1,34 @@ -from os import urandom +from binascii import unhexlify from pisa import logging from pisa.errors import * from pisa.inspector import Inspector from pisa.appointment import Appointment from pisa.block_processor import BlockProcessor +from test.unit.conftest import get_random_value_hex from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS inspector = Inspector() APPOINTMENT_OK = (0, None) -NO_HEX_STINGS = ["R" * 64, urandom(31).hex() + "PP", "$"*64, " "*64] -WRONG_TYPES = [[], '', urandom(32).hex(), 3.2, 2.0, (), object, {}, " "*32, object()] -WRONG_TYPES_NO_STR = [[], urandom(32), 3.2, 2.0, (), object, {}, object()] +NO_HEX_STRINGS = ["R" * 64, get_random_value_hex(31) + "PP", "$"*64, " "*64] +WRONG_TYPES = [[], '', get_random_value_hex(32), 3.2, 2.0, (), object, {}, " "*32, object()] +WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()] logging.getLogger().disabled = True def test_check_locator(): # Right appointment type, size and format - locator = urandom(32).hex() + locator = get_random_value_hex(32) assert(Inspector.check_locator(locator) == APPOINTMENT_OK) # Wrong size (too big) - locator = urandom(33).hex() + locator = get_random_value_hex(33) assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE) # Wrong size (too small) - locator = urandom(31).hex() + locator = get_random_value_hex(31) assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE) # Empty @@ -41,7 +42,7 @@ def test_check_locator(): assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_TYPE) # Wrong format (no hex) - locators = NO_HEX_STINGS + locators = NO_HEX_STRINGS for locator in locators: assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_FORMAT) @@ -122,7 +123,7 @@ def test_check_delta(): def test_check_blob(): # Right format and length - encrypted_blob = urandom(120).hex() + encrypted_blob = get_random_value_hex(120) assert(Inspector.check_blob(encrypted_blob) == APPOINTMENT_OK) # # Wrong content @@ -139,7 +140,7 @@ def test_check_blob(): assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_EMPTY_FIELD) # Wrong format (no hex) - encrypted_blobs = NO_HEX_STINGS + encrypted_blobs = NO_HEX_STRINGS for encrypted_blob in encrypted_blobs: assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT) @@ -157,7 +158,7 @@ def test_check_cipher(): assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE) # Wrong value - ciphers = NO_HEX_STINGS + ciphers = NO_HEX_STRINGS for cipher in ciphers: assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED) @@ -179,7 +180,7 @@ def test_check_hash_function(): assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE) # Wrong value - hash_functions = NO_HEX_STINGS + hash_functions = NO_HEX_STRINGS for hash_function in hash_functions: assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED) @@ -198,11 +199,11 @@ def test_inspect(run_bitcoind): assert (type(appointment) == tuple and appointment[0] != 0) # Valid appointment - locator = urandom(32).hex() + locator = get_random_value_hex(32) start_time = BlockProcessor.get_block_count() + 5 end_time = start_time + 20 dispute_delta = MIN_DISPUTE_DELTA - encrypted_blob = urandom(64).hex() + encrypted_blob = get_random_value_hex(64) cipher = SUPPORTED_CIPHERS[0] hash_function = SUPPORTED_HASH_FUNCTIONS[0] diff --git a/test/unit/test_responder.py b/test/unit/test_responder.py index 72e617e..af1fefd 100644 --- a/test/unit/test_responder.py +++ b/test/unit/test_responder.py @@ -1,6 +1,5 @@ import json import pytest -from os import urandom from uuid import uuid4 from threading import Thread from queue import Queue, Empty @@ -9,8 +8,9 @@ from pisa.tools import check_txid_format from test.simulator.utils import sha256d from pisa.responder import Responder, Job from test.simulator.bitcoind_sim import TX -from test.unit.conftest import generate_block, generate_blocks from pisa.utils.auth_proxy import AuthServiceProxy +from test.unit.conftest import get_random_value_hex +from test.unit.conftest import generate_block, generate_blocks from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT @@ -40,7 +40,7 @@ def create_dummy_job_data(random_txid=False, justice_rawtx=None): justice_txid = sha256d(justice_rawtx) if random_txid is True: - justice_txid = urandom(32).hex() + justice_txid = get_random_value_hex(32) appointment_end = bitcoin_cli.getblockcount() + 2 @@ -214,10 +214,10 @@ def test_do_watch(responder): def test_get_txs_to_rebroadcast(responder): # Let's create a few fake txids and assign at least 6 missing confirmations to each - txs_missing_too_many_conf = {urandom(32).hex(): 6+i for i in range(10)} + txs_missing_too_many_conf = {get_random_value_hex(32): 6+i for i in range(10)} # Let's create some other transaction that has missed some confirmations but not that many - txs_missing_some_conf = {urandom(32).hex(): 3 for _ in range(10)} + txs_missing_some_conf = {get_random_value_hex(32): 3 for _ in range(10)} # All the txs in the first dict should be flagged as to_rebroadcast responder.missed_confirmations = txs_missing_too_many_conf