mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
Gets rid of blob classes. Close #66
This commit is contained in:
@@ -15,7 +15,6 @@ from cli.exceptions import InvalidKey, InvalidParameter, TowerResponseError
|
||||
from cli.help import show_usage, help_add_appointment, help_get_appointment, help_register, help_get_all_appointments
|
||||
|
||||
import common.cryptographer
|
||||
from common.blob import Blob
|
||||
from common import constants
|
||||
from common.logger import Logger
|
||||
from common.appointment import Appointment
|
||||
@@ -104,7 +103,7 @@ def add_appointment(appointment_data, cli_sk, teos_pk, teos_url):
|
||||
raise InvalidParameter("The provided data is missing the transaction")
|
||||
|
||||
appointment_data["locator"] = compute_locator(tx_id)
|
||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(tx), tx_id)
|
||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(tx, tx_id)
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import struct
|
||||
from binascii import unhexlify
|
||||
|
||||
from common.encrypted_blob import EncryptedBlob
|
||||
|
||||
|
||||
class Appointment:
|
||||
"""
|
||||
@@ -15,9 +13,8 @@ class Appointment:
|
||||
end_time (:obj:`int`): The block height where the tower will stop watching for breaches.
|
||||
to_self_delay (:obj:`int`): The ``to_self_delay`` encoded in the ``csv`` of the ``to_remote`` output of the
|
||||
commitment transaction that this appointment is covering.
|
||||
encrypted_blob (:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>`): An ``EncryptedBlob`` object
|
||||
containing an encrypted penalty transaction. The tower will decrypt it and broadcast the penalty transaction
|
||||
upon seeing a breach on the blockchain.
|
||||
encrypted_blob (:obj:`str`): An encrypted blob of data containing a penalty transaction. The tower will decrypt
|
||||
it and broadcast the penalty transaction upon seeing a breach on the blockchain.
|
||||
"""
|
||||
|
||||
def __init__(self, locator, start_time, end_time, to_self_delay, encrypted_blob):
|
||||
@@ -25,7 +22,7 @@ class Appointment:
|
||||
self.start_time = start_time # ToDo: #4-standardize-appointment-fields
|
||||
self.end_time = end_time # ToDo: #4-standardize-appointment-fields
|
||||
self.to_self_delay = to_self_delay
|
||||
self.encrypted_blob = EncryptedBlob(encrypted_blob)
|
||||
self.encrypted_blob = encrypted_blob
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, appointment_data):
|
||||
@@ -73,7 +70,7 @@ class Appointment:
|
||||
"start_time": self.start_time,
|
||||
"end_time": self.end_time,
|
||||
"to_self_delay": self.to_self_delay,
|
||||
"encrypted_blob": self.encrypted_blob.data,
|
||||
"encrypted_blob": self.encrypted_blob,
|
||||
}
|
||||
|
||||
return appointment
|
||||
@@ -95,5 +92,5 @@ class Appointment:
|
||||
+ struct.pack(">I", self.start_time)
|
||||
+ struct.pack(">I", self.end_time)
|
||||
+ struct.pack(">I", self.to_self_delay)
|
||||
+ unhexlify(self.encrypted_blob.data)
|
||||
+ unhexlify(self.encrypted_blob)
|
||||
)
|
||||
|
||||
@@ -1,9 +0,0 @@
|
||||
import re
|
||||
|
||||
|
||||
class Blob:
|
||||
def __init__(self, data):
|
||||
if type(data) is not str or re.search(r"^[0-9A-Fa-f]+$", data) is None:
|
||||
raise ValueError("Non-Hex character found in transaction.")
|
||||
|
||||
self.data = data
|
||||
@@ -180,7 +180,7 @@ class Cryptographer:
|
||||
:obj:`ValueError`: if either the ``secret`` or ``encrypted_blob`` is not properly formatted.
|
||||
"""
|
||||
|
||||
Cryptographer.check_data_key_format(encrypted_blob.data, secret)
|
||||
Cryptographer.check_data_key_format(encrypted_blob, secret)
|
||||
|
||||
# sk is the H(txid) (32-byte) and nonce is set to 0 (12-byte)
|
||||
sk = sha256(unhexlify(secret)).digest()
|
||||
@@ -195,7 +195,7 @@ class Cryptographer:
|
||||
|
||||
# Decrypt
|
||||
cipher = ChaCha20Poly1305(sk)
|
||||
data = unhexlify(encrypted_blob.data)
|
||||
data = unhexlify(encrypted_blob)
|
||||
|
||||
try:
|
||||
blob = cipher.decrypt(nonce=nonce, data=data, associated_data=None)
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
class EncryptedBlob:
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, EncryptedBlob) and self.data == other.data
|
||||
@@ -189,7 +189,7 @@ class API:
|
||||
|
||||
if appointment_summary:
|
||||
used_slots = ceil(appointment_summary.get("size") / ENCRYPTED_BLOB_MAX_SIZE_HEX)
|
||||
required_slots = ceil(len(appointment.encrypted_blob.data) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
|
||||
required_slots = ceil(len(appointment.encrypted_blob) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
|
||||
slot_diff = required_slots - used_slots
|
||||
|
||||
# For updates we only reserve the slot difference provided the new one is bigger.
|
||||
@@ -198,7 +198,7 @@ class API:
|
||||
else:
|
||||
# For regular appointments 1 slot is reserved per ENCRYPTED_BLOB_MAX_SIZE_HEX block.
|
||||
slot_diff = 0
|
||||
required_slots = ceil(len(appointment.encrypted_blob.data) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
|
||||
required_slots = ceil(len(appointment.encrypted_blob) / ENCRYPTED_BLOB_MAX_SIZE_HEX)
|
||||
|
||||
# Slots are reserved before adding the appointments to prevent race conditions.
|
||||
# DISCUSS: It may be worth using signals here to avoid race conditions anyway.
|
||||
|
||||
@@ -20,8 +20,8 @@ class Watcher:
|
||||
|
||||
The :class:`Watcher` keeps track of the accepted appointments in ``appointments`` and, for new received block,
|
||||
checks if any breach has happened by comparing the txids with the appointment locators. If a breach is seen, the
|
||||
:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` of the corresponding appointment is decrypted and the
|
||||
data is passed to the :obj:`Responder <teos.responder.Responder>`.
|
||||
``encrypted_blob`` of the corresponding appointment is decrypted and the data is passed to the
|
||||
:obj:`Responder <teos.responder.Responder>`.
|
||||
|
||||
If an appointment reaches its end with no breach, the data is simply deleted.
|
||||
|
||||
@@ -102,9 +102,8 @@ class Watcher:
|
||||
``add_appointment`` is the entry point of the ``Watcher``. Upon receiving a new appointment it will start
|
||||
monitoring the blockchain (``do_watch``) until ``appointments`` is empty.
|
||||
|
||||
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding
|
||||
:obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` and pass the information to the
|
||||
:obj:`Responder <teos.responder.Responder>`.
|
||||
Once a breach is seen on the blockchain, the :obj:`Watcher` will decrypt the corresponding ``encrypted_blob``
|
||||
and pass the information to the :obj:`Responder <teos.responder.Responder>`.
|
||||
|
||||
The tower may store multiple appointments with the same ``locator`` to avoid DoS attacks based on data
|
||||
rewriting. `locators`` should be derived from the ``dispute_txid``, but that task is performed by the user, and
|
||||
@@ -133,7 +132,7 @@ class Watcher:
|
||||
self.appointments[uuid] = {
|
||||
"locator": appointment.locator,
|
||||
"end_time": appointment.end_time,
|
||||
"size": len(appointment.encrypted_blob.data),
|
||||
"size": len(appointment.encrypted_blob),
|
||||
}
|
||||
|
||||
if appointment.locator in self.locator_uuid_map:
|
||||
@@ -268,9 +267,8 @@ class Watcher:
|
||||
"""
|
||||
Filters what of the found breaches contain valid transaction data.
|
||||
|
||||
The :obj:`Watcher` cannot if a given :obj:`EncryptedBlob <common.encrypted_blob.EncryptedBlob>` contains a valid
|
||||
transaction until a breach if seen. Blobs that contain arbitrary data are dropped and not sent to the
|
||||
:obj:`Responder <teos.responder.Responder>`.
|
||||
The :obj:`Watcher` cannot if a given ``encrypted_blob`` contains a valid transaction until a breach if seen.
|
||||
Blobs that contain arbitrary data are dropped and not sent to the :obj:`Responder <teos.responder.Responder>`.
|
||||
|
||||
Args:
|
||||
breaches (:obj:`dict`): a dictionary containing channel breaches (``locator:txid``).
|
||||
@@ -292,8 +290,8 @@ class Watcher:
|
||||
for uuid in self.locator_uuid_map[locator]:
|
||||
appointment = Appointment.from_dict(self.db_manager.load_watcher_appointment(uuid))
|
||||
|
||||
if appointment.encrypted_blob.data in decrypted_blobs:
|
||||
penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob.data]
|
||||
if appointment.encrypted_blob in decrypted_blobs:
|
||||
penalty_tx, penalty_rawtx = decrypted_blobs[appointment.encrypted_blob]
|
||||
|
||||
else:
|
||||
try:
|
||||
@@ -303,7 +301,7 @@ class Watcher:
|
||||
penalty_rawtx = None
|
||||
|
||||
penalty_tx = self.block_processor.decode_raw_transaction(penalty_rawtx)
|
||||
decrypted_blobs[appointment.encrypted_blob.data] = (penalty_tx, penalty_rawtx)
|
||||
decrypted_blobs[appointment.encrypted_blob] = (penalty_tx, penalty_rawtx)
|
||||
|
||||
if penalty_tx is not None:
|
||||
valid_breaches[uuid] = {
|
||||
|
||||
@@ -7,7 +7,6 @@ from binascii import hexlify
|
||||
from coincurve import PrivateKey
|
||||
from requests.exceptions import ConnectionError, Timeout
|
||||
|
||||
from common.blob import Blob
|
||||
import common.cryptographer
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
@@ -50,9 +49,7 @@ dummy_appointment_dict = {
|
||||
"start_time": dummy_appointment_data.get("start_time"),
|
||||
"end_time": dummy_appointment_data.get("end_time"),
|
||||
"to_self_delay": dummy_appointment_data.get("to_self_delay"),
|
||||
"encrypted_blob": Cryptographer.encrypt(
|
||||
Blob(dummy_appointment_data.get("tx")), dummy_appointment_data.get("tx_id")
|
||||
),
|
||||
"encrypted_blob": Cryptographer.encrypt(dummy_appointment_data.get("tx"), dummy_appointment_data.get("tx_id")),
|
||||
}
|
||||
|
||||
dummy_appointment = Appointment.from_dict(dummy_appointment_dict)
|
||||
|
||||
@@ -3,12 +3,10 @@ import binascii
|
||||
from pytest import fixture
|
||||
|
||||
from common.appointment import Appointment
|
||||
from common.encrypted_blob import EncryptedBlob
|
||||
from common.constants import LOCATOR_LEN_BYTES
|
||||
|
||||
from test.common.unit.conftest import get_random_value_hex
|
||||
|
||||
from common.constants import LOCATOR_LEN_BYTES
|
||||
|
||||
|
||||
# Not much to test here, adding it for completeness
|
||||
@fixture
|
||||
@@ -46,7 +44,7 @@ def test_init_appointment(appointment_data):
|
||||
and appointment_data["start_time"] == appointment.start_time
|
||||
and appointment_data["end_time"] == appointment.end_time
|
||||
and appointment_data["to_self_delay"] == appointment.to_self_delay
|
||||
and EncryptedBlob(appointment_data["encrypted_blob"]) == appointment.encrypted_blob
|
||||
and appointment_data["encrypted_blob"] == appointment.encrypted_blob
|
||||
)
|
||||
|
||||
|
||||
@@ -66,7 +64,7 @@ def test_to_dict(appointment_data):
|
||||
and appointment_data["start_time"] == dict_appointment["start_time"]
|
||||
and appointment_data["end_time"] == dict_appointment["end_time"]
|
||||
and appointment_data["to_self_delay"] == dict_appointment["to_self_delay"]
|
||||
and EncryptedBlob(appointment_data["encrypted_blob"]) == EncryptedBlob(dict_appointment["encrypted_blob"])
|
||||
and appointment_data["encrypted_blob"] == dict_appointment["encrypted_blob"]
|
||||
)
|
||||
|
||||
|
||||
@@ -110,4 +108,4 @@ def test_serialize(appointment_data):
|
||||
assert struct.unpack(">I", start_time)[0] == appointment.start_time
|
||||
assert struct.unpack(">I", end_time)[0] == appointment.end_time
|
||||
assert struct.unpack(">I", to_self_delay)[0] == appointment.to_self_delay
|
||||
assert binascii.hexlify(encrypted_blob).decode() == appointment.encrypted_blob.data
|
||||
assert binascii.hexlify(encrypted_blob).decode() == appointment.encrypted_blob
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
from binascii import unhexlify
|
||||
|
||||
from common.blob import Blob
|
||||
from test.common.unit.conftest import get_random_value_hex
|
||||
|
||||
|
||||
def test_init_blob():
|
||||
data = get_random_value_hex(64)
|
||||
blob = Blob(data)
|
||||
assert isinstance(blob, Blob)
|
||||
|
||||
# Wrong data
|
||||
try:
|
||||
Blob(unhexlify(get_random_value_hex(64)))
|
||||
assert False, "Able to create blob with wrong data"
|
||||
|
||||
except ValueError:
|
||||
assert True
|
||||
@@ -6,10 +6,8 @@ from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from coincurve import PrivateKey, PublicKey
|
||||
import common.cryptographer
|
||||
from common.blob import Blob
|
||||
from common.logger import Logger
|
||||
from common.cryptographer import Cryptographer
|
||||
from common.encrypted_blob import EncryptedBlob
|
||||
from test.common.unit.conftest import get_random_value_hex
|
||||
|
||||
common.cryptographer.logger = Logger(actor="Cryptographer", log_name_prefix="")
|
||||
@@ -61,7 +59,7 @@ def test_check_data_key_format():
|
||||
|
||||
|
||||
def test_encrypt_odd_length_data():
|
||||
blob = Blob(get_random_value_hex(64)[-1])
|
||||
blob = get_random_value_hex(64)[-1]
|
||||
key = get_random_value_hex(32)
|
||||
|
||||
try:
|
||||
@@ -73,7 +71,7 @@ def test_encrypt_odd_length_data():
|
||||
|
||||
|
||||
def test_encrypt_wrong_key_size():
|
||||
blob = Blob(get_random_value_hex(64))
|
||||
blob = get_random_value_hex(64)
|
||||
key = get_random_value_hex(31)
|
||||
|
||||
try:
|
||||
@@ -85,15 +83,13 @@ def test_encrypt_wrong_key_size():
|
||||
|
||||
|
||||
def test_encrypt():
|
||||
blob = Blob(data)
|
||||
|
||||
assert Cryptographer.encrypt(blob, key) == encrypted_data
|
||||
assert Cryptographer.encrypt(data, key) == encrypted_data
|
||||
|
||||
|
||||
def test_decrypt_invalid_tag():
|
||||
random_key = get_random_value_hex(32)
|
||||
random_encrypted_data = get_random_value_hex(64)
|
||||
random_encrypted_blob = EncryptedBlob(random_encrypted_data)
|
||||
random_encrypted_blob = random_encrypted_data
|
||||
|
||||
# Trying to decrypt random data should result in an InvalidTag exception. Our decrypt function
|
||||
# returns None
|
||||
@@ -104,7 +100,7 @@ def test_decrypt_invalid_tag():
|
||||
def test_decrypt_odd_length_data():
|
||||
random_key = get_random_value_hex(32)
|
||||
random_encrypted_data_odd = get_random_value_hex(64)[:-1]
|
||||
random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd)
|
||||
random_encrypted_blob_odd = random_encrypted_data_odd
|
||||
|
||||
try:
|
||||
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||
@@ -117,7 +113,7 @@ def test_decrypt_odd_length_data():
|
||||
def test_decrypt_wrong_key_size():
|
||||
random_key = get_random_value_hex(31)
|
||||
random_encrypted_data_odd = get_random_value_hex(64)
|
||||
random_encrypted_blob_odd = EncryptedBlob(random_encrypted_data_odd)
|
||||
random_encrypted_blob_odd = random_encrypted_data_odd
|
||||
|
||||
try:
|
||||
Cryptographer.decrypt(random_encrypted_blob_odd, random_key)
|
||||
@@ -129,7 +125,7 @@ def test_decrypt_wrong_key_size():
|
||||
|
||||
def test_decrypt():
|
||||
# Valid data should run with no InvalidTag and verify
|
||||
assert Cryptographer.decrypt(EncryptedBlob(encrypted_data), key) == data
|
||||
assert Cryptographer.decrypt(encrypted_data, key) == data
|
||||
|
||||
|
||||
def test_load_key_file():
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
from common.encrypted_blob import EncryptedBlob
|
||||
from test.common.unit.conftest import get_random_value_hex
|
||||
|
||||
|
||||
def test_init_encrypted_blob():
|
||||
# No much to test here, basically that the object is properly created
|
||||
data = get_random_value_hex(64)
|
||||
assert EncryptedBlob(data).data == data
|
||||
|
||||
|
||||
def test_equal():
|
||||
data = get_random_value_hex(64)
|
||||
e_blob1 = EncryptedBlob(data)
|
||||
e_blob2 = EncryptedBlob(data)
|
||||
|
||||
assert e_blob1 == e_blob2 and id(e_blob1) != id(e_blob2)
|
||||
@@ -9,7 +9,6 @@ from cli.exceptions import TowerResponseError
|
||||
from cli import teos_cli, DATA_DIR, DEFAULT_CONF, CONF_FILE_NAME
|
||||
|
||||
import common.cryptographer
|
||||
from common.blob import Blob
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.appointment import Appointment
|
||||
@@ -245,7 +244,7 @@ def test_appointment_wrong_decryption_key(bitcoin_cli):
|
||||
# We can't use teos_cli.add_appointment here since it computes the locator internally, so let's do it manually.
|
||||
# We will encrypt the blob using the random value and derive the locator from the commitment tx.
|
||||
appointment_data["locator"] = compute_locator(bitcoin_cli.decoderawtransaction(commitment_tx).get("txid"))
|
||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(Blob(penalty_tx), get_random_value_hex(32))
|
||||
appointment_data["encrypted_blob"] = Cryptographer.encrypt(penalty_tx, get_random_value_hex(32))
|
||||
appointment = Appointment.from_dict(appointment_data)
|
||||
|
||||
signature = Cryptographer.sign(appointment.serialize(), cli_sk)
|
||||
|
||||
@@ -20,7 +20,6 @@ from teos.block_processor import BlockProcessor
|
||||
from teos.appointments_dbm import AppointmentsDBM
|
||||
|
||||
import common.cryptographer
|
||||
from common.blob import Blob
|
||||
from common.logger import Logger
|
||||
from common.tools import compute_locator
|
||||
from common.appointment import Appointment
|
||||
@@ -138,9 +137,8 @@ def generate_dummy_appointment(real_height=True, start_time_offset=5, end_time_o
|
||||
}
|
||||
|
||||
locator = compute_locator(dispute_txid)
|
||||
blob = Blob(dummy_appointment_data.get("tx"))
|
||||
|
||||
encrypted_blob = Cryptographer.encrypt(blob, dummy_appointment_data.get("tx_id"))
|
||||
encrypted_blob = Cryptographer.encrypt(dummy_appointment_data.get("tx"), dummy_appointment_data.get("tx_id"))
|
||||
|
||||
appointment_data = {
|
||||
"locator": locator,
|
||||
|
||||
@@ -219,7 +219,7 @@ def test_add_appointment_registered_not_enough_free_slots(api, client, appointme
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
|
||||
# Let's create a big blob
|
||||
appointment.encrypted_blob.data = TWO_SLOTS_BLOTS
|
||||
appointment.encrypted_blob = TWO_SLOTS_BLOTS
|
||||
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -279,7 +279,7 @@ def test_add_appointment_update_same_size(api, client, appointment):
|
||||
|
||||
# The user has no additional slots, but it should be able to update
|
||||
# Let's just reverse the encrypted blob for example
|
||||
appointment.encrypted_blob.data = appointment.encrypted_blob.data[::-1]
|
||||
appointment.encrypted_blob = appointment.encrypted_blob[::-1]
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -298,7 +298,7 @@ def test_add_appointment_update_bigger(api, client, appointment):
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 1
|
||||
|
||||
# The user has one slot, so it should be able to update as long as it only takes 1 additional slot
|
||||
appointment.encrypted_blob.data = TWO_SLOTS_BLOTS
|
||||
appointment.encrypted_blob = TWO_SLOTS_BLOTS
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -307,7 +307,7 @@ def test_add_appointment_update_bigger(api, client, appointment):
|
||||
|
||||
# Check that it'll fail if no enough slots are available
|
||||
# Double the size from before
|
||||
appointment.encrypted_blob.data = TWO_SLOTS_BLOTS + TWO_SLOTS_BLOTS
|
||||
appointment.encrypted_blob = TWO_SLOTS_BLOTS + TWO_SLOTS_BLOTS
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -320,7 +320,7 @@ def test_add_appointment_update_smaller(api, client, appointment):
|
||||
api.gatekeeper.registered_users[compressed_client_pk] = {"available_slots": 2}
|
||||
|
||||
# This should take 2 slots
|
||||
appointment.encrypted_blob.data = TWO_SLOTS_BLOTS
|
||||
appointment.encrypted_blob = TWO_SLOTS_BLOTS
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
@@ -328,7 +328,7 @@ def test_add_appointment_update_smaller(api, client, appointment):
|
||||
assert r.status_code == HTTP_OK and r.json.get("available_slots") == 0
|
||||
|
||||
# Let's update with one just small enough
|
||||
appointment.encrypted_blob.data = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2)
|
||||
appointment.encrypted_blob = "A" * (ENCRYPTED_BLOB_MAX_SIZE_HEX - 2)
|
||||
appointment_signature = Cryptographer.sign(appointment.serialize(), client_sk)
|
||||
r = add_appointment(
|
||||
client, {"appointment": appointment.to_dict(), "signature": appointment_signature}, compressed_client_pk
|
||||
|
||||
@@ -300,7 +300,7 @@ def test_inspect(run_bitcoind):
|
||||
and appointment.start_time == start_time
|
||||
and appointment.end_time == end_time
|
||||
and appointment.to_self_delay == to_self_delay
|
||||
and appointment.encrypted_blob.data == encrypted_blob
|
||||
and appointment.encrypted_blob == encrypted_blob
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -278,7 +278,7 @@ def test_filter_valid_breaches(watcher):
|
||||
)
|
||||
|
||||
dummy_appointment, _ = generate_dummy_appointment()
|
||||
dummy_appointment.encrypted_blob.data = encrypted_blob
|
||||
dummy_appointment.encrypted_blob = encrypted_blob
|
||||
dummy_appointment.locator = compute_locator(dispute_txid)
|
||||
uuid = uuid4().hex
|
||||
|
||||
|
||||
Reference in New Issue
Block a user