mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Integrates encryption/decryption within the Cryptographer. Close #63
Includes unittests. Also reformats test_inspector to avoid using cli functions
This commit is contained in:
@@ -12,29 +12,3 @@ class Blob:
|
|||||||
raise ValueError("Non-Hex character found in txid.")
|
raise ValueError("Non-Hex character found in txid.")
|
||||||
|
|
||||||
self.data = data
|
self.data = data
|
||||||
|
|
||||||
def encrypt(self, tx_id):
|
|
||||||
if len(tx_id) != 64:
|
|
||||||
raise ValueError("txid does not matches the expected size (32-byte / 64 hex chars).")
|
|
||||||
|
|
||||||
elif re.search(r"^[0-9A-Fa-f]+$", tx_id) is None:
|
|
||||||
raise ValueError("Non-Hex character found in txid.")
|
|
||||||
|
|
||||||
# Transaction to be encrypted
|
|
||||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
|
||||||
tx = unhexlify(self.data)
|
|
||||||
|
|
||||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
|
||||||
sk = sha256(unhexlify(tx_id)).digest()
|
|
||||||
nonce = bytearray(12)
|
|
||||||
|
|
||||||
# Encrypt the data
|
|
||||||
cipher = ChaCha20Poly1305(sk)
|
|
||||||
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
|
|
||||||
encrypted_blob = hexlify(encrypted_blob).decode()
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Creating new blob", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), encrypted_blob=encrypted_blob
|
|
||||||
)
|
|
||||||
|
|
||||||
return encrypted_blob
|
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import re
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
@@ -29,6 +28,8 @@ from apps.cli import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from common.constants import LOCATOR_LEN_HEX
|
from common.constants import LOCATOR_LEN_HEX
|
||||||
|
from common.cryptographer import Cryptographer
|
||||||
|
from common.tools import check_sha256_hex_format
|
||||||
|
|
||||||
|
|
||||||
HTTP_OK = 200
|
HTTP_OK = 200
|
||||||
@@ -162,7 +163,7 @@ def add_appointment(args):
|
|||||||
logger.error("The provided JSON is empty.")
|
logger.error("The provided JSON is empty.")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
valid_locator = check_txid_format(appointment_data.get("tx_id"))
|
valid_locator = check_sha256_hex_format(appointment_data.get("tx_id"))
|
||||||
|
|
||||||
if not valid_locator:
|
if not valid_locator:
|
||||||
logger.error("The provided locator is not valid.")
|
logger.error("The provided locator is not valid.")
|
||||||
@@ -288,7 +289,7 @@ def get_appointment(args):
|
|||||||
sys.exit(help_get_appointment())
|
sys.exit(help_get_appointment())
|
||||||
else:
|
else:
|
||||||
locator = arg_opt
|
locator = arg_opt
|
||||||
valid_locator = check_txid_format(locator)
|
valid_locator = check_sha256_hex_format(locator)
|
||||||
|
|
||||||
if not valid_locator:
|
if not valid_locator:
|
||||||
logger.error("The provided locator is not valid: {}".format(locator))
|
logger.error("The provided locator is not valid: {}".format(locator))
|
||||||
@@ -317,7 +318,7 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta):
|
|||||||
|
|
||||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||||
blob = Blob(tx)
|
blob = Blob(tx)
|
||||||
encrypted_blob = blob.encrypt(tx_id)
|
encrypted_blob = Cryptographer.encrypt(blob, tx_id)
|
||||||
|
|
||||||
appointment = {
|
appointment = {
|
||||||
"locator": locator,
|
"locator": locator,
|
||||||
@@ -330,14 +331,6 @@ def build_appointment(tx, tx_id, start_time, end_time, dispute_delta):
|
|||||||
return appointment
|
return appointment
|
||||||
|
|
||||||
|
|
||||||
def check_txid_format(txid):
|
|
||||||
if len(txid) != 64:
|
|
||||||
sys.exit("locator does not matches the expected size (32-byte / 64 hex chars).")
|
|
||||||
|
|
||||||
# TODO: #12-check-txid-regexp
|
|
||||||
return re.search(r"^[0-9A-Fa-f]+$", txid) is not None
|
|
||||||
|
|
||||||
|
|
||||||
def show_usage():
|
def show_usage():
|
||||||
return (
|
return (
|
||||||
"USAGE: "
|
"USAGE: "
|
||||||
|
|||||||
@@ -3,30 +3,68 @@ from binascii import unhexlify, hexlify
|
|||||||
from cryptography.exceptions import InvalidTag
|
from cryptography.exceptions import InvalidTag
|
||||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||||
|
|
||||||
|
from common.tools import check_sha256_hex_format
|
||||||
|
|
||||||
from pisa.logger import Logger
|
from pisa.logger import Logger
|
||||||
|
|
||||||
logger = Logger("Cryptographer")
|
logger = Logger("Cryptographer")
|
||||||
|
|
||||||
|
|
||||||
class Cryptographer:
|
class Cryptographer:
|
||||||
|
@staticmethod
|
||||||
|
def check_data_key_format(data, key):
|
||||||
|
if len(data) % 2:
|
||||||
|
error = "Incorrect (Odd-length) value."
|
||||||
|
logger.error(error, data=data)
|
||||||
|
raise ValueError(error)
|
||||||
|
|
||||||
|
if not check_sha256_hex_format(key):
|
||||||
|
error = "Key must be a 32-byte hex value (64 hex chars)."
|
||||||
|
logger.error(error, key=key)
|
||||||
|
raise ValueError(error)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def encrypt(blob, key, rtype="hex"):
|
||||||
|
if rtype not in ["hex", "bytes"]:
|
||||||
|
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
||||||
|
|
||||||
|
Cryptographer.check_data_key_format(blob.data, key)
|
||||||
|
|
||||||
|
# Transaction to be encrypted
|
||||||
|
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||||
|
tx = unhexlify(blob.data)
|
||||||
|
|
||||||
|
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||||
|
sk = sha256(unhexlify(key)).digest()
|
||||||
|
nonce = bytearray(12)
|
||||||
|
|
||||||
|
logger.info("Encrypting blob.", sk=hexlify(sk).decode(), nonce=hexlify(nonce).decode(), blob=blob.data)
|
||||||
|
|
||||||
|
# Encrypt the data
|
||||||
|
cipher = ChaCha20Poly1305(sk)
|
||||||
|
encrypted_blob = cipher.encrypt(nonce=nonce, data=tx, associated_data=None)
|
||||||
|
|
||||||
|
if rtype == "hex":
|
||||||
|
encrypted_blob = hexlify(encrypted_blob).decode("utf8")
|
||||||
|
|
||||||
|
return encrypted_blob
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
# ToDo: #20-test-tx-decrypting-edge-cases
|
# ToDo: #20-test-tx-decrypting-edge-cases
|
||||||
def decrypt(encrypted_blob, key, rtype="hex"):
|
def decrypt(encrypted_blob, key, rtype="hex"):
|
||||||
if rtype not in ["hex", "bytes"]:
|
if rtype not in ["hex", "bytes"]:
|
||||||
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
raise ValueError("Wrong return type. Return type must be 'hex' or 'bytes'")
|
||||||
|
|
||||||
if len(encrypted_blob.data) % 2:
|
Cryptographer.check_data_key_format(encrypted_blob.data, key)
|
||||||
logger.info(
|
|
||||||
"Incorrect (Odd-length) value to be decrypted.", encrypted_blob=encrypted_blob.data, dispute_txid=key
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||||
sk = sha256(unhexlify(key)).digest()
|
sk = sha256(unhexlify(key)).digest()
|
||||||
nonce = bytearray(12)
|
nonce = bytearray(12)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Creating new blob.",
|
"Decrypting Blob.",
|
||||||
sk=hexlify(sk).decode(),
|
sk=hexlify(sk).decode(),
|
||||||
nonce=hexlify(nonce).decode(),
|
nonce=hexlify(nonce).decode(),
|
||||||
encrypted_blob=encrypted_blob.data,
|
encrypted_blob=encrypted_blob.data,
|
||||||
|
|||||||
@@ -39,8 +39,3 @@ def in_correct_network(network):
|
|||||||
correct_network = True
|
correct_network = True
|
||||||
|
|
||||||
return correct_network
|
return correct_network
|
||||||
|
|
||||||
|
|
||||||
def check_txid_format(txid):
|
|
||||||
# TODO: #12-check-txid-regexp
|
|
||||||
return isinstance(txid, str) and re.search(r"^[0-9A-Fa-f]{64}$", txid) is not None
|
|
||||||
|
|||||||
@@ -180,7 +180,12 @@ class Watcher:
|
|||||||
for locator, dispute_txid in matches.items():
|
for locator, dispute_txid in matches.items():
|
||||||
for uuid in self.locator_uuid_map[locator]:
|
for uuid in self.locator_uuid_map[locator]:
|
||||||
|
|
||||||
justice_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid)
|
try:
|
||||||
|
justice_rawtx = Cryptographer.decrypt(self.appointments[uuid].encrypted_blob, dispute_txid)
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
justice_rawtx = None
|
||||||
|
|
||||||
justice_tx = BlockProcessor.decode_raw_transaction(justice_rawtx)
|
justice_tx = BlockProcessor.decode_raw_transaction(justice_rawtx)
|
||||||
|
|
||||||
if justice_tx is not None:
|
if justice_tx is not None:
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ from test.simulator.transaction import TX
|
|||||||
from test.simulator.bitcoind_sim import run_simulator, HOST, PORT
|
from test.simulator.bitcoind_sim import run_simulator, HOST, PORT
|
||||||
|
|
||||||
from common.constants import LOCATOR_LEN_HEX
|
from common.constants import LOCATOR_LEN_HEX
|
||||||
|
from common.cryptographer import Cryptographer
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
@@ -113,7 +114,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
|
|||||||
locator = Watcher.compute_locator(dispute_txid)
|
locator = Watcher.compute_locator(dispute_txid)
|
||||||
blob = Blob(dummy_appointment_data.get("tx"))
|
blob = Blob(dummy_appointment_data.get("tx"))
|
||||||
|
|
||||||
encrypted_blob = blob.encrypt((dummy_appointment_data.get("tx_id")))
|
encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id"))
|
||||||
|
|
||||||
appointment_data = {
|
appointment_data = {
|
||||||
"locator": locator,
|
"locator": locator,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
|
from apps.cli.blob import Blob
|
||||||
from common.cryptographer import Cryptographer
|
from common.cryptographer import Cryptographer
|
||||||
from pisa.encrypted_blob import EncryptedBlob
|
from pisa.encrypted_blob import EncryptedBlob
|
||||||
from test.unit.conftest import get_random_value_hex
|
from test.unit.conftest import get_random_value_hex
|
||||||
@@ -7,68 +8,139 @@ from test.unit.conftest import get_random_value_hex
|
|||||||
data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777"
|
data = "6097cdf52309b1b2124efeed36bd34f46dc1c25ad23ac86f28380f746254f777"
|
||||||
key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659"
|
key = "b2e984a570f6f49bc38ace178e09147b0aa296cbb7c92eb01412f7e2d07b5659"
|
||||||
encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810"
|
encrypted_data = "8f31028097a8bf12a92e088caab5cf3fcddf0d35ed2b72c24b12269373efcdea04f9d2a820adafe830c20ff132d89810"
|
||||||
encrypted_blob = EncryptedBlob(encrypted_data)
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_wrong_data():
|
def test_check_data_key_format_wrong_data():
|
||||||
random_key = get_random_value_hex(32)
|
data = get_random_value_hex(64)[:-1]
|
||||||
random_encrypted_data = get_random_value_hex(64)
|
key = get_random_value_hex(32)
|
||||||
random_encrypted_blob = EncryptedBlob(random_encrypted_data)
|
|
||||||
|
|
||||||
# Trying to decrypt random data (in AES_GCM-128) should result in an InvalidTag exception. Our decrypt function
|
|
||||||
# returns None
|
|
||||||
hex_tx = Cryptographer.decrypt(random_encrypted_blob, random_key)
|
|
||||||
assert hex_tx is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_odd_length():
|
|
||||||
random_key = get_random_value_hex(32)
|
|
||||||
random_encrypted_data_odd = get_random_value_hex(64)[:-1]
|
|
||||||
random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd)
|
|
||||||
|
|
||||||
assert Cryptographer.decrypt(random_encrypted_blob_odd, random_key) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_hex():
|
|
||||||
# Valid data should run with no InvalidTag and verify
|
|
||||||
assert Cryptographer.decrypt(encrypted_blob, key) == data
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_bytes():
|
|
||||||
# We can also get the decryption in bytes
|
|
||||||
byte_blob = Cryptographer.decrypt(encrypted_blob, key, rtype="bytes")
|
|
||||||
assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(data)
|
|
||||||
|
|
||||||
|
|
||||||
def test_decrypt_wrong_return():
|
|
||||||
# Any other type but "hex" (default) or "bytes" should fail
|
|
||||||
try:
|
try:
|
||||||
Cryptographer.decrypt(encrypted_blob, key, rtype="random_value")
|
Cryptographer.check_data_key_format(data, key)
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError as e:
|
||||||
|
assert "Odd-length" in str(e)
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_data_key_format_wrong_key():
|
||||||
|
data = get_random_value_hex(64)
|
||||||
|
key = get_random_value_hex(33)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Cryptographer.check_data_key_format(data, key)
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError as e:
|
||||||
|
assert "32-byte hex" in str(e)
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_data_key_format():
|
||||||
|
data = get_random_value_hex(64)
|
||||||
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
|
assert Cryptographer.check_data_key_format(data, key) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_encrypt_odd_length_data():
|
||||||
|
blob = Blob(get_random_value_hex(64)[-1])
|
||||||
|
key = get_random_value_hex(32)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Cryptographer.encrypt(blob, key)
|
||||||
assert False
|
assert False
|
||||||
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
assert True
|
assert True
|
||||||
|
|
||||||
|
|
||||||
# def test_encrypt():
|
def test_encrypt_wrong_key_size():
|
||||||
# # Valid data, valid key
|
blob = Blob(get_random_value_hex(64))
|
||||||
# data = get_random_value_hex(64)
|
key = get_random_value_hex(31)
|
||||||
# blob = Blob(data, SUPPORTED_CIPHERS[0], SUPPORTED_HASH_FUNCTIONS[0])
|
|
||||||
# key = get_random_value_hex(32)
|
try:
|
||||||
#
|
Cryptographer.encrypt(blob, key)
|
||||||
# encrypted_blob = blob.encrypt(key)
|
assert False
|
||||||
#
|
|
||||||
# # Invalid key (note that encrypt cannot be called with invalid data since that's checked when the Blob is created)
|
except ValueError:
|
||||||
# invalid_key = unhexlify(get_random_value_hex(32))
|
assert True
|
||||||
#
|
|
||||||
# try:
|
|
||||||
# blob.encrypt(invalid_key)
|
def test_encrypt_hex():
|
||||||
# assert False, "Able to create encrypt with invalid key"
|
blob = Blob(data)
|
||||||
#
|
|
||||||
# except ValueError:
|
assert Cryptographer.encrypt(blob, key) == encrypted_data
|
||||||
# assert True
|
|
||||||
#
|
|
||||||
# # Check that two encryptions of the same data have the same result
|
def test_encrypt_bytes():
|
||||||
# encrypted_blob2 = blob.encrypt(key)
|
blob = Blob(data)
|
||||||
#
|
|
||||||
# assert encrypted_blob == encrypted_blob2 and id(encrypted_blob) != id(encrypted_blob2)
|
byte_blob = Cryptographer.encrypt(blob, key, rtype="bytes")
|
||||||
|
assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(encrypted_data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_encrypt_wrong_return():
|
||||||
|
# Any other type but "hex" (default) or "bytes" should fail
|
||||||
|
try:
|
||||||
|
Cryptographer.encrypt(Blob(data), key, rtype="random_value")
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
assert True
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_invalid_tag():
|
||||||
|
random_key = get_random_value_hex(32)
|
||||||
|
random_encrypted_data = get_random_value_hex(64)
|
||||||
|
random_encrypted_blob = EncryptedBlob(random_encrypted_data)
|
||||||
|
|
||||||
|
# Trying to decrypt random data should result in an InvalidTag exception. Our decrypt function
|
||||||
|
# returns None
|
||||||
|
hex_tx = Cryptographer.decrypt(random_encrypted_blob, random_key)
|
||||||
|
assert hex_tx is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_odd_length_data():
|
||||||
|
random_key = get_random_value_hex(32)
|
||||||
|
random_encrypted_data_odd = get_random_value_hex(64)[:-1]
|
||||||
|
random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
assert True
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_wrong_key_size():
|
||||||
|
random_key = get_random_value_hex(31)
|
||||||
|
random_encrypted_data_odd = get_random_value_hex(64)
|
||||||
|
random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
assert True
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_hex():
|
||||||
|
# Valid data should run with no InvalidTag and verify
|
||||||
|
assert Cryptographer.decrypt(EncryptedBlob(encrypted_data), key) == data
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_bytes():
|
||||||
|
# We can also get the decryption in bytes
|
||||||
|
byte_blob = Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="bytes")
|
||||||
|
assert isinstance(byte_blob, bytes) and byte_blob == binascii.unhexlify(data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decrypt_wrong_return():
|
||||||
|
# Any other type but "hex" (default) or "bytes" should fail
|
||||||
|
try:
|
||||||
|
Cryptographer.decrypt(EncryptedBlob(encrypted_data), key, rtype="random_value")
|
||||||
|
assert False
|
||||||
|
|
||||||
|
except ValueError:
|
||||||
|
assert True
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ from cryptography.hazmat.backends import default_backend
|
|||||||
from cryptography.hazmat.primitives import hashes
|
from cryptography.hazmat.primitives import hashes
|
||||||
from cryptography.hazmat.primitives.asymmetric import ec
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
|
||||||
from apps.cli.pisa_cli import build_appointment
|
|
||||||
|
|
||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
from pisa.errors import *
|
from pisa.errors import *
|
||||||
from pisa.inspector import Inspector
|
from pisa.inspector import Inspector
|
||||||
@@ -14,7 +12,7 @@ from pisa.appointment import Appointment
|
|||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
from pisa.conf import MIN_DISPUTE_DELTA
|
from pisa.conf import MIN_DISPUTE_DELTA
|
||||||
|
|
||||||
from test.unit.conftest import get_random_value_hex
|
from test.unit.conftest import get_random_value_hex, generate_dummy_appointment_data
|
||||||
|
|
||||||
from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX
|
from common.constants import LOCATOR_LEN_BYTES, LOCATOR_LEN_HEX
|
||||||
|
|
||||||
@@ -179,25 +177,17 @@ def test_check_blob():
|
|||||||
def test_check_appointment_signature(generate_keypair):
|
def test_check_appointment_signature(generate_keypair):
|
||||||
client_sk, client_pk = generate_keypair
|
client_sk, client_pk = generate_keypair
|
||||||
|
|
||||||
dummy_appointment_request = {
|
dummy_appointment_data, _ = generate_dummy_appointment_data(real_height=False)
|
||||||
"tx": get_random_value_hex(192),
|
assert Inspector.check_appointment_signature(
|
||||||
"tx_id": get_random_value_hex(32),
|
dummy_appointment_data["appointment"], dummy_appointment_data["signature"], dummy_appointment_data["public_key"]
|
||||||
"start_time": 1500,
|
)
|
||||||
"end_time": 50000,
|
|
||||||
"dispute_delta": 200,
|
|
||||||
}
|
|
||||||
dummy_appointment = build_appointment(**dummy_appointment_request)
|
|
||||||
|
|
||||||
# Verify that an appointment signed by the client is valid
|
|
||||||
signature = sign_appointment(client_sk, dummy_appointment)
|
|
||||||
assert Inspector.check_appointment_signature(dummy_appointment, signature, client_pk)
|
|
||||||
|
|
||||||
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||||
|
|
||||||
# Create a bad signature to make sure inspector rejects it
|
# Create a bad signature to make sure inspector rejects it
|
||||||
bad_signature = sign_appointment(fake_sk, dummy_appointment)
|
bad_signature = sign_appointment(fake_sk, dummy_appointment_data["appointment"])
|
||||||
assert (
|
assert (
|
||||||
Inspector.check_appointment_signature(dummy_appointment, bad_signature, client_pk)[0]
|
Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk)[0]
|
||||||
== APPOINTMENT_INVALID_SIGNATURE
|
== APPOINTMENT_INVALID_SIGNATURE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -11,9 +11,10 @@ from pisa import c_logger
|
|||||||
from pisa.db_manager import DBManager
|
from pisa.db_manager import DBManager
|
||||||
from pisa.responder import Responder, Job
|
from pisa.responder import Responder, Job
|
||||||
from pisa.block_processor import BlockProcessor
|
from pisa.block_processor import BlockProcessor
|
||||||
from pisa.tools import check_txid_format, bitcoin_cli
|
from pisa.tools import bitcoin_cli
|
||||||
|
|
||||||
from common.constants import LOCATOR_LEN_HEX
|
from common.constants import LOCATOR_LEN_HEX
|
||||||
|
from common.tools import check_sha256_hex_format
|
||||||
|
|
||||||
from test.simulator.utils import sha256d
|
from test.simulator.utils import sha256d
|
||||||
from test.simulator.bitcoind_sim import TX
|
from test.simulator.bitcoind_sim import TX
|
||||||
@@ -288,7 +289,7 @@ def test_do_subscribe(responder):
|
|||||||
try:
|
try:
|
||||||
generate_block()
|
generate_block()
|
||||||
block_hash = responder.block_queue.get()
|
block_hash = responder.block_queue.get()
|
||||||
assert check_txid_format(block_hash)
|
assert check_sha256_hex_format(block_hash)
|
||||||
|
|
||||||
except Empty:
|
except Empty:
|
||||||
assert False
|
assert False
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
from pisa.tools import can_connect_to_bitcoind, in_correct_network, bitcoin_cli, check_txid_format
|
from pisa.tools import can_connect_to_bitcoind, in_correct_network, bitcoin_cli
|
||||||
|
|
||||||
|
from common.tools import check_sha256_hex_format
|
||||||
|
|
||||||
c_logger.disabled = True
|
c_logger.disabled = True
|
||||||
|
|
||||||
@@ -30,14 +32,30 @@ def test_bitcoin_cli():
|
|||||||
assert False
|
assert False
|
||||||
|
|
||||||
|
|
||||||
def test_check_txid_format():
|
def test_check_sha256_hex_format():
|
||||||
assert check_txid_format(None) is False
|
assert check_sha256_hex_format(None) is False
|
||||||
assert check_txid_format("") is False
|
assert check_sha256_hex_format("") is False
|
||||||
assert check_txid_format(0x0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF) is False # wrong type
|
assert (
|
||||||
assert check_txid_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True # lowercase
|
check_sha256_hex_format(0x0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF) is False
|
||||||
assert check_txid_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True # uppercase
|
) # wrong type
|
||||||
assert check_txid_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True # mixed case
|
assert (
|
||||||
assert check_txid_format("0123456789012345678901234567890123456789012345678901234567890123") is True # only nums
|
check_sha256_hex_format("abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") is True
|
||||||
assert check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False # too short
|
) # lowercase
|
||||||
assert check_txid_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False # too long
|
assert (
|
||||||
assert check_txid_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False # non-hex
|
check_sha256_hex_format("ABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCDEFABCD") is True
|
||||||
|
) # uppercase
|
||||||
|
assert (
|
||||||
|
check_sha256_hex_format("0123456789abcdef0123456789ABCDEF0123456789abcdef0123456789ABCDEF") is True
|
||||||
|
) # mixed case
|
||||||
|
assert (
|
||||||
|
check_sha256_hex_format("0123456789012345678901234567890123456789012345678901234567890123") is True
|
||||||
|
) # only nums
|
||||||
|
assert (
|
||||||
|
check_sha256_hex_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdf") is False
|
||||||
|
) # too short
|
||||||
|
assert (
|
||||||
|
check_sha256_hex_format("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0") is False
|
||||||
|
) # too long
|
||||||
|
assert (
|
||||||
|
check_sha256_hex_format("g123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef") is False
|
||||||
|
) # non-hex
|
||||||
|
|||||||
@@ -12,10 +12,12 @@ from cryptography.exceptions import InvalidSignature
|
|||||||
from pisa import c_logger
|
from pisa import c_logger
|
||||||
from pisa.watcher import Watcher
|
from pisa.watcher import Watcher
|
||||||
from pisa.responder import Responder
|
from pisa.responder import Responder
|
||||||
from pisa.tools import check_txid_format, bitcoin_cli
|
from pisa.tools import bitcoin_cli
|
||||||
from test.unit.conftest import generate_block, generate_blocks, generate_dummy_appointment, get_random_value_hex
|
from test.unit.conftest import generate_block, generate_blocks, generate_dummy_appointment, get_random_value_hex
|
||||||
from pisa.conf import EXPIRY_DELTA, PISA_SECRET_KEY, MAX_APPOINTMENTS
|
from pisa.conf import EXPIRY_DELTA, PISA_SECRET_KEY, MAX_APPOINTMENTS
|
||||||
|
|
||||||
|
from common.tools import check_sha256_hex_format
|
||||||
|
|
||||||
c_logger.disabled = True
|
c_logger.disabled = True
|
||||||
|
|
||||||
APPOINTMENTS = 5
|
APPOINTMENTS = 5
|
||||||
@@ -152,7 +154,7 @@ def test_do_subscribe(watcher):
|
|||||||
try:
|
try:
|
||||||
generate_block()
|
generate_block()
|
||||||
block_hash = watcher.block_queue.get()
|
block_hash = watcher.block_queue.get()
|
||||||
assert check_txid_format(block_hash)
|
assert check_sha256_hex_format(block_hash)
|
||||||
|
|
||||||
except Empty:
|
except Empty:
|
||||||
assert False
|
assert False
|
||||||
|
|||||||
Reference in New Issue
Block a user