#!/usr/bin/env python3 from contextlib import contextmanager from sqlalchemy import create_engine from common import Base, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement from sqlalchemy.orm import sessionmaker from sqlalchemy import func from datetime import datetime, timedelta import click from pyln.proto.primitives import varint_encode import os from sqlalchemy.orm import load_only default_db = "sqlite:///$HOME/.lightning/bitcoin/historian.sqlite3" @contextmanager def db_session(dsn): """ Tiny contextmanager to facilitate sqlalchemy session management """ if dsn is None: dsn = default_db dsn = os.path.expandvars(dsn) engine = create_engine(dsn, echo=False) Base.metadata.create_all(engine) session_maker = sessionmaker(bind=engine) session = session_maker() try: yield session session.commit() except: session.rollback() raise finally: session.close() @click.group() def cli(): pass @cli.group() def snapshot(): pass dt_fmt = "%Y-%m-%d %H:%M:%S" default_since = datetime.utcnow() - timedelta(hours=1) @snapshot.command() @click.argument('destination', type=click.File('wb')) @click.argument( 'since', type=click.DateTime(formats=[dt_fmt]), default=default_since.strftime(dt_fmt) ) @click.option('--db', type=str, default=default_db) def incremental(since, destination, db): with db_session(db) as session: # The following is a mouthful, but it is relatively simple. The inner # query selects all scids that had any updates in the timerange we're # interested in. The outer query then selects announcements and # channel_updates that match those scids. The results are ordered by # scid, and update timestamps, so we can deduplicate them while # iterating through them. This last step is a bit wasteful, since we # end up iterating over many more entries than strictly necessary, but # a `GROUP BY a.scid` may filter out the latest updates, which is not # what we want and we were unable to find a `HAVING` statement that'd # filter only on the group rather than the table. rows = session.execute(""" SELECT a.scid, a.raw, u1.raw, u2.raw FROM channel_announcements a LEFT JOIN channel_updates u1 ON (a.scid = u1.scid) LEFT JOIN channel_updates u2 ON (a.scid = u2.scid) WHERE (u1.direction IS NULL OR u1.direction = 0) AND (u2.direction IS NULL OR u2.direction = 1) AND a.scid IN ( SELECT u.scid FROM channel_updates u WHERE u.timestamp >= DATETIME('{}') GROUP BY u.scid ) ORDER BY a.scid, u1.timestamp DESC, u2.timestamp DESC """.format(since.strftime("%Y-%m-%d %H:%M:%S"))) chan_count = 0 last_scid = None for scid, cann, u1, u2 in rows: if scid == last_scid: continue last_scid = scid chan_count += 1 varint_encode(len(cann), destination) destination.write(cann) varint_encode(len(u1), destination) destination.write(u1) varint_encode(len(u2), destination) destination.write(u2) # Now get and return the node_announcements in the timerange. These # come after the channels since no node without a # channel_announcements and channel_update is allowed. rows = session.execute(""" SELECT n.node_id, n.timestamp, n.raw FROM node_announcements n WHERE n.timestamp >= DATETIME('{}') GROUP BY n.node_id HAVING n.timestamp = MAX(n.timestamp) ORDER BY timestamp DESC """.format(since.strftime("%Y-%m-%d %H:%M:%S"))) last_nid = None node_count = 0 for nid, ts, nann in rows: if nid == last_nid: continue last_nid = nid node_count += 1 varint_encode(len(nann), destination) destination.write(nann) click.echo( f'Wrote {chan_count} channels and {node_count} nodes to {destination.name}', err=True ) @snapshot.command() @click.argument('destination', type=click.File('wb')) @click.pass_context @click.option('--db', type=str, default=default_db) def full(ctx, destination, db): since = datetime.utcnow() - timedelta(weeks=2) ctx.invoke(incremental, since=since, destination=destination, db=db) if __name__ == "__main__": cli()