summary: replace shelve with datastore using pickle

Usage of python shelve caused a lot of issues in the past.
This commit is contained in:
Michael Schmoock
2023-02-09 17:48:05 +01:00
parent 07f9fcfa78
commit b52360c0cd
3 changed files with 27 additions and 88 deletions

View File

@@ -6,16 +6,14 @@ from operator import attrgetter
from summary_avail import trace_availability, addpeer
import pyln.client
import requests
import shelve
import threading
import time
import os
import glob
import pickle
import sys
plugin = Plugin(autopatch=True)
dbfile = "summary.dat"
datastore_key = ['summary', 'avail']
have_utf8 = False
@@ -51,9 +49,9 @@ class PeerThread(threading.Thread):
try:
rpcpeers = plugin.rpc.listpeers()
trace_availability(plugin, rpcpeers)
plugin.persist.sync()
plugin.log("[PeerThread] Peerstate availability persisted and "
"synced. Sleeping now...", 'debug')
write_datastore(plugin)
plugin.log("[PeerThread] Peerstate wrote to datastore. "
"Sleeping now...", 'debug')
time.sleep(plugin.avail_interval)
except Exception as ex:
plugin.log("[PeerThread] " + str(ex), 'warn')
@@ -248,70 +246,20 @@ def summary(plugin, exclude='', sortkey=None):
return reply
def remove_db():
# From this reference https://stackoverflow.com/a/16231228/10854225
# the file can have different extension and depends from the os target
# in this way we say to remove any file that start with summary.dat*
# FIXME: There is better option to obtain the same result
for db_file in glob.glob(os.path.join(".", f"{dbfile}*")):
os.remove(db_file)
def load_datastore(plugin):
entries = plugin.rpc.listdatastore(key=datastore_key)['datastore']
if len(entries) == 0:
plugin.log(f"Creating a new datastore '{datastore_key}'", 'debug')
return {'version': 1, 'peerstate': {}, 'availcount': 0}
persist = pickle.loads(bytearray.fromhex(entries[0]["hex"]))
plugin.log(f"Reopened datastore '{datastore_key}' with {persist['availcount']} "
f"runs and {len(persist['peerstate'])} entries", 'debug')
return persist
def init_db(plugin, retry_time=4, sleep_time=1):
"""
On some os we receive some error of type [Errno 79] Inappropriate file type or format: 'summary.dat.db'
With this function we retry the call to open db 4 time, and if the last time we obtain an error
We will remove the database and recreate a new one.
"""
db = None
retry = 0
while (db is None and retry < retry_time):
try:
db = shelve.open(dbfile, writeback=True)
except IOError as ex:
plugin.log("Error during db initialization: {}".format(ex))
time.sleep(sleep_time)
if retry == retry_time - 2:
plugin.log("As last attempt we try to delete the db.")
# In case we can not access to the file
# we can safely delete the db and recreate a new one
remove_db()
retry += 1
if db is None:
raise RuntimeError("db initialization error")
else:
# Sometimes a re-opened databse will throw `_dmb.error: cannot add item`
# on first write maybe because of shelve binary format changes.
# In this case, remove and recreate.
try:
db['test_touch'] = "just_some_data"
del db['test_touch']
except Exception:
try: # still give it a try to close it gracefully
db.close()
except Exception:
pass
remove_db()
return init_db(plugin)
return db
def close_db(plugin) -> bool:
"""
This method contains the logic to close the database
and print some error message that can happen.
"""
if plugin.persist is not None:
try:
plugin.persist.close()
plugin.log("Database sync and closed with success")
except ValueError as ex:
plugin.log("An exception occurs during the db closing operation with the following message: {}".format(ex))
return False
else:
plugin.log("There is no db opened for the plugin")
return True
def write_datastore(plugin):
hexstr = pickle.dumps(plugin.persist).hex()
plugin.rpc.datastore(key=datastore_key, hex=hexstr, mode="create-or-replace")
@plugin.init()
@@ -325,14 +273,7 @@ def init(options, configuration, plugin):
plugin.avail_interval = float(options['summary-availability-interval'])
plugin.avail_window = 60 * 60 * int(options['summary-availability-window'])
plugin.persist = init_db(plugin)
if 'peerstate' not in plugin.persist:
plugin.log(f"Creating a new {dbfile} shelve", 'debug')
plugin.persist['peerstate'] = {}
plugin.persist['availcount'] = 0
else:
plugin.log(f"Reopened {dbfile} shelve with {plugin.persist['availcount']} "
f"runs and {len(plugin.persist['peerstate'])} entries", 'debug')
plugin.persist = load_datastore(plugin)
info = plugin.rpc.getinfo()
config = plugin.rpc.listconfigs()
@@ -371,8 +312,9 @@ def init(options, configuration, plugin):
@plugin.subscribe("shutdown")
def on_rpc_command_callback(plugin, **kwargs):
plugin.log("Closing db before lightningd exit")
close_db(plugin)
# FIXME: Writing datastore does not work on exit, as daemon is already lost.
# plugin.log("Writing out datastore before shutting down")
# write_datastore(plugin)
sys.exit()

View File

@@ -1,13 +1,9 @@
from datetime import datetime
# ensure an rpc peer is added
def addpeer(p, rpcpeer):
pid = rpcpeer['id']
if pid not in p.persist['peerstate']:
p.persist['peerstate'][pid] = {
'connected': rpcpeer['connected'],
'last_seen': datetime.now() if rpcpeer['connected'] else None,
'avail': 1.0 if rpcpeer['connected'] else 0.0
}
@@ -25,7 +21,6 @@ def trace_availability(p, rpcpeers):
addpeer(p, rpcpeer)
if rpcpeer['connected']:
p.persist['peerstate'][pid]['last_seen'] = datetime.now()
p.persist['peerstate'][pid]['connected'] = True
p.persist['peerstate'][pid]['avail'] = 1.0 * alpha + p.persist['peerstate'][pid]['avail'] * beta
else:

View File

@@ -35,7 +35,7 @@ def test_summary_peer_thread(node_factory):
l2.stop() # we stop l2 and wait for l1 to see that
l1.daemon.wait_for_log(f".*{l2id}.*Peer connection lost.*")
wait_for(lambda: l1.rpc.listpeers(l2id)['peers'][0]['connected'] is False)
l1.daemon.wait_for_log(r".*availability persisted and synced.*")
l1.daemon.wait_for_log("Peerstate wrote to datastore")
s2 = l1.rpc.summary()
# then
@@ -179,13 +179,15 @@ def test_summary_persist(node_factory):
l1, l2 = node_factory.line_graph(2, opts=opts)
# when
l1.daemon.wait_for_log(r".*availability persisted and synced.*")
l1.daemon.logsearch_start = 0
l1.daemon.wait_for_log("Creating a new datastore")
l1.daemon.wait_for_log("Peerstate wrote to datastore")
s1 = l1.rpc.summary()
l2.stop()
l1.restart()
assert l1.daemon.is_in_log(r".*Reopened summary.dat shelve.*")
assert l1.daemon.is_in_log("Reopened datastore")
l1.daemon.logsearch_start = len(l1.daemon.logs)
l1.daemon.wait_for_log(r".*availability persisted and synced.*")
l1.daemon.wait_for_log("Peerstate wrote to datastore")
s2 = l1.rpc.summary()
# then