mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Apply requested changes
- Adds a PRG based on a hardcoded seed to make the tests reproducible (get_random_value_hex) - Updates all the tests replacing urandom for get_random_value_hex - Properly places misplaced bitcoin_cli in bitcoin_sim_tests - Typos
This commit is contained in:
@@ -29,13 +29,13 @@ if __name__ == '__main__':
|
|||||||
# FIXME: Leaving this here for future option/arguments
|
# FIXME: Leaving this here for future option/arguments
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if can_connect_to_bitcoind():
|
if not can_connect_to_bitcoind():
|
||||||
if in_correct_network(BTC_NETWORK):
|
logger.error("Can't connect to bitcoind. Shutting down")
|
||||||
# Fire the api
|
|
||||||
start_api()
|
|
||||||
|
|
||||||
else:
|
elif not in_correct_network(BTC_NETWORK):
|
||||||
logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down")
|
logger.error("bitcoind is running on a different network, check conf.py and bitcoin.conf. Shutting down")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.error("Can't connect to bitcoind. Shutting down")
|
# Fire the api
|
||||||
|
start_api()
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,17 @@
|
|||||||
import re
|
import re
|
||||||
import os
|
|
||||||
import pytest
|
import pytest
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from test.simulator.transaction import TX
|
from test.simulator.transaction import TX
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
from test.simulator.bitcoind_sim import run_simulator
|
from test.simulator.bitcoind_sim import run_simulator
|
||||||
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
|
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
|
||||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
||||||
|
|
||||||
MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, os.urandom(32).hex()]
|
MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, get_random_value_hex(32)]
|
||||||
|
|
||||||
|
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
@@ -32,9 +34,6 @@ def check_hash_format(txid):
|
|||||||
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None
|
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None
|
||||||
|
|
||||||
|
|
||||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
|
|
||||||
|
|
||||||
|
|
||||||
def test_help(run_bitcoind):
|
def test_help(run_bitcoind):
|
||||||
# Help should always return 0
|
# Help should always return 0
|
||||||
assert(bitcoin_cli.help() == 0)
|
assert(bitcoin_cli.help() == 0)
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
import random
|
||||||
import requests
|
import requests
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
@@ -27,6 +28,17 @@ def run_api():
|
|||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='session', autouse=True)
|
||||||
|
def prng_seed():
|
||||||
|
random.seed(0)
|
||||||
|
|
||||||
|
|
||||||
|
def get_random_value_hex(nbytes):
|
||||||
|
pseudo_random_value = random.getrandbits(8*nbytes)
|
||||||
|
prv_hex = '{:x}'.format(pseudo_random_value)
|
||||||
|
return prv_hex.zfill(2*nbytes)
|
||||||
|
|
||||||
|
|
||||||
def generate_block():
|
def generate_block():
|
||||||
requests.post(url="http://{}:{}/generate".format(HOST, PORT), timeout=5)
|
requests.post(url="http://{}:{}/generate".format(HOST, PORT), timeout=5)
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
from os import urandom
|
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
|
|
||||||
@@ -9,8 +8,8 @@ from apps.cli.blob import Blob
|
|||||||
from pisa import HOST, PORT, logging
|
from pisa import HOST, PORT, logging
|
||||||
from test.simulator.utils import sha256d
|
from test.simulator.utils import sha256d
|
||||||
from test.simulator.transaction import TX
|
from test.simulator.transaction import TX
|
||||||
from test.unit.conftest import generate_block
|
|
||||||
from pisa.utils.auth_proxy import AuthServiceProxy
|
from pisa.utils.auth_proxy import AuthServiceProxy
|
||||||
|
from test.unit.conftest import generate_block, get_random_value_hex
|
||||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
|
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, MAX_APPOINTMENTS
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
@@ -101,7 +100,7 @@ def test_request_appointment(new_appointment):
|
|||||||
|
|
||||||
|
|
||||||
def test_request_random_appointment():
|
def test_request_random_appointment():
|
||||||
r = requests.get(url=PISA_API + "/get_appointment?locator=" + urandom(32).hex())
|
r = requests.get(url=PISA_API + "/get_appointment?locator=" + get_random_value_hex(32))
|
||||||
assert (r.status_code == 200)
|
assert (r.status_code == 200)
|
||||||
|
|
||||||
received_appointments = json.loads(r.content)
|
received_appointments = json.loads(r.content)
|
||||||
|
|||||||
@@ -1,19 +1,19 @@
|
|||||||
from os import urandom
|
|
||||||
from pytest import fixture
|
from pytest import fixture
|
||||||
|
|
||||||
from pisa.appointment import Appointment
|
from pisa.appointment import Appointment
|
||||||
from pisa.encrypted_blob import EncryptedBlob
|
from pisa.encrypted_blob import EncryptedBlob
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
|
|
||||||
# Not much to test here, adding it for completeness
|
# Not much to test here, adding it for completeness
|
||||||
|
|
||||||
@fixture
|
@fixture
|
||||||
def appointment_data():
|
def appointment_data():
|
||||||
locator = urandom(32).hex()
|
locator = get_random_value_hex(32)
|
||||||
start_time = 100
|
start_time = 100
|
||||||
end_time = 120
|
end_time = 120
|
||||||
dispute_delta = 20
|
dispute_delta = 20
|
||||||
encrypted_blob_data = urandom(100).hex()
|
encrypted_blob_data = get_random_value_hex(100)
|
||||||
cipher = "AES-GCM-128"
|
cipher = "AES-GCM-128"
|
||||||
hash_function = "SHA256"
|
hash_function = "SHA256"
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,15 @@
|
|||||||
from os import urandom
|
from binascii import unhexlify
|
||||||
|
|
||||||
from pisa import logging
|
from pisa import logging
|
||||||
from apps.cli.blob import Blob
|
from apps.cli.blob import Blob
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
from pisa.conf import SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
|
|
||||||
|
|
||||||
def test_init_blob():
|
def test_init_blob():
|
||||||
data = urandom(64).hex()
|
data = get_random_value_hex(64)
|
||||||
|
|
||||||
# Fixed (valid) hash function, try different valid ciphers
|
# Fixed (valid) hash function, try different valid ciphers
|
||||||
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
||||||
@@ -29,7 +30,7 @@ def test_init_blob():
|
|||||||
assert(blob.data == data and blob.cipher == cipher and blob.hash_function == case)
|
assert(blob.data == data and blob.cipher == cipher and blob.hash_function == case)
|
||||||
|
|
||||||
# Invalid data
|
# Invalid data
|
||||||
data = urandom(64)
|
data = unhexlify(get_random_value_hex(64))
|
||||||
cipher = SUPPORTED_CIPHERS[0]
|
cipher = SUPPORTED_CIPHERS[0]
|
||||||
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
||||||
|
|
||||||
@@ -41,7 +42,7 @@ def test_init_blob():
|
|||||||
assert True
|
assert True
|
||||||
|
|
||||||
# Invalid cipher
|
# Invalid cipher
|
||||||
data = urandom(64).hex()
|
data = get_random_value_hex(64)
|
||||||
cipher = "A" * 10
|
cipher = "A" * 10
|
||||||
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
||||||
|
|
||||||
@@ -53,7 +54,7 @@ def test_init_blob():
|
|||||||
assert True
|
assert True
|
||||||
|
|
||||||
# Invalid hash function
|
# Invalid hash function
|
||||||
data = urandom(64).hex()
|
data = get_random_value_hex(64)
|
||||||
cipher = SUPPORTED_CIPHERS[0]
|
cipher = SUPPORTED_CIPHERS[0]
|
||||||
hash_function = "A" * 10
|
hash_function = "A" * 10
|
||||||
|
|
||||||
@@ -67,14 +68,14 @@ def test_init_blob():
|
|||||||
|
|
||||||
def test_encrypt():
|
def test_encrypt():
|
||||||
# Valid data, valid key
|
# Valid data, valid key
|
||||||
data = urandom(64).hex()
|
data = get_random_value_hex(64)
|
||||||
blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
|
blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
|
||||||
key = urandom(32).hex()
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
encrypted_blob = blob.encrypt(key)
|
encrypted_blob = blob.encrypt(key)
|
||||||
|
|
||||||
# Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
|
# Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
|
||||||
invalid_key = urandom(32)
|
invalid_key = unhexlify(get_random_value_hex(32))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
blob.encrypt(invalid_key)
|
blob.encrypt(invalid_key)
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
from os import urandom
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
|
|
||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
|
|
||||||
@@ -15,7 +15,7 @@ TEST_SET_SIZE = 200
|
|||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
def txids():
|
def txids():
|
||||||
return [urandom(32).hex() for _ in range(APPOINTMENT_COUNT)]
|
return [get_random_value_hex(32) for _ in range(APPOINTMENT_COUNT)]
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
@@ -44,7 +44,7 @@ def test_get_block(best_block_hash):
|
|||||||
|
|
||||||
|
|
||||||
def test_get_random_block():
|
def test_get_random_block():
|
||||||
block = BlockProcessor.get_block(urandom(32).hex())
|
block = BlockProcessor.get_block(get_random_value_hex(32))
|
||||||
|
|
||||||
assert block is None
|
assert block is None
|
||||||
|
|
||||||
@@ -62,7 +62,7 @@ def test_potential_matches(txids, locator_uuid_map):
|
|||||||
|
|
||||||
|
|
||||||
def test_potential_matches_random(locator_uuid_map):
|
def test_potential_matches_random(locator_uuid_map):
|
||||||
txids = [urandom(32).hex() for _ in range(len(locator_uuid_map))]
|
txids = [get_random_value_hex(32) for _ in range(len(locator_uuid_map))]
|
||||||
|
|
||||||
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
|
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
|
||||||
|
|
||||||
@@ -72,7 +72,7 @@ def test_potential_matches_random(locator_uuid_map):
|
|||||||
|
|
||||||
def test_potential_matches_random_data(locator_uuid_map):
|
def test_potential_matches_random_data(locator_uuid_map):
|
||||||
# The likelihood of finding a potential match with random data should be negligible
|
# The likelihood of finding a potential match with random data should be negligible
|
||||||
txids = [urandom(32).hex() for _ in range(TEST_SET_SIZE)]
|
txids = [get_random_value_hex(32) for _ in range(TEST_SET_SIZE)]
|
||||||
|
|
||||||
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
|
potential_matches = BlockProcessor.get_potential_matches(txids, locator_uuid_map)
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,11 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import logging
|
import logging
|
||||||
from os import urandom
|
|
||||||
|
|
||||||
|
|
||||||
from pisa.carrier import Carrier
|
from pisa.carrier import Carrier
|
||||||
from test.simulator.utils import sha256d
|
from test.simulator.utils import sha256d
|
||||||
from test.simulator.transaction import TX
|
from test.simulator.transaction import TX
|
||||||
from test.unit.conftest import generate_blocks
|
from test.unit.conftest import generate_blocks
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR
|
from pisa.rpc_errors import RPC_VERIFY_ALREADY_IN_CHAIN, RPC_DESERIALIZATION_ERROR
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
@@ -72,7 +71,7 @@ def test_get_transaction():
|
|||||||
|
|
||||||
|
|
||||||
def test_get_non_existing_transaction():
|
def test_get_non_existing_transaction():
|
||||||
tx_info = Carrier.get_transaction(urandom(32).hex())
|
tx_info = Carrier.get_transaction(get_random_value_hex(32))
|
||||||
|
|
||||||
assert tx_info is None
|
assert tx_info is None
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from pisa import logging
|
|||||||
from pisa.responder import Job
|
from pisa.responder import Job
|
||||||
from pisa.cleaner import Cleaner
|
from pisa.cleaner import Cleaner
|
||||||
from pisa.appointment import Appointment
|
from pisa.appointment import Appointment
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
CONFIRMATIONS = 6
|
CONFIRMATIONS = 6
|
||||||
ITEMS = 10
|
ITEMS = 10
|
||||||
@@ -21,7 +22,7 @@ def set_up_appointments(total_appointments):
|
|||||||
|
|
||||||
for _ in range(total_appointments):
|
for _ in range(total_appointments):
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
locator = urandom(32).hex()
|
locator = get_random_value_hex(32)
|
||||||
|
|
||||||
appointments[uuid] = Appointment(locator, None, None, None, None, None, None)
|
appointments[uuid] = Appointment(locator, None, None, None, None, None, None)
|
||||||
locator_uuid_map[locator] = [uuid]
|
locator_uuid_map[locator] = [uuid]
|
||||||
@@ -42,7 +43,7 @@ def set_up_jobs(total_jobs):
|
|||||||
|
|
||||||
for _ in range(total_jobs):
|
for _ in range(total_jobs):
|
||||||
uuid = uuid4().hex
|
uuid = uuid4().hex
|
||||||
txid = urandom(32).hex()
|
txid = get_random_value_hex(32)
|
||||||
|
|
||||||
# Assign both justice_txid and dispute_txid the same id (it shouldn't matter)
|
# Assign both justice_txid and dispute_txid the same id (it shouldn't matter)
|
||||||
jobs[uuid] = Job(txid, txid, None, None)
|
jobs[uuid] = Job(txid, txid, None, None)
|
||||||
|
|||||||
@@ -1,21 +1,20 @@
|
|||||||
from os import urandom
|
|
||||||
|
|
||||||
from pisa import logging
|
from pisa import logging
|
||||||
from pisa.encrypted_blob import EncryptedBlob
|
from pisa.encrypted_blob import EncryptedBlob
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
|
|
||||||
|
|
||||||
def test_init_encrypted_blob():
|
def test_init_encrypted_blob():
|
||||||
# No much to test here, basically that the object is properly created
|
# No much to test here, basically that the object is properly created
|
||||||
data = urandom(64).hex()
|
data = get_random_value_hex(64)
|
||||||
assert (EncryptedBlob(data).data == data)
|
assert (EncryptedBlob(data).data == data)
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt():
|
def test_decrypt():
|
||||||
# TODO: The decryption tests are assuming the cipher is AES-GCM-128, since EncryptedBlob assumes the same. Fix this.
|
# TODO: The decryption tests are assuming the cipher is AES-GCM-128, since EncryptedBlob assumes the same. Fix this.
|
||||||
key = urandom(32).hex()
|
key = get_random_value_hex(32)
|
||||||
encrypted_data = urandom(64).hex()
|
encrypted_data = get_random_value_hex(64)
|
||||||
encrypted_blob = EncryptedBlob(encrypted_data)
|
encrypted_blob = EncryptedBlob(encrypted_data)
|
||||||
|
|
||||||
# Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function
|
# Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function
|
||||||
|
|||||||
@@ -1,33 +1,34 @@
|
|||||||
from os import urandom
|
from binascii import unhexlify
|
||||||
|
|
||||||
from pisa import logging
|
from pisa import logging
|
||||||
from pisa.errors import *
|
from pisa.errors import *
|
||||||
from pisa.inspector import Inspector
|
from pisa.inspector import Inspector
|
||||||
from pisa.appointment import Appointment
|
from pisa.appointment import Appointment
|
||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
from pisa.conf import MIN_DISPUTE_DELTA, SUPPORTED_CIPHERS, SUPPORTED_HASH_FUNCTIONS
|
||||||
|
|
||||||
inspector = Inspector()
|
inspector = Inspector()
|
||||||
APPOINTMENT_OK = (0, None)
|
APPOINTMENT_OK = (0, None)
|
||||||
|
|
||||||
NO_HEX_STINGS = ["R" * 64, urandom(31).hex() + "PP", "$"*64, " "*64]
|
NO_HEX_STRINGS = ["R" * 64, get_random_value_hex(31) + "PP", "$"*64, " "*64]
|
||||||
WRONG_TYPES = [[], '', urandom(32).hex(), 3.2, 2.0, (), object, {}, " "*32, object()]
|
WRONG_TYPES = [[], '', get_random_value_hex(32), 3.2, 2.0, (), object, {}, " "*32, object()]
|
||||||
WRONG_TYPES_NO_STR = [[], urandom(32), 3.2, 2.0, (), object, {}, object()]
|
WRONG_TYPES_NO_STR = [[], unhexlify(get_random_value_hex(32)), 3.2, 2.0, (), object, {}, object()]
|
||||||
|
|
||||||
logging.getLogger().disabled = True
|
logging.getLogger().disabled = True
|
||||||
|
|
||||||
|
|
||||||
def test_check_locator():
|
def test_check_locator():
|
||||||
# Right appointment type, size and format
|
# Right appointment type, size and format
|
||||||
locator = urandom(32).hex()
|
locator = get_random_value_hex(32)
|
||||||
assert(Inspector.check_locator(locator) == APPOINTMENT_OK)
|
assert(Inspector.check_locator(locator) == APPOINTMENT_OK)
|
||||||
|
|
||||||
# Wrong size (too big)
|
# Wrong size (too big)
|
||||||
locator = urandom(33).hex()
|
locator = get_random_value_hex(33)
|
||||||
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
|
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
|
||||||
|
|
||||||
# Wrong size (too small)
|
# Wrong size (too small)
|
||||||
locator = urandom(31).hex()
|
locator = get_random_value_hex(31)
|
||||||
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
|
assert(Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_SIZE)
|
||||||
|
|
||||||
# Empty
|
# Empty
|
||||||
@@ -41,7 +42,7 @@ def test_check_locator():
|
|||||||
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
||||||
|
|
||||||
# Wrong format (no hex)
|
# Wrong format (no hex)
|
||||||
locators = NO_HEX_STINGS
|
locators = NO_HEX_STRINGS
|
||||||
for locator in locators:
|
for locator in locators:
|
||||||
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
|
assert (Inspector.check_locator(locator)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
|
||||||
|
|
||||||
@@ -122,7 +123,7 @@ def test_check_delta():
|
|||||||
|
|
||||||
def test_check_blob():
|
def test_check_blob():
|
||||||
# Right format and length
|
# Right format and length
|
||||||
encrypted_blob = urandom(120).hex()
|
encrypted_blob = get_random_value_hex(120)
|
||||||
assert(Inspector.check_blob(encrypted_blob) == APPOINTMENT_OK)
|
assert(Inspector.check_blob(encrypted_blob) == APPOINTMENT_OK)
|
||||||
|
|
||||||
# # Wrong content
|
# # Wrong content
|
||||||
@@ -139,7 +140,7 @@ def test_check_blob():
|
|||||||
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_EMPTY_FIELD)
|
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_EMPTY_FIELD)
|
||||||
|
|
||||||
# Wrong format (no hex)
|
# Wrong format (no hex)
|
||||||
encrypted_blobs = NO_HEX_STINGS
|
encrypted_blobs = NO_HEX_STRINGS
|
||||||
for encrypted_blob in encrypted_blobs:
|
for encrypted_blob in encrypted_blobs:
|
||||||
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
|
assert (Inspector.check_blob(encrypted_blob)[0] == APPOINTMENT_WRONG_FIELD_FORMAT)
|
||||||
|
|
||||||
@@ -157,7 +158,7 @@ def test_check_cipher():
|
|||||||
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
||||||
|
|
||||||
# Wrong value
|
# Wrong value
|
||||||
ciphers = NO_HEX_STINGS
|
ciphers = NO_HEX_STRINGS
|
||||||
for cipher in ciphers:
|
for cipher in ciphers:
|
||||||
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED)
|
assert(Inspector.check_cipher(cipher)[0] == APPOINTMENT_CIPHER_NOT_SUPPORTED)
|
||||||
|
|
||||||
@@ -179,7 +180,7 @@ def test_check_hash_function():
|
|||||||
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_WRONG_FIELD_TYPE)
|
||||||
|
|
||||||
# Wrong value
|
# Wrong value
|
||||||
hash_functions = NO_HEX_STINGS
|
hash_functions = NO_HEX_STRINGS
|
||||||
for hash_function in hash_functions:
|
for hash_function in hash_functions:
|
||||||
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED)
|
assert (Inspector.check_hash_function(hash_function)[0] == APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED)
|
||||||
|
|
||||||
@@ -198,11 +199,11 @@ def test_inspect(run_bitcoind):
|
|||||||
assert (type(appointment) == tuple and appointment[0] != 0)
|
assert (type(appointment) == tuple and appointment[0] != 0)
|
||||||
|
|
||||||
# Valid appointment
|
# Valid appointment
|
||||||
locator = urandom(32).hex()
|
locator = get_random_value_hex(32)
|
||||||
start_time = BlockProcessor.get_block_count() + 5
|
start_time = BlockProcessor.get_block_count() + 5
|
||||||
end_time = start_time + 20
|
end_time = start_time + 20
|
||||||
dispute_delta = MIN_DISPUTE_DELTA
|
dispute_delta = MIN_DISPUTE_DELTA
|
||||||
encrypted_blob = urandom(64).hex()
|
encrypted_blob = get_random_value_hex(64)
|
||||||
cipher = SUPPORTED_CIPHERS[0]
|
cipher = SUPPORTED_CIPHERS[0]
|
||||||
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
hash_function = SUPPORTED_HASH_FUNCTIONS[0]
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
import pytest
|
import pytest
|
||||||
from os import urandom
|
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
@@ -9,8 +8,9 @@ from pisa.tools import check_txid_format
|
|||||||
from test.simulator.utils import sha256d
|
from test.simulator.utils import sha256d
|
||||||
from pisa.responder import Responder, Job
|
from pisa.responder import Responder, Job
|
||||||
from test.simulator.bitcoind_sim import TX
|
from test.simulator.bitcoind_sim import TX
|
||||||
from test.unit.conftest import generate_block, generate_blocks
|
|
||||||
from pisa.utils.auth_proxy import AuthServiceProxy
|
from pisa.utils.auth_proxy import AuthServiceProxy
|
||||||
|
from test.unit.conftest import get_random_value_hex
|
||||||
|
from test.unit.conftest import generate_block, generate_blocks
|
||||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
||||||
|
|
||||||
|
|
||||||
@@ -40,7 +40,7 @@ def create_dummy_job_data(random_txid=False, justice_rawtx=None):
|
|||||||
justice_txid = sha256d(justice_rawtx)
|
justice_txid = sha256d(justice_rawtx)
|
||||||
|
|
||||||
if random_txid is True:
|
if random_txid is True:
|
||||||
justice_txid = urandom(32).hex()
|
justice_txid = get_random_value_hex(32)
|
||||||
|
|
||||||
appointment_end = bitcoin_cli.getblockcount() + 2
|
appointment_end = bitcoin_cli.getblockcount() + 2
|
||||||
|
|
||||||
@@ -214,10 +214,10 @@ def test_do_watch(responder):
|
|||||||
|
|
||||||
def test_get_txs_to_rebroadcast(responder):
|
def test_get_txs_to_rebroadcast(responder):
|
||||||
# Let's create a few fake txids and assign at least 6 missing confirmations to each
|
# Let's create a few fake txids and assign at least 6 missing confirmations to each
|
||||||
txs_missing_too_many_conf = {urandom(32).hex(): 6+i for i in range(10)}
|
txs_missing_too_many_conf = {get_random_value_hex(32): 6+i for i in range(10)}
|
||||||
|
|
||||||
# Let's create some other transaction that has missed some confirmations but not that many
|
# Let's create some other transaction that has missed some confirmations but not that many
|
||||||
txs_missing_some_conf = {urandom(32).hex(): 3 for _ in range(10)}
|
txs_missing_some_conf = {get_random_value_hex(32): 3 for _ in range(10)}
|
||||||
|
|
||||||
# All the txs in the first dict should be flagged as to_rebroadcast
|
# All the txs in the first dict should be flagged as to_rebroadcast
|
||||||
responder.missed_confirmations = txs_missing_too_many_conf
|
responder.missed_confirmations = txs_missing_too_many_conf
|
||||||
|
|||||||
Reference in New Issue
Block a user