Files
plugins/historian/historian-cli
Christian Decker 2e0963cf40 historian: Switch to nested queries from massive joins
Not only is this more correct, but it's also quicker, since we avoid the huge
joins.
2021-01-22 18:00:16 +01:00

187 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python3
from contextlib import contextmanager
from sqlalchemy import create_engine
from common import Base, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement
from sqlalchemy.orm import sessionmaker
from sqlalchemy import func
from datetime import datetime, timedelta
import click
from pyln.proto.primitives import varint_encode, varint_decode
import os
from sqlalchemy.orm import load_only
default_db = "sqlite:///$HOME/.lightning/bitcoin/historian.sqlite3"
@contextmanager
def db_session(dsn):
""" Tiny contextmanager to facilitate sqlalchemy session management
"""
if dsn is None:
dsn = default_db
dsn = os.path.expandvars(dsn)
engine = create_engine(dsn, echo=False)
Base.metadata.create_all(engine)
session_maker = sessionmaker(bind=engine)
session = session_maker()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
@click.group()
def cli():
pass
@cli.group()
def snapshot():
pass
dt_fmt = "%Y-%m-%d %H:%M:%S"
default_since = datetime.utcnow() - timedelta(hours=1)
@snapshot.command()
@click.argument('destination', type=click.File('wb'))
@click.argument(
'since',
type=click.DateTime(formats=[dt_fmt]),
default=default_since.strftime(dt_fmt)
)
@click.option('--db', type=str, default=default_db)
def incremental(since, destination, db):
with db_session(db) as session:
# Several nested queries here because join was a bit too
# restrictive. The inner SELECT in the WHERE-clause selects all scids
# that had any updates in the desired timerange. The outer SELECT then
# gets all the announcements and kicks off inner SELECTs that look for
# the latest update for each direction.
rows = session.execute("""
SELECT
a.scid,
a.raw,
(
SELECT
u.raw
FROM
channel_updates u
WHERE
u.scid = a.scid AND
direction = 0
ORDER BY
timestamp
DESC LIMIT 1
) as u0,
(
SELECT
u.raw
FROM
channel_updates u
WHERE
u.scid = a.scid AND
direction = 1
ORDER BY
timestamp
DESC LIMIT 1
) as u1
FROM
channel_announcements a
WHERE
a.scid IN (
SELECT
u.scid
FROM
channel_updates u
WHERE
u.timestamp >= DATETIME('{}')
GROUP BY
u.scid
)
ORDER BY
a.scid
""".format(since.strftime("%Y-%m-%d %H:%M:%S")))
chan_count = 0
last_scid = None
for scid, cann, u1, u2 in rows:
if scid == last_scid:
continue
last_scid = scid
chan_count += 1
varint_encode(len(cann), destination)
destination.write(cann)
if u1 is not None:
varint_encode(len(u1), destination)
destination.write(u1)
if u2 is not None:
varint_encode(len(u2), destination)
destination.write(u2)
# Now get and return the node_announcements in the timerange. These
# come after the channels since no node without a
# channel_announcements and channel_update is allowed.
rows = session.execute("""
SELECT
n.node_id,
n.timestamp,
n.raw
FROM
node_announcements n
WHERE
n.timestamp >= DATETIME('{}')
GROUP BY
n.node_id
HAVING
n.timestamp = MAX(n.timestamp)
ORDER BY timestamp DESC
""".format(since.strftime("%Y-%m-%d %H:%M:%S")))
last_nid = None
node_count = 0
for nid, ts, nann in rows:
if nid == last_nid:
continue
last_nid = nid
node_count += 1
varint_encode(len(nann), destination)
destination.write(nann)
click.echo(
f'Wrote {chan_count} channels and {node_count} nodes to {destination.name}',
err=True
)
@snapshot.command()
@click.argument('destination', type=click.File('wb'))
@click.pass_context
@click.option('--db', type=str, default=default_db)
def full(ctx, destination, db):
since = datetime.utcnow() - timedelta(weeks=2)
ctx.invoke(incremental, since=since, destination=destination, db=db)
@snapshot.command()
@click.argument('snapshot', type=click.File('rb'))
def read(snapshot):
while True:
l = varint_decode(snapshot)
if l is None:
break
msg = snapshot.read(l)
if len(msg) != l:
raise ValueError("Incomplete read at end of file")
print(msg.hex())
if __name__ == "__main__":
cli()