mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-24 01:24:26 +01:00
pyln-testing: use files for stdout and stderr, not threads.
Some flakes are caused by weird races in this code. Plus, if we get things to write straight to files, we might see things in there on post-mortem which happen after the python runner exits. It's a bit less efficient, but much simpler. Let's see if it helps! Some tests need a rework now, since we don't get a failure (except eventual timeout), but they're simpler. Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
@@ -174,13 +174,20 @@ class TailableProc(object):
|
||||
tail the processes and react to their output.
|
||||
"""
|
||||
|
||||
def __init__(self, outputDir=None, verbose=True):
|
||||
def __init__(self, outputDir, verbose=True):
|
||||
self.logs = []
|
||||
self.logs_cond = threading.Condition(threading.RLock())
|
||||
self.env = os.environ.copy()
|
||||
self.running = False
|
||||
self.proc = None
|
||||
self.outputDir = outputDir
|
||||
if not os.path.exists(outputDir):
|
||||
os.makedirs(outputDir)
|
||||
# Create and open them.
|
||||
self.stdout_filename = os.path.join(outputDir, "log")
|
||||
self.stderr_filename = os.path.join(outputDir, "errlog")
|
||||
self.stdout_write = open(self.stdout_filename, "wt")
|
||||
self.stderr_write = open(self.stderr_filename, "wt")
|
||||
self.stdout_read = open(self.stdout_filename, "rt")
|
||||
self.stderr_read = open(self.stderr_filename, "rt")
|
||||
self.logsearch_start = 0
|
||||
self.err_logs = []
|
||||
self.prefix = ""
|
||||
@@ -192,29 +199,17 @@ class TailableProc(object):
|
||||
# pass it to the log matcher and not print it to stdout).
|
||||
self.log_filter = lambda line: False
|
||||
|
||||
def start(self, stdin=None, stdout=None, stderr=None):
|
||||
def start(self, stdin=None):
|
||||
"""Start the underlying process and start monitoring it.
|
||||
"""
|
||||
logging.debug("Starting '%s'", " ".join(self.cmd_line))
|
||||
self.proc = subprocess.Popen(self.cmd_line,
|
||||
stdin=stdin,
|
||||
stdout=stdout if stdout else subprocess.PIPE,
|
||||
stderr=stderr,
|
||||
stdout=self.stdout_write,
|
||||
stderr=self.stderr_write,
|
||||
env=self.env)
|
||||
self.thread = threading.Thread(target=self.tail)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.running = True
|
||||
|
||||
def save_log(self):
|
||||
if self.outputDir:
|
||||
logpath = os.path.join(self.outputDir, 'log')
|
||||
with open(logpath, 'w') as f:
|
||||
for l in self.logs:
|
||||
f.write(l + '\n')
|
||||
|
||||
def stop(self, timeout=10):
|
||||
self.save_log()
|
||||
self.proc.terminate()
|
||||
|
||||
# Now give it some time to react to the signal
|
||||
@@ -224,56 +219,32 @@ class TailableProc(object):
|
||||
self.proc.kill()
|
||||
|
||||
self.proc.wait()
|
||||
self.thread.join()
|
||||
|
||||
return self.proc.returncode
|
||||
|
||||
def kill(self):
|
||||
"""Kill process without giving it warning."""
|
||||
self.proc.kill()
|
||||
self.proc.wait()
|
||||
self.thread.join()
|
||||
|
||||
def tail(self):
|
||||
"""Tail the stdout of the process and remember it.
|
||||
|
||||
Stores the lines of output produced by the process in
|
||||
self.logs and signals that a new line was read so that it can
|
||||
be picked up by consumers.
|
||||
def logs_catchup(self):
|
||||
"""Save the latest stdout / stderr contents; return true if we got anything.
|
||||
"""
|
||||
for line in iter(self.proc.stdout.readline, ''):
|
||||
if len(line) == 0:
|
||||
break
|
||||
|
||||
line = line.decode('UTF-8', 'replace').rstrip()
|
||||
|
||||
if self.log_filter(line):
|
||||
continue
|
||||
|
||||
if self.verbose:
|
||||
sys.stdout.write("{}: {}\n".format(self.prefix, line))
|
||||
|
||||
with self.logs_cond:
|
||||
self.logs.append(line)
|
||||
self.logs_cond.notifyAll()
|
||||
|
||||
self.running = False
|
||||
self.proc.stdout.close()
|
||||
|
||||
if self.proc.stderr:
|
||||
for line in iter(self.proc.stderr.readline, ''):
|
||||
|
||||
if line is None or len(line) == 0:
|
||||
break
|
||||
|
||||
line = line.rstrip().decode('UTF-8', 'replace')
|
||||
self.err_logs.append(line)
|
||||
|
||||
self.proc.stderr.close()
|
||||
new_stdout = self.stdout_read.readlines()
|
||||
if self.verbose:
|
||||
for line in new_stdout:
|
||||
sys.stdout.write("{}: {}".format(self.prefix, line))
|
||||
self.logs += [l.rstrip() for l in new_stdout]
|
||||
new_stderr = self.stderr_read.readlines()
|
||||
if self.verbose:
|
||||
for line in new_stderr:
|
||||
sys.stderr.write("{}-stderr: {}".format(self.prefix, line))
|
||||
self.err_logs += [l.rstrip() for l in new_stderr]
|
||||
return len(new_stdout) > 0 or len(new_stderr) > 0
|
||||
|
||||
def is_in_log(self, regex, start=0):
|
||||
"""Look for `regex` in the logs."""
|
||||
|
||||
self.logs_catchup()
|
||||
ex = re.compile(regex)
|
||||
for l in self.logs[start:]:
|
||||
if ex.search(l):
|
||||
@@ -286,6 +257,7 @@ class TailableProc(object):
|
||||
def is_in_stderr(self, regex):
|
||||
"""Look for `regex` in stderr."""
|
||||
|
||||
self.logs_catchup()
|
||||
ex = re.compile(regex)
|
||||
for l in self.err_logs:
|
||||
if ex.search(l):
|
||||
@@ -311,31 +283,29 @@ class TailableProc(object):
|
||||
logging.debug("Waiting for {} in the logs".format(regexs))
|
||||
exs = [re.compile(r) for r in regexs]
|
||||
start_time = time.time()
|
||||
pos = self.logsearch_start
|
||||
while True:
|
||||
if timeout is not None and time.time() > start_time + timeout:
|
||||
print("Time-out: can't find {} in logs".format(exs))
|
||||
for r in exs:
|
||||
if self.is_in_log(r):
|
||||
print("({} was previously in logs!)".format(r))
|
||||
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
|
||||
if self.logsearch_start >= len(self.logs):
|
||||
if not self.logs_catchup():
|
||||
time.sleep(0.25)
|
||||
|
||||
with self.logs_cond:
|
||||
if pos >= len(self.logs):
|
||||
if not self.running:
|
||||
raise ValueError('Process died while waiting for logs')
|
||||
self.logs_cond.wait(1)
|
||||
continue
|
||||
if timeout is not None and time.time() > start_time + timeout:
|
||||
print("Time-out: can't find {} in logs".format(exs))
|
||||
for r in exs:
|
||||
if self.is_in_log(r):
|
||||
print("({} was previously in logs!)".format(r))
|
||||
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
|
||||
continue
|
||||
|
||||
for r in exs.copy():
|
||||
self.logsearch_start = pos + 1
|
||||
if r.search(self.logs[pos]):
|
||||
logging.debug("Found '%s' in logs", r)
|
||||
exs.remove(r)
|
||||
break
|
||||
if len(exs) == 0:
|
||||
return self.logs[pos]
|
||||
pos += 1
|
||||
line = self.logs[self.logsearch_start]
|
||||
self.logsearch_start += 1
|
||||
for r in exs.copy():
|
||||
if r.search(line):
|
||||
logging.debug("Found '%s' in logs", r)
|
||||
exs.remove(r)
|
||||
if len(exs) == 0:
|
||||
return line
|
||||
# Don't match same line with different regexs!
|
||||
break
|
||||
|
||||
def wait_for_log(self, regex, timeout=TIMEOUT):
|
||||
"""Look for `regex` in the logs.
|
||||
@@ -620,10 +590,9 @@ class LightningD(TailableProc):
|
||||
|
||||
return self.cmd_prefix + [self.executable] + opts
|
||||
|
||||
def start(self, stdin=None, stdout=None, stderr=None,
|
||||
wait_for_initialized=True):
|
||||
def start(self, stdin=None, wait_for_initialized=True):
|
||||
self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport
|
||||
TailableProc.start(self, stdin, stdout, stderr)
|
||||
TailableProc.start(self, stdin)
|
||||
if wait_for_initialized:
|
||||
self.wait_for_log("Server started with public key")
|
||||
logging.info("LightningD started")
|
||||
@@ -852,8 +821,8 @@ class LightningNode(object):
|
||||
info = self.rpc.getinfo()
|
||||
return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info
|
||||
|
||||
def start(self, wait_for_bitcoind_sync=True, stderr=None):
|
||||
self.daemon.start(stderr=stderr)
|
||||
def start(self, wait_for_bitcoind_sync=True):
|
||||
self.daemon.start()
|
||||
# Cache `getinfo`, we'll be using it a lot
|
||||
self.info = self.rpc.getinfo()
|
||||
# This shortcut is sufficient for our simple tests.
|
||||
@@ -878,7 +847,6 @@ class LightningNode(object):
|
||||
if self.rc is None:
|
||||
self.rc = self.daemon.stop()
|
||||
|
||||
self.daemon.save_log()
|
||||
self.daemon.cleanup()
|
||||
|
||||
if self.rc != 0 and not self.may_fail:
|
||||
@@ -1417,12 +1385,7 @@ class NodeFactory(object):
|
||||
|
||||
if start:
|
||||
try:
|
||||
# Capture stderr if we're failing
|
||||
if expect_fail:
|
||||
stderr = subprocess.PIPE
|
||||
else:
|
||||
stderr = None
|
||||
node.start(wait_for_bitcoind_sync, stderr=stderr)
|
||||
node.start(wait_for_bitcoind_sync)
|
||||
except Exception:
|
||||
if expect_fail:
|
||||
return node
|
||||
|
||||
Reference in New Issue
Block a user