From b1338ef5d40921f255d67a08371420a5cd838a80 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Sun, 11 Oct 2020 21:57:16 +0200 Subject: [PATCH] historian: Add loading functionality to historian-cli --- historian/historian-cli | 116 +++++++++++++++++++++++++++++++++++++ historian/requirements.txt | 1 + 2 files changed, 117 insertions(+) diff --git a/historian/historian-cli b/historian/historian-cli index 19fc2bd..d77d204 100755 --- a/historian/historian-cli +++ b/historian/historian-cli @@ -1,14 +1,22 @@ #!/usr/bin/env python3 +from tqdm import tqdm from contextlib import contextmanager from sqlalchemy import create_engine from common import Base, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement from sqlalchemy.orm import sessionmaker from sqlalchemy import func from datetime import datetime, timedelta +from collections import namedtuple import click from pyln.proto.primitives import varint_encode, varint_decode import os from sqlalchemy.orm import load_only +import re +import io +import logging +import socket +from pyln.proto import wire + default_db = "sqlite:///$HOME/.lightning/bitcoin/historian.sqlite3" @@ -108,6 +116,9 @@ ORDER BY a.scid """.format(since.strftime("%Y-%m-%d %H:%M:%S"))) + # Write the header now that we know we'll be writing something. + destination.write(b"GSP\x01") + chan_count = 0 last_scid = None for scid, cann, u1, u2 in rows: @@ -170,6 +181,17 @@ def full(ctx, destination, db): @snapshot.command() @click.argument('snapshot', type=click.File('rb')) def read(snapshot): + header = snapshot.read(4) + if len(header) < 4: + raise ValueError("Could not read header") + + tag, version = header[0:3], header[3] + if tag != b'GSP': + raise ValueError(f"Header mismatch, expected GSP, got {repr(tag)}") + + if version != 1: + raise ValueError(f"Unsupported version {version}, only support up to version 1") + while True: l = varint_decode(snapshot) if l is None: @@ -182,5 +204,99 @@ def read(snapshot): print(msg.hex()) +LightningAddress = namedtuple('LightningAddress', ['node_id', 'host', 'port']) + + +class LightningAddressParam(click.ParamType): + def convert(self, value, param, ctx): + m = re.match(r"(0[23][a-fA-F0-9]+)@([a-zA-Z0-9\.:]+):([0-9]+)?", value) + if m is None: + self.fail( + f"{value} isn't a valid lightning connection string, " + "expected \"[node_id]@[host]:[port]\"" + ) + return + + if len(m.groups()) < 3: + return LightningAddress(m[1], m[2], 9735) + else: + return LightningAddress(m[1], m[2], int(m[3])) + + +class LightningPeer: + def __init__(self, node_id: str, address: str, port: int = 9735): + self.node_id = node_id + self.address = address + self.port = port + self.connection = None + self.local_privkey = wire.PrivateKey(os.urandom(32)) + + def connect(self): + sock = socket.create_connection((self.address, self.port), timeout=30) + self.connection = wire.LightningConnection( + sock, + remote_pubkey=wire.PublicKey(bytes.fromhex(self.node_id)), + local_privkey=self.local_privkey, + is_initiator=True, + ) + self.connection.shake() + + # Send an init message, with no global features, and 0b10101010 as + # local features. + self.connection.send_message(b"\x00\x10\x00\x00\x00\x01\xaa") + + def send(self, packet: bytes) -> None: + if self.connection is None: + raise ValueError("Not connected to peer") + + logging.debug("Sending {}".format(packet.hex())) + self.connection.send_message(packet) + + def send_all(self, packets) -> None: + assert self.connection is not None + for p in packets: + self.send(p) + + def disconnect(self): + self.connection.connection.close() + + +def split_gossip(reader: io.BytesIO): + while True: + length = varint_decode(reader) + if length is None: + break + + msg = reader.read(length) + if len(msg) != length: + raise ValueError("Incomplete read at end of file") + + yield msg + + +@snapshot.command() +@click.argument('snapshot', type=click.File('rb')) +@click.argument('destination', type=LightningAddressParam()) +def load(snapshot, destination): + header = snapshot.read(4) + if len(header) < 4: + raise ValueError("Could not read header") + + tag, version = header[0:3], header[3] + if tag != b'GSP': + raise ValueError(f"Header mismatch, expected GSP, got {repr(tag)}") + + if version != 1: + raise ValueError(f"Unsupported version {version}, only support up to version 1") + + logging.debug(f"Connecting to {destination}") + peer = LightningPeer(destination.node_id, destination.host, destination.port) + peer.connect() + logging.debug("Connected, streaming messages from snapshot") + peer.send_all(tqdm(split_gossip(snapshot))) + peer.disconnect() + logging.debug("Done streaming messages, disconnecting") + + if __name__ == "__main__": cli() diff --git a/historian/requirements.txt b/historian/requirements.txt index 193d289..1a8e63f 100644 --- a/historian/requirements.txt +++ b/historian/requirements.txt @@ -1,3 +1,4 @@ inotify==0.2.10 sqlalchemy==1.3.17 pyln-proto +tqdm