mirror of
https://github.com/aljazceru/plugins.git
synced 2025-12-19 14:14:20 +01:00
210 lines
7.8 KiB
Python
210 lines
7.8 KiB
Python
import logging, os, struct
|
|
import shutil
|
|
import tempfile
|
|
from typing import Iterator
|
|
from urllib.parse import urlparse
|
|
|
|
from backend import Backend, Change
|
|
|
|
class FileBackend(Backend):
|
|
def __init__(self, destination: str, create: bool):
|
|
self.version = None
|
|
self.prev_version = None
|
|
self.destination = destination
|
|
self.offsets = [0, 0]
|
|
self.version_count = 0
|
|
self.url = urlparse(self.destination)
|
|
|
|
if os.path.exists(self.url.path) and create:
|
|
raise ValueError("Attempted to create a FileBackend, but file already exists.")
|
|
if not os.path.exists(self.url.path) and not create:
|
|
raise ValueError("Attempted to open a FileBackend but file doesn't already exists, use `backup-cli init` to initialize it first.")
|
|
if create:
|
|
# Initialize a new backup file
|
|
self.version, self.prev_version = 0, 0
|
|
self.offsets = [512, 0]
|
|
self.version_count = 0
|
|
self.write_metadata()
|
|
|
|
def initialize(self) -> bool:
|
|
return self.read_metadata()
|
|
|
|
def write_metadata(self):
|
|
blob = struct.pack("!IIQIQQ", 0x01, self.version, self.offsets[0],
|
|
self.prev_version, self.offsets[1],
|
|
self.version_count)
|
|
|
|
# Pad the header
|
|
blob += b'\x00' * (512 - len(blob))
|
|
mode = "rb+" if os.path.exists(self.url.path) else "wb+"
|
|
|
|
with open(self.url.path, mode) as f:
|
|
f.seek(0)
|
|
f.write(blob)
|
|
f.flush()
|
|
|
|
def read_metadata(self):
|
|
with open(self.url.path, 'rb') as f:
|
|
blob = f.read(512)
|
|
if len(blob) != 512:
|
|
logging.warn("Corrupt FileBackend header, expected 512 bytes, got {} bytes".format(len(blob)))
|
|
return False
|
|
|
|
file_version, = struct.unpack_from("!I", blob)
|
|
if file_version != 1:
|
|
logging.warn("Unknown FileBackend version {}".format(file_version))
|
|
return False
|
|
|
|
self.version, self.offsets[0], self.prev_version, self.offsets[1], self.version_count, = struct.unpack_from("!IQIQQ", blob, offset=4)
|
|
|
|
return True
|
|
|
|
def add_change(self, entry: Change) -> bool:
|
|
typ = b'\x01' if entry.snapshot is None else b'\x02'
|
|
if typ == b'\x01':
|
|
payload = b'\x00'.join([t.encode('UTF-8') for t in entry.transaction])
|
|
elif typ == b'\x02':
|
|
payload = entry.snapshot
|
|
|
|
length = struct.pack("!I", len(payload))
|
|
version = struct.pack("!I", entry.version)
|
|
with open(self.url.path, 'ab') as f:
|
|
f.seek(self.offsets[0])
|
|
f.write(length)
|
|
f.write(version)
|
|
f.write(typ)
|
|
f.write(payload)
|
|
self.prev_version, self.offsets[1] = self.version, self.offsets[0]
|
|
self.version = entry.version
|
|
self.offsets[0] += 9 + len(payload)
|
|
self.version_count += 1
|
|
self.write_metadata()
|
|
|
|
return True
|
|
|
|
def rewind(self):
|
|
# After rewinding we set offsets[0] and prev_version to 0 (best effort
|
|
# result). If either of these are set to 0 we have two consecutive
|
|
# rewinds which cannot be safely done (we'd be rewinding more than the
|
|
# one in-flight transaction).
|
|
if self.offsets[1] == 0 or self.prev_version == 0:
|
|
logging.warn("Cannot rewind multiple times.")
|
|
return False
|
|
|
|
self.version, self.offsets[0] = self.prev_version, self.offsets[1]
|
|
self.prev_version, self.offsets[1] = 0, 0
|
|
return True
|
|
|
|
def stream_changes(self) -> Iterator[Change]:
|
|
self.read_metadata()
|
|
version = -1
|
|
with open(self.url.path, 'rb') as f:
|
|
# Skip the header
|
|
f.seek(512)
|
|
while version < self.version:
|
|
length, version, typ = struct.unpack("!IIb", f.read(9))
|
|
payload = f.read(length)
|
|
if typ == 1:
|
|
yield Change(
|
|
version=version,
|
|
snapshot=None,
|
|
transaction=[t.decode('UTF-8') for t in payload.split(b'\x00')]
|
|
)
|
|
elif typ == 2:
|
|
yield Change(version=version, snapshot=payload, transaction=None)
|
|
else:
|
|
raise ValueError("Unknown FileBackend entry type {}".format(typ))
|
|
|
|
if version != self.version:
|
|
raise ValueError("Versions do not match up: restored version {}, backend version {}".format(version, self.version))
|
|
assert(version == self.version)
|
|
|
|
def compact(self):
|
|
stop = self.version # Stop one version short of the head when compacting
|
|
tmp = tempfile.TemporaryDirectory()
|
|
backupdir, clonename = os.path.split(self.url.path)
|
|
|
|
# Path of the backup clone that we're trying to build up. We
|
|
# are trying to put this right next to the original backup, to
|
|
# maximize the chances of both being on the same FS, which
|
|
# makes the move below atomic.
|
|
clonepath = os.path.join(backupdir, clonename + ".compacting")
|
|
|
|
# Location we extract the snapshot to and then apply
|
|
# incremental changes.
|
|
snapshotpath = os.path.join(tmp.name, "lightningd.sqlite3")
|
|
|
|
stats = {
|
|
'before': {
|
|
'backupsize': os.stat(self.url.path).st_size,
|
|
'version_count': self.version_count,
|
|
},
|
|
}
|
|
|
|
print("Starting compaction: stats={}".format(stats))
|
|
self.db = self._db_open(snapshotpath)
|
|
|
|
for change in self.stream_changes():
|
|
if change.version == stop:
|
|
break
|
|
|
|
if change.snapshot is not None:
|
|
self._restore_snapshot(change.snapshot, snapshotpath)
|
|
|
|
if change.transaction is not None:
|
|
self._restore_transaction(change.transaction)
|
|
self.db.commit()
|
|
|
|
# If this assertion fails we are in a degenerate state: we
|
|
# have less than two changes in the backup (starting
|
|
# Core-Lightning alone produces 6 changes), and compacting an
|
|
# almost empty backup is not useful.
|
|
assert change is not None
|
|
|
|
# Remember `change`, it's the rewindable change we need to
|
|
# stash on top of the new snapshot.
|
|
clone = FileBackend(clonepath, create=True)
|
|
clone.offsets = [512, 0]
|
|
|
|
# We are about to add the snapshot n-1 on top of n-2 (init),
|
|
# followed by the last change for n on top of
|
|
# n-1. prev_version trails that by one.
|
|
clone.version = change.version - 2
|
|
clone.prev_version = clone.version - 1
|
|
clone.version_count = 0
|
|
clone.write_metadata()
|
|
|
|
snapshot = Change(
|
|
version=change.version - 1,
|
|
snapshot=open(snapshotpath, 'rb').read(),
|
|
transaction=None
|
|
)
|
|
print("Adding intial snapshot with {} bytes for version {}".format(
|
|
len(snapshot.snapshot),
|
|
snapshot.version
|
|
))
|
|
clone.add_change(snapshot)
|
|
|
|
assert clone.version == change.version - 1
|
|
assert clone.prev_version == change.version - 2
|
|
clone.add_change(change)
|
|
|
|
assert self.version == clone.version
|
|
assert self.prev_version == clone.prev_version
|
|
|
|
stats['after'] = {
|
|
'version_count': clone.version_count,
|
|
'backupsize': os.stat(clonepath).st_size,
|
|
}
|
|
|
|
print("Compacted {} changes, saving {} bytes, swapping backups".format(
|
|
stats['before']['version_count'] - stats['after']['version_count'],
|
|
stats['before']['backupsize'] - stats['after']['backupsize'],
|
|
))
|
|
shutil.move(clonepath, self.url.path)
|
|
|
|
# Re-initialize ourselves so we have the correct metadata
|
|
self.read_metadata()
|
|
|
|
return stats
|