mirror of
https://github.com/aljazceru/plugins.git
synced 2025-12-19 14:14:20 +01:00
backup: Initial version of the backup plugin
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
__pycache__
|
__pycache__
|
||||||
.coverage
|
.coverage
|
||||||
|
.mypy_cache
|
||||||
|
|||||||
218
backup/backup.py
Executable file
218
backup/backup.py
Executable file
@@ -0,0 +1,218 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from pyln.client import Plugin
|
||||||
|
from pprint import pprint
|
||||||
|
from collections import namedtuple
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
import struct
|
||||||
|
import os
|
||||||
|
from typing import Mapping, Type, Optional
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from binascii import hexlify
|
||||||
|
|
||||||
|
|
||||||
|
plugin = Plugin()
|
||||||
|
|
||||||
|
root = logging.getLogger()
|
||||||
|
root.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
handler = logging.StreamHandler(sys.stdout)
|
||||||
|
handler.setLevel(logging.DEBUG)
|
||||||
|
formatter = logging.Formatter('%(message)s')
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
root.addHandler(handler)
|
||||||
|
|
||||||
|
# A change that was proposed by c-lightning that needs saving to the
|
||||||
|
# backup. `version` is the database version before the transaction was
|
||||||
|
# applied.
|
||||||
|
Change = namedtuple('Change',['version', 'transaction'])
|
||||||
|
|
||||||
|
class Backend(object):
|
||||||
|
def __init__(self, destination: str):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def snapshot(self, filename: str) -> bool:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def add_change(self, change: Change) -> bool:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def initialize(self) -> bool:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
class FileBackend(Backend):
|
||||||
|
def __init__(self, destination: str):
|
||||||
|
self.version = None
|
||||||
|
self.prev_version = None
|
||||||
|
self.destination = destination
|
||||||
|
self.offsets = [0, 0]
|
||||||
|
self.version_count = 0
|
||||||
|
self.url = urlparse(self.destination)
|
||||||
|
|
||||||
|
def initialize(self) -> bool:
|
||||||
|
if not os.path.exists(self.url.path):
|
||||||
|
return False
|
||||||
|
return self.read_metadata()
|
||||||
|
|
||||||
|
def write_metadata(self):
|
||||||
|
blob = struct.pack("!IIQIQQ", 0x01, self.version, self.offsets[0],
|
||||||
|
self.prev_version, self.offsets[1],
|
||||||
|
self.version_count)
|
||||||
|
|
||||||
|
# Pad the header
|
||||||
|
blob += b'\x00' * (512 - len(blob))
|
||||||
|
mode = "rb+" if os.path.exists(self.url.path) else "wb+"
|
||||||
|
|
||||||
|
with open(self.url.path, mode) as f:
|
||||||
|
f.seek(0)
|
||||||
|
f.write(blob)
|
||||||
|
f.flush()
|
||||||
|
|
||||||
|
def read_metadata(self):
|
||||||
|
with open(self.url.path, 'rb') as f:
|
||||||
|
blob = f.read(512)
|
||||||
|
if len(blob) != 512:
|
||||||
|
logging.warn("Corrupt FileBackend header, expected 512 bytes, got {} bytes".format(len(blob)))
|
||||||
|
return False
|
||||||
|
|
||||||
|
file_version, = struct.unpack_from("!I", blob)
|
||||||
|
if file_version != 1:
|
||||||
|
logging.warn("Unknown FileBackend version {}".format(file_version))
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.version, self.offsets[0], self.prev_version, self.offsets[1], self.version_count, = struct.unpack_from("!IQIQQ", blob, offset=4)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def add_change(self, entry: Change) -> bool:
|
||||||
|
typ = b'\x01' if entry.snapshot is None else b'\x02'
|
||||||
|
if typ == b'\x01':
|
||||||
|
payload = b'\x00'.join([t.encode('ASCII') for t in entry.transaction])
|
||||||
|
elif typ == b'\x02':
|
||||||
|
payload = entry.snapshot
|
||||||
|
|
||||||
|
length = struct.pack("!I", len(payload))
|
||||||
|
with open(self.url.path, 'ab') as f:
|
||||||
|
f.seek(self.offsets[0])
|
||||||
|
f.write(length)
|
||||||
|
f.write(payload)
|
||||||
|
self.prev_version, self.offsets[1] = self.version, self.offsets[0]
|
||||||
|
self.version = entry.version
|
||||||
|
self.offsets[0] += 4 + len(payload)
|
||||||
|
self.write_metadata()
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def rewind(self):
|
||||||
|
# After rewinding we set offsets[0] and prev_version to 0 (best effort
|
||||||
|
# result). If either of these are set to 0 we have two consecutive
|
||||||
|
# rewinds which cannot be safely done (we'd be rewinding more than the
|
||||||
|
# one in-flight transaction).
|
||||||
|
if self.offsets[1] == 0 or self.prev_version == 0:
|
||||||
|
logging.warn("Cannot rewind multiple times.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.version, self.offsets[0] = self.prev_version, self.offsets[1]
|
||||||
|
self.prev_version, self.offsets[1] = 0, 0
|
||||||
|
return True
|
||||||
|
|
||||||
|
backend_map: Mapping[str, Type[Backend]] = {
|
||||||
|
'file': FileBackend,
|
||||||
|
}
|
||||||
|
|
||||||
|
def abort(reason: str) -> None:
|
||||||
|
plugin.log(reason)
|
||||||
|
plugin.rpc.stop()
|
||||||
|
raise ValueError()
|
||||||
|
|
||||||
|
|
||||||
|
def check_first_write(plugin, data_version):
|
||||||
|
"""Verify that we are up-to-date and c-lightning didn't forget writes.
|
||||||
|
|
||||||
|
We may be at most 1 write off:
|
||||||
|
|
||||||
|
- c-lightning and backup are at the same version (happy case)
|
||||||
|
- c-lightning is 1 write behind: it must've crashed inbetween calling the
|
||||||
|
hook and committing the DB transaction.
|
||||||
|
- c-lightning is one or more writes ahead: either we lost some writes, or
|
||||||
|
c-lightning was running without the plugin at some point -> crash!
|
||||||
|
- c-lighning is more than 1 write behind: c-lightning had a lobotomy, or
|
||||||
|
was restored from an old backup -> crash!
|
||||||
|
"""
|
||||||
|
backend = plugin.backend
|
||||||
|
|
||||||
|
logging.info("Comparing backup version {} versus first write version {}".format(
|
||||||
|
backend.version, data_version
|
||||||
|
))
|
||||||
|
|
||||||
|
if backend.version == data_version - 1:
|
||||||
|
logging.info("Versions match up")
|
||||||
|
return True
|
||||||
|
|
||||||
|
elif backend.prev_version == data_version - 1 and plugin.backend.rewind():
|
||||||
|
logging.info("Last changes not applied, rewinding non-committed transaction")
|
||||||
|
return True
|
||||||
|
|
||||||
|
elif backend.prev_version > data_version - 1:
|
||||||
|
abort("c-lightning seems to have lost some state (failed restore?). Emergency shutdown.")
|
||||||
|
|
||||||
|
else:
|
||||||
|
abort("Backup is out of date, we cannot continue safely. Emergency shutdown.")
|
||||||
|
|
||||||
|
|
||||||
|
@plugin.hook('db_write')
|
||||||
|
def on_db_write(writes, data_version, plugin, **kwargs):
|
||||||
|
change = Change(data_version, writes)
|
||||||
|
if not hasattr(plugin, 'backend'):
|
||||||
|
plugin.early_writes.append(change)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return apply_write(plugin, change)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_write(plugin, change):
|
||||||
|
if not plugin.initialized:
|
||||||
|
assert(check_first_write(plugin, change.version))
|
||||||
|
plugin.initialized = True
|
||||||
|
|
||||||
|
return plugin.backend.add_entry(change)
|
||||||
|
|
||||||
|
|
||||||
|
@plugin.init()
|
||||||
|
def on_init(options: Mapping[str, str], plugin: Plugin, **kwargs):
|
||||||
|
# Reach into the DB and
|
||||||
|
configs = plugin.rpc.listconfigs()
|
||||||
|
plugin.db_path = configs['wallet']
|
||||||
|
destination = options['backup-destination']
|
||||||
|
|
||||||
|
if not plugin.db_path.startswith('sqlite3'):
|
||||||
|
abort("The backup plugin only works with the sqlite3 database.")
|
||||||
|
|
||||||
|
if destination == 'null':
|
||||||
|
abort("You must specify a backup destination, possibly on a secondary disk.")
|
||||||
|
|
||||||
|
# Let's initialize the backed. First we need to figure out which backend to use.
|
||||||
|
p = urlparse(destination)
|
||||||
|
backend_cl = backend_map.get(p.scheme, None)
|
||||||
|
if backend_cl is None:
|
||||||
|
abort("Could not find a backend for scheme {p.scheme}".format(p=p))
|
||||||
|
|
||||||
|
plugin.backend = backend_cl(destination)
|
||||||
|
if not plugin.backend.initialize():
|
||||||
|
abort("Could not initialize the backup {}, please use 'backup-cli' to initialize the backup first.".format(destination))
|
||||||
|
|
||||||
|
for c in plugin.early_writes:
|
||||||
|
apply_write(plugin, c)
|
||||||
|
|
||||||
|
|
||||||
|
plugin.add_option(
|
||||||
|
'backup-destination', None,
|
||||||
|
'Destination of the database backups (file:///filename/on/another/disk/).'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Did we perform the version check of backend versus the first write?
|
||||||
|
plugin.initialized = False
|
||||||
|
plugin.early_writes = []
|
||||||
|
plugin.run()
|
||||||
BIN
backup/fixtures/backup.dbak
Normal file
BIN
backup/fixtures/backup.dbak
Normal file
Binary file not shown.
135
backup/test_backup.py
Normal file
135
backup/test_backup.py
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
from flaky import flaky
|
||||||
|
from pyln.client import RpcError
|
||||||
|
from pyln.testing.fixtures import *
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
plugin_dir = os.path.dirname(__file__)
|
||||||
|
plugin_path = os.path.join(plugin_dir, "backup.py")
|
||||||
|
|
||||||
|
|
||||||
|
def test_start(node_factory, directory):
|
||||||
|
opts = {
|
||||||
|
'plugin': plugin_path,
|
||||||
|
'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak')
|
||||||
|
}
|
||||||
|
shutil.copyfile(
|
||||||
|
os.path.join(plugin_dir, 'fixtures', "backup.dbak"),
|
||||||
|
os.path.join(directory, "backup.dbak")
|
||||||
|
)
|
||||||
|
l1 = node_factory.get_node(options=opts)
|
||||||
|
|
||||||
|
l1.daemon.wait_for_log(r'backup.py')
|
||||||
|
|
||||||
|
# Restart the node a couple of times, to check that we can resume normally
|
||||||
|
for i in range(5):
|
||||||
|
l1.restart()
|
||||||
|
l1.daemon.wait_for_log(r'Versions match up')
|
||||||
|
|
||||||
|
|
||||||
|
def test_tx_abort(node_factory, directory):
|
||||||
|
"""Simulate a crash between hook call and DB commit.
|
||||||
|
|
||||||
|
We simulate this by updating the data_version var in the database before
|
||||||
|
restarting the node. This desyncs the node from the backup, and restoring
|
||||||
|
may not work (depending on which transaction was pretend-rolled-back), but
|
||||||
|
continuing should work fine, since it can happen that we crash just
|
||||||
|
inbetween the hook call and the DB transaction.
|
||||||
|
|
||||||
|
"""
|
||||||
|
opts = {
|
||||||
|
'plugin': plugin_path,
|
||||||
|
'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak')
|
||||||
|
}
|
||||||
|
shutil.copyfile(
|
||||||
|
os.path.join(plugin_dir, 'fixtures', "backup.dbak"),
|
||||||
|
os.path.join(directory, "backup.dbak")
|
||||||
|
)
|
||||||
|
l1 = node_factory.get_node(options=opts)
|
||||||
|
l1.stop()
|
||||||
|
|
||||||
|
print(l1.db.query("SELECT * FROM vars;"))
|
||||||
|
|
||||||
|
# Now fudge the data_version:
|
||||||
|
l1.db.execute("UPDATE vars SET intval = intval - 1 WHERE name = 'data_version'")
|
||||||
|
|
||||||
|
print(l1.db.query("SELECT * FROM vars;"))
|
||||||
|
|
||||||
|
l1.restart()
|
||||||
|
l1.daemon.wait_for_log(r'Last changes not applied')
|
||||||
|
|
||||||
|
|
||||||
|
@flaky
|
||||||
|
def test_failing_restore(nf, directory):
|
||||||
|
"""The node database is having memory loss, make sure we abort.
|
||||||
|
|
||||||
|
We simulate a loss of transactions by manually resetting the data_version
|
||||||
|
in the database back to n-2, which is non-recoverable.
|
||||||
|
|
||||||
|
"""
|
||||||
|
opts = {
|
||||||
|
'plugin': plugin_path,
|
||||||
|
'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak')
|
||||||
|
}
|
||||||
|
shutil.copyfile(
|
||||||
|
os.path.join(plugin_dir, 'fixtures', "backup.dbak"),
|
||||||
|
os.path.join(directory, "backup.dbak")
|
||||||
|
)
|
||||||
|
l1 = node_factory.get_node(options=opts)
|
||||||
|
l1.stop()
|
||||||
|
|
||||||
|
# Now fudge the data_version:
|
||||||
|
l1.db.execute("UPDATE vars SET intval = intval - 2 WHERE name = 'data_version'")
|
||||||
|
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
l1.start()
|
||||||
|
|
||||||
|
l1.daemon.proc.wait()
|
||||||
|
assert(l1.daemon.is_in_log(r'lost some state') is not None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_intermittent_backup(node_factory, directory):
|
||||||
|
"""Simulate intermittent use of the backup, or an old file backup.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
opts = {
|
||||||
|
'plugin': plugin_path,
|
||||||
|
'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak')
|
||||||
|
}
|
||||||
|
shutil.copyfile(
|
||||||
|
os.path.join(plugin_dir, 'fixtures', "backup.dbak"),
|
||||||
|
os.path.join(directory, "backup.dbak")
|
||||||
|
)
|
||||||
|
l1 = node_factory.get_node(options=opts)
|
||||||
|
|
||||||
|
# Now start without the plugin. This should work fine.
|
||||||
|
del l1.daemon.opts['plugin']
|
||||||
|
del l1.daemon.opts['backup-destination']
|
||||||
|
l1.restart()
|
||||||
|
|
||||||
|
# Now restart adding the plugin again, and it should fail due to gaps in
|
||||||
|
# the backup.
|
||||||
|
l1.stop()
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
l1.daemon.opts.update(opts)
|
||||||
|
l1.start()
|
||||||
|
|
||||||
|
l1.daemon.proc.wait()
|
||||||
|
assert(l1.daemon.is_in_log(r'Backup is out of date') is not None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_restore(nf, directory):
|
||||||
|
bpath = os.path.join(directory, 'lightning-1', 'regtest')
|
||||||
|
bdest = 'file://' + os.path.join(bpath, 'backup.dbak')
|
||||||
|
os.makedirs(bpath)
|
||||||
|
subprocess.check_call([cli_path, "init", bpath, bdest])
|
||||||
|
opts = {
|
||||||
|
'plugin': plugin_path,
|
||||||
|
'backup-destination': bdest,
|
||||||
|
}
|
||||||
|
l1 = nf.get_node(options=opts, cleandir=False)
|
||||||
|
l1.stop()
|
||||||
|
|
||||||
|
rdest = os.path.join(bpath, 'lightningd.sqlite.restore')
|
||||||
|
subprocess.check_call([cli_path, "restore", bdest, rdest])
|
||||||
Reference in New Issue
Block a user