pygossmap: cleanups and optimizations

- moves offset into GossipHeader hdr which is passed to all constuctors
 - reads .flags as u16 instead of extracting it from the .length, see 0274d88ba
 - adds zombie and ratelimit flag to GossipHeader
 - bytes_read start at 0 instead of 1 which is more correct,
   the one byte is then corrected for when setting the offset of new header.
 - bytes_read is increased in pull_bytes as this is the only place where
   something is read
 - use new style for various format-strings
This commit is contained in:
Michael Schmoock
2023-02-14 18:28:45 +01:00
committed by Rusty Russell
parent 882cafd3c7
commit 3f651b08d5

View File

@@ -3,7 +3,7 @@
from pyln.spec.bolt7 import (channel_announcement, channel_update, from pyln.spec.bolt7 import (channel_announcement, channel_update,
node_announcement) node_announcement)
from pyln.proto import ShortChannelId, PublicKey from pyln.proto import ShortChannelId, PublicKey
from typing import Any, Dict, List, Optional, Union, cast from typing import Any, Dict, List, Optional, Union
import io import io
import struct import struct
@@ -11,10 +11,10 @@ import struct
# These duplicate constants in lightning/common/gossip_store.h # These duplicate constants in lightning/common/gossip_store.h
GOSSIP_STORE_MAJOR_VERSION = (0 << 5) GOSSIP_STORE_MAJOR_VERSION = (0 << 5)
GOSSIP_STORE_MAJOR_VERSION_MASK = 0xE0 GOSSIP_STORE_MAJOR_VERSION_MASK = 0xE0
GOSSIP_STORE_LEN_DELETED_BIT = 0x80000000 GOSSIP_STORE_LEN_DELETED_BIT = 0x8000
GOSSIP_STORE_LEN_PUSH_BIT = 0x40000000 GOSSIP_STORE_LEN_PUSH_BIT = 0x4000
GOSSIP_STORE_LEN_RATELIMIT_BIT = 0x20000000 GOSSIP_STORE_LEN_RATELIMIT_BIT = 0x2000
GOSSIP_STORE_LEN_MASK = (0x0000FFFF) GOSSIP_STORE_ZOMBIE_BIT = 0x1000
# These duplicate constants in lightning/gossipd/gossip_store_wiregen.h # These duplicate constants in lightning/gossipd/gossip_store_wiregen.h
WIRE_GOSSIP_STORE_PRIVATE_CHANNEL = 4104 WIRE_GOSSIP_STORE_PRIVATE_CHANNEL = 4104
@@ -25,33 +25,35 @@ WIRE_GOSSIP_STORE_CHANNEL_AMOUNT = 4101
class GossipStoreHeader(object): class GossipStoreHeader(object):
def __init__(self, buf: bytes): def __init__(self, buf: bytes, off: int):
length, self.crc, self.timestamp = struct.unpack('>III', buf) self.flags, self.length, self.crc, self.timestamp = struct.unpack('>HHII', buf)
self.deleted = (length & GOSSIP_STORE_LEN_DELETED_BIT) != 0 self.off = off
self.length = (length & GOSSIP_STORE_LEN_MASK) self.deleted = (self.flags & GOSSIP_STORE_LEN_DELETED_BIT) != 0
self.ratelimit = (self.flags & GOSSIP_STORE_LEN_RATELIMIT_BIT) != 0
self.zombie = (self.flags & GOSSIP_STORE_ZOMBIE_BIT) != 0
class GossmapHalfchannel(object): class GossmapHalfchannel(object):
"""One direction of a GossmapChannel.""" """One direction of a GossmapChannel."""
def __init__(self, channel: 'GossmapChannel', direction: int, def __init__(self, channel: 'GossmapChannel', direction: int,
timestamp: int, cltv_expiry_delta: int, fields: Dict[str, Any], hdr: GossipStoreHeader):
htlc_minimum_msat: int, htlc_maximum_msat: int, assert direction in [0, 1], "direction can only be 0 or 1"
fee_base_msat: int, fee_proportional_millionths: int):
self.channel = channel self.channel = channel
self.direction = direction self.direction = direction
self.source = channel.node1 if direction == 0 else channel.node2 self.source = channel.node1 if direction == 0 else channel.node2
self.destination = channel.node2 if direction == 0 else channel.node1 self.destination = channel.node2 if direction == 0 else channel.node1
self.fields: Dict[str, Any] = fields
self.hdr: GossipStoreHeader = hdr
self.timestamp: int = timestamp self.timestamp: int = fields['timestamp']
self.cltv_expiry_delta: int = cltv_expiry_delta self.cltv_expiry_delta: int = fields['cltv_expiry_delta']
self.htlc_minimum_msat: int = htlc_minimum_msat self.htlc_minimum_msat: int = fields['htlc_minimum_msat']
self.htlc_maximum_msat: Optional[int] = htlc_maximum_msat self.htlc_maximum_msat: Optional[int] = fields.get('htlc_maximum_msat', None)
self.fee_base_msat: int = fee_base_msat self.fee_base_msat: int = fields['fee_base_msat']
self.fee_proportional_millionths: int = fee_proportional_millionths self.fee_proportional_millionths: int = fields['fee_proportional_millionths']
def __repr__(self): def __repr__(self):
return "GossmapHalfchannel[{}x{}]".format(str(self.channel.scid), self.direction) return f"GossmapHalfchannel[{self._scidd}]"
class GossmapNodeId(object): class GossmapNodeId(object):
@@ -91,45 +93,36 @@ class GossmapNodeId(object):
class GossmapChannel(object): class GossmapChannel(object):
"""A channel: fields of channel_announcement are in .fields, optional updates are in .updates_fields, which can be None if there has been no channel update.""" """A channel: fields of channel_announcement are in .fields,
optional updates are in .half_channels[0/1].fields """
def __init__(self, def __init__(self,
fields: Dict[str, Any], fields: Dict[str, Any],
announce_offset: int, scid: Union[ShortChannelId, str],
scid,
node1: 'GossmapNode', node1: 'GossmapNode',
node2: 'GossmapNode', node2: 'GossmapNode',
is_private: bool): is_private: bool,
self.fields = fields hdr: GossipStoreHeader):
self.announce_offset = announce_offset self.fields: Dict[str, Any] = fields
self.hdr: GossipStoreHeader = hdr
self.is_private = is_private self.is_private = is_private
self.scid = scid self.scid = ShortChannelId.from_str(scid) if isinstance(scid, str) else scid
self.node1 = node1 self.node1 = node1
self.node2 = node2 self.node2 = node2
self.updates_fields: List[Optional[Dict[str, Any]]] = [None, None]
self.updates_offset: List[Optional[int]] = [None, None]
self.satoshis = None self.satoshis = None
self.half_channels: List[Optional[GossmapHalfchannel]] = [None, None] self.half_channels: List[Optional[GossmapHalfchannel]] = [None, None]
def _update_channel(self, def _update_channel(self,
direction: int, direction: int,
fields: Dict[str, Any], fields: Dict[str, Any],
off: int): hdr: GossipStoreHeader):
self.updates_fields[direction] = fields
self.updates_offset[direction] = off
half = GossmapHalfchannel(self, direction, half = GossmapHalfchannel(self, direction, fields, hdr)
fields['timestamp'],
fields['cltv_expiry_delta'],
fields['htlc_minimum_msat'],
fields.get('htlc_maximum_msat', None),
fields['fee_base_msat'],
fields['fee_proportional_millionths'])
self.half_channels[direction] = half self.half_channels[direction] = half
def get_direction(self, direction: int): def get_direction(self, direction: int):
""" returns the GossmapHalfchannel if known by channel_update """ """ returns the GossmapHalfchannel if known by channel_update """
if not 0 <= direction <= 1: assert direction in [0, 1], "direction can only be 0 or 1"
raise ValueError("direction can only be 0 or 1")
return self.half_channels[direction] return self.half_channels[direction]
def __repr__(self): def __repr__(self):
@@ -137,20 +130,19 @@ class GossmapChannel(object):
class GossmapNode(object): class GossmapNode(object):
"""A node: fields of node_announcement are in .announce_fields, which can be None of there has been no node announcement. """A node: fields of node_announcement are in .fields,
which can be None if there has been no node announcement.
.channels is a list of the GossmapChannels attached to this node. .channels is a list of the GossmapChannels attached to this node."""
"""
def __init__(self, node_id: Union[GossmapNodeId, bytes, str]): def __init__(self, node_id: Union[GossmapNodeId, bytes, str]):
if isinstance(node_id, bytes) or isinstance(node_id, str): if isinstance(node_id, bytes) or isinstance(node_id, str):
node_id = GossmapNodeId(node_id) node_id = GossmapNodeId(node_id)
self.announce_fields: Optional[Dict[str, Any]] = None self.fields: Optional[Dict[str, Any]] = None
self.announce_offset: Optional[int] = None self.hdr: GossipStoreHeader = None
self.channels: List[GossmapChannel] = [] self.channels: List[GossmapChannel] = []
self.node_id = node_id self.node_id = node_id
def __repr__(self): def __repr__(self):
return "GossmapNode[{}]".format(self.node_id.nodeid.hex()) return f"GossmapNode[{self.node_id.nodeid.hex()}]"
def __eq__(self, other): def __eq__(self, other):
if not isinstance(other, GossmapNode): if not isinstance(other, GossmapNode):
@@ -169,25 +161,25 @@ class Gossmap(object):
self.store_filename = store_filename self.store_filename = store_filename
self.store_file = open(store_filename, "rb") self.store_file = open(store_filename, "rb")
self.store_buf = bytes() self.store_buf = bytes()
self.bytes_read = 0
self.nodes: Dict[GossmapNodeId, GossmapNode] = {} self.nodes: Dict[GossmapNodeId, GossmapNode] = {}
self.channels: Dict[ShortChannelId, GossmapChannel] = {} self.channels: Dict[ShortChannelId, GossmapChannel] = {}
self._last_scid: Optional[str] = None self._last_scid: Optional[str] = None
version = self.store_file.read(1)[0] version = self.store_file.read(1)[0]
if (version & GOSSIP_STORE_MAJOR_VERSION_MASK) != GOSSIP_STORE_MAJOR_VERSION: if (version & GOSSIP_STORE_MAJOR_VERSION_MASK) != GOSSIP_STORE_MAJOR_VERSION:
raise ValueError("Invalid gossip store version {}".format(version)) raise ValueError("Invalid gossip store version {}".format(version))
self.bytes_read = 1 self.processing_time = 0
self.orphan_channel_updates = set()
self.refresh() self.refresh()
def _new_channel(self, def _new_channel(self,
fields: Dict[str, Any], fields: Dict[str, Any],
announce_offset: int,
scid: ShortChannelId, scid: ShortChannelId,
node1: GossmapNode, node1: GossmapNode,
node2: GossmapNode, node2: GossmapNode,
is_private: bool): is_private: bool,
c = GossmapChannel(fields, announce_offset, hdr: GossipStoreHeader):
scid, node1, node2, c = GossmapChannel(fields, scid, node1, node2, is_private, hdr)
is_private)
self._last_scid = scid self._last_scid = scid
self.channels[scid] = c self.channels[scid] = c
node1.channels.append(c) node1.channels.append(c)
@@ -204,7 +196,7 @@ class Gossmap(object):
if len(c.node2.channels) == 0: if len(c.node2.channels) == 0:
del self.nodes[c.node2.node_id] del self.nodes[c.node2.node_id]
def _add_channel(self, rec: bytes, off: int, is_private: bool): def _add_channel(self, rec: bytes, is_private: bool, hdr: GossipStoreHeader):
fields = channel_announcement.read(io.BytesIO(rec[2:]), {}) fields = channel_announcement.read(io.BytesIO(rec[2:]), {})
# Add nodes one the fly # Add nodes one the fly
node1_id = GossmapNodeId(fields['node_id_1']) node1_id = GossmapNodeId(fields['node_id_1'])
@@ -213,17 +205,17 @@ class Gossmap(object):
self.nodes[node1_id] = GossmapNode(node1_id) self.nodes[node1_id] = GossmapNode(node1_id)
if node2_id not in self.nodes: if node2_id not in self.nodes:
self.nodes[node2_id] = GossmapNode(node2_id) self.nodes[node2_id] = GossmapNode(node2_id)
self._new_channel(fields, off, self._new_channel(fields,
ShortChannelId.from_int(fields['short_channel_id']), ShortChannelId.from_int(fields['short_channel_id']),
self.get_node(node1_id), self.get_node(node2_id), self.get_node(node1_id), self.get_node(node2_id),
is_private) is_private, hdr)
def _set_channel_amount(self, rec: bytes): def _set_channel_amount(self, rec: bytes):
""" Sets channel capacity of last added channel """ """ Sets channel capacity of last added channel """
sats, = struct.unpack(">Q", rec[2:]) sats, = struct.unpack(">Q", rec[2:])
self.channels[self._last_scid].satoshis = sats self.channels[self._last_scid].satoshis = sats
def get_channel(self, short_channel_id: ShortChannelId): def get_channel(self, short_channel_id: Union[ShortChannelId, str]):
""" Resolves a channel by its short channel id """ """ Resolves a channel by its short channel id """
if isinstance(short_channel_id, str): if isinstance(short_channel_id, str):
short_channel_id = ShortChannelId.from_str(short_channel_id) short_channel_id = ShortChannelId.from_str(short_channel_id)
@@ -233,23 +225,29 @@ class Gossmap(object):
""" Resolves a node by its public key node_id """ """ Resolves a node by its public key node_id """
if isinstance(node_id, str): if isinstance(node_id, str):
node_id = GossmapNodeId.from_str(node_id) node_id = GossmapNodeId.from_str(node_id)
return self.nodes.get(cast(GossmapNodeId, node_id)) return self.nodes.get(node_id)
def _update_channel(self, rec: bytes, off: int): def _update_channel(self, rec: bytes, hdr: GossipStoreHeader):
fields = channel_update.read(io.BytesIO(rec[2:]), {}) fields = channel_update.read(io.BytesIO(rec[2:]), {})
direction = fields['channel_flags'] & 1 direction = fields['channel_flags'] & 1
c = self.channels[ShortChannelId.from_int(fields['short_channel_id'])] scid = ShortChannelId.from_int(fields['short_channel_id'])
c._update_channel(direction, fields, off) if scid in self.channels:
c = self.channels[scid]
c._update_channel(direction, fields, hdr)
else:
self.orphan_channel_updates.add(scid)
def _add_node_announcement(self, rec: bytes, off: int): def _add_node_announcement(self, rec: bytes, hdr: GossipStoreHeader):
fields = node_announcement.read(io.BytesIO(rec[2:]), {}) fields = node_announcement.read(io.BytesIO(rec[2:]), {})
node_id = GossmapNodeId(fields['node_id']) node_id = GossmapNodeId(fields['node_id'])
self.nodes[node_id].announce_fields = fields if node_id not in self.nodes:
self.nodes[node_id].announce_offset = off self.nodes[node_id] = GossmapNode(node_id)
node = self.nodes[node_id]
node.fields = fields
node.hdr = hdr
def reopen_store(self): def reopen_store(self):
"""FIXME: Implement!""" assert False, "FIXME: Implement!"
assert False
def _remove_channel_by_deletemsg(self, rec: bytes): def _remove_channel_by_deletemsg(self, rec: bytes):
scidint, = struct.unpack(">Q", rec[2:]) scidint, = struct.unpack(">Q", rec[2:])
@@ -261,52 +259,51 @@ class Gossmap(object):
def _pull_bytes(self, length: int) -> bool: def _pull_bytes(self, length: int) -> bool:
"""Pull bytes from file into our internal buffer""" """Pull bytes from file into our internal buffer"""
if len(self.store_buf) < length: if len(self.store_buf) < length:
self.store_buf += self.store_file.read(length self.store_buf += self.store_file.read(length - len(self.store_buf))
- len(self.store_buf)) self.bytes_read += len(self.store_buf)
return len(self.store_buf) >= length return len(self.store_buf) >= length
def _read_record(self) -> Optional[bytes]: def _read_record(self) -> Optional[bytes]:
"""If a whole record is not in the file, returns None. """If a whole record is not in the file, returns None.
If deleted, returns empty.""" If deleted, returns empty."""
off = self.bytes_read + 1
if not self._pull_bytes(12): if not self._pull_bytes(12):
return None return None, None
hdr = GossipStoreHeader(self.store_buf[:12]) hdr = GossipStoreHeader(self.store_buf[:12], off)
if not self._pull_bytes(12 + hdr.length): if not self._pull_bytes(12 + hdr.length):
return None return None, hdr
self.bytes_read += len(self.store_buf) rec = self.store_buf[12:]
ret = self.store_buf[12:]
self.store_buf = bytes() self.store_buf = bytes()
if hdr.deleted: return rec, hdr
ret = bytes()
return ret
def refresh(self): def refresh(self):
"""Catch up with any changes to the gossip store""" """Catch up with any changes to the gossip store"""
while True: while True:
off = self.bytes_read rec, hdr = self._read_record()
rec = self._read_record() if rec is None: # EOF
# EOF?
if rec is None:
break break
# Deleted? if hdr.deleted: # Skip deleted records
if len(rec) == 0: continue
if hdr.zombie:
continue continue
rectype, = struct.unpack(">H", rec[:2]) rectype, = struct.unpack(">H", rec[:2])
if rectype == channel_announcement.number: if rectype == channel_announcement.number:
self._add_channel(rec, off, False) self._add_channel(rec, False, hdr)
elif rectype == WIRE_GOSSIP_STORE_PRIVATE_CHANNEL: elif rectype == WIRE_GOSSIP_STORE_PRIVATE_CHANNEL:
self._add_channel(rec[2 + 8 + 2:], off + 2 + 8 + 2, True) hdr.off += 2 + 8 + 2
self._add_channel(rec[2 + 8 + 2:], True, hdr)
elif rectype == WIRE_GOSSIP_STORE_CHANNEL_AMOUNT: elif rectype == WIRE_GOSSIP_STORE_CHANNEL_AMOUNT:
self._set_channel_amount(rec) self._set_channel_amount(rec)
elif rectype == channel_update.number: elif rectype == channel_update.number:
self._update_channel(rec, off) self._update_channel(rec, hdr)
elif rectype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE: elif rectype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE:
self._update_channel(rec[2 + 2:], off + 2 + 2) hdr.off += 2 + 2
self._update_channel(rec[2 + 2:], hdr)
elif rectype == WIRE_GOSSIP_STORE_DELETE_CHAN: elif rectype == WIRE_GOSSIP_STORE_DELETE_CHAN:
self._remove_channel_by_deletemsg(rec) self._remove_channel_by_deletemsg(rec)
elif rectype == node_announcement.number: elif rectype == node_announcement.number:
self._add_node_announcement(rec, off) self._add_node_announcement(rec, hdr)
elif rectype == WIRE_GOSSIP_STORE_ENDED: elif rectype == WIRE_GOSSIP_STORE_ENDED:
self.reopen_store() self.reopen_store()
else: else: