mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 14:44:21 +01:00
Server validates appointment signature upon reception
This commit is contained in:
@@ -32,7 +32,9 @@ def add_appointment():
|
|||||||
# Check content type once if properly defined
|
# Check content type once if properly defined
|
||||||
request_data = json.loads(request.get_json())
|
request_data = json.loads(request.get_json())
|
||||||
inspector = Inspector()
|
inspector = Inspector()
|
||||||
appointment = inspector.inspect(request_data)
|
appointment = inspector.inspect(
|
||||||
|
request_data.get("appointment"), request_data.get("signature"), request_data.get("public_key")
|
||||||
|
)
|
||||||
|
|
||||||
error = None
|
error = None
|
||||||
response = None
|
response = None
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ APPOINTMENT_FIELD_TOO_BIG = -6
|
|||||||
APPOINTMENT_WRONG_FIELD = -7
|
APPOINTMENT_WRONG_FIELD = -7
|
||||||
APPOINTMENT_CIPHER_NOT_SUPPORTED = -8
|
APPOINTMENT_CIPHER_NOT_SUPPORTED = -8
|
||||||
APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED = -9
|
APPOINTMENT_HASH_FUNCTION_NOT_SUPPORTED = -9
|
||||||
|
APPOINTMENT_INVALID_SIGNATURE = -10
|
||||||
|
|
||||||
# Custom RPC errors
|
# Custom RPC errors
|
||||||
RPC_TX_REORGED_AFTER_BROADCAST = -98
|
RPC_TX_REORGED_AFTER_BROADCAST = -98
|
||||||
|
|||||||
@@ -1,4 +1,12 @@
|
|||||||
|
import json
|
||||||
import re
|
import re
|
||||||
|
from binascii import unhexlify
|
||||||
|
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
||||||
|
from cryptography.exceptions import InvalidSignature
|
||||||
|
|
||||||
from pisa import errors
|
from pisa import errors
|
||||||
import pisa.conf as conf
|
import pisa.conf as conf
|
||||||
@@ -15,14 +23,14 @@ logger = Logger("Inspector")
|
|||||||
|
|
||||||
|
|
||||||
class Inspector:
|
class Inspector:
|
||||||
def inspect(self, data):
|
def inspect(self, appt, signature, public_key):
|
||||||
locator = data.get("locator")
|
locator = appt.get("locator")
|
||||||
start_time = data.get("start_time")
|
start_time = appt.get("start_time")
|
||||||
end_time = data.get("end_time")
|
end_time = appt.get("end_time")
|
||||||
dispute_delta = data.get("dispute_delta")
|
dispute_delta = appt.get("dispute_delta")
|
||||||
encrypted_blob = data.get("encrypted_blob")
|
encrypted_blob = appt.get("encrypted_blob")
|
||||||
cipher = data.get("cipher")
|
cipher = appt.get("cipher")
|
||||||
hash_function = data.get("hash_function")
|
hash_function = appt.get("hash_function")
|
||||||
|
|
||||||
block_height = BlockProcessor.get_block_count()
|
block_height = BlockProcessor.get_block_count()
|
||||||
|
|
||||||
@@ -41,6 +49,8 @@ class Inspector:
|
|||||||
rcode, message = self.check_cipher(cipher)
|
rcode, message = self.check_cipher(cipher)
|
||||||
if rcode == 0:
|
if rcode == 0:
|
||||||
rcode, message = self.check_hash_function(hash_function)
|
rcode, message = self.check_hash_function(hash_function)
|
||||||
|
if rcode == 0:
|
||||||
|
rcode, message = self.check_appointment_signature(appt, signature, public_key)
|
||||||
|
|
||||||
if rcode == 0:
|
if rcode == 0:
|
||||||
r = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
|
r = Appointment(locator, start_time, end_time, dispute_delta, encrypted_blob, cipher, hash_function)
|
||||||
@@ -245,3 +255,24 @@ class Inspector:
|
|||||||
logger.error(message)
|
logger.error(message)
|
||||||
|
|
||||||
return rcode, message
|
return rcode, message
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
# Verifies that the appointment signature is a valid signature with public key
|
||||||
|
def check_appointment_signature(appointment, signature, pk_pem):
|
||||||
|
message = None
|
||||||
|
rcode = 0
|
||||||
|
|
||||||
|
if signature is None:
|
||||||
|
rcode = errors.APPOINTMENT_EMPTY_FIELD
|
||||||
|
message = "empty signature received"
|
||||||
|
|
||||||
|
try:
|
||||||
|
sig_bytes = unhexlify(signature.encode("utf-8"))
|
||||||
|
client_pk = load_pem_public_key(pk_pem.encode("utf-8"), backend=default_backend())
|
||||||
|
data = json.dumps(appointment, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
||||||
|
client_pk.verify(sig_bytes, data, ec.ECDSA(hashes.SHA256()))
|
||||||
|
|
||||||
|
except InvalidSignature:
|
||||||
|
rcode = errors.APPOINTMENT_INVALID_SIGNATURE
|
||||||
|
|
||||||
|
return rcode, message
|
||||||
|
|||||||
Reference in New Issue
Block a user