Files
python-teos/watchtower-plugin/retrier.py
Sergi Delgado Segura 94e36ebeda plugin - Adds docstrings
2020-05-05 20:11:04 +02:00

149 lines
5.4 KiB
Python

import backoff
from threading import Thread
from common.exceptions import SignatureError
from net.http import add_appointment
from exceptions import TowerConnectionError, TowerResponseError
MAX_RETRIES = None
def check_retry(status):
"""
Checks is the job needs to be retried. Jobs are retried if max_retries is not reached and the tower status is
temporarily unreachable.
Args:
status (:obj:`str`): the tower status.
Returns:
:obj:`bool`: True is the status is "temporarily unreachable", False otherwise.
"""
return status == "temporarily unreachable"
def on_backoff(details):
"""
Function called when backing off after a retry. Logs data regarding the retry.
Args:
details: the retry details (check backoff library for more info).
"""
plugin = details.get("args")[1]
tower_id = details.get("args")[2]
plugin.log(f"Retry {details.get('tries')} failed for tower {tower_id}, backing off")
def on_giveup(details):
"""
Function called when giving up after the last retry. Logs data regarding the retry and flags the tower as
unreachable.
Args:
details: the retry details (check backoff library for more info).
"""
plugin = details.get("args")[1]
tower_id = details.get("args")[2]
plugin.log(f"Max retries reached, abandoning tower {tower_id}")
tower_update = {"status": "unreachable"}
plugin.wt_client.update_tower_state(tower_id, tower_update)
def set_max_retries(max_retries):
"""Workaround to set max retries from Retrier to the backoff.on_predicate decorator"""
global MAX_RETRIES
MAX_RETRIES = max_retries
def max_retries():
"""Workaround to set max retries from Retrier to the backoff.on_predicate decorator"""
return MAX_RETRIES
class Retrier:
"""
The Retrier is in charge of the retry process for appointments that were sent to towers that were temporarily
unreachable.
Args:
max_retries (:obj:`int`): the maximum number of times that a tower will be retried.
temp_unreachable_towers (:obj:`Queue`): a queue of temporarily unreachable towers populated by the plugin on
failing to deliver an appointment.
"""
def __init__(self, max_retries, temp_unreachable_towers):
self.temp_unreachable_towers = temp_unreachable_towers
set_max_retries(max_retries)
def manage_retry(self, plugin):
"""
Listens to the temporarily unreachable towers queue and creates a thread to manage each tower it gets.
Args:
plugin (:obj:`Plugin`): the plugin object.
"""
while True:
tower_id = self.temp_unreachable_towers.get()
tower = plugin.wt_client.towers[tower_id]
Thread(target=self.do_retry, args=[plugin, tower_id, tower], daemon=True).start()
@backoff.on_predicate(backoff.expo, check_retry, max_tries=max_retries, on_backoff=on_backoff, on_giveup=on_giveup)
def do_retry(self, plugin, tower_id, tower):
"""
Retries to send a list of pending appointments to a temporarily unreachable tower. This function is managed by
manage_retries and run in a different thread per tower.
For every pending appointment the worker thread tries to send the data to the tower. If the tower keeps being
unreachable, the job is retries up to MAX_RETRIES. If MAX_RETRIES is reached, the worker thread gives up and the
tower is flagged as unreachable.
Args:
plugin (:obj:`Plugin`): the plugin object.
tower_id (:obj:`str`): the id of the tower managed by the thread.
tower: (:obj:`TowerSummary`): the tower data.
Returns:
:obj:`str`: the tower status if it is not reachable.
"""
for appointment_dict, signature in plugin.wt_client.towers[tower_id].pending_appointments:
tower_update = {}
try:
tower_signature, available_slots = add_appointment(plugin, tower_id, tower, appointment_dict, signature)
tower_update["status"] = "reachable"
tower_update["appointment"] = (appointment_dict.get("locator"), tower_signature)
tower_update["available_slots"] = available_slots
except SignatureError as e:
tower_update["status"] = "misbehaving"
tower_update["misbehaving_proof"] = {
"appointment": appointment_dict,
"signature": e.kwargs.get("signature"),
"recovered_id": e.kwargs.get("recovered_id"),
}
except TowerConnectionError:
tower_update["status"] = "temporarily unreachable"
except TowerResponseError as e:
tower_update["status"] = e.kwargs.get("status")
if e.kwargs.get("invalid_appointment"):
tower_update["invalid_appointment"] = (appointment_dict, signature)
if tower_update["status"] in ["reachable", "misbehaving"]:
tower_update["pending_appointment"] = ([appointment_dict, signature], "remove")
if tower_update["status"] != "temporarily unreachable":
# Update memory and TowersDB
plugin.wt_client.update_tower_state(tower_id, tower_update)
# Continue looping if reachable, return for either retry or stop otherwise
if tower_update["status"] != "reachable":
return tower_update.get("status")