mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Adds basic authentication logic.
+ Users need to be registered in order to send appointments (free registration for now) + The tower gives them a number of appointments to work with + Non-registered users and users with no enough appoitnemnts slots return the same error (to avoid proving) - Authentication does not cover get_* requests yet - No tests - No docs
This commit is contained in:
@@ -1,9 +1,7 @@
|
||||
import re
|
||||
from binascii import unhexlify
|
||||
|
||||
import common.cryptographer
|
||||
from common.constants import LOCATOR_LEN_HEX
|
||||
from common.cryptographer import Cryptographer, PublicKey
|
||||
|
||||
from teos import errors, LOG_PREFIX
|
||||
from common.logger import Logger
|
||||
@@ -19,7 +17,6 @@ common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix=LOG_
|
||||
|
||||
|
||||
BLOCKS_IN_A_MONTH = 4320 # 4320 = roughly a month in blocks
|
||||
ENCRYPTED_BLOB_MAX_SIZE_HEX = 2 * 2048
|
||||
|
||||
|
||||
class Inspector:
|
||||
@@ -36,14 +33,13 @@ class Inspector:
|
||||
self.block_processor = block_processor
|
||||
self.min_to_self_delay = min_to_self_delay
|
||||
|
||||
def inspect(self, appointment_data, signature, public_key):
|
||||
def inspect(self, appointment_data):
|
||||
"""
|
||||
Inspects whether the data provided by the user is correct.
|
||||
|
||||
Args:
|
||||
appointment_data (:obj:`dict`): a dictionary containing the appointment data.
|
||||
signature (:obj:`str`): the appointment signature provided by the user (hex encoded).
|
||||
public_key (:obj:`str`): the user's public key (hex encoded).
|
||||
|
||||
|
||||
Returns:
|
||||
:obj:`Appointment <teos.appointment.Appointment>` or :obj:`tuple`: An appointment initialized with the
|
||||
@@ -72,8 +68,6 @@ class Inspector:
|
||||
rcode, message = self.check_to_self_delay(appointment_data.get("to_self_delay"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_blob(appointment_data.get("encrypted_blob"))
|
||||
if rcode == 0:
|
||||
rcode, message = self.check_appointment_signature(appointment_data, signature, public_key)
|
||||
|
||||
if rcode == 0:
|
||||
r = Appointment.from_dict(appointment_data)
|
||||
@@ -330,10 +324,6 @@ class Inspector:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_TYPE
|
||||
message = "wrong encrypted_blob data type ({})".format(t)
|
||||
|
||||
elif len(encrypted_blob) > ENCRYPTED_BLOB_MAX_SIZE_HEX:
|
||||
rcode = errors.APPOINTMENT_FIELD_TOO_BIG
|
||||
message = "encrypted_blob has to be 2Kib at most (current {})".format(len(encrypted_blob) // 2)
|
||||
|
||||
elif re.search(r"^[0-9A-Fa-f]+$", encrypted_blob) is None:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD_FORMAT
|
||||
message = "wrong encrypted_blob format ({})".format(encrypted_blob)
|
||||
@@ -342,51 +332,3 @@ class Inspector:
|
||||
logger.error(message)
|
||||
|
||||
return rcode, message
|
||||
|
||||
@staticmethod
|
||||
# Verifies that the appointment signature is a valid signature with public key
|
||||
def check_appointment_signature(appointment_data, signature, pk):
|
||||
"""
|
||||
Checks if the provided user signature is correct.
|
||||
|
||||
Args:
|
||||
appointment_data (:obj:`dict`): the appointment that was signed by the user.
|
||||
signature (:obj:`str`): the user's signature (hex encoded).
|
||||
pk (:obj:`str`): the user's public key (hex encoded).
|
||||
|
||||
Returns:
|
||||
:obj:`tuple`: A tuple (return code, message) as follows:
|
||||
|
||||
- ``(0, None)`` if the ``signature`` is correct.
|
||||
- ``!= (0, None)`` otherwise.
|
||||
|
||||
The possible return errors are: ``APPOINTMENT_EMPTY_FIELD``, ``APPOINTMENT_WRONG_FIELD_TYPE``, and
|
||||
``APPOINTMENT_WRONG_FIELD_FORMAT``.
|
||||
"""
|
||||
|
||||
message = None
|
||||
rcode = 0
|
||||
|
||||
if signature is None:
|
||||
rcode = errors.APPOINTMENT_EMPTY_FIELD
|
||||
message = "empty signature received"
|
||||
|
||||
elif pk is None:
|
||||
rcode = errors.APPOINTMENT_EMPTY_FIELD
|
||||
message = "empty public key received"
|
||||
|
||||
elif re.match(r"^[0-9A-Fa-f]{66}$", pk) is None:
|
||||
rcode = errors.APPOINTMENT_WRONG_FIELD
|
||||
message = "public key must be a hex encoded 33-byte long value"
|
||||
|
||||
else:
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
rpk = Cryptographer.recover_pk(appointment.serialize(), signature)
|
||||
pk = PublicKey(unhexlify(pk))
|
||||
valid_sig = Cryptographer.verify_rpk(pk, rpk)
|
||||
|
||||
if not valid_sig:
|
||||
rcode = errors.APPOINTMENT_INVALID_SIGNATURE
|
||||
message = "invalid signature"
|
||||
|
||||
return rcode, message
|
||||
|
||||
Reference in New Issue
Block a user