Files
plugins/historian/historian.py
Peter Sebastian Nordholt ca75d7e132 Fix string formating.
2021-02-05 09:05:16 +01:00

204 lines
6.2 KiB
Python
Executable File

#!/usr/bin/env python3
from inotify import constants
from inotify.adapters import Inotify
from pyln.client import Plugin
from sqlalchemy import create_engine
from sqlalchemy import desc
from sqlalchemy.orm import sessionmaker
from threading import Thread
from common import Base, ChannelAnnouncement, ChannelUpdate, NodeAnnouncement
import logging
import gossipd
import struct
import time
plugin = Plugin()
class FsMonitor(Thread):
def __init__(self):
pass
def run(self):
watch_mask = constants.IN_ALL_EVENTS
print("Starting FsMonitor")
i = Inotify()
i.add_watch('gossip_store', mask=watch_mask)
for event in i.event_gen(yield_nones=False):
(e, type_names, path, filename) = event
if e.mask & constants.IN_DELETE_SELF:
i.remove_watch('gossip_store')
i.add_watch('gossip_store', mask=watch_mask)
class FileTailer():
def __init__(self, filename):
self.filename = filename
self.pos = 1
self.version = None
def resume(self):
ev_count = 0
with open(self.filename, 'rb') as f:
self.version, = struct.unpack("!B", f.read(1))
f.seek(self.pos)
while True:
diff = 8
hdr = f.read(8)
if len(hdr) < 8:
break
length, crc = struct.unpack("!II", hdr)
if self.version > 3:
f.read(4) # Throw away the CRC
diff += 4
# deleted = (length & 0x80000000 != 0)
# important = (length & 0x40000000 != 0)
length = length & (~0x80000000) & (~0x40000000)
if length > 1000:
raise ValueError(
f"Unreasonably large message: {length} bytes long"
)
msg = f.read(length)
# Incomplete write, will try again
if len(msg) < length:
logging.debug(
f"Partial read: {len(msg)}<{length}, waiting 1 second"
)
time.sleep(1)
f.seek(self.pos)
continue
diff += length
# Strip eventual wrappers:
typ, = struct.unpack("!H", msg[:2])
if self.version <= 3 and typ in [4096, 4097, 4098]:
msg = msg[4:]
self.pos += diff
if typ in [4101, 3503]:
f.seek(self.pos)
continue
ev_count += 1
yield msg
logging.debug(
f"Reached end of {self.filename} at {self.pos} after {ev_count} "
"new messages, waiting for new fs event"
)
def wait_actionable(self, i):
for event in i.event_gen(yield_nones=False):
if event[0].mask & constants.IN_DELETE_SELF:
return 'swap'
if event[0].mask & constants.IN_MODIFY:
return 'append'
def tail(self):
watch_mask = (constants.IN_ALL_EVENTS ^ constants.IN_ACCESS ^
constants.IN_OPEN ^ constants.IN_CLOSE_NOWRITE)
i = Inotify()
i.add_watch(self.filename, mask=watch_mask)
while True:
# Consume as much as possible.
yield from self.resume()
# Now wait for a change that we can react to
ev = self.wait_actionable(i)
if ev == 'append':
continue
if ev == 'swap':
# Need to reach around since file-deletion removes C watches,
# but not the python one...
try:
i.remove_watch(self.filename)
except Exception:
pass
i.add_watch(self.filename, mask=watch_mask)
self.pos = 1
continue
class Flusher(Thread):
def __init__(self, engine):
Thread.__init__(self)
self.engine = engine
self.session_maker = sessionmaker(bind=engine)
self.session = None
def run(self):
logging.info("Starting flusher")
ft = FileTailer('gossip_store')
last_flush = time.time()
self.session = self.session_maker()
for i, e in enumerate(ft.tail()):
self.store(e)
if last_flush < time.time() - 10:
self.session.commit()
self.session = self.session_maker()
last_flush = time.time()
logging.warn("Filetailer exited...")
def store(self, raw: bytes) -> None:
try:
msg = gossipd.parse(raw)
cls = None
if isinstance(msg, gossipd.ChannelUpdate):
cls = ChannelUpdate
elif isinstance(msg, gossipd.ChannelAnnouncement):
cls = ChannelAnnouncement
elif isinstance(msg, gossipd.NodeAnnouncement):
cls = NodeAnnouncement
else:
return;
self.session.merge(cls.from_gossip(msg, raw))
except Exception as e:
logging.warn(f"Exception parsing gossip message: {e}")
@plugin.init()
def init(plugin, configuration, options):
print(options)
engine = create_engine(options['historian-dsn'], echo=False)
Base.metadata.create_all(engine)
plugin.engine = engine
Flusher(engine).start()
@plugin.method('historian-stats')
def stats(plugin):
engine = plugin.engine
session_maker = sessionmaker(bind=engine)
session = session_maker()
return {
'channel_announcements': session.query(ChannelAnnouncement).count(),
'channel_updates': session.query(ChannelUpdate).count(),
'node_announcements': session.query(NodeAnnouncement).count(),
'latest_node_announcement': session.query(NodeAnnouncement).order_by(desc(NodeAnnouncement.timestamp)).limit(1).first(),
'latest_channel_update': session.query(ChannelUpdate).order_by(desc(ChannelUpdate.timestamp)).limit(1).first(),
}
plugin.add_option(
'historian-dsn',
'sqlite:///historian.sqlite3',
"SQL DSN defining where the gossip data should be stored."
)
plugin.run()