diff --git a/backup/backup.py b/backup/backup.py index 64cd9e2..3d10377 100755 --- a/backup/backup.py +++ b/backup/backup.py @@ -98,9 +98,9 @@ def on_init(options, **kwargs): # IMPORTANT NOTE # Putting RPC stuff in init() like the following can cause deadlocks! # See: https://github.com/lightningd/plugins/issues/209 - #configs = plugin.rpc.listconfigs() - #if not configs['wallet'].startswith('sqlite3'): - # kill("The backup plugin only works with the sqlite3 database.") + # configs = plugin.rpc.listconfigs() + # if not configs['wallet'].startswith('sqlite3'): + # kill("The backup plugin only works with the sqlite3 database.") def kill(message: str): diff --git a/backup/filebackend.py b/backup/filebackend.py index d8683b8..3f0bc57 100644 --- a/backup/filebackend.py +++ b/backup/filebackend.py @@ -1,11 +1,13 @@ -import logging, os, struct +import logging +import os +import struct import shutil import tempfile from typing import Iterator from urllib.parse import urlparse - from backend import Backend, Change + class FileBackend(Backend): def __init__(self, destination: str, create: bool): self.version = None diff --git a/backup/protocol.py b/backup/protocol.py index c3ff8c6..8157074 100644 --- a/backup/protocol.py +++ b/backup/protocol.py @@ -8,6 +8,7 @@ import zlib from backend import Change + class PacketType: CHANGE = 0x01 SNAPSHOT = 0x02 @@ -21,8 +22,10 @@ class PacketType: COMPACT = 0x0a COMPACT_RES = 0x0b + PKT_CHANGE_TYPES = {PacketType.CHANGE, PacketType.SNAPSHOT} + def recvall(sock: socket.socket, n: int) -> bytearray: '''Receive exactly n bytes from a socket.''' buf = bytearray(n) @@ -35,28 +38,32 @@ def recvall(sock: socket.socket, n: int) -> bytearray: ptr += count return buf + def send_packet(sock: socket.socket, typ: int, payload: bytes) -> None: sock.sendall(struct.pack('!BI', typ, len(payload))) sock.sendall(payload) + def recv_packet(sock: socket.socket) -> Tuple[int, bytes]: (typ, length) = struct.unpack('!BI', recvall(sock, 5)) payload = recvall(sock, length) return (typ, payload) + def change_from_packet(typ, payload): '''Convert a network packet to a Change object.''' if typ == PacketType.CHANGE: (version, ) = struct.unpack('!I', payload[0:4]) payload = zlib.decompress(payload[4:]) return Change(version=version, snapshot=None, - transaction=[t.decode('UTF-8') for t in payload.split(b'\x00')]) + transaction=[t.decode('UTF-8') for t in payload.split(b'\x00')]) elif typ == PacketType.SNAPSHOT: (version, ) = struct.unpack('!I', payload[0:4]) payload = zlib.decompress(payload[4:]) return Change(version=version, snapshot=payload, transaction=None) raise ValueError('Not a change (typ {})'.format(typ)) + def packet_from_change(entry): '''Convert a Change object to a network packet.''' if entry.snapshot is None: diff --git a/backup/server.py b/backup/server.py index 6d22160..8a03712 100644 --- a/backup/server.py +++ b/backup/server.py @@ -1,10 +1,13 @@ -import logging, socket, struct +import logging +import socket +import struct import json import sys from typing import Tuple from backend import Backend -from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet +from protocol import PacketType, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet + class SystemdHandler(logging.Handler): PREFIX = { @@ -31,6 +34,7 @@ class SystemdHandler(logging.Handler): except Exception: self.handleError(record) + def setup_server_logging(mode, level): root_logger = logging.getLogger() root_logger.setLevel(level.upper()) @@ -42,6 +46,7 @@ def setup_server_logging(mode, level): else: assert(mode == 'plain') + class SocketServer: def __init__(self, addr: Tuple[str, int], backend: Backend) -> None: self.backend = backend @@ -63,7 +68,7 @@ class SocketServer: while True: try: (typ, payload) = self._recv_packet() - except IOError as e: + except IOError: logging.info('Connection closed') break if typ in PKT_CHANGE_TYPES: @@ -86,8 +91,8 @@ class SocketServer: elif typ == PacketType.REQ_METADATA: logging.debug('Received REQ_METADATA') blob = struct.pack("!IIIQ", 0x01, self.backend.version, - self.backend.prev_version, - self.backend.version_count) + self.backend.prev_version, + self.backend.version_count) self._send_packet(PacketType.METADATA, blob) elif typ == PacketType.RESTORE: logging.info('Received RESTORE') @@ -118,7 +123,7 @@ class SocketServer: conn, _ = self.bind.accept() try: self._handle_conn(conn) - except Exception as e: + except Exception: logging.exception('Got exception') finally: conn.close() diff --git a/backup/socketbackend.py b/backup/socketbackend.py index f6a747c..85be586 100644 --- a/backup/socketbackend.py +++ b/backup/socketbackend.py @@ -1,37 +1,47 @@ from collections import namedtuple -import json, logging, socket, re, struct, time +import json +import logging +import socket +import re +import struct +import time from typing import Tuple, Iterator from urllib.parse import urlparse, parse_qs from backend import Backend, Change -from protocol import PacketType, recvall, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet +from protocol import PacketType, PKT_CHANGE_TYPES, change_from_packet, packet_from_change, send_packet, recv_packet # Total number of reconnection tries -RECONNECT_TRIES=5 +RECONNECT_TRIES = 5 # Delay in seconds between reconnections (initial) -RECONNECT_DELAY=5 +RECONNECT_DELAY = 5 # Scale delay factor after each failure -RECONNECT_DELAY_BACKOFF=1.5 +RECONNECT_DELAY_BACKOFF = 1.5 HostPortInfo = namedtuple('HostPortInfo', ['host', 'port', 'addrtype']) SocketURLInfo = namedtuple('SocketURLInfo', ['target', 'proxytype', 'proxytarget']) # Network address type. + + class AddrType: IPv4 = 0 IPv6 = 1 NAME = 2 # Proxy type. Only SOCKS5 supported at the moment as this is sufficient for Tor. + + class ProxyType: DIRECT = 0 SOCKS5 = 1 + def parse_host_port(path: str) -> HostPortInfo: '''Parse a host:port pair.''' - if path.startswith('['): # bracketed IPv6 address + if path.startswith('['): # bracketed IPv6 address eidx = path.find(']') if eidx == -1: raise ValueError('Unterminated bracketed host address.') @@ -46,7 +56,7 @@ def parse_host_port(path: str) -> HostPortInfo: if eidx == -1: raise ValueError('Port number missing.') host = path[0:eidx] - if re.match('\d+\.\d+\.\d+\.\d+$', host): # matches IPv4 address format + if re.match(r'\d+\.\d+\.\d+\.\d+$', host): # matches IPv4 address format addrtype = AddrType.IPv4 else: addrtype = AddrType.NAME @@ -59,6 +69,7 @@ def parse_host_port(path: str) -> HostPortInfo: return HostPortInfo(host=host, port=port, addrtype=addrtype) + def parse_socket_url(destination: str) -> SocketURLInfo: '''Parse a socket: URL to extract the information contained in it.''' url = urlparse(destination) @@ -73,7 +84,7 @@ def parse_socket_url(destination: str) -> SocketURLInfo: # reject unknown parameters (currently all of them) qs = parse_qs(url.query) for (key, values) in qs.items(): - if key == 'proxy': # proxy=socks5:127.0.0.1:9050 + if key == 'proxy': # proxy=socks5:127.0.0.1:9050 if len(values) != 1: raise ValueError('Proxy can only have one value') @@ -88,6 +99,7 @@ def parse_socket_url(destination: str) -> SocketURLInfo: return SocketURLInfo(target=target, proxytype=proxytype, proxytarget=proxytarget) + class SocketBackend(Backend): def __init__(self, destination: str, create: bool): self.version = None @@ -100,7 +112,7 @@ class SocketBackend(Backend): if self.url.proxytype == ProxyType.DIRECT: if self.url.target.addrtype == AddrType.IPv6: self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - else: # TODO NAME is assumed to be IPv4 for now + else: # TODO NAME is assumed to be IPv4 for now self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) else: assert(self.url.proxytype == ProxyType.SOCKS5) @@ -109,8 +121,8 @@ class SocketBackend(Backend): self.sock.set_proxy(socks.SOCKS5, self.url.proxytarget.host, self.url.proxytarget.port) logging.info('Connecting to {}:{} (addrtype {}, proxytype {}, proxytarget {})...'.format( - self.url.target.host, self.url.target.port, self.url.target.addrtype, - self.url.proxytype, self.url.proxytarget)) + self.url.target.host, self.url.target.port, self.url.target.addrtype, + self.url.proxytype, self.url.proxytarget)) self.sock.connect((self.url.target.host, self.url.target.port)) logging.info('Connected to {}'.format(self.destination)) @@ -144,7 +156,7 @@ class SocketBackend(Backend): retry = 0 retry_delay = RECONNECT_DELAY need_connect = False - while True: # Retry loop + while True: # Retry loop try: if need_connect: self.connect() diff --git a/backup/test_backup.py b/backup/test_backup.py index 180c96d..95ffcea 100644 --- a/backup/test_backup.py +++ b/backup/test_backup.py @@ -1,5 +1,4 @@ from backend import Backend -from filebackend import FileBackend import socketbackend from flaky import flaky from pyln.testing.fixtures import * # noqa: F401,F403 @@ -7,6 +6,7 @@ from pyln.testing.utils import sync_blockheight import os import pytest import subprocess +import tempfile plugin_dir = os.path.dirname(__file__) @@ -236,6 +236,7 @@ class DummyBackend(Backend): def __init__(self): pass + def test_rewrite(): tests = [ ( @@ -281,6 +282,7 @@ def test_compact(bitcoind, directory, node_factory): tmp = tempfile.TemporaryDirectory() subprocess.check_call([cli_path, "restore", bdest, tmp.name]) + def test_parse_socket_url(): with pytest.raises(ValueError): # fail: invalid url scheme