watcher - wipes completed/expired appointments from the Gatekeeper

This commit is contained in:
Sergi Delgado Segura
2020-04-22 14:55:10 +02:00
parent f66e4785f2
commit e7141b295e
2 changed files with 23 additions and 11 deletions

View File

@@ -44,8 +44,8 @@ class Watcher:
Attributes: Attributes:
appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment appointments (:obj:`dict`): a dictionary containing a summary of the appointments (:obj:`ExtendedAppointment
<teos.extended_appointment.ExtendedAppointment>` instances) accepted by the tower (``locator``, <teos.extended_appointment.ExtendedAppointment>` instances) accepted by the tower (``locator`` and
``user_id``, and ``size``). It's populated trough ``add_appointment``. ``user_id``). It's populated trough ``add_appointment``.
locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several locator_uuid_map (:obj:`dict`): a ``locator:uuid`` map used to allow the :obj:`Watcher` to deal with several
appointments with the same ``locator``. appointments with the same ``locator``.
block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is block_queue (:obj:`Queue`): A queue used by the :obj:`Watcher` to receive block hashes from ``bitcoind``. It is
@@ -59,7 +59,6 @@ class Watcher:
responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance. responder (:obj:`Responder <teos.responder.Responder>`): a ``Responder`` instance.
signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments. signing_key (:mod:`PrivateKey`): a private key used to sign accepted appointments.
max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time. max_appointments (:obj:`int`): the maximum amount of appointments accepted by the ``Watcher`` at the same time.
expiry_delta (:obj:`int`): the additional time the ``Watcher`` will keep an expired appointment around.
last_known_block (:obj:`str`): the last block known by the ``Watcher``. last_known_block (:obj:`str`): the last block known by the ``Watcher``.
Raises: Raises:
@@ -131,11 +130,8 @@ class Watcher:
# If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps). # If an appointment is requested by the user the uuid can be recomputed and queried straightaway (no maps).
uuid = hash_160("{}{}".format(appointment.locator, user_id)) uuid = hash_160("{}{}".format(appointment.locator, user_id))
# The third argument is the previous version of the same appointment (optional, returns None if missing) # Add the appointment to the Gatekeeper
available_slots = self.gatekeeper.update_available_slots( available_slots = self.gatekeeper.add_update_appointment(user_id, uuid, appointment)
user_id, appointment.get_summary(), self.appointments.get(uuid)
)
self.gatekeeper.registered_users[user_id].appointments.append(uuid)
self.appointments[uuid] = appointment.get_summary() self.appointments[uuid] = appointment.get_summary()
if appointment.locator in self.locator_uuid_map: if appointment.locator in self.locator_uuid_map:
@@ -191,6 +187,11 @@ class Watcher:
# Make sure we only try to delete what is on the Watcher (some appointments may have been triggered) # Make sure we only try to delete what is on the Watcher (some appointments may have been triggered)
expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys())) expired_appointments = list(set(expired_appointments).intersection(self.appointments.keys()))
# Keep track of the expired appointments before deleting them from memory
appointments_to_delete_gatekeeper = {
uuid: self.appointments[uuid].get("user_id") for uuid in expired_appointments
}
Cleaner.delete_expired_appointments( Cleaner.delete_expired_appointments(
expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager expired_appointments, self.appointments, self.locator_uuid_map, self.db_manager
) )
@@ -230,10 +231,18 @@ class Watcher:
appointments_to_delete.extend(invalid_breaches) appointments_to_delete.extend(invalid_breaches)
self.db_manager.batch_create_triggered_appointment_flag(triggered_flags) self.db_manager.batch_create_triggered_appointment_flag(triggered_flags)
# Update the dictionary with the completed appointments
appointments_to_delete_gatekeeper.update(
{uuid: self.appointments[uuid].get("user_id") for uuid in appointments_to_delete}
)
Cleaner.delete_completed_appointments( Cleaner.delete_completed_appointments(
appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager appointments_to_delete, self.appointments, self.locator_uuid_map, self.db_manager
) )
# Remove expired and completed appointments from the Gatekeeper
Cleaner.delete_gatekeeper_appointments(self.gatekeeper, appointments_to_delete_gatekeeper)
if len(self.appointments) != 0: if len(self.appointments) != 0:
logger.info("No more pending appointments") logger.info("No more pending appointments")

View File

@@ -217,8 +217,9 @@ def test_do_watch(watcher, temp_db_manager):
# Add the appointments # Add the appointments
for uuid, appointment in appointments.items(): for uuid, appointment in appointments.items():
watcher.appointments[uuid] = {"locator": appointment.locator, "user_id": user_id, "size": 200} watcher.appointments[uuid] = {"locator": appointment.locator, "user_id": user_id}
watcher.gatekeeper.registered_users[user_id].appointments.append(uuid) # Assume the appointment only takes one slot
watcher.gatekeeper.registered_users[user_id].appointments[uuid] = 1
watcher.db_manager.store_watcher_appointment(uuid, appointment.to_dict()) watcher.db_manager.store_watcher_appointment(uuid, appointment.to_dict())
watcher.db_manager.create_append_locator_map(appointment.locator, uuid) watcher.db_manager.create_append_locator_map(appointment.locator, uuid)
@@ -237,9 +238,11 @@ def test_do_watch(watcher, temp_db_manager):
# The rest of appointments will timeout after the subscription times-out (9 more blocks) + EXPIRY_DELTA # The rest of appointments will timeout after the subscription times-out (9 more blocks) + EXPIRY_DELTA
# Wait for an additional block to be safe # Wait for an additional block to be safe
generate_blocks_w_delay(10 + config.get("EXPIRY_DELTA")) generate_blocks_w_delay(10 + config.get("EXPIRY_DELTA"))
assert len(watcher.appointments) == 0 assert len(watcher.appointments) == 0
# Check that they are not in the Gatekeeper either, only the two that passed to the Responder should remain
assert len(watcher.gatekeeper.registered_users[user_id].appointments) == 2
# FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this. # FIXME: We should also add cases where the transactions are invalid. bitcoind_mock needs to be extended for this.