diff --git a/backup/backend.py b/backup/backend.py new file mode 100644 index 0000000..d044e5f --- /dev/null +++ b/backup/backend.py @@ -0,0 +1,125 @@ +from collections import namedtuple +import os +import re +from typing import Iterator + +import sqlite3 +from tqdm import tqdm + +# A change that was proposed by c-lightning that needs saving to the +# backup. `version` is the database version before the transaction was +# applied. The optional snapshot reqpresents a complete copy of the database, +# as it was before applying the `transaction`. This is used by the plugin from +# time to time to allow the backend to compress the changelog and forms a new +# basis for the backup. +Change = namedtuple('Change', ['version', 'snapshot', 'transaction']) + + +class Backend(object): + def __init__(self, destination: str): + """Read the metadata from the destination and prepare any necesary resources. + + After this call the following members must be initialized: + + - backend.version: the last data version we wrote to the backend + - backend.prev_version: the previous data version in case we need to + roll back the last one + """ + self.version = None + self.prev_version = None + raise NotImplementedError + + def add_change(self, change: Change) -> bool: + """Add a single change to the backend. + + This call should always make sure that the change has been correctly + written and flushed before returning. + """ + raise NotImplementedError + + def initialize(self) -> bool: + """Set up any resources needed by this backend. + + """ + raise NotImplementedError + + def stream_changes(self) -> Iterator[Change]: + """Retrieve changes from the backend in order to perform a restore. + """ + raise NotImplementedError + + def rewind(self) -> bool: + """Remove the last change that was added to the backup + + Because the transaction is reported to the backup plugin before it is + being committed to the database it can happen that we get notified + about a transaction but then `lightningd` is stopped and the + transaction is not committed. This means the backup includes an + extraneous transaction which needs to be removed. A backend must allow + a single rewind operation, and should fail additional calls to rewind + (we may have at most one pending transaction not being committed at + any time). + + """ + raise NotImplementedError + + def compact(self): + """Apply some incremental changes to the snapshot to reduce our size. + """ + raise NotImplementedError + + def _db_open(self, dest: str) -> sqlite3.Connection: + db = sqlite3.connect(dest) + db.execute("PRAGMA foreign_keys = 1") + return db + + def _restore_snapshot(self, snapshot: bytes, dest: str): + if os.path.exists(dest): + os.unlink(dest) + with open(dest, 'wb') as f: + f.write(snapshot) + self.db = self._db_open(dest) + + def _rewrite_stmt(self, stmt: str) -> str: + """We had a stmt expansion bug in c-lightning, this replicates the fix. + + We were expanding statements incorrectly, missing some + whitespace between a param and the `WHERE` keyword. This + re-inserts the space. + + """ + stmt = re.sub(r'reserved_til=([0-9]+)WHERE', r'reserved_til=\1 WHERE', stmt) + stmt = re.sub(r'peer_id=([0-9]+)WHERE channels.id=', r'peer_id=\1 WHERE channels.id=', stmt) + return stmt + + def _restore_transaction(self, tx: Iterator[str]): + assert(self.db) + cur = self.db.cursor() + for q in tx: + q = self._rewrite_stmt(q) + cur.execute(q) + self.db.commit() + + def restore(self, dest: str, remove_existing: bool = False): + """Restore the backup in this backend to its former glory. + + If `dest` is a directory, we assume the default database filename: + lightningd.sqlite3 + """ + if os.path.isdir(dest): + dest = os.path.join(dest, "lightningd.sqlite3") + if os.path.exists(dest): + if not remove_existing: + raise ValueError( + "Destination for backup restore exists: {dest}".format( + dest=dest + ) + ) + os.unlink(dest) + + self.db = self._db_open(dest) + for c in tqdm(self.stream_changes()): + if c.snapshot is not None: + self._restore_snapshot(c.snapshot, dest) + if c.transaction is not None: + self._restore_transaction(c.transaction) diff --git a/backup/backends.py b/backup/backends.py new file mode 100644 index 0000000..3576543 --- /dev/null +++ b/backup/backends.py @@ -0,0 +1,33 @@ +'''Create a backend instance based on URI scheme dispatch.''' +from typing import Type +from urllib.parse import urlparse + +from backend import Backend +from socketbackend import SocketBackend +from filebackend import FileBackend + + +def resolve_backend_class(backend_url): + + backend_map: Mapping[str, Type[Backend]] = { + 'file': FileBackend, + 'socket': SocketBackend, + } + p = urlparse(backend_url) + backend_cl = backend_map.get(p.scheme, None) + return backend_cl + + +def get_backend(destination, create=False, require_init=False): + backend_cl = resolve_backend_class(destination) + if backend_cl is None: + raise ValueError("No backend implementation found for {destination}".format( + destination=destination, + )) + backend = backend_cl(destination, create=create) + initialized = backend.initialize() + if require_init and not initialized: + kill("Could not initialize the backup {}, please use 'backup-cli' to initialize the backup first.".format(destination)) + assert(backend.version is not None) + assert(backend.prev_version is not None) + return backend \ No newline at end of file diff --git a/backup/backup-cli b/backup/backup-cli index a6c2fc8..d6e68bd 100755 --- a/backup/backup-cli +++ b/backup/backup-cli @@ -1,54 +1,61 @@ #!/usr/bin/env python3 -from backup import FileBackend, get_backend, Change +from backends import get_backend +from backend import Change +from server import SocketServer + import os import click import json +import logging import sqlite3 import sys +root = logging.getLogger() +root.setLevel(logging.INFO) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(message)s') +handler.setFormatter(formatter) +root.addHandler(handler) @click.command() -@click.argument("lightning-dir", type=click.Path(exists=True)) @click.argument("backend-url") +@click.option('--lightning-dir', type=click.Path(exists=True), default=None, help='Use an existing lightning directory (default: initialize an empty backup).') def init(lightning_dir, backend_url): destination = backend_url backend = get_backend(destination, create=True) - backend.version, backend.prev_version = 0, 0 - backend.offsets = [512, 0] - backend.version_count = 0 - backend.write_metadata() - lock_file = os.path.join(lightning_dir, "backup.lock") - db_file = os.path.join(lightning_dir, "lightningd.sqlite3") + if lightning_dir is not None: + lock_file = os.path.join(lightning_dir, "backup.lock") + db_file = os.path.join(lightning_dir, "lightningd.sqlite3") - with open(lock_file, "w") as f: - f.write(json.dumps({ - 'backend_url': destination, - })) + with open(lock_file, "w") as f: + f.write(json.dumps({ + 'backend_url': destination, + })) - # TODO Take a snapshot + data_version = 0 + if os.path.exists(db_file): + print("Found an existing database at {db_file}, initializing the backup with a snapshot".format(db_file=db_file)) + # Peek into the DB to see if we have + db = sqlite3.connect(db_file) + cur = db.cursor() + rows = cur.execute("SELECT intval FROM vars WHERE name = 'data_version'") + data_version = rows.fetchone()[0] - data_version = 0 - if os.path.exists(db_file): - print("Found an existing database at {db_file}, initializing the backup with a snapshot".format(db_file=db_file)) - # Peek into the DB to see if we have - db = sqlite3.connect(db_file) - cur = db.cursor() - rows = cur.execute("SELECT intval FROM vars WHERE name = 'data_version'") - data_version = rows.fetchone()[0] - - snapshot = Change( - version=data_version, - snapshot=open(db_file, 'rb').read(), - transaction=None - ) - if not backend.add_change(snapshot): - print("Could not write snapshot to backend") - sys.exit(1) + snapshot = Change( + version=data_version, + snapshot=open(db_file, 'rb').read(), + transaction=None + ) + if not backend.add_change(snapshot): + print("Could not write snapshot to backend") + sys.exit(1) + else: + print("Successfully written initial snapshot to {destination}".format(destination=destination)) else: - print("Successfully written initial snapshot to {destination}".format(destination=destination)) - else: - print("Database does not exist yet, created an empty backup file") + print("Database does not exist yet, created an empty backup file") print("Initialized backup backend {destination}, you can now start c-lightning".format( destination=destination, @@ -64,6 +71,17 @@ def restore(backend_url, restore_destination): backend.restore(restore_destination) +@click.command() +@click.argument("backend-url") +@click.argument("addr") +def server(backend_url, addr): + backend = get_backend(backend_url) + addr, port = addr.split(':') + port = int(port) + server = SocketServer((addr, port), backend) + server.run() + + @click.group() def cli(): pass @@ -71,6 +89,7 @@ def cli(): cli.add_command(init) cli.add_command(restore) +cli.add_command(server) if __name__ == "__main__": cli() diff --git a/backup/backup.py b/backup/backup.py index 85b6687..6216e8b 100755 --- a/backup/backup.py +++ b/backup/backup.py @@ -1,21 +1,14 @@ #!/usr/bin/env python3 -from collections import namedtuple from pyln.client import Plugin -from tqdm import tqdm -from typing import Mapping, Type, Iterator -from urllib.parse import urlparse import json import logging import os -import re -import shutil -import struct import sys -import sqlite3 -import tempfile import time import psutil +from backend import Change +from backends import get_backend plugin = Plugin() @@ -28,347 +21,6 @@ formatter = logging.Formatter('%(message)s') handler.setFormatter(formatter) root.addHandler(handler) -# A change that was proposed by c-lightning that needs saving to the -# backup. `version` is the database version before the transaction was -# applied. The optional snapshot reqpresents a complete copy of the database, -# as it was before applying the `transaction`. This is used by the plugin from -# time to time to allow the backend to compress the changelog and forms a new -# basis for the backup. -Change = namedtuple('Change', ['version', 'snapshot', 'transaction']) - - -class Backend(object): - def __init__(self, destination: str): - """Read the metadata from the destination and prepare any necesary resources. - - After this call the following members must be initialized: - - - backend.version: the last data version we wrote to the backend - - backend.prev_version: the previous data version in case we need to - roll back the last one - """ - self.version = None - self.prev_version = None - raise NotImplementedError - - def add_change(self, change: Change) -> bool: - """Add a single change to the backend. - - This call should always make sure that the change has been correctly - written and flushed before returning. - """ - raise NotImplementedError - - def initialize(self) -> bool: - """Set up any resources needed by this backend. - - """ - raise NotImplementedError - - def stream_changes(self) -> Iterator[Change]: - """Retrieve changes from the backend in order to perform a restore. - """ - raise NotImplementedError - - def rewind(self) -> bool: - """Remove the last change that was added to the backup - - Because the transaction is reported to the backup plugin before it is - being committed to the database it can happen that we get notified - about a transaction but then `lightningd` is stopped and the - transaction is not committed. This means the backup includes an - extraneous transaction which needs to be removed. A backend must allow - a single rewind operation, and should fail additional calls to rewind - (we may have at most one pending transaction not being committed at - any time). - - """ - raise NotImplementedError - - def compact(self): - """Apply some incremental changes to the snapshot to reduce our size. - """ - raise NotImplementedError - - def _db_open(self, dest: str) -> sqlite3.Connection: - db = sqlite3.connect(dest) - db.execute("PRAGMA foreign_keys = 1") - return db - - def _restore_snapshot(self, snapshot: bytes, dest: str): - if os.path.exists(dest): - os.unlink(dest) - with open(dest, 'wb') as f: - f.write(snapshot) - self.db = self._db_open(dest) - - def _rewrite_stmt(self, stmt: bytes) -> bytes: - """We had a stmt expansion bug in c-lightning, this replicates the fix. - - We were expanding statements incorrectly, missing some - whitespace between a param and the `WHERE` keyword. This - re-inserts the space. - - """ - stmt = re.sub(r'reserved_til=([0-9]+)WHERE', r'reserved_til=\1 WHERE', stmt) - stmt = re.sub(r'peer_id=([0-9]+)WHERE channels.id=', r'peer_id=\1 WHERE channels.id=', stmt) - return stmt - - def _restore_transaction(self, tx: Iterator[bytes]): - assert(self.db) - cur = self.db.cursor() - for q in tx: - q = self._rewrite_stmt(q.decode('UTF-8')) - cur.execute(q) - self.db.commit() - - def restore(self, dest: str, remove_existing: bool = False): - """Restore the backup in this backend to its former glory. - - If `dest` is a directory, we assume the default database filename: - lightningd.sqlite3 - """ - if os.path.isdir(dest): - dest = os.path.join(dest, "lightningd.sqlite3") - if os.path.exists(dest): - if not remove_existing: - raise ValueError( - "Destination for backup restore exists: {dest}".format( - dest=dest - ) - ) - os.unlink(dest) - - self.db = self._db_open(dest) - for c in tqdm(self.stream_changes()): - if c.snapshot is not None: - self._restore_snapshot(c.snapshot, dest) - if c.transaction is not None: - self._restore_transaction(c.transaction) - - -class FileBackend(Backend): - def __init__(self, destination: str, create: bool): - self.version = None - self.prev_version = None - self.destination = destination - self.offsets = [0, 0] - self.version_count = 0 - self.url = urlparse(self.destination) - - if os.path.exists(self.url.path) and create: - raise ValueError("Attempted to create a FileBackend, but file already exists.") - - def initialize(self) -> bool: - if not os.path.exists(self.url.path): - self.version = 0 - self.prev_version = 0 - return False - return self.read_metadata() - - def write_metadata(self): - blob = struct.pack("!IIQIQQ", 0x01, self.version, self.offsets[0], - self.prev_version, self.offsets[1], - self.version_count) - - # Pad the header - blob += b'\x00' * (512 - len(blob)) - mode = "rb+" if os.path.exists(self.url.path) else "wb+" - - with open(self.url.path, mode) as f: - f.seek(0) - f.write(blob) - f.flush() - - def read_metadata(self): - with open(self.url.path, 'rb') as f: - blob = f.read(512) - if len(blob) != 512: - logging.warn("Corrupt FileBackend header, expected 512 bytes, got {} bytes".format(len(blob))) - return False - - file_version, = struct.unpack_from("!I", blob) - if file_version != 1: - logging.warn("Unknown FileBackend version {}".format(file_version)) - return False - - self.version, self.offsets[0], self.prev_version, self.offsets[1], self.version_count, = struct.unpack_from("!IQIQQ", blob, offset=4) - - return True - - def add_change(self, entry: Change) -> bool: - typ = b'\x01' if entry.snapshot is None else b'\x02' - if typ == b'\x01': - stmts = [t.encode('UTF-8') if isinstance(t, str) else t for t in entry.transaction] - payload = b'\x00'.join(stmts) - elif typ == b'\x02': - payload = entry.snapshot - - length = struct.pack("!I", len(payload)) - version = struct.pack("!I", entry.version) - with open(self.url.path, 'ab') as f: - f.seek(self.offsets[0]) - f.write(length) - f.write(version) - f.write(typ) - f.write(payload) - self.prev_version, self.offsets[1] = self.version, self.offsets[0] - self.version = entry.version - self.offsets[0] += 9 + len(payload) - self.version_count += 1 - self.write_metadata() - - return True - - def rewind(self): - # After rewinding we set offsets[0] and prev_version to 0 (best effort - # result). If either of these are set to 0 we have two consecutive - # rewinds which cannot be safely done (we'd be rewinding more than the - # one in-flight transaction). - if self.offsets[1] == 0 or self.prev_version == 0: - logging.warn("Cannot rewind multiple times.") - return False - - self.version, self.offsets[0] = self.prev_version, self.offsets[1] - self.prev_version, self.offsets[1] = 0, 0 - return True - - def stream_changes(self) -> Iterator[Change]: - self.read_metadata() - version = -1 - with open(self.url.path, 'rb') as f: - # Skip the header - f.seek(512) - while version < self.version: - length, version, typ = struct.unpack("!IIb", f.read(9)) - payload = f.read(length) - if typ == 1: - yield Change( - version=version, - snapshot=None, - transaction=payload.split(b'\x00') - ) - elif typ == 2: - yield Change(version=version, snapshot=payload, transaction=None) - else: - raise ValueError("Unknown FileBackend entry type {}".format(typ)) - - if version != self.version: - raise ValueError("Versions do not match up: restored version {}, backend version {}".format(version, self.version)) - assert(version == self.version) - - def compact(self): - stop = self.version # Stop one version short of the head when compacting - tmp = tempfile.TemporaryDirectory() - backupdir, clonename = os.path.split(self.url.path) - - # Path of the backup clone that we're trying to build up. We - # are trying to put this right next to the original backup, to - # maximize the chances of both being on the same FS, which - # makes the move below atomic. - clonepath = os.path.join(backupdir, clonename + ".compacting") - - # Location we extract the snapshot to and then apply - # incremental changes. - snapshotpath = os.path.join(tmp.name, "lightningd.sqlite3") - - stats = { - 'before': { - 'backupsize': os.stat(self.url.path).st_size, - 'version_count': self.version_count, - }, - } - - print("Starting compaction: stats={}".format(stats)) - self.db = self._db_open(snapshotpath) - - for change in self.stream_changes(): - if change.version == stop: - break - - if change.snapshot is not None: - self._restore_snapshot(change.snapshot, snapshotpath) - - if change.transaction is not None: - self._restore_transaction(change.transaction) - - # If this assertion fails we are in a degenerate state: we - # have less than two changes in the backup (starting - # c-lightning alone produces 6 changes), and compacting an - # almost empty backup is not useful. - assert change is not None - - # Remember `change`, it's the rewindable change we need to - # stash on top of the new snapshot. - clone = FileBackend(clonepath, create=True) - clone.offsets = [512, 0] - - # We are about to add the snapshot n-1 on top of n-2 (init), - # followed by the last change for n on top of - # n-1. prev_version trails that by one. - clone.version = change.version - 2 - clone.prev_version = clone.version - 1 - clone.version_count = 0 - clone.write_metadata() - - snapshot = Change( - version=change.version - 1, - snapshot=open(snapshotpath, 'rb').read(), - transaction=None - ) - print("Adding intial snapshot with {} bytes for version {}".format( - len(snapshot.snapshot), - snapshot.version - )) - clone.add_change(snapshot) - - assert clone.version == change.version - 1 - assert clone.prev_version == change.version - 2 - clone.add_change(change) - - assert self.version == clone.version - assert self.prev_version == clone.prev_version - - stats['after'] = { - 'version_count': clone.version_count, - 'backupsize': os.stat(clonepath).st_size, - } - - print("Compacted {} changes, saving {} bytes, swapping backups".format( - stats['before']['version_count'] - stats['after']['version_count'], - stats['before']['backupsize'] - stats['after']['backupsize'], - )) - shutil.move(clonepath, self.url.path) - - # Re-initialize ourselves so we have the correct metadata - self.read_metadata() - - return stats - - -def resolve_backend_class(backend_url): - backend_map: Mapping[str, Type[Backend]] = { - 'file': FileBackend, - } - p = urlparse(backend_url) - backend_cl = backend_map.get(p.scheme, None) - return backend_cl - - -def get_backend(destination, create=False, require_init=False): - backend_cl = resolve_backend_class(destination) - if backend_cl is None: - raise ValueError("No backend implementation found for {destination}".format( - destination=destination, - )) - backend = backend_cl(destination, create=create) - initialized = backend.initialize() - if require_init and not initialized: - kill("Could not initialize the backup {}, please use 'backup-cli' to initialize the backup first.".format(destination)) - assert(backend.version is not None) - assert(backend.prev_version is not None) - return backend - def check_first_write(plugin, data_version): """Verify that we are up-to-date and c-lightning didn't forget writes. @@ -482,7 +134,11 @@ if __name__ == "__main__": if not os.path.exists("backup.lock"): kill("Could not find backup.lock in the lightning-dir") - d = json.load(open("backup.lock", 'r')) - destination = d['backend_url'] - plugin.backend = get_backend(destination, require_init=True) - plugin.run() + try: + d = json.load(open("backup.lock", 'r')) + destination = d['backend_url'] + plugin.backend = get_backend(destination, require_init=True) + plugin.run() + except Exception: + logging.exception('Exception while initializing backup plugin') + kill('Exception while initializing plugin, terminating lightningd') diff --git a/backup/filebackend.py b/backup/filebackend.py new file mode 100644 index 0000000..3c53964 --- /dev/null +++ b/backup/filebackend.py @@ -0,0 +1,208 @@ +import logging, os, struct +import shutil +import tempfile +from typing import Iterator +from urllib.parse import urlparse + +from backend import Backend, Change + +class FileBackend(Backend): + def __init__(self, destination: str, create: bool): + self.version = None + self.prev_version = None + self.destination = destination + self.offsets = [0, 0] + self.version_count = 0 + self.url = urlparse(self.destination) + + if os.path.exists(self.url.path) and create: + raise ValueError("Attempted to create a FileBackend, but file already exists.") + if not os.path.exists(self.url.path) and not create: + raise ValueError("Attempted to open a FileBackend but file doesn't already exists, use `backup-cli init` to initialize it first.") + if create: + # Initialize a new backup file + self.version, self.prev_version = 0, 0 + self.offsets = [512, 0] + self.version_count = 0 + self.write_metadata() + + def initialize(self) -> bool: + return self.read_metadata() + + def write_metadata(self): + blob = struct.pack("!IIQIQQ", 0x01, self.version, self.offsets[0], + self.prev_version, self.offsets[1], + self.version_count) + + # Pad the header + blob += b'\x00' * (512 - len(blob)) + mode = "rb+" if os.path.exists(self.url.path) else "wb+" + + with open(self.url.path, mode) as f: + f.seek(0) + f.write(blob) + f.flush() + + def read_metadata(self): + with open(self.url.path, 'rb') as f: + blob = f.read(512) + if len(blob) != 512: + logging.warn("Corrupt FileBackend header, expected 512 bytes, got {} bytes".format(len(blob))) + return False + + file_version, = struct.unpack_from("!I", blob) + if file_version != 1: + logging.warn("Unknown FileBackend version {}".format(file_version)) + return False + + self.version, self.offsets[0], self.prev_version, self.offsets[1], self.version_count, = struct.unpack_from("!IQIQQ", blob, offset=4) + + return True + + def add_change(self, entry: Change) -> bool: + typ = b'\x01' if entry.snapshot is None else b'\x02' + if typ == b'\x01': + payload = b'\x00'.join([t.encode('UTF-8') for t in entry.transaction]) + elif typ == b'\x02': + payload = entry.snapshot + + length = struct.pack("!I", len(payload)) + version = struct.pack("!I", entry.version) + with open(self.url.path, 'ab') as f: + f.seek(self.offsets[0]) + f.write(length) + f.write(version) + f.write(typ) + f.write(payload) + self.prev_version, self.offsets[1] = self.version, self.offsets[0] + self.version = entry.version + self.offsets[0] += 9 + len(payload) + self.version_count += 1 + self.write_metadata() + + return True + + def rewind(self): + # After rewinding we set offsets[0] and prev_version to 0 (best effort + # result). If either of these are set to 0 we have two consecutive + # rewinds which cannot be safely done (we'd be rewinding more than the + # one in-flight transaction). + if self.offsets[1] == 0 or self.prev_version == 0: + logging.warn("Cannot rewind multiple times.") + return False + + self.version, self.offsets[0] = self.prev_version, self.offsets[1] + self.prev_version, self.offsets[1] = 0, 0 + return True + + def stream_changes(self) -> Iterator[Change]: + self.read_metadata() + version = -1 + with open(self.url.path, 'rb') as f: + # Skip the header + f.seek(512) + while version < self.version: + length, version, typ = struct.unpack("!IIb", f.read(9)) + payload = f.read(length) + if typ == 1: + yield Change( + version=version, + snapshot=None, + transaction=[t.decode('UTF-8') for t in payload.split(b'\x00')] + ) + elif typ == 2: + yield Change(version=version, snapshot=payload, transaction=None) + else: + raise ValueError("Unknown FileBackend entry type {}".format(typ)) + + if version != self.version: + raise ValueError("Versions do not match up: restored version {}, backend version {}".format(version, self.version)) + assert(version == self.version) + + def compact(self): + stop = self.version # Stop one version short of the head when compacting + tmp = tempfile.TemporaryDirectory() + backupdir, clonename = os.path.split(self.url.path) + + # Path of the backup clone that we're trying to build up. We + # are trying to put this right next to the original backup, to + # maximize the chances of both being on the same FS, which + # makes the move below atomic. + clonepath = os.path.join(backupdir, clonename + ".compacting") + + # Location we extract the snapshot to and then apply + # incremental changes. + snapshotpath = os.path.join(tmp.name, "lightningd.sqlite3") + + stats = { + 'before': { + 'backupsize': os.stat(self.url.path).st_size, + 'version_count': self.version_count, + }, + } + + print("Starting compaction: stats={}".format(stats)) + self.db = self._db_open(snapshotpath) + + for change in self.stream_changes(): + if change.version == stop: + break + + if change.snapshot is not None: + self._restore_snapshot(change.snapshot, snapshotpath) + + if change.transaction is not None: + self._restore_transaction(change.transaction) + + # If this assertion fails we are in a degenerate state: we + # have less than two changes in the backup (starting + # c-lightning alone produces 6 changes), and compacting an + # almost empty backup is not useful. + assert change is not None + + # Remember `change`, it's the rewindable change we need to + # stash on top of the new snapshot. + clone = FileBackend(clonepath, create=True) + clone.offsets = [512, 0] + + # We are about to add the snapshot n-1 on top of n-2 (init), + # followed by the last change for n on top of + # n-1. prev_version trails that by one. + clone.version = change.version - 2 + clone.prev_version = clone.version - 1 + clone.version_count = 0 + clone.write_metadata() + + snapshot = Change( + version=change.version - 1, + snapshot=open(snapshotpath, 'rb').read(), + transaction=None + ) + print("Adding intial snapshot with {} bytes for version {}".format( + len(snapshot.snapshot), + snapshot.version + )) + clone.add_change(snapshot) + + assert clone.version == change.version - 1 + assert clone.prev_version == change.version - 2 + clone.add_change(change) + + assert self.version == clone.version + assert self.prev_version == clone.prev_version + + stats['after'] = { + 'version_count': clone.version_count, + 'backupsize': os.stat(clonepath).st_size, + } + + print("Compacted {} changes, saving {} bytes, swapping backups".format( + stats['before']['version_count'] - stats['after']['version_count'], + stats['before']['backupsize'] - stats['after']['backupsize'], + )) + shutil.move(clonepath, self.url.path) + + # Re-initialize ourselves so we have the correct metadata + self.read_metadata() + + return stats diff --git a/backup/protocol.py b/backup/protocol.py new file mode 100644 index 0000000..c3ff8c6 --- /dev/null +++ b/backup/protocol.py @@ -0,0 +1,70 @@ +''' +Socket-based remote backup protocol. This is used to create a connection to a backup backend, and send it incremental database updates. +''' +import socket +import struct +from typing import Tuple +import zlib + +from backend import Change + +class PacketType: + CHANGE = 0x01 + SNAPSHOT = 0x02 + REWIND = 0x03 + REQ_METADATA = 0x04 + RESTORE = 0x05 + ACK = 0x06 + NACK = 0x07 + METADATA = 0x08 + DONE = 0x09 + COMPACT = 0x0a + COMPACT_RES = 0x0b + +PKT_CHANGE_TYPES = {PacketType.CHANGE, PacketType.SNAPSHOT} + +def recvall(sock: socket.socket, n: int) -> bytearray: + '''Receive exactly n bytes from a socket.''' + buf = bytearray(n) + view = memoryview(buf) + ptr = 0 + while ptr < n: + count = sock.recv_into(view[ptr:]) + if count == 0: + raise IOError('Premature end of stream') + ptr += count + return buf + +def send_packet(sock: socket.socket, typ: int, payload: bytes) -> None: + sock.sendall(struct.pack('!BI', typ, len(payload))) + sock.sendall(payload) + +def recv_packet(sock: socket.socket) -> Tuple[int, bytes]: + (typ, length) = struct.unpack('!BI', recvall(sock, 5)) + payload = recvall(sock, length) + return (typ, payload) + +def change_from_packet(typ, payload): + '''Convert a network packet to a Change object.''' + if typ == PacketType.CHANGE: + (version, ) = struct.unpack('!I', payload[0:4]) + payload = zlib.decompress(payload[4:]) + return Change(version=version, snapshot=None, + transaction=[t.decode('UTF-8') for t in payload.split(b'\x00')]) + elif typ == PacketType.SNAPSHOT: + (version, ) = struct.unpack('!I', payload[0:4]) + payload = zlib.decompress(payload[4:]) + return Change(version=version, snapshot=payload, transaction=None) + raise ValueError('Not a change (typ {})'.format(typ)) + +def packet_from_change(entry): + '''Convert a Change object to a network packet.''' + if entry.snapshot is None: + typ = PacketType.CHANGE + payload = b'\x00'.join([t.encode('UTF-8') for t in entry.transaction]) + else: + typ = PacketType.SNAPSHOT + payload = entry.snapshot + + version = struct.pack("!I", entry.version) + return typ, version + zlib.compress(payload) diff --git a/backup/remote.md b/backup/remote.md new file mode 100644 index 0000000..ba7d4ac --- /dev/null +++ b/backup/remote.md @@ -0,0 +1,172 @@ +Remote backup backend for c-lightning +===================================== + +Introduction +------------ + +The purpose of this backend is to allow hassle-free incremental remote backups of a c-lightning +daemon's state. + +The remote backup system consists of two parts: + +- A `backup.py` plugin backend that listens for changes to c-lightning's database and communicates them + to a remote server. + +- A server daemon that receives changes from the backup backend and communicates with a local backup backend + to store them. The server side does not need to be running c-lightning, nor have it installed. + +The backend URL format is `socket::`. For example `socket:127.0.0.1:1234`. + +To run the server against a local backend use `backup-cli server file://.../ 127.0.0.1:1234`. + +Usage +----- + +First initialize an empty file backend on the server side, then start the server: + +```bash +backup-cli init file:///path/to/backup +backup-cli server file:///path/to/backup 127.0.0.1:8700 +``` + +On the client side: + +```bash +# Make sure c-lightning is not running +lightning-cli stop +# Initialize the socket backend (this makes an initial snapshot, and creates a configuration file for the plugin) +backup-cli init socket:127.0.0.1:8700 --lightning-dir "$HOME/.lightning/bitcoin" +# Start c-lighting, with the backup plugin as important plugin so that any issue with it stops the daemon +lightningd ... \ + --important-plugin /path/to/plugins/backup/backup.py +``` + +The easiest way to connect the server and client if they are not running on the same host is with a ssh +forward. For example, when connecting from another machine to the one running c-lightning use: + +```bash +ssh mylightninghost -R 8700:127.0.0.1:8700 +``` + +Or when it is the other way around: + +```bash +ssh backupserver -L 8700:127.0.0.1:8700 +``` + +Goals +----- + +- Hassle-free incremental remote backup of c-lightning's database over a simple TCP protocol. + +- Safety. c-lightningd will only proceed when the remote backend has acknowledged storing a change, and will halt when there is no connection to the backup server. + +- Bandwidth efficiency. Updates can be really large, and SQL statements ought to be well compressible, so bandwidth is saved by performing zlib compression on the changes and snapshots. + +Non-goals +--------- + +- Encryption. This is outside scope, a VPN (say, a wireguard connection), SSH tunnel (ssh `-L` or `-R`), or even a Tor onion service is more flexible, avoids the pitfalls of custom cryptography code, and for the user to learn yet another way to configure secure transport. + +Protocol details +================ + +A bidirectional TCP protocol is used to synchronize state between the client and server. It is documented here in case anyone wants to make a custom server implementation. + +Packet format: + + + +Every packet has a type and a 32-bit length. Defined packet types are: + + 0x01 CHANGE Change + 0x02 SNAPSHOT Snapshot + 0x03 REWIND Rewind a version (can only be done once) + 0x04 REQ_METADATA Request metadata + 0x05 RESTORE Request stream of changes to restore + 0x06 ACK Acknowledge change, snapshot or rewind + 0x07 NACK An error happened (e.g. rewind too far) + 0x08 METADATA Metadata response + 0x09 DONE Restore is complete + 0x0A COMPACT Do backup compaction + 0x0B COMPACT_RES Database compaction result + +CHANGE +------ + +A database update. + +Fields: + +- version (u32) +- a list of SQL statements to be executed for this update, encoded as UTF-8, separated by NULL bytes. The last statement will not be terminated with a NULL byte. (zlib compressed) + +SNAPSHOT +-------- + +A full database snapshot, replacing the previous incremental backup. + +Fields: + +- version (u32) +- a raw dump of the sqlite database (zlib compressed) + +REQ_METADATA +------------ + +Request metadata from server. The server should respond with a `METADATA` packet. + +No fields. + +RESTORE +------- + +Request a stream of changes to restore the database. + +The server should respond with a stream of `CHANGE` and `SNAPSHOT` packets, finishing with a `DONE` packet. + +Unlike when sending a change to backup, the client is not required to (but may) respond to these with `ACK`. + +No fields. + +ACK +--- + +General succss response. Acknowledge having processed a `CHANGE` and `SNAPSHOT` packet. + +Fields: + +- new version (u32) + +NACK +---- + +Indicates an error processing the last packet. + +No fields. + +METADATA +-------- + +Metadata response, sent as response to `REQ_METADATA`. + +Fields: + +- protocol (should be 0x01) (u32) +- version (u32) +- prev_version (u32) +- version_count (u64) + +COMPACT +-------- + +Do a database compaction. Sends `COMPACT_RES` on succesful completion, `NACK` otherwise. + +COMPACT_RES +----------- + +Result of a database compaction. + +Fields + +- A UTF-8 encoded JSON data structure with statistics as returned by Backend.compact() diff --git a/backup/server.py b/backup/server.py new file mode 100644 index 0000000..afa5a1c --- /dev/null +++ b/backup/server.py @@ -0,0 +1,87 @@ +import logging, socket, struct +import json +from typing import Tuple + +from backend import Backend +from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet + +class SocketServer: + def __init__(self, addr: Tuple[str, int], backend: Backend) -> None: + self.backend = backend + self.addr = addr + self.bind = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.bind.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.bind.bind(addr) + + def _send_packet(self, typ: int, payload: bytes) -> None: + send_packet(self.sock, typ, payload) + + def _recv_packet(self) -> Tuple[int, bytes]: + return recv_packet(self.sock) + + def _handle_conn(self, conn) -> None: + # Can only handle one connection at a time + logging.info('Servicing incoming connection') + self.sock = conn + while True: + try: + (typ, payload) = self._recv_packet() + except IOError as e: + logging.info('Connection closed') + break + if typ in PKT_CHANGE_TYPES: + change = change_from_packet(typ, payload) + if typ == PacketType.CHANGE: + logging.info('Received CHANGE {}'.format(change.version)) + else: + logging.info('Received SNAPSHOT {}'.format(change.version)) + self.backend.add_change(change) + self._send_packet(PacketType.ACK, struct.pack("!I", self.backend.version)) + elif typ == PacketType.REWIND: + logging.info('Received REWIND') + to_version, = struct.unpack('!I', payload) + if to_version != self.backend.prev_version: + logging.info('Cannot rewind to version {}'.format(to_version)) + self._send_packet(PacketType.NACK, struct.pack("!I", self.backend.version)) + else: + self.backend.rewind() + self._send_packet(PacketType.ACK, struct.pack("!I", self.backend.version)) + elif typ == PacketType.REQ_METADATA: + logging.info('Received REQ_METADATA') + blob = struct.pack("!IIIQ", 0x01, self.backend.version, + self.backend.prev_version, + self.backend.version_count) + self._send_packet(PacketType.METADATA, blob) + elif typ == PacketType.RESTORE: + logging.info('Received RESTORE') + for change in self.backend.stream_changes(): + (typ, payload) = packet_from_change(change) + self._send_packet(typ, payload) + self._send_packet(PacketType.DONE, b'') + elif typ == PacketType.COMPACT: + logging.info('Received COMPACT') + stats = self.backend.compact() + self._send_packet(PacketType.COMPACT_RES, json.dumps(stats).encode()) + elif typ == PacketType.ACK: + logging.info('Received ACK') + elif typ == PacketType.NACK: + logging.info('Received NACK') + elif typ == PacketType.METADATA: + logging.info('Received METADATA') + elif typ == PacketType.COMPACT_RES: + logging.info('Received COMPACT_RES') + else: + raise Exception('Unknown or unexpected packet type {}'.format(typ)) + self.conn = None + + def run(self) -> None: + self.bind.listen(1) + logging.info('Waiting for connection on {}'.format(self.addr)) + while True: + conn, _ = self.bind.accept() + try: + self._handle_conn(conn) + except Exception as e: + logging.exception('Got exception') + finally: + conn.close() diff --git a/backup/socketbackend.py b/backup/socketbackend.py new file mode 100644 index 0000000..c020a5a --- /dev/null +++ b/backup/socketbackend.py @@ -0,0 +1,80 @@ +import json +import logging, socket, struct +from typing import Tuple, Iterator +from urllib.parse import urlparse + +from backend import Backend, Change +from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet + +class SocketBackend(Backend): + def __init__(self, destination: str, create: bool): + self.version = None + self.prev_version = None + self.destination = destination + self.url = urlparse(self.destination) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + (host, port) = self.url.path.split(':') + logging.info('Initialized socket backend') + self.sock.connect((host, int(port))) + logging.info('Connected to {}'.format(destination)) + + def _send_packet(self, typ: int, payload: bytes) -> None: + send_packet(self.sock, typ, payload) + + def _recv_packet(self) -> Tuple[int, bytes]: + return recv_packet(self.sock) + + def initialize(self) -> bool: + ''' + Initialize socket backend by request current metadata from server. + ''' + logging.info('Initializing backend') + self._send_packet(PacketType.REQ_METADATA, b'') + (typ, payload) = self._recv_packet() + assert(typ == PacketType.METADATA) + self.protocol, self.version, self.prev_version, self.version_count = struct.unpack("!IIIQ", payload) + logging.info('Initialized SocketBackend: protocol={}, version={}, prev_version={}, version_count={}'.format( + self.protocol, self.version, self.prev_version, self.version_count + )) + return True + + def add_change(self, entry: Change) -> bool: + typ, payload = packet_from_change(entry) + self._send_packet(typ, payload) + # Wait for change to be acknowledged before continuing. + (typ, _) = self._recv_packet() + assert(typ == PacketType.ACK) + return True + + def rewind(self) -> bool: + '''Rewind to previous version.''' + version = struct.pack("!I", self.prev_version) + self._send_packet(PacketType.REWIND, version) + # Wait for change to be acknowledged before continuing. + (typ, _) = self._recv_packet() + assert(typ == PacketType.ACK) + return True + + def stream_changes(self) -> Iterator[Change]: + self._send_packet(PacketType.RESTORE, b'') + version = -1 + while True: + (typ, payload) = self._recv_packet() + if typ in PKT_CHANGE_TYPES: + change = change_from_packet(typ, payload) + version = change.version + yield change + elif typ == PacketType.DONE: + break + else: + raise ValueError("Unknown entry type {}".format(typ)) + + if version != self.version: + raise ValueError("Versions do not match up: restored version {}, backend version {}".format(version, self.version)) + assert(version == self.version) + + def compact(self): + self._send_packet(PacketType.COMPACT, b'') + (typ, payload) = self._recv_packet() + assert(typ == PacketType.COMPACT_RES) + return json.loads(payload.decode()) diff --git a/backup/test_backup.py b/backup/test_backup.py index c27280a..05a5421 100644 --- a/backup/test_backup.py +++ b/backup/test_backup.py @@ -1,4 +1,4 @@ -from backup import FileBackend +from filebackend import FileBackend from flaky import flaky from pyln.testing.fixtures import * # noqa: F401,F403 from pyln.testing.utils import sync_blockheight