mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-19 15:14:23 +01:00
pyln.proto.message.*: add type annotations.
Other changes along the way:
1. In a couple of places we passed None as a dummy for for
`otherfields` where {} is just as good.
2. Turned bytes into hex for errors.
3. Remove nonsensical (unused) get_tlv_by_number() function from MessageNamespace
4. Renamed unrelated-but-overlapping `field_from_csv` and
`type_from_csv` static methods, since mypy thought they should have
the same type.
5. Unknown tlv fields are placed in dict as strings, not ints, for
type simplicity.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
committed by
Christian Decker
parent
da070e73b2
commit
f52065201b
@@ -1,19 +1,20 @@
|
||||
import struct
|
||||
import io
|
||||
from .fundamental_types import fundamental_types, BigSizeType, split_field, try_unpack
|
||||
from io import BufferedIOBase, BytesIO
|
||||
from .fundamental_types import fundamental_types, BigSizeType, split_field, try_unpack, FieldType
|
||||
from .array_types import (
|
||||
SizedArrayType, DynamicArrayType, LengthFieldType, EllipsisArrayType
|
||||
)
|
||||
from typing import Dict, List, Optional, Tuple, Any, cast
|
||||
|
||||
|
||||
class MessageNamespace(object):
|
||||
"""A class which contains all FieldTypes and Messages in a particular
|
||||
domain, such as within a given BOLT"""
|
||||
def __init__(self, csv_lines=[]):
|
||||
self.subtypes = {}
|
||||
self.fundamentaltypes = {}
|
||||
self.tlvtypes = {}
|
||||
self.messagetypes = {}
|
||||
def __init__(self, csv_lines: List[str] = []):
|
||||
self.subtypes: Dict[str, SubtypeType] = {}
|
||||
self.fundamentaltypes: Dict[str, SubtypeType] = {}
|
||||
self.tlvtypes: Dict[str, TlvStreamType] = {}
|
||||
self.messagetypes: Dict[str, MessageType] = {}
|
||||
|
||||
# For convenience, basic types go in every namespace
|
||||
for t in fundamental_types():
|
||||
@@ -21,7 +22,7 @@ domain, such as within a given BOLT"""
|
||||
|
||||
self.load_csv(csv_lines)
|
||||
|
||||
def __add__(self, other):
|
||||
def __add__(self, other: 'MessageNamespace'):
|
||||
ret = MessageNamespace()
|
||||
ret.subtypes = self.subtypes.copy()
|
||||
for v in other.subtypes.values():
|
||||
@@ -34,57 +35,57 @@ domain, such as within a given BOLT"""
|
||||
ret.add_messagetype(v)
|
||||
return ret
|
||||
|
||||
def add_subtype(self, t):
|
||||
def add_subtype(self, t: 'SubtypeType') -> None:
|
||||
prev = self.get_type(t.name)
|
||||
if prev:
|
||||
return ValueError('Already have {}'.format(prev))
|
||||
raise ValueError('Already have {}'.format(prev))
|
||||
self.subtypes[t.name] = t
|
||||
|
||||
def add_fundamentaltype(self, t):
|
||||
def add_fundamentaltype(self, t: 'SubtypeType') -> None:
|
||||
assert not self.get_type(t.name)
|
||||
self.fundamentaltypes[t.name] = t
|
||||
|
||||
def add_tlvtype(self, t):
|
||||
def add_tlvtype(self, t: 'TlvStreamType') -> None:
|
||||
prev = self.get_type(t.name)
|
||||
if prev:
|
||||
return ValueError('Already have {}'.format(prev))
|
||||
raise ValueError('Already have {}'.format(prev))
|
||||
self.tlvtypes[t.name] = t
|
||||
|
||||
def add_messagetype(self, m):
|
||||
def add_messagetype(self, m: 'MessageType') -> None:
|
||||
if self.get_msgtype(m.name):
|
||||
return ValueError('{}: message already exists'.format(m.name))
|
||||
raise ValueError('{}: message already exists'.format(m.name))
|
||||
if self.get_msgtype_by_number(m.number):
|
||||
return ValueError('{}: message {} already number {}'.format(
|
||||
m.name, self.get_msg_by_number(m.number), m.number))
|
||||
raise ValueError('{}: message {} already number {}'.format(
|
||||
m.name, self.get_msgtype_by_number(m.number), m.number))
|
||||
self.messagetypes[m.name] = m
|
||||
|
||||
def get_msgtype(self, name):
|
||||
def get_msgtype(self, name: str) -> Optional['MessageType']:
|
||||
if name in self.messagetypes:
|
||||
return self.messagetypes[name]
|
||||
return None
|
||||
|
||||
def get_msgtype_by_number(self, num):
|
||||
def get_msgtype_by_number(self, num: int) -> Optional['MessageType']:
|
||||
for m in self.messagetypes.values():
|
||||
if m.number == num:
|
||||
return m
|
||||
return None
|
||||
|
||||
def get_fundamentaltype(self, name):
|
||||
def get_fundamentaltype(self, name: str) -> Optional['SubtypeType']:
|
||||
if name in self.fundamentaltypes:
|
||||
return self.fundamentaltypes[name]
|
||||
return None
|
||||
|
||||
def get_subtype(self, name):
|
||||
def get_subtype(self, name: str) -> Optional['SubtypeType']:
|
||||
if name in self.subtypes:
|
||||
return self.subtypes[name]
|
||||
return None
|
||||
|
||||
def get_tlvtype(self, name):
|
||||
def get_tlvtype(self, name: str) -> Optional['TlvStreamType']:
|
||||
if name in self.tlvtypes:
|
||||
return self.tlvtypes[name]
|
||||
return None
|
||||
|
||||
def get_type(self, name):
|
||||
def get_type(self, name: str) -> Optional['SubtypeType']:
|
||||
t = self.get_fundamentaltype(name)
|
||||
if t is None:
|
||||
t = self.get_subtype(name)
|
||||
@@ -92,20 +93,14 @@ domain, such as within a given BOLT"""
|
||||
t = self.get_tlvtype(name)
|
||||
return t
|
||||
|
||||
def get_tlv_by_number(self, num):
|
||||
for t in self.tlvtypes:
|
||||
if t.number == num:
|
||||
return t
|
||||
return None
|
||||
|
||||
def load_csv(self, lines):
|
||||
def load_csv(self, lines: List[str]) -> None:
|
||||
"""Load a series of comma-separate-value lines into the namespace"""
|
||||
vals = {'msgtype': [],
|
||||
'msgdata': [],
|
||||
'tlvtype': [],
|
||||
'tlvdata': [],
|
||||
'subtype': [],
|
||||
'subtypedata': []}
|
||||
vals: Dict[str, List[List[str]]] = {'msgtype': [],
|
||||
'msgdata': [],
|
||||
'tlvtype': [],
|
||||
'tlvdata': [],
|
||||
'subtype': [],
|
||||
'subtypedata': []}
|
||||
for l in lines:
|
||||
parts = l.split(',')
|
||||
if parts[0] not in vals:
|
||||
@@ -114,39 +109,39 @@ domain, such as within a given BOLT"""
|
||||
|
||||
# Types can refer to other types, so add data last.
|
||||
for parts in vals['msgtype']:
|
||||
self.add_messagetype(MessageType.type_from_csv(parts))
|
||||
self.add_messagetype(MessageType.msgtype_from_csv(parts))
|
||||
|
||||
for parts in vals['subtype']:
|
||||
self.add_subtype(SubtypeType.type_from_csv(parts))
|
||||
self.add_subtype(SubtypeType.subtype_from_csv(parts))
|
||||
|
||||
for parts in vals['tlvtype']:
|
||||
TlvStreamType.type_from_csv(self, parts)
|
||||
TlvStreamType.tlvtype_from_csv(self, parts)
|
||||
|
||||
for parts in vals['msgdata']:
|
||||
MessageType.field_from_csv(self, parts)
|
||||
MessageType.msgfield_from_csv(self, parts)
|
||||
|
||||
for parts in vals['subtypedata']:
|
||||
SubtypeType.field_from_csv(self, parts)
|
||||
SubtypeType.subfield_from_csv(self, parts)
|
||||
|
||||
for parts in vals['tlvdata']:
|
||||
TlvStreamType.field_from_csv(self, parts)
|
||||
TlvStreamType.tlvfield_from_csv(self, parts)
|
||||
|
||||
|
||||
class MessageTypeField(object):
|
||||
"""A field within a particular message type or subtype"""
|
||||
def __init__(self, ownername, name, fieldtype, option=None):
|
||||
def __init__(self, ownername: str, name: str, fieldtype: FieldType, option: Optional[str] = None):
|
||||
self.full_name = "{}.{}".format(ownername, name)
|
||||
self.name = name
|
||||
self.fieldtype = fieldtype
|
||||
self.option = option
|
||||
|
||||
def missing_fields(self, fields):
|
||||
def missing_fields(self, fieldvals: Dict[str, Any]):
|
||||
"""Return this field if it's not in fields"""
|
||||
if self.name not in fields and not self.option and not self.fieldtype.is_optional():
|
||||
if self.name not in fieldvals and not self.option and not self.fieldtype.is_optional():
|
||||
return [self]
|
||||
return []
|
||||
|
||||
def len_fields_bad(self, fieldname, otherfields):
|
||||
def len_fields_bad(self, fieldname: str, otherfields: Dict[str, Any]) -> List[str]:
|
||||
return self.fieldtype.len_fields_bad(fieldname, otherfields)
|
||||
|
||||
def __str__(self):
|
||||
@@ -163,17 +158,17 @@ other types. Since 'msgtype' and 'tlvtype' are almost identical, they
|
||||
inherit from this too.
|
||||
|
||||
"""
|
||||
def __init__(self, name):
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
self.fields = []
|
||||
self.fields: List[FieldType] = []
|
||||
|
||||
def find_field(self, fieldname):
|
||||
def find_field(self, fieldname: str):
|
||||
for f in self.fields:
|
||||
if f.name == fieldname:
|
||||
return f
|
||||
return None
|
||||
|
||||
def add_field(self, field):
|
||||
def add_field(self, field: FieldType):
|
||||
if self.find_field(field.name):
|
||||
raise ValueError("{}: duplicate field {}".format(self, field))
|
||||
self.fields.append(field)
|
||||
@@ -181,8 +176,8 @@ inherit from this too.
|
||||
def __str__(self):
|
||||
return "subtype-{}".format(self.name)
|
||||
|
||||
def len_fields_bad(self, fieldname, otherfields):
|
||||
bad_fields = []
|
||||
def len_fields_bad(self, fieldname: str, otherfields: Dict[str, Any]) -> List[str]:
|
||||
bad_fields: List[str] = []
|
||||
for f in self.fields:
|
||||
bad_fields += f.len_fields_bad('{}.{}'.format(fieldname, f.name),
|
||||
otherfields)
|
||||
@@ -190,14 +185,14 @@ inherit from this too.
|
||||
return bad_fields
|
||||
|
||||
@staticmethod
|
||||
def type_from_csv(parts):
|
||||
def subtype_from_csv(parts: List[str]) -> 'SubtypeType':
|
||||
"""e.g subtype,channel_update_timestamps"""
|
||||
if len(parts) != 1:
|
||||
raise ValueError("subtype expected 2 CSV parts, not {}"
|
||||
.format(parts))
|
||||
return SubtypeType(parts[0])
|
||||
|
||||
def _field_from_csv(self, namespace, parts, ellipsisok=False, option=None):
|
||||
def _field_from_csv(self, namespace: MessageNamespace, parts: List[str], ellipsisok=False, option: str = None) -> MessageTypeField:
|
||||
"""Takes msgdata/subtypedata after first two fields
|
||||
e.g. [...]timestamp_node_id_1,u32,
|
||||
|
||||
@@ -236,12 +231,12 @@ inherit from this too.
|
||||
|
||||
return field
|
||||
|
||||
def val_from_str(self, s):
|
||||
def val_from_str(self, s: str) -> Tuple[Dict[str, Any], str]:
|
||||
if not s.startswith('{'):
|
||||
raise ValueError("subtype {} must be wrapped in '{{}}': bad {}"
|
||||
.format(self, s))
|
||||
s = s[1:]
|
||||
ret = {}
|
||||
ret: Dict[str, Any] = {}
|
||||
# FIXME: perhaps allow unlabelled fields to imply assign fields in order?
|
||||
while not s.startswith('}'):
|
||||
fieldname, s = s.split('=', 1)
|
||||
@@ -259,7 +254,7 @@ inherit from this too.
|
||||
|
||||
return ret, s[1:]
|
||||
|
||||
def _raise_if_badvals(self, v):
|
||||
def _raise_if_badvals(self, v: Dict[str, Any]) -> None:
|
||||
# Every non-optional value must be specified, and no others.
|
||||
defined = set([f.name for f in self.fields])
|
||||
have = set(v)
|
||||
@@ -272,7 +267,7 @@ inherit from this too.
|
||||
if not f.fieldtype.is_optional():
|
||||
raise ValueError("Missing value for {}".format(f))
|
||||
|
||||
def val_to_str(self, v, otherfields):
|
||||
def val_to_str(self, v: Dict[str, Any], otherfields: Dict[str, Any]) -> str:
|
||||
self._raise_if_badvals(v)
|
||||
s = ''
|
||||
sep = ''
|
||||
@@ -283,13 +278,13 @@ inherit from this too.
|
||||
|
||||
return '{' + s + '}'
|
||||
|
||||
def write(self, io_out, v, otherfields):
|
||||
def write(self, io_out: BufferedIOBase, v: Dict[str, Any], otherfields: Dict[str, Any]) -> None:
|
||||
self._raise_if_badvals(v)
|
||||
for fname, val in v.items():
|
||||
field = self.find_field(fname)
|
||||
field.fieldtype.write(io_out, val, otherfields)
|
||||
|
||||
def read(self, io_in, otherfields):
|
||||
def read(self, io_in: BufferedIOBase, otherfields: Dict[str, Any]) -> Dict[str, Any]:
|
||||
vals = {}
|
||||
for field in self.fields:
|
||||
val = field.fieldtype.read(io_in, otherfields)
|
||||
@@ -302,7 +297,7 @@ inherit from this too.
|
||||
return vals
|
||||
|
||||
@staticmethod
|
||||
def field_from_csv(namespace, parts):
|
||||
def subfield_from_csv(namespace: MessageNamespace, parts: List[str]) -> None:
|
||||
"""e.g
|
||||
subtypedata,channel_update_timestamps,timestamp_node_id_1,u32,"""
|
||||
if len(parts) != 4:
|
||||
@@ -330,12 +325,12 @@ class MessageType(SubtypeType):
|
||||
'NODE': 0x2000,
|
||||
'UPDATE': 0x1000}
|
||||
|
||||
def __init__(self, name, value, option=None):
|
||||
def __init__(self, name: str, value: str, option: Optional[str] = None):
|
||||
super().__init__(name)
|
||||
self.number = self.parse_value(value)
|
||||
self.option = option
|
||||
|
||||
def parse_value(self, value):
|
||||
def parse_value(self, value: str) -> int:
|
||||
result = 0
|
||||
for token in value.split('|'):
|
||||
if token in self.onion_types.keys():
|
||||
@@ -349,7 +344,7 @@ class MessageType(SubtypeType):
|
||||
return "msgtype-{}".format(self.name)
|
||||
|
||||
@staticmethod
|
||||
def type_from_csv(parts):
|
||||
def msgtype_from_csv(parts: List[str]) -> 'MessageType':
|
||||
"""e.g msgtype,open_channel,32,option_foo"""
|
||||
option = None
|
||||
if len(parts) == 3:
|
||||
@@ -360,7 +355,7 @@ class MessageType(SubtypeType):
|
||||
return MessageType(parts[0], parts[1], option)
|
||||
|
||||
@staticmethod
|
||||
def field_from_csv(namespace, parts):
|
||||
def msgfield_from_csv(namespace: MessageNamespace, parts: List[str]) -> None:
|
||||
"""e.g msgdata,open_channel,temporary_channel_id,byte,32[,opt]"""
|
||||
option = None
|
||||
if len(parts) == 5:
|
||||
@@ -390,18 +385,18 @@ confusingly) refers to them.
|
||||
def __str__(self):
|
||||
return "tlvstreamtype-{}".format(self.name)
|
||||
|
||||
def find_field_by_number(self, num):
|
||||
def find_field_by_number(self, num: int) -> Optional['TlvMessageType']:
|
||||
for f in self.fields:
|
||||
if f.number == num:
|
||||
return f
|
||||
return None
|
||||
|
||||
def is_optional(self):
|
||||
def is_optional(self) -> bool:
|
||||
"""You can omit a tlvstream= altogether"""
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def type_from_csv(namespace, parts):
|
||||
def tlvtype_from_csv(namespace: MessageNamespace, parts: List[str]) -> None:
|
||||
"""e.g tlvtype,reply_channel_range_tlvs,timestamps_tlv,1"""
|
||||
if len(parts) != 3:
|
||||
raise ValueError("tlvtype expected 4 CSV parts, not {}"
|
||||
@@ -414,7 +409,7 @@ confusingly) refers to them.
|
||||
tlvstream.add_field(TlvMessageType(parts[1], parts[2]))
|
||||
|
||||
@staticmethod
|
||||
def field_from_csv(namespace, parts):
|
||||
def tlvfield_from_csv(namespace: MessageNamespace, parts: List[str]) -> None:
|
||||
"""e.g
|
||||
tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
|
||||
@@ -435,20 +430,21 @@ tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
subfield = field._field_from_csv(namespace, parts[2:], ellipsisok=True)
|
||||
field.add_field(subfield)
|
||||
|
||||
def val_from_str(self, s):
|
||||
def val_from_str(self, s: str) -> Tuple[Dict[str, Any], str]:
|
||||
"""{fieldname={...},...}. Returns dict of fieldname->val"""
|
||||
if not s.startswith('{'):
|
||||
raise ValueError("tlvtype {} must be wrapped in '{{}}': bad {}"
|
||||
.format(self, s))
|
||||
s = s[1:]
|
||||
ret = {}
|
||||
ret: Dict[str, Any] = {}
|
||||
while not s.startswith('}'):
|
||||
fieldname, s = s.split('=', 1)
|
||||
f = self.find_field(fieldname)
|
||||
if f is None:
|
||||
# Unknown fields are number=hexstring
|
||||
hexstring, s = split_field(s)
|
||||
ret[int(fieldname)] = bytes.fromhex(hexstring)
|
||||
# Make sure it is actually a valid int!
|
||||
ret[str(int(fieldname))] = bytes.fromhex(hexstring)
|
||||
else:
|
||||
ret[fieldname], s = f.val_from_str(s)
|
||||
if s[0] == ',':
|
||||
@@ -456,7 +452,7 @@ tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
|
||||
return ret, s[1:]
|
||||
|
||||
def val_to_str(self, v, otherfields):
|
||||
def val_to_str(self, v: Dict[str, Any], otherfields: Dict[str, Any]) -> str:
|
||||
s = ''
|
||||
sep = ''
|
||||
for fieldname in v:
|
||||
@@ -470,14 +466,14 @@ tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
|
||||
return '{' + s + '}'
|
||||
|
||||
def write(self, iobuf, v, otherfields):
|
||||
def write(self, io_out: BufferedIOBase, v: Optional[Dict[str, Any]], otherfields: Dict[str, Any]) -> None:
|
||||
# If they didn't specify this tlvstream, it's empty.
|
||||
if v is None:
|
||||
return
|
||||
|
||||
# Make a tuple of (fieldnum, val_to_bin, val) so we can sort into
|
||||
# ascending order as TLV spec requires.
|
||||
def write_raw_val(iobuf, val, otherfields):
|
||||
def write_raw_val(iobuf, val, otherfields: Dict[str, Any]):
|
||||
iobuf.write(val)
|
||||
|
||||
def get_value(tup):
|
||||
@@ -496,14 +492,14 @@ tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
ordered.sort(key=get_value)
|
||||
|
||||
for typenum, writefunc, val in ordered:
|
||||
buf = io.BytesIO()
|
||||
buf = BytesIO()
|
||||
writefunc(buf, val, otherfields)
|
||||
BigSizeType.write(iobuf, typenum)
|
||||
BigSizeType.write(iobuf, len(buf.getvalue()))
|
||||
iobuf.write(buf.getvalue())
|
||||
BigSizeType.write(io_out, typenum)
|
||||
BigSizeType.write(io_out, len(buf.getvalue()))
|
||||
io_out.write(buf.getvalue())
|
||||
|
||||
def read(self, io_in, otherfields):
|
||||
vals = {}
|
||||
def read(self, io_in: BufferedIOBase, otherfields: Dict[str, Any]) -> Dict[str, Any]:
|
||||
vals: Dict[str, Any] = {}
|
||||
|
||||
while True:
|
||||
tlv_type = BigSizeType.read(io_in)
|
||||
@@ -522,17 +518,18 @@ tlvdata,reply_channel_range_tlvs,timestamps_tlv,encoding_type,u8,
|
||||
# Raw fields are allowed, just index by number.
|
||||
vals[tlv_type] = binval
|
||||
else:
|
||||
vals[f.name] = f.read(io.BytesIO(binval), otherfields)
|
||||
# FIXME: Why doesn't mypy think BytesIO is a valid BufferedIOBase?
|
||||
vals[f.name] = f.read(cast(BufferedIOBase, BytesIO(binval)), otherfields)
|
||||
|
||||
def name_and_val(self, name, v):
|
||||
def name_and_val(self, name: str, v: Dict[str, Any]) -> str:
|
||||
"""This is overridden by LengthFieldType to return nothing"""
|
||||
return " {}={}".format(name, self.val_to_str(v, None))
|
||||
return " {}={}".format(name, self.val_to_str(v, {}))
|
||||
|
||||
|
||||
class TlvMessageType(MessageType):
|
||||
"""A 'tlvtype' in BOLT-speak"""
|
||||
|
||||
def __init__(self, name, value):
|
||||
def __init__(self, name: str, value: str):
|
||||
super().__init__(name, value)
|
||||
|
||||
def __str__(self):
|
||||
@@ -541,10 +538,10 @@ class TlvMessageType(MessageType):
|
||||
|
||||
class Message(object):
|
||||
"""A particular message instance"""
|
||||
def __init__(self, messagetype, **kwargs):
|
||||
def __init__(self, messagetype: MessageType, **kwargs):
|
||||
"""MessageType is the type of this msg, with fields. Fields can either be valid values for the type, or if they are strings they are converted according to the field type"""
|
||||
self.messagetype = messagetype
|
||||
self.fields = {}
|
||||
self.fields: Dict[str, Any] = {}
|
||||
|
||||
# Convert arguments from strings to values if necessary.
|
||||
for field in kwargs:
|
||||
@@ -564,16 +561,16 @@ class Message(object):
|
||||
if bad_lens:
|
||||
raise ValueError("Inconsistent length fields: {}".format(bad_lens))
|
||||
|
||||
def missing_fields(self):
|
||||
def missing_fields(self) -> List[str]:
|
||||
"""Are any required fields missing?"""
|
||||
missing = []
|
||||
missing: List[str] = []
|
||||
for ftype in self.messagetype.fields:
|
||||
missing += ftype.missing_fields(self.fields)
|
||||
|
||||
return missing
|
||||
|
||||
@staticmethod
|
||||
def read(namespace, io_in):
|
||||
def read(namespace: MessageNamespace, io_in: BufferedIOBase) -> Optional['Message']:
|
||||
"""Read and decode a Message within that namespace.
|
||||
|
||||
Returns None on EOF
|
||||
@@ -587,7 +584,7 @@ Returns None on EOF
|
||||
if mtype is None:
|
||||
raise ValueError('Unknown message type number {}'.format(typenum))
|
||||
|
||||
fields = {}
|
||||
fields: Dict[str, Any] = {}
|
||||
for f in mtype.fields:
|
||||
fields[f.name] = f.fieldtype.read(io_in, fields)
|
||||
if fields[f.name] is None:
|
||||
@@ -598,7 +595,7 @@ Returns None on EOF
|
||||
return Message(mtype, **fields)
|
||||
|
||||
@staticmethod
|
||||
def from_str(namespace, s, incomplete_ok=False):
|
||||
def from_str(namespace: MessageNamespace, s: str, incomplete_ok=False) -> 'Message':
|
||||
"""Decode a string to a Message within that namespace.
|
||||
|
||||
Format is msgname [ field=...]*.
|
||||
@@ -624,7 +621,7 @@ Format is msgname [ field=...]*.
|
||||
|
||||
return m
|
||||
|
||||
def write(self, io_out):
|
||||
def write(self, io_out: BufferedIOBase) -> None:
|
||||
"""Write a Message into its wire format.
|
||||
|
||||
Must not have missing fields.
|
||||
@@ -645,7 +642,7 @@ Must not have missing fields.
|
||||
val = None
|
||||
f.fieldtype.write(io_out, val, self.fields)
|
||||
|
||||
def to_str(self):
|
||||
def to_str(self) -> str:
|
||||
"""Encode a Message into a string"""
|
||||
ret = "{}".format(self.messagetype.name)
|
||||
for f in self.messagetype.fields:
|
||||
|
||||
Reference in New Issue
Block a user