diff --git a/.gitignore b/.gitignore index d8a1bb0..38411fc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__ .coverage +.mypy_cache diff --git a/backup/backup.py b/backup/backup.py new file mode 100755 index 0000000..2c0bbfa --- /dev/null +++ b/backup/backup.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +from pyln.client import Plugin +from pprint import pprint +from collections import namedtuple +from urllib.parse import urlparse +import struct +import os +from typing import Mapping, Type, Optional +import logging +import sys +from binascii import hexlify + + +plugin = Plugin() + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(message)s') +handler.setFormatter(formatter) +root.addHandler(handler) + +# A change that was proposed by c-lightning that needs saving to the +# backup. `version` is the database version before the transaction was +# applied. +Change = namedtuple('Change',['version', 'transaction']) + +class Backend(object): + def __init__(self, destination: str): + raise NotImplementedError + + def snapshot(self, filename: str) -> bool: + raise NotImplementedError + + def add_change(self, change: Change) -> bool: + raise NotImplementedError + + def initialize(self) -> bool: + raise NotImplementedError + +class FileBackend(Backend): + def __init__(self, destination: str): + self.version = None + self.prev_version = None + self.destination = destination + self.offsets = [0, 0] + self.version_count = 0 + self.url = urlparse(self.destination) + + def initialize(self) -> bool: + if not os.path.exists(self.url.path): + return False + return self.read_metadata() + + def write_metadata(self): + blob = struct.pack("!IIQIQQ", 0x01, self.version, self.offsets[0], + self.prev_version, self.offsets[1], + self.version_count) + + # Pad the header + blob += b'\x00' * (512 - len(blob)) + mode = "rb+" if os.path.exists(self.url.path) else "wb+" + + with open(self.url.path, mode) as f: + f.seek(0) + f.write(blob) + f.flush() + + def read_metadata(self): + with open(self.url.path, 'rb') as f: + blob = f.read(512) + if len(blob) != 512: + logging.warn("Corrupt FileBackend header, expected 512 bytes, got {} bytes".format(len(blob))) + return False + + file_version, = struct.unpack_from("!I", blob) + if file_version != 1: + logging.warn("Unknown FileBackend version {}".format(file_version)) + return False + + self.version, self.offsets[0], self.prev_version, self.offsets[1], self.version_count, = struct.unpack_from("!IQIQQ", blob, offset=4) + + return True + + def add_change(self, entry: Change) -> bool: + typ = b'\x01' if entry.snapshot is None else b'\x02' + if typ == b'\x01': + payload = b'\x00'.join([t.encode('ASCII') for t in entry.transaction]) + elif typ == b'\x02': + payload = entry.snapshot + + length = struct.pack("!I", len(payload)) + with open(self.url.path, 'ab') as f: + f.seek(self.offsets[0]) + f.write(length) + f.write(payload) + self.prev_version, self.offsets[1] = self.version, self.offsets[0] + self.version = entry.version + self.offsets[0] += 4 + len(payload) + self.write_metadata() + + return True + + def rewind(self): + # After rewinding we set offsets[0] and prev_version to 0 (best effort + # result). If either of these are set to 0 we have two consecutive + # rewinds which cannot be safely done (we'd be rewinding more than the + # one in-flight transaction). + if self.offsets[1] == 0 or self.prev_version == 0: + logging.warn("Cannot rewind multiple times.") + return False + + self.version, self.offsets[0] = self.prev_version, self.offsets[1] + self.prev_version, self.offsets[1] = 0, 0 + return True + +backend_map: Mapping[str, Type[Backend]] = { + 'file': FileBackend, +} + +def abort(reason: str) -> None: + plugin.log(reason) + plugin.rpc.stop() + raise ValueError() + + +def check_first_write(plugin, data_version): + """Verify that we are up-to-date and c-lightning didn't forget writes. + + We may be at most 1 write off: + + - c-lightning and backup are at the same version (happy case) + - c-lightning is 1 write behind: it must've crashed inbetween calling the + hook and committing the DB transaction. + - c-lightning is one or more writes ahead: either we lost some writes, or + c-lightning was running without the plugin at some point -> crash! + - c-lighning is more than 1 write behind: c-lightning had a lobotomy, or + was restored from an old backup -> crash! + """ + backend = plugin.backend + + logging.info("Comparing backup version {} versus first write version {}".format( + backend.version, data_version + )) + + if backend.version == data_version - 1: + logging.info("Versions match up") + return True + + elif backend.prev_version == data_version - 1 and plugin.backend.rewind(): + logging.info("Last changes not applied, rewinding non-committed transaction") + return True + + elif backend.prev_version > data_version - 1: + abort("c-lightning seems to have lost some state (failed restore?). Emergency shutdown.") + + else: + abort("Backup is out of date, we cannot continue safely. Emergency shutdown.") + + +@plugin.hook('db_write') +def on_db_write(writes, data_version, plugin, **kwargs): + change = Change(data_version, writes) + if not hasattr(plugin, 'backend'): + plugin.early_writes.append(change) + return True + else: + return apply_write(plugin, change) + + +def apply_write(plugin, change): + if not plugin.initialized: + assert(check_first_write(plugin, change.version)) + plugin.initialized = True + + return plugin.backend.add_entry(change) + + +@plugin.init() +def on_init(options: Mapping[str, str], plugin: Plugin, **kwargs): + # Reach into the DB and + configs = plugin.rpc.listconfigs() + plugin.db_path = configs['wallet'] + destination = options['backup-destination'] + + if not plugin.db_path.startswith('sqlite3'): + abort("The backup plugin only works with the sqlite3 database.") + + if destination == 'null': + abort("You must specify a backup destination, possibly on a secondary disk.") + + # Let's initialize the backed. First we need to figure out which backend to use. + p = urlparse(destination) + backend_cl = backend_map.get(p.scheme, None) + if backend_cl is None: + abort("Could not find a backend for scheme {p.scheme}".format(p=p)) + + plugin.backend = backend_cl(destination) + if not plugin.backend.initialize(): + abort("Could not initialize the backup {}, please use 'backup-cli' to initialize the backup first.".format(destination)) + + for c in plugin.early_writes: + apply_write(plugin, c) + + +plugin.add_option( + 'backup-destination', None, + 'Destination of the database backups (file:///filename/on/another/disk/).' +) + + +if __name__ == "__main__": + # Did we perform the version check of backend versus the first write? + plugin.initialized = False + plugin.early_writes = [] + plugin.run() diff --git a/backup/fixtures/backup.dbak b/backup/fixtures/backup.dbak new file mode 100644 index 0000000..06b3e4e Binary files /dev/null and b/backup/fixtures/backup.dbak differ diff --git a/backup/test_backup.py b/backup/test_backup.py new file mode 100644 index 0000000..db2bf19 --- /dev/null +++ b/backup/test_backup.py @@ -0,0 +1,135 @@ +from flaky import flaky +from pyln.client import RpcError +from pyln.testing.fixtures import * +import os +import time + +plugin_dir = os.path.dirname(__file__) +plugin_path = os.path.join(plugin_dir, "backup.py") + + +def test_start(node_factory, directory): + opts = { + 'plugin': plugin_path, + 'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak') + } + shutil.copyfile( + os.path.join(plugin_dir, 'fixtures', "backup.dbak"), + os.path.join(directory, "backup.dbak") + ) + l1 = node_factory.get_node(options=opts) + + l1.daemon.wait_for_log(r'backup.py') + + # Restart the node a couple of times, to check that we can resume normally + for i in range(5): + l1.restart() + l1.daemon.wait_for_log(r'Versions match up') + + +def test_tx_abort(node_factory, directory): + """Simulate a crash between hook call and DB commit. + + We simulate this by updating the data_version var in the database before + restarting the node. This desyncs the node from the backup, and restoring + may not work (depending on which transaction was pretend-rolled-back), but + continuing should work fine, since it can happen that we crash just + inbetween the hook call and the DB transaction. + + """ + opts = { + 'plugin': plugin_path, + 'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak') + } + shutil.copyfile( + os.path.join(plugin_dir, 'fixtures', "backup.dbak"), + os.path.join(directory, "backup.dbak") + ) + l1 = node_factory.get_node(options=opts) + l1.stop() + + print(l1.db.query("SELECT * FROM vars;")) + + # Now fudge the data_version: + l1.db.execute("UPDATE vars SET intval = intval - 1 WHERE name = 'data_version'") + + print(l1.db.query("SELECT * FROM vars;")) + + l1.restart() + l1.daemon.wait_for_log(r'Last changes not applied') + + +@flaky +def test_failing_restore(nf, directory): + """The node database is having memory loss, make sure we abort. + + We simulate a loss of transactions by manually resetting the data_version + in the database back to n-2, which is non-recoverable. + + """ + opts = { + 'plugin': plugin_path, + 'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak') + } + shutil.copyfile( + os.path.join(plugin_dir, 'fixtures', "backup.dbak"), + os.path.join(directory, "backup.dbak") + ) + l1 = node_factory.get_node(options=opts) + l1.stop() + + # Now fudge the data_version: + l1.db.execute("UPDATE vars SET intval = intval - 2 WHERE name = 'data_version'") + + with pytest.raises(Exception): + l1.start() + + l1.daemon.proc.wait() + assert(l1.daemon.is_in_log(r'lost some state') is not None) + + +def test_intermittent_backup(node_factory, directory): + """Simulate intermittent use of the backup, or an old file backup. + + """ + + opts = { + 'plugin': plugin_path, + 'backup-destination': 'file://' + os.path.join(directory, 'backup.dbak') + } + shutil.copyfile( + os.path.join(plugin_dir, 'fixtures', "backup.dbak"), + os.path.join(directory, "backup.dbak") + ) + l1 = node_factory.get_node(options=opts) + + # Now start without the plugin. This should work fine. + del l1.daemon.opts['plugin'] + del l1.daemon.opts['backup-destination'] + l1.restart() + + # Now restart adding the plugin again, and it should fail due to gaps in + # the backup. + l1.stop() + with pytest.raises(Exception): + l1.daemon.opts.update(opts) + l1.start() + + l1.daemon.proc.wait() + assert(l1.daemon.is_in_log(r'Backup is out of date') is not None) + + +def test_restore(nf, directory): + bpath = os.path.join(directory, 'lightning-1', 'regtest') + bdest = 'file://' + os.path.join(bpath, 'backup.dbak') + os.makedirs(bpath) + subprocess.check_call([cli_path, "init", bpath, bdest]) + opts = { + 'plugin': plugin_path, + 'backup-destination': bdest, + } + l1 = nf.get_node(options=opts, cleandir=False) + l1.stop() + + rdest = os.path.join(bpath, 'lightningd.sqlite.restore') + subprocess.check_call([cli_path, "restore", bdest, rdest])