mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 14:14:22 +01:00
plugin - redesigns retrier using backoff
This commit is contained in:
@@ -4,3 +4,4 @@ coincurve
|
|||||||
cryptography==2.8
|
cryptography==2.8
|
||||||
pyzbase32
|
pyzbase32
|
||||||
plyvel
|
plyvel
|
||||||
|
backoff
|
||||||
@@ -1,59 +1,68 @@
|
|||||||
|
import backoff
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
from tower_info import TowerInfo
|
from tower_info import TowerInfo
|
||||||
from net.http import send_appointment
|
from net.http import add_appointment
|
||||||
from exceptions import TowerConnectionError, TowerResponseError
|
|
||||||
|
|
||||||
|
MAX_RETRIES = None
|
||||||
|
|
||||||
|
|
||||||
|
def on_backoff(details):
|
||||||
|
plugin = details.get("args")[1]
|
||||||
|
tower_id = details.get("args")[2]
|
||||||
|
plugin.log("Retry {} failed for tower {}, backing off".format(details.get("tries"), tower_id))
|
||||||
|
|
||||||
|
|
||||||
|
def on_giveup(details):
|
||||||
|
plugin = details.get("args")[1]
|
||||||
|
tower_id = details.get("args")[2]
|
||||||
|
tower_info = details.get("args")[3]
|
||||||
|
|
||||||
|
plugin.log("Max retries reached, abandoning tower {}".format(tower_id))
|
||||||
|
|
||||||
|
tower_info.status = "unreachable"
|
||||||
|
plugin.wt_client.towers[tower_id]["status"] = "unreachable"
|
||||||
|
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
||||||
|
|
||||||
|
|
||||||
|
def set_max_retries(max_retries):
|
||||||
|
global MAX_RETRIES
|
||||||
|
MAX_RETRIES = max_retries
|
||||||
|
|
||||||
|
|
||||||
|
def max_retries():
|
||||||
|
return MAX_RETRIES
|
||||||
|
|
||||||
|
|
||||||
class Retrier:
|
class Retrier:
|
||||||
def __init__(self, retry_delta, max_retries, temp_unreachable_towers):
|
def __init__(self, max_retries, temp_unreachable_towers):
|
||||||
self.retry_delta = retry_delta
|
|
||||||
self.max_retries = max_retries
|
|
||||||
self.temp_unreachable_towers = temp_unreachable_towers
|
self.temp_unreachable_towers = temp_unreachable_towers
|
||||||
self.retry_count = {}
|
set_max_retries(max_retries)
|
||||||
|
|
||||||
def do_retry(self, plugin):
|
def manage_retry(self, plugin):
|
||||||
while True:
|
while True:
|
||||||
tower_id = self.temp_unreachable_towers.get()
|
tower_id = self.temp_unreachable_towers.get()
|
||||||
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
tower_info = TowerInfo.from_dict(plugin.wt_client.db_manager.load_tower_record(tower_id))
|
||||||
|
|
||||||
try:
|
Thread(target=self.do_retry, args=[plugin, tower_id, tower_info], daemon=True).start()
|
||||||
for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]:
|
|
||||||
plugin.log("Retrying: sending appointment to {}".format(tower_id))
|
|
||||||
response = send_appointment(tower_id, tower_info, appointment_dict, signature)
|
|
||||||
plugin.log("Appointment accepted and signed by {})".format(tower_id))
|
|
||||||
plugin.log("Remaining slots: {}".format(response.get("available_slots")))
|
|
||||||
|
|
||||||
tower_info.appointments[appointment_dict.get("locator")] = response.get("signature")
|
@backoff.on_predicate(
|
||||||
tower_info.available_slots = response.get("available_slots")
|
backoff.expo,
|
||||||
|
lambda x: x == "temporarily unreachable",
|
||||||
|
max_tries=max_retries,
|
||||||
|
on_backoff=on_backoff,
|
||||||
|
on_giveup=on_giveup,
|
||||||
|
)
|
||||||
|
def do_retry(self, plugin, tower_id, tower_info):
|
||||||
|
for appointment_dict, signature in plugin.wt_client.towers[tower_id]["pending_appointments"]:
|
||||||
|
status = add_appointment(plugin, tower_id, tower_info, appointment_dict, signature)
|
||||||
|
|
||||||
# Update memory and TowersDB
|
if status in ["reachable", "misbehaving"]:
|
||||||
tower_info.pending_appointments.remove([appointment_dict, signature])
|
tower_info.pending_appointments.remove([appointment_dict, signature])
|
||||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
|
||||||
plugin.wt_client.towers[tower_id] = tower_info.get_summary()
|
|
||||||
|
|
||||||
if tower_id in self.retry_count:
|
# Update memory and TowersDB
|
||||||
self.retry_count.pop(tower_id)
|
plugin.wt_client.update_tower_state(tower_id, tower_info)
|
||||||
|
|
||||||
tower_info.status = "reachable"
|
else:
|
||||||
plugin.wt_client.towers[tower_id]["status"] = "reachable"
|
return status
|
||||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
|
||||||
|
|
||||||
except TowerConnectionError:
|
|
||||||
if tower_id not in self.retry_count:
|
|
||||||
self.retry_count[tower_id] = 1
|
|
||||||
else:
|
|
||||||
plugin.log("Retry {} failed for tower {}, backing off".format(self.retry_count[tower_id], tower_id))
|
|
||||||
self.retry_count[tower_id] += 1
|
|
||||||
|
|
||||||
if self.retry_count[tower_id] <= self.max_retries:
|
|
||||||
self.temp_unreachable_towers.put(tower_id)
|
|
||||||
else:
|
|
||||||
plugin.log("Max retries reached, abandoning tower {}".format(tower_id))
|
|
||||||
self.retry_count.pop(tower_id)
|
|
||||||
|
|
||||||
tower_info.status = "unreachable"
|
|
||||||
plugin.wt_client.towers[tower_id]["status"] = "unreachable"
|
|
||||||
plugin.wt_client.db_manager.store_tower_record(tower_id, tower_info)
|
|
||||||
|
|
||||||
except TowerResponseError as e:
|
|
||||||
# FIXME: deal with tower errors, such as no available slots
|
|
||||||
plugin.log(str(e))
|
|
||||||
|
|||||||
Reference in New Issue
Block a user