diff --git a/contrib/pyln-client/pyln/client/gossmap.py b/contrib/pyln-client/pyln/client/gossmap.py index 4d72209e8..79e1d26c5 100755 --- a/contrib/pyln-client/pyln/client/gossmap.py +++ b/contrib/pyln-client/pyln/client/gossmap.py @@ -3,7 +3,7 @@ from pyln.spec.bolt7 import (channel_announcement, channel_update, node_announcement) from pyln.proto import ShortChannelId, PublicKey -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Union import io import struct @@ -11,10 +11,10 @@ import struct # These duplicate constants in lightning/common/gossip_store.h GOSSIP_STORE_MAJOR_VERSION = (0 << 5) GOSSIP_STORE_MAJOR_VERSION_MASK = 0xE0 -GOSSIP_STORE_LEN_DELETED_BIT = 0x80000000 -GOSSIP_STORE_LEN_PUSH_BIT = 0x40000000 -GOSSIP_STORE_LEN_RATELIMIT_BIT = 0x20000000 -GOSSIP_STORE_LEN_MASK = (0x0000FFFF) +GOSSIP_STORE_LEN_DELETED_BIT = 0x8000 +GOSSIP_STORE_LEN_PUSH_BIT = 0x4000 +GOSSIP_STORE_LEN_RATELIMIT_BIT = 0x2000 +GOSSIP_STORE_ZOMBIE_BIT = 0x1000 # These duplicate constants in lightning/gossipd/gossip_store_wiregen.h WIRE_GOSSIP_STORE_PRIVATE_CHANNEL = 4104 @@ -25,33 +25,35 @@ WIRE_GOSSIP_STORE_CHANNEL_AMOUNT = 4101 class GossipStoreHeader(object): - def __init__(self, buf: bytes): - length, self.crc, self.timestamp = struct.unpack('>III', buf) - self.deleted = (length & GOSSIP_STORE_LEN_DELETED_BIT) != 0 - self.length = (length & GOSSIP_STORE_LEN_MASK) + def __init__(self, buf: bytes, off: int): + self.flags, self.length, self.crc, self.timestamp = struct.unpack('>HHII', buf) + self.off = off + self.deleted = (self.flags & GOSSIP_STORE_LEN_DELETED_BIT) != 0 + self.ratelimit = (self.flags & GOSSIP_STORE_LEN_RATELIMIT_BIT) != 0 + self.zombie = (self.flags & GOSSIP_STORE_ZOMBIE_BIT) != 0 class GossmapHalfchannel(object): """One direction of a GossmapChannel.""" def __init__(self, channel: 'GossmapChannel', direction: int, - timestamp: int, cltv_expiry_delta: int, - htlc_minimum_msat: int, htlc_maximum_msat: int, - fee_base_msat: int, fee_proportional_millionths: int): - + fields: Dict[str, Any], hdr: GossipStoreHeader): + assert direction in [0, 1], "direction can only be 0 or 1" self.channel = channel self.direction = direction self.source = channel.node1 if direction == 0 else channel.node2 self.destination = channel.node2 if direction == 0 else channel.node1 + self.fields: Dict[str, Any] = fields + self.hdr: GossipStoreHeader = hdr - self.timestamp: int = timestamp - self.cltv_expiry_delta: int = cltv_expiry_delta - self.htlc_minimum_msat: int = htlc_minimum_msat - self.htlc_maximum_msat: Optional[int] = htlc_maximum_msat - self.fee_base_msat: int = fee_base_msat - self.fee_proportional_millionths: int = fee_proportional_millionths + self.timestamp: int = fields['timestamp'] + self.cltv_expiry_delta: int = fields['cltv_expiry_delta'] + self.htlc_minimum_msat: int = fields['htlc_minimum_msat'] + self.htlc_maximum_msat: Optional[int] = fields.get('htlc_maximum_msat', None) + self.fee_base_msat: int = fields['fee_base_msat'] + self.fee_proportional_millionths: int = fields['fee_proportional_millionths'] def __repr__(self): - return "GossmapHalfchannel[{}x{}]".format(str(self.channel.scid), self.direction) + return f"GossmapHalfchannel[{self._scidd}]" class GossmapNodeId(object): @@ -91,45 +93,36 @@ class GossmapNodeId(object): class GossmapChannel(object): - """A channel: fields of channel_announcement are in .fields, optional updates are in .updates_fields, which can be None if there has been no channel update.""" + """A channel: fields of channel_announcement are in .fields, + optional updates are in .half_channels[0/1].fields """ def __init__(self, fields: Dict[str, Any], - announce_offset: int, - scid, + scid: Union[ShortChannelId, str], node1: 'GossmapNode', node2: 'GossmapNode', - is_private: bool): - self.fields = fields - self.announce_offset = announce_offset + is_private: bool, + hdr: GossipStoreHeader): + self.fields: Dict[str, Any] = fields + self.hdr: GossipStoreHeader = hdr + self.is_private = is_private - self.scid = scid + self.scid = ShortChannelId.from_str(scid) if isinstance(scid, str) else scid self.node1 = node1 self.node2 = node2 - self.updates_fields: List[Optional[Dict[str, Any]]] = [None, None] - self.updates_offset: List[Optional[int]] = [None, None] self.satoshis = None self.half_channels: List[Optional[GossmapHalfchannel]] = [None, None] def _update_channel(self, direction: int, fields: Dict[str, Any], - off: int): - self.updates_fields[direction] = fields - self.updates_offset[direction] = off + hdr: GossipStoreHeader): - half = GossmapHalfchannel(self, direction, - fields['timestamp'], - fields['cltv_expiry_delta'], - fields['htlc_minimum_msat'], - fields.get('htlc_maximum_msat', None), - fields['fee_base_msat'], - fields['fee_proportional_millionths']) + half = GossmapHalfchannel(self, direction, fields, hdr) self.half_channels[direction] = half def get_direction(self, direction: int): """ returns the GossmapHalfchannel if known by channel_update """ - if not 0 <= direction <= 1: - raise ValueError("direction can only be 0 or 1") + assert direction in [0, 1], "direction can only be 0 or 1" return self.half_channels[direction] def __repr__(self): @@ -137,20 +130,19 @@ class GossmapChannel(object): class GossmapNode(object): - """A node: fields of node_announcement are in .announce_fields, which can be None of there has been no node announcement. - -.channels is a list of the GossmapChannels attached to this node. -""" + """A node: fields of node_announcement are in .fields, + which can be None if there has been no node announcement. + .channels is a list of the GossmapChannels attached to this node.""" def __init__(self, node_id: Union[GossmapNodeId, bytes, str]): if isinstance(node_id, bytes) or isinstance(node_id, str): node_id = GossmapNodeId(node_id) - self.announce_fields: Optional[Dict[str, Any]] = None - self.announce_offset: Optional[int] = None + self.fields: Optional[Dict[str, Any]] = None + self.hdr: GossipStoreHeader = None self.channels: List[GossmapChannel] = [] self.node_id = node_id def __repr__(self): - return "GossmapNode[{}]".format(self.node_id.nodeid.hex()) + return f"GossmapNode[{self.node_id.nodeid.hex()}]" def __eq__(self, other): if not isinstance(other, GossmapNode): @@ -169,25 +161,25 @@ class Gossmap(object): self.store_filename = store_filename self.store_file = open(store_filename, "rb") self.store_buf = bytes() + self.bytes_read = 0 self.nodes: Dict[GossmapNodeId, GossmapNode] = {} self.channels: Dict[ShortChannelId, GossmapChannel] = {} self._last_scid: Optional[str] = None version = self.store_file.read(1)[0] if (version & GOSSIP_STORE_MAJOR_VERSION_MASK) != GOSSIP_STORE_MAJOR_VERSION: raise ValueError("Invalid gossip store version {}".format(version)) - self.bytes_read = 1 + self.processing_time = 0 + self.orphan_channel_updates = set() self.refresh() def _new_channel(self, fields: Dict[str, Any], - announce_offset: int, scid: ShortChannelId, node1: GossmapNode, node2: GossmapNode, - is_private: bool): - c = GossmapChannel(fields, announce_offset, - scid, node1, node2, - is_private) + is_private: bool, + hdr: GossipStoreHeader): + c = GossmapChannel(fields, scid, node1, node2, is_private, hdr) self._last_scid = scid self.channels[scid] = c node1.channels.append(c) @@ -204,7 +196,7 @@ class Gossmap(object): if len(c.node2.channels) == 0: del self.nodes[c.node2.node_id] - def _add_channel(self, rec: bytes, off: int, is_private: bool): + def _add_channel(self, rec: bytes, is_private: bool, hdr: GossipStoreHeader): fields = channel_announcement.read(io.BytesIO(rec[2:]), {}) # Add nodes one the fly node1_id = GossmapNodeId(fields['node_id_1']) @@ -213,17 +205,17 @@ class Gossmap(object): self.nodes[node1_id] = GossmapNode(node1_id) if node2_id not in self.nodes: self.nodes[node2_id] = GossmapNode(node2_id) - self._new_channel(fields, off, + self._new_channel(fields, ShortChannelId.from_int(fields['short_channel_id']), self.get_node(node1_id), self.get_node(node2_id), - is_private) + is_private, hdr) def _set_channel_amount(self, rec: bytes): """ Sets channel capacity of last added channel """ sats, = struct.unpack(">Q", rec[2:]) self.channels[self._last_scid].satoshis = sats - def get_channel(self, short_channel_id: ShortChannelId): + def get_channel(self, short_channel_id: Union[ShortChannelId, str]): """ Resolves a channel by its short channel id """ if isinstance(short_channel_id, str): short_channel_id = ShortChannelId.from_str(short_channel_id) @@ -233,23 +225,29 @@ class Gossmap(object): """ Resolves a node by its public key node_id """ if isinstance(node_id, str): node_id = GossmapNodeId.from_str(node_id) - return self.nodes.get(cast(GossmapNodeId, node_id)) + return self.nodes.get(node_id) - def _update_channel(self, rec: bytes, off: int): + def _update_channel(self, rec: bytes, hdr: GossipStoreHeader): fields = channel_update.read(io.BytesIO(rec[2:]), {}) direction = fields['channel_flags'] & 1 - c = self.channels[ShortChannelId.from_int(fields['short_channel_id'])] - c._update_channel(direction, fields, off) + scid = ShortChannelId.from_int(fields['short_channel_id']) + if scid in self.channels: + c = self.channels[scid] + c._update_channel(direction, fields, hdr) + else: + self.orphan_channel_updates.add(scid) - def _add_node_announcement(self, rec: bytes, off: int): + def _add_node_announcement(self, rec: bytes, hdr: GossipStoreHeader): fields = node_announcement.read(io.BytesIO(rec[2:]), {}) node_id = GossmapNodeId(fields['node_id']) - self.nodes[node_id].announce_fields = fields - self.nodes[node_id].announce_offset = off + if node_id not in self.nodes: + self.nodes[node_id] = GossmapNode(node_id) + node = self.nodes[node_id] + node.fields = fields + node.hdr = hdr def reopen_store(self): - """FIXME: Implement!""" - assert False + assert False, "FIXME: Implement!" def _remove_channel_by_deletemsg(self, rec: bytes): scidint, = struct.unpack(">Q", rec[2:]) @@ -261,52 +259,51 @@ class Gossmap(object): def _pull_bytes(self, length: int) -> bool: """Pull bytes from file into our internal buffer""" if len(self.store_buf) < length: - self.store_buf += self.store_file.read(length - - len(self.store_buf)) + self.store_buf += self.store_file.read(length - len(self.store_buf)) + self.bytes_read += len(self.store_buf) return len(self.store_buf) >= length def _read_record(self) -> Optional[bytes]: """If a whole record is not in the file, returns None. If deleted, returns empty.""" + off = self.bytes_read + 1 if not self._pull_bytes(12): - return None - hdr = GossipStoreHeader(self.store_buf[:12]) + return None, None + hdr = GossipStoreHeader(self.store_buf[:12], off) if not self._pull_bytes(12 + hdr.length): - return None - self.bytes_read += len(self.store_buf) - ret = self.store_buf[12:] + return None, hdr + rec = self.store_buf[12:] self.store_buf = bytes() - if hdr.deleted: - ret = bytes() - return ret + return rec, hdr def refresh(self): """Catch up with any changes to the gossip store""" while True: - off = self.bytes_read - rec = self._read_record() - # EOF? - if rec is None: + rec, hdr = self._read_record() + if rec is None: # EOF break - # Deleted? - if len(rec) == 0: + if hdr.deleted: # Skip deleted records + continue + if hdr.zombie: continue rectype, = struct.unpack(">H", rec[:2]) if rectype == channel_announcement.number: - self._add_channel(rec, off, False) + self._add_channel(rec, False, hdr) elif rectype == WIRE_GOSSIP_STORE_PRIVATE_CHANNEL: - self._add_channel(rec[2 + 8 + 2:], off + 2 + 8 + 2, True) + hdr.off += 2 + 8 + 2 + self._add_channel(rec[2 + 8 + 2:], True, hdr) elif rectype == WIRE_GOSSIP_STORE_CHANNEL_AMOUNT: self._set_channel_amount(rec) elif rectype == channel_update.number: - self._update_channel(rec, off) + self._update_channel(rec, hdr) elif rectype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE: - self._update_channel(rec[2 + 2:], off + 2 + 2) + hdr.off += 2 + 2 + self._update_channel(rec[2 + 2:], hdr) elif rectype == WIRE_GOSSIP_STORE_DELETE_CHAN: self._remove_channel_by_deletemsg(rec) elif rectype == node_announcement.number: - self._add_node_announcement(rec, off) + self._add_node_announcement(rec, hdr) elif rectype == WIRE_GOSSIP_STORE_ENDED: self.reopen_store() else: