mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Merge pull request #75 from sr-gi/68-appointment-encoding
Defines a proper appointment encoding
This commit is contained in:
@@ -21,6 +21,7 @@ from apps.cli import (
|
||||
)
|
||||
|
||||
from common.logger import Logger
|
||||
from common.appointment import Appointment
|
||||
from common.constants import LOCATOR_LEN_HEX
|
||||
from common.cryptographer import Cryptographer
|
||||
from common.tools import check_sha256_hex_format
|
||||
@@ -128,13 +129,7 @@ def add_appointment(args):
|
||||
return False
|
||||
|
||||
add_appointment_endpoint = "http://{}:{}".format(pisa_api_server, pisa_api_port)
|
||||
appointment = build_appointment(
|
||||
appointment_data.get("tx"),
|
||||
appointment_data.get("tx_id"),
|
||||
appointment_data.get("start_time"),
|
||||
appointment_data.get("end_time"),
|
||||
appointment_data.get("to_self_delay"),
|
||||
)
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
|
||||
try:
|
||||
sk_der = load_key_file_data(CLI_PRIVATE_KEY)
|
||||
@@ -152,7 +147,7 @@ def add_appointment(args):
|
||||
logger.error("I/O error", errno=e.errno, error=e.strerror)
|
||||
return False
|
||||
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment), cli_sk)
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
try:
|
||||
cli_pk_der = load_key_file_data(CLI_PUBLIC_KEY)
|
||||
@@ -217,7 +212,7 @@ def add_appointment(args):
|
||||
logger.error("Failed to deserialize the public key. It might be in an unsupported format")
|
||||
return False
|
||||
|
||||
is_sig_valid = Cryptographer.verify(Cryptographer.signature_format(appointment), signature, pisa_pk)
|
||||
is_sig_valid = Cryptographer.verify(appointment.serialize(), signature, pisa_pk)
|
||||
|
||||
except FileNotFoundError:
|
||||
logger.error("Pisa's public key file not found. Please check your settings")
|
||||
@@ -278,24 +273,6 @@ def get_appointment(args):
|
||||
return True
|
||||
|
||||
|
||||
def build_appointment(tx, tx_id, start_time, end_time, to_self_delay):
|
||||
locator = compute_locator(tx_id)
|
||||
|
||||
# FIXME: The blob data should contain more things that just the transaction. Leaving like this for now.
|
||||
blob = Blob(tx)
|
||||
encrypted_blob = Cryptographer.encrypt(blob, tx_id)
|
||||
|
||||
appointment = {
|
||||
"locator": locator,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"to_self_delay": to_self_delay,
|
||||
"encrypted_blob": encrypted_blob,
|
||||
}
|
||||
|
||||
return appointment
|
||||
|
||||
|
||||
def show_usage():
|
||||
return (
|
||||
"USAGE: "
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import json
|
||||
import struct
|
||||
from binascii import unhexlify
|
||||
|
||||
from pisa.encrypted_blob import EncryptedBlob
|
||||
|
||||
@@ -101,3 +103,23 @@ class Appointment:
|
||||
appointment["triggered"] = triggered
|
||||
|
||||
return json.dumps(appointment, sort_keys=True, separators=(",", ":"))
|
||||
|
||||
def serialize(self):
|
||||
"""
|
||||
Serializes an appointment to be signed.
|
||||
|
||||
The serialization follows the same ordering as the fields in the appointment:
|
||||
locator:start_time:end_time:to_self_delay:encrypted_blob
|
||||
|
||||
All values are big endian.
|
||||
|
||||
Returns:
|
||||
:mod:`bytes`: The serialized data to be signed.
|
||||
"""
|
||||
return (
|
||||
unhexlify(self.locator)
|
||||
+ struct.pack(">I", self.start_time)
|
||||
+ struct.pack(">I", self.end_time)
|
||||
+ struct.pack(">I", self.to_self_delay)
|
||||
+ unhexlify(self.encrypted_blob.data)
|
||||
)
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify, hexlify
|
||||
|
||||
@@ -146,23 +145,6 @@ class Cryptographer:
|
||||
|
||||
return blob
|
||||
|
||||
# NOTCOVERED
|
||||
@staticmethod
|
||||
def signature_format(data):
|
||||
"""
|
||||
Serializes a given ``data`` in the right format to be signed.
|
||||
|
||||
Args:
|
||||
data(:mod:`str`): the data to be formatted.
|
||||
|
||||
Returns:
|
||||
:mod:`str`: The serialized data to be signed.
|
||||
"""
|
||||
|
||||
# FIXME: This is temporary serialization. A proper one is required. Data need to be unhexlified too (can't atm)
|
||||
return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
# Deserialize public key from der data.
|
||||
@staticmethod
|
||||
def load_public_key_der(pk_der):
|
||||
"""
|
||||
@@ -195,7 +177,6 @@ class Cryptographer:
|
||||
|
||||
return None
|
||||
|
||||
# Deserialize private key from der data.
|
||||
@staticmethod
|
||||
def load_private_key_der(sk_der):
|
||||
"""
|
||||
@@ -231,7 +212,7 @@ class Cryptographer:
|
||||
Signs a given data using a given secret key using ECDSA.
|
||||
|
||||
Args:
|
||||
data(:mod:`str`): the data to be signed.
|
||||
data(:mod:`bytes`): the data to be signed.
|
||||
sk(:mod:`EllipticCurvePrivateKey`): the ECDSA secret key used to signed the data.
|
||||
rtype: the return type for the encrypted value. Can be either ``'str'`` or ``'bytes'``.
|
||||
|
||||
@@ -263,7 +244,7 @@ class Cryptographer:
|
||||
Verifies if a signature is valid for a given public key and message.
|
||||
|
||||
Args:
|
||||
message(:mod:`str`): the message that is supposed have been signed.
|
||||
message(:mod:`bytes`): the message that is supposed have been signed.
|
||||
signature(:mod:`str`): the potential signature of the message.
|
||||
pk(:mod:`EllipticCurvePublicKey`): the public key that is used to try to verify the signature.
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ from flask import Flask, request, abort, jsonify
|
||||
from pisa import HOST, PORT, logging
|
||||
from common.logger import Logger
|
||||
from pisa.inspector import Inspector
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
from pisa.block_processor import BlockProcessor
|
||||
|
||||
from common.constants import HTTP_OK, HTTP_BAD_REQUEST, HTTP_SERVICE_UNAVAILABLE, LOCATOR_LEN_HEX
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from queue import Queue
|
||||
|
||||
from pisa.responder import TransactionTracker
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
|
||||
|
||||
class Builder:
|
||||
|
||||
@@ -7,7 +7,7 @@ from common.cryptographer import Cryptographer
|
||||
from pisa import errors
|
||||
import pisa.conf as conf
|
||||
from common.logger import Logger
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
from pisa.block_processor import BlockProcessor
|
||||
|
||||
logger = Logger("Inspector")
|
||||
@@ -309,12 +309,12 @@ class Inspector:
|
||||
|
||||
@staticmethod
|
||||
# Verifies that the appointment signature is a valid signature with public key
|
||||
def check_appointment_signature(appointment, signature, pk_der):
|
||||
def check_appointment_signature(appointment_data, signature, pk_der):
|
||||
"""
|
||||
Checks if the provided user signature is correct.
|
||||
|
||||
Args:
|
||||
appointment (:obj:`dict`): the appointment that was signed by the user.
|
||||
appointment_data (:obj:`dict`): the appointment that was signed by the user.
|
||||
signature (:obj:`str`): the user's signature (hex encoded).
|
||||
pk_der (:obj:`str`): the user's public key (hex encoded, DER format).
|
||||
|
||||
@@ -336,7 +336,7 @@ class Inspector:
|
||||
message = "empty signature received"
|
||||
|
||||
pk = Cryptographer.load_public_key_der(unhexlify(pk_der))
|
||||
valid_sig = Cryptographer.verify(Cryptographer.signature_format(appointment), signature, pk)
|
||||
valid_sig = Cryptographer.verify(Appointment.from_dict(appointment_data).serialize(), signature, pk)
|
||||
|
||||
if not valid_sig:
|
||||
rcode = errors.APPOINTMENT_INVALID_SIGNATURE
|
||||
|
||||
@@ -141,7 +141,7 @@ class Watcher:
|
||||
|
||||
logger.info("New appointment accepted", locator=appointment.locator)
|
||||
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment.to_dict()), self.signing_key)
|
||||
signature = Cryptographer.sign(appointment.serialize(), self.signing_key)
|
||||
|
||||
else:
|
||||
appointment_added = False
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import json
|
||||
import struct
|
||||
import binascii
|
||||
from pytest import fixture
|
||||
|
||||
from pisa import c_logger
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
from pisa.encrypted_blob import EncryptedBlob
|
||||
|
||||
from test.pisa.unit.conftest import get_random_value_hex
|
||||
@@ -110,3 +112,27 @@ def test_from_dict(appointment_data):
|
||||
except ValueError:
|
||||
appointment_data[key] = prev_val
|
||||
assert True
|
||||
|
||||
|
||||
def test_serialize(appointment_data):
|
||||
# From the tower end, appointments are only created if they pass the inspector tests, so not covering weird formats.
|
||||
# Serialize may fail if, from the user end, the user tries to do it with an weird appointment. Not critical.
|
||||
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
serialized_appointment = appointment.serialize()
|
||||
|
||||
# Size must be 16 + 4 + 4 + 4 + len(encrypted_blob)
|
||||
assert len(serialized_appointment) >= 28
|
||||
assert isinstance(serialized_appointment, bytes)
|
||||
|
||||
locator = serialized_appointment[:16]
|
||||
start_time = serialized_appointment[16:20]
|
||||
end_time = serialized_appointment[20:24]
|
||||
to_self_delay = serialized_appointment[24:28]
|
||||
encrypted_blob = serialized_appointment[28:]
|
||||
|
||||
assert binascii.hexlify(locator).decode() == appointment.locator
|
||||
assert struct.unpack(">I", start_time)[0] == appointment.start_time
|
||||
assert struct.unpack(">I", end_time)[0] == appointment.end_time
|
||||
assert struct.unpack(">I", to_self_delay)[0] == appointment.to_self_delay
|
||||
assert binascii.hexlify(encrypted_blob).decode() == appointment.encrypted_blob.data
|
||||
@@ -211,7 +211,7 @@ def test_sign_wrong_rtype():
|
||||
# Calling sign with an rtype different than 'str' or 'bytes' should fail
|
||||
for wtype in WRONG_TYPES:
|
||||
try:
|
||||
Cryptographer.sign("", "", rtype=wtype)
|
||||
Cryptographer.sign(b"", "", rtype=wtype)
|
||||
assert False
|
||||
|
||||
except ValueError:
|
||||
@@ -221,20 +221,19 @@ def test_sign_wrong_rtype():
|
||||
def test_sign_wrong_sk():
|
||||
# If a sk is not passed, sign will return None
|
||||
for wtype in WRONG_TYPES:
|
||||
assert Cryptographer.sign("", wtype) is None
|
||||
assert Cryptographer.sign(b"", wtype) is None
|
||||
|
||||
|
||||
# FIXME: signature_format is not covered, so we are not covering cases where the message is not in the proper format
|
||||
# at the moment (related to #68, happens in multiple tests from here on)
|
||||
def test_sign():
|
||||
# Otherwise we should get a signature
|
||||
sk, _ = generate_keypair()
|
||||
message = b""
|
||||
|
||||
assert Cryptographer.sign(Cryptographer.signature_format(""), sk) is not None
|
||||
assert Cryptographer.sign(message, sk) is not None
|
||||
|
||||
# Check that the returns work
|
||||
assert isinstance(Cryptographer.sign(Cryptographer.signature_format(""), sk, rtype="str"), str)
|
||||
assert isinstance(Cryptographer.sign(Cryptographer.signature_format(""), sk, rtype="bytes"), bytes)
|
||||
assert isinstance(Cryptographer.sign(message, sk, rtype="str"), str)
|
||||
assert isinstance(Cryptographer.sign(message, sk, rtype="bytes"), bytes)
|
||||
|
||||
|
||||
def test_verify_wrong_pk():
|
||||
@@ -247,10 +246,10 @@ def test_verify_random_values():
|
||||
# Random values shouldn't verify
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
message = get_random_value_hex(32)
|
||||
message = binascii.unhexlify(get_random_value_hex(32))
|
||||
signature = get_random_value_hex(32)
|
||||
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is False
|
||||
assert Cryptographer.verify(message, signature, pk) is False
|
||||
|
||||
|
||||
def test_verify_wrong_pair():
|
||||
@@ -258,16 +257,28 @@ def test_verify_wrong_pair():
|
||||
sk, _ = generate_keypair()
|
||||
_, pk = generate_keypair()
|
||||
|
||||
message = get_random_value_hex(32)
|
||||
message = binascii.unhexlify(get_random_value_hex(32))
|
||||
signature = get_random_value_hex(32)
|
||||
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is False
|
||||
assert Cryptographer.verify(message, signature, pk) is False
|
||||
|
||||
|
||||
def test_verify_wrong_message():
|
||||
# Verifying with a wrong keypair must fail
|
||||
sk, pk = generate_keypair()
|
||||
|
||||
message = binascii.unhexlify(get_random_value_hex(32))
|
||||
signature = Cryptographer.sign(message, sk)
|
||||
|
||||
wrong_message = binascii.unhexlify(get_random_value_hex(32))
|
||||
|
||||
assert Cryptographer.verify(wrong_message, signature, pk) is False
|
||||
|
||||
|
||||
def test_verify():
|
||||
# A properly generated signature should verify
|
||||
sk, pk = generate_keypair()
|
||||
message = get_random_value_hex(32)
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(message), sk)
|
||||
message = binascii.unhexlify(get_random_value_hex(32))
|
||||
signature = Cryptographer.sign(message, sk)
|
||||
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(message), signature, pk) is True
|
||||
assert Cryptographer.verify(message, signature, pk) is True
|
||||
|
||||
@@ -15,7 +15,7 @@ from pisa.responder import TransactionTracker
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.tools import bitcoin_cli
|
||||
from pisa.db_manager import DBManager
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
|
||||
from test.simulator.utils import sha256d
|
||||
from test.simulator.transaction import TX
|
||||
@@ -110,7 +110,7 @@ def generate_dummy_appointment_data(real_height=True, start_time_offset=5, end_t
|
||||
"encrypted_blob": encrypted_blob,
|
||||
}
|
||||
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
||||
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk)
|
||||
pk_hex = hexlify(client_pk_der).decode("utf-8")
|
||||
|
||||
data = {"appointment": appointment_data, "signature": signature, "public_key": pk_hex}
|
||||
|
||||
@@ -4,7 +4,7 @@ from uuid import uuid4
|
||||
from pisa import c_logger
|
||||
from pisa.responder import TransactionTracker
|
||||
from pisa.cleaner import Cleaner
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
from pisa.db_manager import WATCHER_PREFIX
|
||||
|
||||
from test.pisa.unit.conftest import get_random_value_hex
|
||||
|
||||
@@ -7,7 +7,7 @@ from cryptography.hazmat.primitives import serialization
|
||||
from pisa import c_logger
|
||||
from pisa.errors import *
|
||||
from pisa.inspector import Inspector
|
||||
from pisa.appointment import Appointment
|
||||
from common.appointment import Appointment
|
||||
from pisa.block_processor import BlockProcessor
|
||||
from pisa.conf import MIN_TO_SELF_DELAY
|
||||
|
||||
@@ -185,7 +185,9 @@ def test_check_appointment_signature():
|
||||
fake_sk = ec.generate_private_key(ec.SECP256K1, default_backend())
|
||||
|
||||
# Create a bad signature to make sure inspector rejects it
|
||||
bad_signature = Cryptographer.sign(Cryptographer.signature_format(dummy_appointment_data["appointment"]), fake_sk)
|
||||
bad_signature = Cryptographer.sign(
|
||||
Appointment.from_dict(dummy_appointment_data["appointment"]).serialize(), fake_sk
|
||||
)
|
||||
assert (
|
||||
Inspector.check_appointment_signature(dummy_appointment_data["appointment"], bad_signature, client_pk_hex)[0]
|
||||
== APPOINTMENT_INVALID_SIGNATURE
|
||||
@@ -202,12 +204,6 @@ def test_inspect(run_bitcoind):
|
||||
)
|
||||
client_pk_hex = hexlify(client_pk_der).decode("utf-8")
|
||||
|
||||
# Invalid appointment, every field is empty
|
||||
appointment_data = dict()
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
||||
appointment = inspector.inspect(appointment_data, signature, client_pk)
|
||||
assert type(appointment) == tuple and appointment[0] != 0
|
||||
|
||||
# Valid appointment
|
||||
locator = get_random_value_hex(LOCATOR_LEN_BYTES)
|
||||
start_time = BlockProcessor.get_block_count() + 5
|
||||
@@ -223,7 +219,7 @@ def test_inspect(run_bitcoind):
|
||||
"encrypted_blob": encrypted_blob,
|
||||
}
|
||||
|
||||
signature = Cryptographer.sign(Cryptographer.signature_format(appointment_data), client_sk)
|
||||
signature = Cryptographer.sign(Appointment.from_dict(appointment_data).serialize(), client_sk)
|
||||
|
||||
appointment = inspector.inspect(appointment_data, signature, client_pk_hex)
|
||||
|
||||
|
||||
@@ -92,13 +92,13 @@ def test_add_appointment(run_bitcoind, watcher):
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
||||
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||
|
||||
# Check that we can also add an already added appointment (same locator)
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
||||
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||
|
||||
|
||||
def test_add_too_many_appointments(watcher):
|
||||
@@ -112,7 +112,7 @@ def test_add_too_many_appointments(watcher):
|
||||
added_appointment, sig = watcher.add_appointment(appointment)
|
||||
|
||||
assert added_appointment is True
|
||||
assert Cryptographer.verify(Cryptographer.signature_format(appointment.to_dict()), sig, public_key)
|
||||
assert Cryptographer.verify(appointment.serialize(), sig, public_key)
|
||||
|
||||
appointment, dispute_tx = generate_dummy_appointment(
|
||||
start_time_offset=START_TIME_OFFSET, end_time_offset=END_TIME_OFFSET
|
||||
|
||||
Reference in New Issue
Block a user