mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-21 16:14:23 +01:00
lightning.py: parse multiple JSON RPC commands accurately.
We need to keep the remaining buffer, and we need to try to parse it before we read the next. I first tried keeping it in the object, but its lifetime is that of the *socket*, which we actually reopen for every command. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
committed by
Christian Decker
parent
fe11ee5406
commit
0c3f85d931
@@ -20,6 +20,7 @@ changes.
|
|||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- JSON API: uppercase invoices now parsed correctly (broken in 0.6.2).
|
- JSON API: uppercase invoices now parsed correctly (broken in 0.6.2).
|
||||||
|
- pylightning: handle multiple simultanous RPC replies reliably.
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
|
|
||||||
|
|||||||
@@ -25,26 +25,23 @@ class UnixDomainSocketRpc(object):
|
|||||||
s = json.dumps(obj)
|
s = json.dumps(obj)
|
||||||
sock.sendall(bytearray(s, 'UTF-8'))
|
sock.sendall(bytearray(s, 'UTF-8'))
|
||||||
|
|
||||||
def _readobj(self, sock):
|
def _readobj(self, sock, buff=b''):
|
||||||
buff = b''
|
"""Read a JSON object, starting with buff; returns object and any buffer left over"""
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
b = sock.recv(1024)
|
|
||||||
buff += b
|
|
||||||
if len(b) == 0:
|
|
||||||
return {'error': 'Connection to RPC server lost.'}
|
|
||||||
|
|
||||||
if buff[-3:] != b' }\n':
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Convert late to UTF-8 so glyphs split across recvs do not
|
# Convert late to UTF-8 so glyphs split across recvs do not
|
||||||
# impact us
|
# impact us
|
||||||
objs, _ = self.decoder.raw_decode(buff.decode("UTF-8"))
|
objs, len_used = self.decoder.raw_decode(buff.decode("UTF-8"))
|
||||||
return objs
|
return objs, buff[len_used:].lstrip()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# Probably didn't read enough
|
# Probably didn't read enough
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
b = sock.recv(1024)
|
||||||
|
buff += b
|
||||||
|
if len(b) == 0:
|
||||||
|
return {'error': 'Connection to RPC server lost.'}, buff.lstrip()
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
"""Intercept any call that is not explicitly defined and call @call
|
"""Intercept any call that is not explicitly defined and call @call
|
||||||
|
|
||||||
@@ -65,6 +62,7 @@ class UnixDomainSocketRpc(object):
|
|||||||
# Filter out arguments that are None
|
# Filter out arguments that are None
|
||||||
payload = {k: v for k, v in payload.items() if v is not None}
|
payload = {k: v for k, v in payload.items() if v is not None}
|
||||||
|
|
||||||
|
# FIXME: we open a new socket for every readobj call...
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
sock.connect(self.socket_path)
|
sock.connect(self.socket_path)
|
||||||
self._writeobj(sock, {
|
self._writeobj(sock, {
|
||||||
@@ -72,7 +70,7 @@ class UnixDomainSocketRpc(object):
|
|||||||
"params": payload,
|
"params": payload,
|
||||||
"id": 0
|
"id": 0
|
||||||
})
|
})
|
||||||
resp = self._readobj(sock)
|
resp, _ = self._readobj(sock)
|
||||||
sock.close()
|
sock.close()
|
||||||
|
|
||||||
self.logger.debug("Received response for %s call: %r", method, resp)
|
self.logger.debug("Received response for %s call: %r", method, resp)
|
||||||
|
|||||||
@@ -611,8 +611,9 @@ def test_multirpc(node_factory):
|
|||||||
|
|
||||||
sock.sendall(b'\n'.join(commands))
|
sock.sendall(b'\n'.join(commands))
|
||||||
|
|
||||||
|
buff = b''
|
||||||
for i in commands:
|
for i in commands:
|
||||||
l1.rpc._readobj(sock)
|
_, buff = l1.rpc._readobj(sock, buff)
|
||||||
sock.close()
|
sock.close()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user