From 8575bc62098b71aaaef12a3970e2ed91b142c0b8 Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Wed, 29 Apr 2020 13:08:49 +0200 Subject: [PATCH] plugin - redesigns retrier using backoff --- watchtower-plugin/requirements.txt | 3 +- watchtower-plugin/retrier.py | 99 ++++++++++++++++-------------- 2 files changed, 56 insertions(+), 46 deletions(-) diff --git a/watchtower-plugin/requirements.txt b/watchtower-plugin/requirements.txt index d1f1adf..84ba49d 100644 --- a/watchtower-plugin/requirements.txt +++ b/watchtower-plugin/requirements.txt @@ -3,4 +3,5 @@ requests coincurve cryptography==2.8 pyzbase32 -plyvel \ No newline at end of file +plyvel +backoff \ No newline at end of file diff --git a/watchtower-plugin/retrier.py b/watchtower-plugin/retrier.py index d778c9e..a4cbf3d 100644 --- a/watchtower-plugin/retrier.py +++ b/watchtower-plugin/retrier.py @@ -1,59 +1,68 @@ +import backoff +from threading import Thread + from tower_info import TowerInfo -from net.http import send_appointment -from exceptions import TowerConnectionError, TowerResponseError +from net.http import add_appointment + + +MAX_RETRIES = None + + +def on_backoff(details): + plugin = details.get("args")[1] + tower_id = details.get("args")[2] + plugin.log("Retry {} failed for tower {}, backing off".format(details.get("tries"), tower_id)) + + +def on_giveup(details): + plugin = details.get("args")[1] + tower_id = details.get("args")[2] + tower_info = details.get("args")[3] + + plugin.log("Max retries reached, abandoning tower {}".format(tower_id)) + + tower_info.status = "unreachable" + plugin.wt_client.towers[tower_id]["status"] = "unreachable" + plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) + + +def set_max_retries(max_retries): + global MAX_RETRIES + MAX_RETRIES = max_retries + + +def max_retries(): + return MAX_RETRIES class Retrier: - def __init__(self, retry_delta, max_retries, temp_unreachable_towers): - self.retry_delta = retry_delta - self.max_retries = max_retries + def __init__(self, max_retries, temp_unreachable_towers): self.temp_unreachable_towers = temp_unreachable_towers - self.retry_count = {} + set_max_retries(max_retries) - def do_retry(self, plugin): + def manage_retry(self, plugin): while True: tower_id = self.temp_unreachable_towers.get() tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id)) - try: - for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]: - plugin.log("Retrying: sending appointment to {}".format(tower_id)) - response = send_appointment(tower_id, tower_info, appointment_dict, signature) - plugin.log("Appointment accepted and signed by {})".format(tower_id)) - plugin.log("Remaining slots: {}".format(response.get("available_slots"))) + Thread(target=self.do_retry, args=[plugin, tower_id, tower_info], daemon=True).start() - tower_info.appointments[appointment_dict.get("locator")] = response.get("signature") - tower_info.available_slots = response.get("available_slots") + @backoff.on_predicate( + backoff.expo, + lambda x: x == "temporarily unreachable", + max_tries=max_retries, + on_backoff=on_backoff, + on_giveup=on_giveup, + ) + def do_retry(self, plugin, tower_id, tower_info): + for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]: + status = add_appointment(plugin, tower_id, tower_info, appointment_dict, signature) - # Update memory and TowersDB - tower_info.pending_appointments.remove([appointment_dict, signature]) - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - plugin.wt_client.towers[tower_id] = tower_info.get_summary() + if status in ["reachable", "misbehaving"]: + tower_info.pending_appointments.remove([appointment_dict, signature]) - if tower_id in self.retry_count: - self.retry_count.pop(tower_id) + # Update memory and TowersDB + plugin.wt_client.update_tower_state(tower_id, tower_info) - tower_info.status = "reachable" - plugin.wt_client.towers[tower_id]["status"] = "reachable" - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - - except TowerConnectionError: - if tower_id not in self.retry_count: - self.retry_count[tower_id] = 1 - else: - plugin.log("Retry {} failed for tower {}, backing off".format(self.retry_count[tower_id], tower_id)) - self.retry_count[tower_id] += 1 - - if self.retry_count[tower_id] <= self.max_retries: - self.temp_unreachable_towers.put(tower_id) - else: - plugin.log("Max retries reached, abandoning tower {}".format(tower_id)) - self.retry_count.pop(tower_id) - - tower_info.status = "unreachable" - plugin.wt_client.towers[tower_id]["status"] = "unreachable" - plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info) - - except TowerResponseError as e: - # FIXME: deal with tower errors, such as no available slots - plugin.log(str(e)) + else: + return status