From b52360c0cddd37d8f23819f3d6670cd3783c42dd Mon Sep 17 00:00:00 2001 From: Michael Schmoock Date: Thu, 9 Feb 2023 17:48:05 +0100 Subject: [PATCH] summary: replace shelve with datastore using pickle Usage of python shelve caused a lot of issues in the past. --- summary/summary.py | 100 ++++++++------------------------------- summary/summary_avail.py | 5 -- summary/test_summary.py | 10 ++-- 3 files changed, 27 insertions(+), 88 deletions(-) diff --git a/summary/summary.py b/summary/summary.py index f17af7f..754c07f 100755 --- a/summary/summary.py +++ b/summary/summary.py @@ -6,16 +6,14 @@ from operator import attrgetter from summary_avail import trace_availability, addpeer import pyln.client import requests -import shelve import threading import time -import os -import glob +import pickle import sys plugin = Plugin(autopatch=True) -dbfile = "summary.dat" +datastore_key = ['summary', 'avail'] have_utf8 = False @@ -51,9 +49,9 @@ class PeerThread(threading.Thread): try: rpcpeers = plugin.rpc.listpeers() trace_availability(plugin, rpcpeers) - plugin.persist.sync() - plugin.log("[PeerThread] Peerstate availability persisted and " - "synced. Sleeping now...", 'debug') + write_datastore(plugin) + plugin.log("[PeerThread] Peerstate wrote to datastore. " + "Sleeping now...", 'debug') time.sleep(plugin.avail_interval) except Exception as ex: plugin.log("[PeerThread] " + str(ex), 'warn') @@ -248,70 +246,20 @@ def summary(plugin, exclude='', sortkey=None): return reply -def remove_db(): - # From this reference https://stackoverflow.com/a/16231228/10854225 - # the file can have different extension and depends from the os target - # in this way we say to remove any file that start with summary.dat* - # FIXME: There is better option to obtain the same result - for db_file in glob.glob(os.path.join(".", f"{dbfile}*")): - os.remove(db_file) +def load_datastore(plugin): + entries = plugin.rpc.listdatastore(key=datastore_key)['datastore'] + if len(entries) == 0: + plugin.log(f"Creating a new datastore '{datastore_key}'", 'debug') + return {'version': 1, 'peerstate': {}, 'availcount': 0} + persist = pickle.loads(bytearray.fromhex(entries[0]["hex"])) + plugin.log(f"Reopened datastore '{datastore_key}' with {persist['availcount']} " + f"runs and {len(persist['peerstate'])} entries", 'debug') + return persist -def init_db(plugin, retry_time=4, sleep_time=1): - """ - On some os we receive some error of type [Errno 79] Inappropriate file type or format: 'summary.dat.db' - With this function we retry the call to open db 4 time, and if the last time we obtain an error - We will remove the database and recreate a new one. - """ - db = None - retry = 0 - while (db is None and retry < retry_time): - try: - db = shelve.open(dbfile, writeback=True) - except IOError as ex: - plugin.log("Error during db initialization: {}".format(ex)) - time.sleep(sleep_time) - if retry == retry_time - 2: - plugin.log("As last attempt we try to delete the db.") - # In case we can not access to the file - # we can safely delete the db and recreate a new one - remove_db() - retry += 1 - - if db is None: - raise RuntimeError("db initialization error") - else: - # Sometimes a re-opened databse will throw `_dmb.error: cannot add item` - # on first write maybe because of shelve binary format changes. - # In this case, remove and recreate. - try: - db['test_touch'] = "just_some_data" - del db['test_touch'] - except Exception: - try: # still give it a try to close it gracefully - db.close() - except Exception: - pass - remove_db() - return init_db(plugin) - return db - - -def close_db(plugin) -> bool: - """ - This method contains the logic to close the database - and print some error message that can happen. - """ - if plugin.persist is not None: - try: - plugin.persist.close() - plugin.log("Database sync and closed with success") - except ValueError as ex: - plugin.log("An exception occurs during the db closing operation with the following message: {}".format(ex)) - return False - else: - plugin.log("There is no db opened for the plugin") - return True +def write_datastore(plugin): + hexstr = pickle.dumps(plugin.persist).hex() + plugin.rpc.datastore(key=datastore_key, hex=hexstr, mode="create-or-replace") @plugin.init() @@ -325,14 +273,7 @@ def init(options, configuration, plugin): plugin.avail_interval = float(options['summary-availability-interval']) plugin.avail_window = 60 * 60 * int(options['summary-availability-window']) - plugin.persist = init_db(plugin) - if 'peerstate' not in plugin.persist: - plugin.log(f"Creating a new {dbfile} shelve", 'debug') - plugin.persist['peerstate'] = {} - plugin.persist['availcount'] = 0 - else: - plugin.log(f"Reopened {dbfile} shelve with {plugin.persist['availcount']} " - f"runs and {len(plugin.persist['peerstate'])} entries", 'debug') + plugin.persist = load_datastore(plugin) info = plugin.rpc.getinfo() config = plugin.rpc.listconfigs() @@ -371,8 +312,9 @@ def init(options, configuration, plugin): @plugin.subscribe("shutdown") def on_rpc_command_callback(plugin, **kwargs): - plugin.log("Closing db before lightningd exit") - close_db(plugin) + # FIXME: Writing datastore does not work on exit, as daemon is already lost. + # plugin.log("Writing out datastore before shutting down") + # write_datastore(plugin) sys.exit() diff --git a/summary/summary_avail.py b/summary/summary_avail.py index 71a33e2..d327a95 100644 --- a/summary/summary_avail.py +++ b/summary/summary_avail.py @@ -1,13 +1,9 @@ -from datetime import datetime - - # ensure an rpc peer is added def addpeer(p, rpcpeer): pid = rpcpeer['id'] if pid not in p.persist['peerstate']: p.persist['peerstate'][pid] = { 'connected': rpcpeer['connected'], - 'last_seen': datetime.now() if rpcpeer['connected'] else None, 'avail': 1.0 if rpcpeer['connected'] else 0.0 } @@ -25,7 +21,6 @@ def trace_availability(p, rpcpeers): addpeer(p, rpcpeer) if rpcpeer['connected']: - p.persist['peerstate'][pid]['last_seen'] = datetime.now() p.persist['peerstate'][pid]['connected'] = True p.persist['peerstate'][pid]['avail'] = 1.0 * alpha + p.persist['peerstate'][pid]['avail'] * beta else: diff --git a/summary/test_summary.py b/summary/test_summary.py index 37ce08b..707b49b 100644 --- a/summary/test_summary.py +++ b/summary/test_summary.py @@ -35,7 +35,7 @@ def test_summary_peer_thread(node_factory): l2.stop() # we stop l2 and wait for l1 to see that l1.daemon.wait_for_log(f".*{l2id}.*Peer connection lost.*") wait_for(lambda: l1.rpc.listpeers(l2id)['peers'][0]['connected'] is False) - l1.daemon.wait_for_log(r".*availability persisted and synced.*") + l1.daemon.wait_for_log("Peerstate wrote to datastore") s2 = l1.rpc.summary() # then @@ -179,13 +179,15 @@ def test_summary_persist(node_factory): l1, l2 = node_factory.line_graph(2, opts=opts) # when - l1.daemon.wait_for_log(r".*availability persisted and synced.*") + l1.daemon.logsearch_start = 0 + l1.daemon.wait_for_log("Creating a new datastore") + l1.daemon.wait_for_log("Peerstate wrote to datastore") s1 = l1.rpc.summary() l2.stop() l1.restart() - assert l1.daemon.is_in_log(r".*Reopened summary.dat shelve.*") + assert l1.daemon.is_in_log("Reopened datastore") l1.daemon.logsearch_start = len(l1.daemon.logs) - l1.daemon.wait_for_log(r".*availability persisted and synced.*") + l1.daemon.wait_for_log("Peerstate wrote to datastore") s2 = l1.rpc.summary() # then