historian: Add loading functionality to historian-cli

This commit is contained in:
Christian Decker
2020-10-11 21:57:16 +02:00
parent 2e0963cf40
commit b1338ef5d4
2 changed files with 117 additions and 0 deletions

View File

@@ -1,14 +1,22 @@
#!/usr/bin/env python3
from tqdm import tqdm
from contextlib import contextmanager
from sqlalchemy import create_engine
from common import Base, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func
from datetime import datetime, timedelta
from collections import namedtuple
import click
from pyln.proto.primitives import varint_encode, varint_decode
import os
from sqlalchemy.orm import load_only
import re
import io
import logging
import socket
from pyln.proto import wire
default_db = "sqlite:///$HOME/.lightning/bitcoin/historian.sqlite3"
@@ -108,6 +116,9 @@ ORDER BY
a.scid
""".format(since.strftime("%Y-%m-%d %H:%M:%S")))
# Write the header now that we know we'll be writing something.
destination.write(b"GSP\x01")
chan_count = 0
last_scid = None
for scid, cann, u1, u2 in rows:
@@ -170,6 +181,17 @@ def full(ctx, destination, db):
@snapshot.command()
@click.argument('snapshot', type=click.File('rb'))
def read(snapshot):
header = snapshot.read(4)
if len(header) < 4:
raise ValueError("Could not read header")
tag, version = header[0:3], header[3]
if tag != b'GSP':
raise ValueError(f"Header mismatch, expected GSP, got {repr(tag)}")
if version != 1:
raise ValueError(f"Unsupported version {version}, only support up to version 1")
while True:
l = varint_decode(snapshot)
if l is None:
@@ -182,5 +204,99 @@ def read(snapshot):
print(msg.hex())
LightningAddress = namedtuple('LightningAddress', ['node_id', 'host', 'port'])
class LightningAddressParam(click.ParamType):
def convert(self, value, param, ctx):
m = re.match(r"(0[23][a-fA-F0-9]+)@([a-zA-Z0-9\.:]+):([0-9]+)?", value)
if m is None:
self.fail(
f"{value} isn't a valid lightning connection string, "
"expected \"[node_id]@[host]:[port]\""
)
return
if len(m.groups()) < 3:
return LightningAddress(m[1], m[2], 9735)
else:
return LightningAddress(m[1], m[2], int(m[3]))
class LightningPeer:
def __init__(self, node_id: str, address: str, port: int = 9735):
self.node_id = node_id
self.address = address
self.port = port
self.connection = None
self.local_privkey = wire.PrivateKey(os.urandom(32))
def connect(self):
sock = socket.create_connection((self.address, self.port), timeout=30)
self.connection = wire.LightningConnection(
sock,
remote_pubkey=wire.PublicKey(bytes.fromhex(self.node_id)),
local_privkey=self.local_privkey,
is_initiator=True,
)
self.connection.shake()
# Send an init message, with no global features, and 0b10101010 as
# local features.
self.connection.send_message(b"\x00\x10\x00\x00\x00\x01\xaa")
def send(self, packet: bytes) -> None:
if self.connection is None:
raise ValueError("Not connected to peer")
logging.debug("Sending {}".format(packet.hex()))
self.connection.send_message(packet)
def send_all(self, packets) -> None:
assert self.connection is not None
for p in packets:
self.send(p)
def disconnect(self):
self.connection.connection.close()
def split_gossip(reader: io.BytesIO):
while True:
length = varint_decode(reader)
if length is None:
break
msg = reader.read(length)
if len(msg) != length:
raise ValueError("Incomplete read at end of file")
yield msg
@snapshot.command()
@click.argument('snapshot', type=click.File('rb'))
@click.argument('destination', type=LightningAddressParam())
def load(snapshot, destination):
header = snapshot.read(4)
if len(header) < 4:
raise ValueError("Could not read header")
tag, version = header[0:3], header[3]
if tag != b'GSP':
raise ValueError(f"Header mismatch, expected GSP, got {repr(tag)}")
if version != 1:
raise ValueError(f"Unsupported version {version}, only support up to version 1")
logging.debug(f"Connecting to {destination}")
peer = LightningPeer(destination.node_id, destination.host, destination.port)
peer.connect()
logging.debug("Connected, streaming messages from snapshot")
peer.send_all(tqdm(split_gossip(snapshot)))
peer.disconnect()
logging.debug("Done streaming messages, disconnecting")
if __name__ == "__main__":
cli()

View File

@@ -1,3 +1,4 @@
inotify==0.2.10
sqlalchemy==1.3.17
pyln-proto
tqdm