diff --git a/contrib/pyln-testing/README.md b/contrib/pyln-testing/README.md new file mode 100644 index 000000000..93d31b2bd --- /dev/null +++ b/contrib/pyln-testing/README.md @@ -0,0 +1,33 @@ +# pyln-testing: A library to write tests against c-lightning + +This library implements a number of utilities that help building tests for +c-lightning nodes. In particular it provides a number of pytest fixtures that +allow the management of a test network of a given topology and then execute a +test scenarion. + +`pyln-testing` is used by c-lightning for its internal tests, and by the +community plugin directory to exercise the plugins. + +## Installation + +`pyln-testing` is available on `pip`: + +```bash +pip install pyln-testing +``` + +Alternatively you can also install the development version to get access to +currently unreleased features by checking out the c-lightning source code and +installing into your python3 environment: + +```bash +git clone https://github.com/ElementsProject/lightning.git +cd lightning/contrib/pyln-testing +python3 setup.py develop +``` + +This will add links to the library into your environment so changing the +checked out source code will also result in the environment picking up these +changes. Notice however that unreleased versions may change API without +warning, so test thoroughly with the released version. + diff --git a/contrib/pyln-testing/pyln/testing/__init__.py b/contrib/pyln-testing/pyln/testing/__init__.py new file mode 100644 index 000000000..f102a9cad --- /dev/null +++ b/contrib/pyln-testing/pyln/testing/__init__.py @@ -0,0 +1 @@ +__version__ = "0.0.1" diff --git a/contrib/pyln-testing/pyln/testing/btcproxy.py b/contrib/pyln-testing/pyln/testing/btcproxy.py new file mode 100644 index 000000000..5fa45874a --- /dev/null +++ b/contrib/pyln-testing/pyln/testing/btcproxy.py @@ -0,0 +1,111 @@ +""" A bitcoind proxy that allows instrumentation and canned responses +""" +from flask import Flask, request +from bitcoin.rpc import JSONRPCError +from bitcoin.rpc import RawProxy as BitcoinProxy +from cheroot.wsgi import Server +from cheroot.wsgi import PathInfoDispatcher + +import decimal +import flask +import json +import logging +import threading + + +class DecimalEncoder(json.JSONEncoder): + """By default json.dumps does not handle Decimals correctly, so we override it's handling + """ + def default(self, o): + if isinstance(o, decimal.Decimal): + return "{:.8f}".format(float(o)) + return super(DecimalEncoder, self).default(o) + + +class BitcoinRpcProxy(object): + def __init__(self, bitcoind, rpcport=0): + self.app = Flask("BitcoindProxy") + self.app.add_url_rule("/", "API entrypoint", self.proxy, methods=['POST']) + self.rpcport = rpcport + self.mocks = {} + self.mock_counts = {} + self.bitcoind = bitcoind + self.request_count = 0 + + def _handle_request(self, r): + brpc = BitcoinProxy(btc_conf_file=self.bitcoind.conf_file) + method = r['method'] + + # If we have set a mock for this method reply with that instead of + # forwarding the request. + if method in self.mocks and type(self.mocks[method]) == dict: + ret = {} + ret['id'] = r['id'] + ret['error'] = None + ret['result'] = self.mocks[method] + self.mock_counts[method] += 1 + return ret + elif method in self.mocks and callable(self.mocks[method]): + self.mock_counts[method] += 1 + return self.mocks[method](r) + + try: + reply = { + "result": brpc._call(r['method'], *r['params']), + "error": None, + "id": r['id'] + } + except JSONRPCError as e: + reply = { + "error": e.error, + "code": -32603, + "id": r['id'] + } + self.request_count += 1 + return reply + + def proxy(self): + r = json.loads(request.data.decode('ASCII')) + + if isinstance(r, list): + reply = [self._handle_request(subreq) for subreq in r] + else: + reply = self._handle_request(r) + + response = flask.Response(json.dumps(reply, cls=DecimalEncoder)) + response.headers['Content-Type'] = 'application/json' + return response + + def start(self): + d = PathInfoDispatcher({'/': self.app}) + self.server = Server(('0.0.0.0', self.rpcport), d) + self.proxy_thread = threading.Thread(target=self.server.start) + self.proxy_thread.daemon = True + self.proxy_thread.start() + + # Now that bitcoind is running on the real rpcport, let's tell all + # future callers to talk to the proxyport. We use the bind_addr as a + # signal that the port is bound and accepting connections. + while self.server.bind_addr[1] == 0: + pass + self.rpcport = self.server.bind_addr[1] + logging.debug("BitcoinRpcProxy proxying incoming port {} to {}".format(self.rpcport, self.bitcoind.rpcport)) + + def stop(self): + self.server.stop() + self.proxy_thread.join() + logging.debug("BitcoinRpcProxy shut down after processing {} requests".format(self.request_count)) + + def mock_rpc(self, method, response=None): + """Mock the response to a future RPC call of @method + + The response can either be a dict with the full JSON-RPC response, or a + function that returns such a response. If the response is None the mock + is removed and future calls will be passed through to bitcoind again. + + """ + if response is not None: + self.mocks[method] = response + self.mock_counts[method] = 0 + elif method in self.mocks: + del self.mocks[method] diff --git a/contrib/pyln-testing/pyln/testing/db.py b/contrib/pyln-testing/pyln/testing/db.py new file mode 100644 index 000000000..bb153221d --- /dev/null +++ b/contrib/pyln-testing/pyln/testing/db.py @@ -0,0 +1,197 @@ +from ephemeral_port_reserve import reserve +from glob import glob + +import logging +import os +import psycopg2 +import random +import re +import shutil +import signal +import sqlite3 +import string +import subprocess +import time + + +class Sqlite3Db(object): + def __init__(self, path): + self.path = path + + def get_dsn(self): + """SQLite3 doesn't provide a DSN, resulting in no CLI-option. + """ + return None + + def query(self, query): + orig = os.path.join(self.path) + copy = self.path + ".copy" + shutil.copyfile(orig, copy) + db = sqlite3.connect(copy) + + db.row_factory = sqlite3.Row + c = db.cursor() + c.execute(query) + rows = c.fetchall() + + result = [] + for row in rows: + result.append(dict(zip(row.keys(), row))) + + db.commit() + c.close() + db.close() + return result + + def execute(self, query): + db = sqlite3.connect(self.path) + c = db.cursor() + c.execute(query) + db.commit() + c.close() + db.close() + + +class PostgresDb(object): + def __init__(self, dbname, port): + self.dbname = dbname + self.port = port + + self.conn = psycopg2.connect("dbname={dbname} user=postgres host=localhost port={port}".format( + dbname=dbname, port=port + )) + cur = self.conn.cursor() + cur.execute('SELECT 1') + cur.close() + + def get_dsn(self): + return "postgres://postgres:password@localhost:{port}/{dbname}".format( + port=self.port, dbname=self.dbname + ) + + def query(self, query): + cur = self.conn.cursor() + cur.execute(query) + + # Collect the results into a list of dicts. + res = [] + for r in cur: + t = {} + # Zip the column definition with the value to get its name. + for c, v in zip(cur.description, r): + t[c.name] = v + res.append(t) + cur.close() + return res + + def execute(self, query): + with self.conn, self.conn.cursor() as cur: + cur.execute(query) + + +class SqliteDbProvider(object): + def __init__(self, directory): + self.directory = directory + + def start(self): + pass + + def get_db(self, node_directory, testname, node_id): + path = os.path.join( + node_directory, + 'lightningd.sqlite3' + ) + return Sqlite3Db(path) + + def stop(self): + pass + + +class PostgresDbProvider(object): + def __init__(self, directory): + self.directory = directory + self.port = None + self.proc = None + print("Starting PostgresDbProvider") + + def locate_path(self): + prefix = '/usr/lib/postgresql/*' + matches = glob(prefix) + + candidates = {} + for m in matches: + g = re.search(r'([0-9]+[\.0-9]*)', m) + if not g: + continue + candidates[float(g.group(1))] = m + + if len(candidates) == 0: + raise ValueError("Could not find `postgres` and `initdb` binaries in {}. Is postgresql installed?".format(prefix)) + + # Now iterate in reverse order through matches + for k, v in sorted(candidates.items())[::-1]: + initdb = os.path.join(v, 'bin', 'initdb') + postgres = os.path.join(v, 'bin', 'postgres') + if os.path.isfile(initdb) and os.path.isfile(postgres): + logging.info("Found `postgres` and `initdb` in {}".format(os.path.join(v, 'bin'))) + return initdb, postgres + + raise ValueError("Could not find `postgres` and `initdb` in any of the possible paths: {}".format(candidates.values())) + + def start(self): + passfile = os.path.join(self.directory, "pgpass.txt") + self.pgdir = os.path.join(self.directory, 'pgsql') + # Need to write a tiny file containing the password so `initdb` can pick it up + with open(passfile, 'w') as f: + f.write('cltest\n') + + initdb, postgres = self.locate_path() + subprocess.check_call([ + initdb, + '--pwfile={}'.format(passfile), + '--pgdata={}'.format(self.pgdir), + '--auth=trust', + '--username=postgres', + ]) + self.port = reserve() + self.proc = subprocess.Popen([ + postgres, + '-k', '/tmp/', # So we don't use /var/lib/... + '-D', self.pgdir, + '-p', str(self.port), + '-F', + '-i', + ]) + # Hacky but seems to work ok (might want to make the postgres proc a TailableProc as well if too flaky). + time.sleep(1) + self.conn = psycopg2.connect("dbname=template1 user=postgres host=localhost port={}".format(self.port)) + + # Required for CREATE DATABASE to work + self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + + def get_db(self, node_directory, testname, node_id): + # Random suffix to avoid collisions on repeated tests + nonce = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8)) + dbname = "{}_{}_{}".format(testname, node_id, nonce) + + cur = self.conn.cursor() + cur.execute("CREATE DATABASE {};".format(dbname)) + cur.close() + db = PostgresDb(dbname, self.port) + return db + + def stop(self): + # Send fast shutdown signal see [1] for details: + # + # SIGINT + # + # This is the Fast Shutdown mode. The server disallows new connections + # and sends all existing server processes SIGTERM, which will cause + # them to abort their current transactions and exit promptly. It then + # waits for all server processes to exit and finally shuts down. If + # the server is in online backup mode, backup mode will be terminated, + # rendering the backup useless. + # + # [1] https://www.postgresql.org/docs/9.1/server-shutdown.html + self.proc.send_signal(signal.SIGINT) + self.proc.wait() diff --git a/contrib/pyln-testing/pyln/testing/fixtures.py b/contrib/pyln-testing/pyln/testing/fixtures.py new file mode 100644 index 000000000..57932515b --- /dev/null +++ b/contrib/pyln-testing/pyln/testing/fixtures.py @@ -0,0 +1,321 @@ +from concurrent import futures +from pyln.testing.db import SqliteDbProvider, PostgresDbProvider +from pyln.testing.utils import NodeFactory, BitcoinD, ElementsD, env, DEVELOPER + +import logging +import os +import pytest +import re +import shutil +import tempfile + + +# A dict in which we count how often a particular test has run so far. Used to +# give each attempt its own numbered directory, and avoid clashes. +__attempts = {} + + +@pytest.fixture(scope="session") +def test_base_dir(): + d = os.getenv("TEST_DIR", "/tmp") + + directory = tempfile.mkdtemp(prefix='ltests-', dir=d) + print("Running tests in {}".format(directory)) + + yield directory + + if os.listdir(directory) == []: + shutil.rmtree(directory) + + +@pytest.fixture +def directory(request, test_base_dir, test_name): + """Return a per-test specific directory. + + This makes a unique test-directory even if a test is rerun multiple times. + + """ + global __attempts + # Auto set value if it isn't in the dict yet + __attempts[test_name] = __attempts.get(test_name, 0) + 1 + directory = os.path.join(test_base_dir, "{}_{}".format(test_name, __attempts[test_name])) + request.node.has_errors = False + + yield directory + + # This uses the status set in conftest.pytest_runtest_makereport to + # determine whether we succeeded or failed. Outcome can be None if the + # failure occurs during the setup phase, hence the use to getattr instead + # of accessing it directly. + outcome = getattr(request.node, 'rep_call', None).outcome + failed = not outcome or request.node.has_errors or outcome != 'passed' + + if not failed: + shutil.rmtree(directory) + else: + logging.debug("Test execution failed, leaving the test directory {} intact.".format(directory)) + + +@pytest.fixture +def test_name(request): + yield request.function.__name__ + + +network_daemons = { + 'regtest': BitcoinD, + 'liquid-regtest': ElementsD, +} + + +@pytest.fixture +def bitcoind(directory, teardown_checks): + chaind = network_daemons[env('TEST_NETWORK', 'regtest')] + bitcoind = chaind(bitcoin_dir=directory) + + try: + bitcoind.start() + except Exception: + bitcoind.stop() + raise + + info = bitcoind.rpc.getnetworkinfo() + + if info['version'] < 160000: + bitcoind.rpc.stop() + raise ValueError("bitcoind is too old. At least version 16000 (v0.16.0)" + " is needed, current version is {}".format(info['version'])) + + info = bitcoind.rpc.getblockchaininfo() + # Make sure we have some spendable funds + if info['blocks'] < 101: + bitcoind.generate_block(101 - info['blocks']) + elif bitcoind.rpc.getwalletinfo()['balance'] < 1: + logging.debug("Insufficient balance, generating 1 block") + bitcoind.generate_block(1) + + yield bitcoind + + try: + bitcoind.stop() + except Exception: + bitcoind.proc.kill() + bitcoind.proc.wait() + + +class TeardownErrors(object): + def __init__(self): + self.errors = [] + self.node_errors = [] + + def add_error(self, msg): + self.errors.append(msg) + + def add_node_error(self, node, msg): + self.node_errors.append((node.daemon.prefix, msg)) + + def __str__(self): + node_errors = [" - {}: {}".format(*e) for e in self.node_errors] + errors = [" - {}".format(e) for e in self.errors] + + errors = ["\nNode errors:"] + node_errors + ["Global errors:"] + errors + return "\n".join(errors) + + def has_errors(self): + return len(self.errors) > 0 or len(self.node_errors) > 0 + + +@pytest.fixture +def teardown_checks(request): + """A simple fixture to collect errors during teardown. + + We need to collect the errors and raise them as the very last step in the + fixture tree, otherwise some fixtures may not be cleaned up + correctly. Require this fixture in all other fixtures that need to either + cleanup before reporting an error or want to add an error that is to be + reported. + + """ + errors = TeardownErrors() + yield errors + + if errors.has_errors(): + # Format a nice list of everything that went wrong and raise an exception + request.node.has_errors = True + raise ValueError(str(errors)) + + +@pytest.fixture +def node_factory(request, directory, test_name, bitcoind, executor, db_provider, teardown_checks): + nf = NodeFactory( + test_name, + bitcoind, + executor, + directory=directory, + db_provider=db_provider, + ) + + yield nf + ok, errs = nf.killall([not n.may_fail for n in nf.nodes]) + + for e in errs: + teardown_checks.add_error(e) + + def map_node_error(nodes, f, msg): + for n in nodes: + if n and f(n): + teardown_checks.add_node_error(n, msg) + + map_node_error(nf.nodes, printValgrindErrors, "reported valgrind errors") + map_node_error(nf.nodes, printCrashLog, "had crash.log files") + map_node_error(nf.nodes, lambda n: not n.allow_broken_log and n.daemon.is_in_log(r'\*\*BROKEN\*\*'), "had BROKEN messages") + map_node_error(nf.nodes, checkReconnect, "had unexpected reconnections") + map_node_error(nf.nodes, checkBadGossip, "had bad gossip messages") + map_node_error(nf.nodes, lambda n: n.daemon.is_in_log('Bad reestablish'), "had bad reestablish") + map_node_error(nf.nodes, lambda n: n.daemon.is_in_log('bad hsm request'), "had bad hsm requests") + map_node_error(nf.nodes, lambda n: n.daemon.is_in_log(r'Accessing a null column'), "Accessing a null column") + map_node_error(nf.nodes, checkMemleak, "had memleak messages") + + if not ok: + teardown_checks.add_error("At least one lightning exited with unexpected non-zero return code") + + +def getValgrindErrors(node): + for error_file in os.listdir(node.daemon.lightning_dir): + if not re.fullmatch(r"valgrind-errors.\d+", error_file): + continue + with open(os.path.join(node.daemon.lightning_dir, error_file), 'r') as f: + errors = f.read().strip() + if errors: + return errors, error_file + return None, None + + +def printValgrindErrors(node): + errors, fname = getValgrindErrors(node) + if errors: + print("-" * 31, "Valgrind errors", "-" * 32) + print("Valgrind error file:", fname) + print(errors) + print("-" * 80) + return 1 if errors else 0 + + +def getCrashLog(node): + if node.may_fail: + return None, None + try: + crashlog = os.path.join(node.daemon.lightning_dir, 'crash.log') + with open(crashlog, 'r') as f: + return f.readlines(), crashlog + except Exception: + return None, None + + +def printCrashLog(node): + errors, fname = getCrashLog(node) + if errors: + print("-" * 10, "{} (last 50 lines)".format(fname), "-" * 10) + print("".join(errors[-50:])) + print("-" * 80) + return 1 if errors else 0 + + +def checkReconnect(node): + # Without DEVELOPER, we can't suppress reconnection. + if node.may_reconnect or not DEVELOPER: + return 0 + if node.daemon.is_in_log('Peer has reconnected'): + return 1 + return 0 + + +def checkBadGossip(node): + if node.allow_bad_gossip: + return 0 + # We can get bad gossip order from inside error msgs. + if node.daemon.is_in_log('Bad gossip order from (?!error)'): + # This can happen if a node sees a node_announce after a channel + # is deleted, however. + if node.daemon.is_in_log('Deleting channel'): + return 0 + return 1 + + # Other 'Bad' messages shouldn't happen. + if node.daemon.is_in_log(r'gossipd.*Bad (?!gossip order from error)'): + return 1 + return 0 + + +def checkBroken(node): + if node.allow_broken_log: + return 0 + # We can get bad gossip order from inside error msgs. + if node.daemon.is_in_log(r'\*\*BROKEN\*\*'): + return 1 + return 0 + + +def checkBadReestablish(node): + if node.daemon.is_in_log('Bad reestablish'): + return 1 + return 0 + + +def checkBadHSMRequest(node): + if node.daemon.is_in_log('bad hsm request'): + return 1 + return 0 + + +def checkMemleak(node): + if node.daemon.is_in_log('MEMLEAK:'): + return 1 + return 0 + + +# Mapping from TEST_DB_PROVIDER env variable to class to be used +providers = { + 'sqlite3': SqliteDbProvider, + 'postgres': PostgresDbProvider, +} + + +@pytest.fixture(scope="session") +def db_provider(test_base_dir): + provider = providers[os.getenv('TEST_DB_PROVIDER', 'sqlite3')](test_base_dir) + provider.start() + yield provider + provider.stop() + + +@pytest.fixture +def executor(teardown_checks): + ex = futures.ThreadPoolExecutor(max_workers=20) + yield ex + ex.shutdown(wait=False) + + +@pytest.fixture +def chainparams(): + chainparams = { + 'regtest': { + "bip173_prefix": "bcrt", + "elements": False, + "name": "regtest", + "p2sh_prefix": '2', + "elements": False, + "example_addr": "bcrt1qeyyk6sl5pr49ycpqyckvmttus5ttj25pd0zpvg", + "feeoutput": False, + }, + 'liquid-regtest': { + "bip173_prefix": "ert", + "elements": True, + "name": "liquid-regtest", + "p2sh_prefix": 'X', + "elements": True, + "example_addr": "ert1qq8adjz4u6enf0cjey9j8yt0y490tact9fahkwf", + "feeoutput": True, + } + } + + return chainparams[env('TEST_NETWORK', 'regtest')] diff --git a/contrib/pyln-testing/pyln/testing/utils.py b/contrib/pyln-testing/pyln/testing/utils.py new file mode 100644 index 000000000..6fc5c4144 --- /dev/null +++ b/contrib/pyln-testing/pyln/testing/utils.py @@ -0,0 +1,1083 @@ +from bitcoin.rpc import RawProxy as BitcoinProxy +from pyln.testing.btcproxy import BitcoinRpcProxy +from collections import OrderedDict +from decimal import Decimal +from ephemeral_port_reserve import reserve +from pyln.client import LightningRpc + +import json +import logging +import lzma +import math +import os +import random +import re +import shutil +import sqlite3 +import string +import struct +import subprocess +import threading +import time + +BITCOIND_CONFIG = { + "regtest": 1, + "rpcuser": "rpcuser", + "rpcpassword": "rpcpass", +} + + +LIGHTNINGD_CONFIG = OrderedDict({ + "log-level": "debug", + "cltv-delta": 6, + "cltv-final": 5, + "watchtime-blocks": 5, + "rescan": 1, + 'disable-dns': None, +}) + +with open('config.vars') as configfile: + config = dict([(line.rstrip().split('=', 1)) for line in configfile]) + +DEVELOPER = os.getenv("DEVELOPER", config['DEVELOPER']) == "1" +EXPERIMENTAL_FEATURES = os.getenv("EXPERIMENTAL_FEATURES", config['EXPERIMENTAL_FEATURES']) == "1" + +# Gossip can be slow without DEVELOPER. +if DEVELOPER: + DEFAULT_TIMEOUT = 60 +else: + DEFAULT_TIMEOUT = 180 + +TIMEOUT = int(os.getenv("TIMEOUT", str(DEFAULT_TIMEOUT))) +VALGRIND = os.getenv("VALGRIND", config['VALGRIND']) == "1" +SLOW_MACHINE = os.getenv("SLOW_MACHINE", "0") == "1" +COMPAT = os.getenv("COMPAT", config['COMPAT']) == "1" + + +def wait_for(success, timeout=TIMEOUT): + start_time = time.time() + interval = 0.25 + while not success() and time.time() < start_time + timeout: + time.sleep(interval) + interval *= 2 + if interval > 5: + interval = 5 + if time.time() > start_time + timeout: + raise ValueError("Error waiting for {}", success) + + +def write_config(filename, opts, regtest_opts=None, section_name='regtest'): + with open(filename, 'w') as f: + for k, v in opts.items(): + f.write("{}={}\n".format(k, v)) + if regtest_opts: + f.write("[{}]\n".format(section_name)) + for k, v in regtest_opts.items(): + f.write("{}={}\n".format(k, v)) + + +def only_one(arr): + """Many JSON RPC calls return an array; often we only expect a single entry + """ + assert len(arr) == 1 + return arr[0] + + +def sync_blockheight(bitcoind, nodes): + height = bitcoind.rpc.getblockchaininfo()['blocks'] + for n in nodes: + wait_for(lambda: n.rpc.getinfo()['blockheight'] == height) + + +def wait_channel_quiescent(n1, n2): + wait_for(lambda: only_one(only_one(n1.rpc.listpeers(n2.info['id'])['peers'])['channels'])['htlcs'] == []) + wait_for(lambda: only_one(only_one(n2.rpc.listpeers(n1.info['id'])['peers'])['channels'])['htlcs'] == []) + + +def get_tx_p2wsh_outnum(bitcoind, tx, amount): + """Get output number of this tx which is p2wsh of amount""" + decoded = bitcoind.rpc.decoderawtransaction(tx, True) + + for out in decoded['vout']: + if out['scriptPubKey']['type'] == 'witness_v0_scripthash': + if out['value'] == Decimal(amount) / 10**8: + return out['n'] + + return None + + +class TailableProc(object): + """A monitorable process that we can start, stop and tail. + + This is the base class for the daemons. It allows us to directly + tail the processes and react to their output. + """ + + def __init__(self, outputDir=None, verbose=True): + self.logs = [] + self.logs_cond = threading.Condition(threading.RLock()) + self.env = os.environ.copy() + self.running = False + self.proc = None + self.outputDir = outputDir + self.logsearch_start = 0 + + # Should we be logging lines we read from stdout? + self.verbose = verbose + + # A filter function that'll tell us whether to filter out the line (not + # pass it to the log matcher and not print it to stdout). + self.log_filter = lambda line: False + + def start(self, stdin=None, stdout=None, stderr=None): + """Start the underlying process and start monitoring it. + """ + logging.debug("Starting '%s'", " ".join(self.cmd_line)) + self.proc = subprocess.Popen(self.cmd_line, + stdin=stdin, + stdout=stdout if stdout else subprocess.PIPE, + stderr=stderr, + env=self.env) + self.thread = threading.Thread(target=self.tail) + self.thread.daemon = True + self.thread.start() + self.running = True + + def save_log(self): + if self.outputDir: + logpath = os.path.join(self.outputDir, 'log') + with open(logpath, 'w') as f: + for l in self.logs: + f.write(l + '\n') + + def stop(self, timeout=10): + self.save_log() + self.proc.terminate() + + # Now give it some time to react to the signal + rc = self.proc.wait(timeout) + + if rc is None: + self.proc.kill() + + self.proc.wait() + self.thread.join() + + return self.proc.returncode + + def kill(self): + """Kill process without giving it warning.""" + self.proc.kill() + self.proc.wait() + self.thread.join() + + def tail(self): + """Tail the stdout of the process and remember it. + + Stores the lines of output produced by the process in + self.logs and signals that a new line was read so that it can + be picked up by consumers. + """ + for line in iter(self.proc.stdout.readline, ''): + if len(line) == 0: + break + if self.log_filter(line.decode('ASCII')): + continue + if self.verbose: + logging.debug("%s: %s", self.prefix, line.decode().rstrip()) + with self.logs_cond: + self.logs.append(str(line.rstrip())) + self.logs_cond.notifyAll() + self.running = False + self.proc.stdout.close() + if self.proc.stderr: + self.proc.stderr.close() + + def is_in_log(self, regex, start=0): + """Look for `regex` in the logs.""" + + ex = re.compile(regex) + for l in self.logs[start:]: + if ex.search(l): + logging.debug("Found '%s' in logs", regex) + return l + + logging.debug("Did not find '%s' in logs", regex) + return None + + def wait_for_logs(self, regexs, timeout=TIMEOUT): + """Look for `regexs` in the logs. + + We tail the stdout of the process and look for each regex in `regexs`, + starting from last of the previous waited-for log entries (if any). We + fail if the timeout is exceeded or if the underlying process + exits before all the `regexs` were found. + + If timeout is None, no time-out is applied. + """ + logging.debug("Waiting for {} in the logs".format(regexs)) + exs = [re.compile(r) for r in regexs] + start_time = time.time() + pos = self.logsearch_start + while True: + if timeout is not None and time.time() > start_time + timeout: + print("Time-out: can't find {} in logs".format(exs)) + for r in exs: + if self.is_in_log(r): + print("({} was previously in logs!)".format(r)) + raise TimeoutError('Unable to find "{}" in logs.'.format(exs)) + elif not self.running: + raise ValueError('Process died while waiting for logs') + + with self.logs_cond: + if pos >= len(self.logs): + self.logs_cond.wait(1) + continue + + for r in exs.copy(): + self.logsearch_start = pos + 1 + if r.search(self.logs[pos]): + logging.debug("Found '%s' in logs", r) + exs.remove(r) + break + if len(exs) == 0: + return self.logs[pos] + pos += 1 + + def wait_for_log(self, regex, timeout=TIMEOUT): + """Look for `regex` in the logs. + + Convenience wrapper for the common case of only seeking a single entry. + """ + return self.wait_for_logs([regex], timeout) + + +class SimpleBitcoinProxy: + """Wrapper for BitcoinProxy to reconnect. + + Long wait times between calls to the Bitcoin RPC could result in + `bitcoind` closing the connection, so here we just create + throwaway connections. This is easier than to reach into the RPC + library to close, reopen and reauth upon failure. + """ + def __init__(self, btc_conf_file, *args, **kwargs): + self.__btc_conf_file__ = btc_conf_file + + def __getattr__(self, name): + if name.startswith('__') and name.endswith('__'): + # Python internal stuff + raise AttributeError + + # Create a callable to do the actual call + proxy = BitcoinProxy(btc_conf_file=self.__btc_conf_file__) + + def f(*args): + return proxy._call(name, *args) + + # Make debuggers show rather than > + f.__name__ = name + return f + + +class BitcoinD(TailableProc): + + def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): + TailableProc.__init__(self, bitcoin_dir, verbose=False) + + if rpcport is None: + rpcport = reserve() + + self.bitcoin_dir = bitcoin_dir + self.rpcport = rpcport + self.prefix = 'bitcoind' + + regtestdir = os.path.join(bitcoin_dir, 'regtest') + if not os.path.exists(regtestdir): + os.makedirs(regtestdir) + + self.cmd_line = [ + 'bitcoind', + '-datadir={}'.format(bitcoin_dir), + '-printtoconsole', + '-server', + '-logtimestamps', + '-nolisten', + '-txindex', + '-addresstype=bech32' + ] + # For up to and including 0.16.1, this needs to be in main section. + BITCOIND_CONFIG['rpcport'] = rpcport + # For after 0.16.1 (eg. 3f398d7a17f136cd4a67998406ca41a124ae2966), this + # needs its own [regtest] section. + BITCOIND_REGTEST = {'rpcport': rpcport} + self.conf_file = os.path.join(bitcoin_dir, 'bitcoin.conf') + write_config(self.conf_file, BITCOIND_CONFIG, BITCOIND_REGTEST) + self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file) + self.proxies = [] + + def start(self): + TailableProc.start(self) + self.wait_for_log("Done loading", timeout=TIMEOUT) + + logging.info("BitcoinD started") + + def stop(self): + for p in self.proxies: + p.stop() + self.rpc.stop() + return TailableProc.stop(self) + + def get_proxy(self): + proxy = BitcoinRpcProxy(self) + self.proxies.append(proxy) + proxy.start() + return proxy + + # wait_for_mempool can be used to wait for the mempool before generating blocks: + # True := wait for at least 1 transation + # int > 0 := wait for at least N transactions + # 'tx_id' := wait for one transaction id given as a string + # ['tx_id1', 'tx_id2'] := wait until all of the specified transaction IDs + def generate_block(self, numblocks=1, wait_for_mempool=0): + if wait_for_mempool: + if isinstance(wait_for_mempool, str): + wait_for_mempool = [wait_for_mempool] + if isinstance(wait_for_mempool, list): + wait_for(lambda: all(txid in self.rpc.getrawmempool() for txid in wait_for_mempool)) + else: + wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool) + # As of 0.16, generate() is removed; use generatetoaddress. + return self.rpc.generatetoaddress(numblocks, self.rpc.getnewaddress()) + + def simple_reorg(self, height, shift=0): + """ + Reorganize chain by creating a fork at height=[height] and re-mine all mempool + transactions into [height + shift], where shift >= 0. Returns hashes of generated + blocks. + + Note that tx's that become invalid at [height] (because coin maturity, locktime + etc.) are removed from mempool. The length of the new chain will be original + 1 + OR original + [shift], whichever is larger. + + For example: to push tx's backward from height h1 to h2 < h1, use [height]=h2. + + Or to change the txindex of tx's at height h1: + 1. A block at height h2 < h1 should contain a non-coinbase tx that can be pulled + forward to h1. + 2. Set [height]=h2 and [shift]= h1-h2 + """ + hashes = [] + fee_delta = 1000000 + orig_len = self.rpc.getblockcount() + old_hash = self.rpc.getblockhash(height) + final_len = height + shift if height + shift > orig_len else 1 + orig_len + # TODO: raise error for insane args? + + self.rpc.invalidateblock(old_hash) + self.wait_for_log(r'InvalidChainFound: invalid block=.* height={}'.format(height)) + memp = self.rpc.getrawmempool() + + if shift == 0: + hashes += self.generate_block(1 + final_len - height) + else: + for txid in memp: + # lower priority (to effective feerate=0) so they are not mined + self.rpc.prioritisetransaction(txid, None, -fee_delta) + hashes += self.generate_block(shift) + + for txid in memp: + # restore priority so they are mined + self.rpc.prioritisetransaction(txid, None, fee_delta) + hashes += self.generate_block(1 + final_len - (height + shift)) + self.wait_for_log(r'UpdateTip: new best=.* height={}'.format(final_len)) + return hashes + + def getnewaddress(self): + return self.rpc.getnewaddress() + + +class ElementsD(BitcoinD): + def __init__(self, bitcoin_dir="/tmp/bitcoind-test", rpcport=None): + config = BITCOIND_CONFIG.copy() + if 'regtest' in config: + del config['regtest'] + + config['chain'] = 'liquid-regtest' + BitcoinD.__init__(self, bitcoin_dir, rpcport) + + self.cmd_line = [ + 'elementsd', + '-datadir={}'.format(bitcoin_dir), + '-printtoconsole', + '-server', + '-logtimestamps', + '-nolisten', + '-validatepegin=0', + '-con_blocksubsidy=5000000000', + ] + conf_file = os.path.join(bitcoin_dir, 'elements.conf') + config['rpcport'] = self.rpcport + BITCOIND_REGTEST = {'rpcport': self.rpcport} + write_config(conf_file, config, BITCOIND_REGTEST, section_name='liquid-regtest') + self.conf_file = conf_file + self.rpc = SimpleBitcoinProxy(btc_conf_file=self.conf_file) + self.prefix = 'elementsd' + + def generate_block(self, numblocks=1, wait_for_mempool=0): + if wait_for_mempool: + if isinstance(wait_for_mempool, str): + wait_for_mempool = [wait_for_mempool] + if isinstance(wait_for_mempool, list): + wait_for(lambda: all(txid in self.rpc.getrawmempool() for txid in wait_for_mempool)) + else: + wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool) + # As of 0.16, generate() is removed; use generatetoaddress. + return self.rpc.generate(numblocks) + + def getnewaddress(self): + """Need to get an address and then make it unconfidential + """ + addr = self.rpc.getnewaddress() + info = self.rpc.getaddressinfo(addr) + return info['unconfidential'] + + +class LightningD(TailableProc): + def __init__(self, lightning_dir, bitcoindproxy, port=9735, random_hsm=False, node_id=0): + TailableProc.__init__(self, lightning_dir) + self.executable = 'lightningd/lightningd' + self.lightning_dir = lightning_dir + self.port = port + self.cmd_prefix = [] + self.disconnect_file = None + + self.rpcproxy = bitcoindproxy + + self.opts = LIGHTNINGD_CONFIG.copy() + opts = { + 'lightning-dir': lightning_dir, + 'addr': '127.0.0.1:{}'.format(port), + 'allow-deprecated-apis': 'false', + 'network': config.get('TEST_NETWORK', 'regtest'), + 'ignore-fee-limits': 'false', + 'bitcoin-rpcuser': BITCOIND_CONFIG['rpcuser'], + 'bitcoin-rpcpassword': BITCOIND_CONFIG['rpcpassword'], + } + + for k, v in opts.items(): + self.opts[k] = v + + if not os.path.exists(lightning_dir): + os.makedirs(lightning_dir) + + # Last 32-bytes of final part of dir -> seed. + seed = (bytes(re.search('([^/]+)/*$', lightning_dir).group(1), encoding='utf-8') + bytes(32))[:32] + if not random_hsm: + with open(os.path.join(lightning_dir, 'hsm_secret'), 'wb') as f: + f.write(seed) + if DEVELOPER: + self.opts['dev-fast-gossip'] = None + self.opts['dev-bitcoind-poll'] = 1 + self.prefix = 'lightningd-%d' % (node_id) + + def cleanup(self): + # To force blackhole to exit, disconnect file must be truncated! + if self.disconnect_file: + with open(self.disconnect_file, "w") as f: + f.truncate() + + @property + def cmd_line(self): + + opts = [] + for k, v in self.opts.items(): + if v is None: + opts.append("--{}".format(k)) + elif isinstance(v, list): + for i in v: + opts.append("--{}={}".format(k, i)) + else: + opts.append("--{}={}".format(k, v)) + + return self.cmd_prefix + [self.executable] + opts + + def start(self, stdin=None, stdout=None, stderr=None, + wait_for_initialized=True): + self.opts['bitcoin-rpcport'] = self.rpcproxy.rpcport + TailableProc.start(self, stdin, stdout, stderr) + if wait_for_initialized: + self.wait_for_log("Server started with public key") + logging.info("LightningD started") + + def wait(self, timeout=10): + """Wait for the daemon to stop for up to timeout seconds + + Returns the returncode of the process, None if the process did + not return before the timeout triggers. + """ + self.proc.wait(timeout) + return self.proc.returncode + + +class LightningNode(object): + def __init__(self, daemon, rpc, btc, executor, may_fail=False, + may_reconnect=False, allow_broken_log=False, + allow_bad_gossip=False, db=None): + self.rpc = rpc + self.daemon = daemon + self.bitcoin = btc + self.executor = executor + self.may_fail = may_fail + self.may_reconnect = may_reconnect + self.allow_broken_log = allow_broken_log + self.allow_bad_gossip = allow_bad_gossip + self.db = db + + def connect(self, remote_node): + self.rpc.connect(remote_node.info['id'], '127.0.0.1', remote_node.daemon.port) + + def is_connected(self, remote_node): + return remote_node.info['id'] in [p['id'] for p in self.rpc.listpeers()['peers']] + + def openchannel(self, remote_node, capacity, addrtype="p2sh-segwit", confirm=True, wait_for_announce=True, connect=True): + addr, wallettxid = self.fundwallet(10 * capacity, addrtype) + + if connect and not self.is_connected(remote_node): + self.connect(remote_node) + + fundingtx = self.rpc.fundchannel(remote_node.info['id'], capacity) + + # Wait for the funding transaction to be in bitcoind's mempool + wait_for(lambda: fundingtx['txid'] in self.bitcoin.rpc.getrawmempool()) + + if confirm or wait_for_announce: + self.bitcoin.generate_block(1) + + if wait_for_announce: + self.bitcoin.generate_block(5) + + if confirm or wait_for_announce: + self.daemon.wait_for_log( + r'Funding tx {} depth'.format(fundingtx['txid'])) + return {'address': addr, 'wallettxid': wallettxid, 'fundingtx': fundingtx} + + def fundwallet(self, sats, addrtype="p2sh-segwit"): + addr = self.rpc.newaddr(addrtype)[addrtype] + txid = self.bitcoin.rpc.sendtoaddress(addr, sats / 10**8) + self.bitcoin.generate_block(1) + self.daemon.wait_for_log('Owning output .* txid {} CONFIRMED'.format(txid)) + return addr, txid + + def getactivechannels(self): + return [c for c in self.rpc.listchannels()['channels'] if c['active']] + + def db_query(self, query): + return self.db.query(query) + + # Assumes node is stopped! + def db_manip(self, query): + db = sqlite3.connect(os.path.join(self.daemon.lightning_dir, "lightningd.sqlite3")) + db.row_factory = sqlite3.Row + c = db.cursor() + c.execute(query) + db.commit() + c.close() + db.close() + + def is_synced_with_bitcoin(self, info=None): + if info is None: + info = self.rpc.getinfo() + return 'warning_bitcoind_sync' not in info and 'warning_lightningd_sync' not in info + + def start(self, wait_for_bitcoind_sync=True): + self.daemon.start() + # Cache `getinfo`, we'll be using it a lot + self.info = self.rpc.getinfo() + # This shortcut is sufficient for our simple tests. + self.port = self.info['binding'][0]['port'] + if wait_for_bitcoind_sync and not self.is_synced_with_bitcoin(self.info): + wait_for(lambda: self.is_synced_with_bitcoin()) + + def stop(self, timeout=10): + """ Attempt to do a clean shutdown, but kill if it hangs + """ + + # Tell the daemon to stop + try: + # May fail if the process already died + self.rpc.stop() + except Exception: + pass + + rc = self.daemon.wait(timeout) + + # If it did not stop be more insistent + if rc is None: + rc = self.daemon.stop() + + self.daemon.save_log() + self.daemon.cleanup() + + if rc != 0 and not self.may_fail: + raise ValueError("Node did not exit cleanly, rc={}".format(rc)) + else: + return rc + + def restart(self, timeout=10, clean=True): + """Stop and restart the lightning node. + + Keyword arguments: + timeout: number of seconds to wait for a shutdown + clean: whether to issue a `stop` RPC command before killing + """ + if clean: + self.stop(timeout) + else: + self.daemon.stop() + + self.start() + + def fund_channel(self, l2, amount, wait_for_active=True): + + # Give yourself some funds to work with + addr = self.rpc.newaddr()['bech32'] + self.bitcoin.rpc.sendtoaddress(addr, (amount + 1000000) / 10**8) + numfunds = len(self.rpc.listfunds()['outputs']) + self.bitcoin.generate_block(1) + wait_for(lambda: len(self.rpc.listfunds()['outputs']) > numfunds) + + # Now go ahead and open a channel + num_tx = len(self.bitcoin.rpc.getrawmempool()) + tx = self.rpc.fundchannel(l2.info['id'], amount)['tx'] + + wait_for(lambda: len(self.bitcoin.rpc.getrawmempool()) == num_tx + 1) + self.bitcoin.generate_block(1) + + # Hacky way to find our output. + scid = "{}x1x{}".format(self.bitcoin.rpc.getblockcount(), + get_tx_p2wsh_outnum(self.bitcoin, tx, amount)) + + if wait_for_active: + # We wait until gossipd sees both local updates, as well as status NORMAL, + # so it can definitely route through. + self.daemon.wait_for_logs([r'update for channel {}/0 now ACTIVE' + .format(scid), + r'update for channel {}/1 now ACTIVE' + .format(scid), + 'to CHANNELD_NORMAL']) + l2.daemon.wait_for_logs([r'update for channel {}/0 now ACTIVE' + .format(scid), + r'update for channel {}/1 now ACTIVE' + .format(scid), + 'to CHANNELD_NORMAL']) + return scid + + def subd_pid(self, subd): + """Get the process id of the given subdaemon, eg channeld or gossipd""" + ex = re.compile(r'lightning_{}.*: pid ([0-9]*),'.format(subd)) + # Make sure we get latest one if it's restarted! + for l in reversed(self.daemon.logs): + group = ex.search(l) + if group: + return group.group(1) + raise ValueError("No daemon {} found".format(subd)) + + def channel_state(self, other): + """Return the state of the channel to the other node. + + Returns None if there is no such peer, or a channel hasn't been funded + yet. + + """ + peers = self.rpc.listpeers(other.info['id'])['peers'] + if not peers or 'channels' not in peers[0]: + return None + channel = peers[0]['channels'][0] + return channel['state'] + + def get_channel_scid(self, other): + """Get the short_channel_id for the channel to the other node. + """ + peers = self.rpc.listpeers(other.info['id'])['peers'] + if not peers or 'channels' not in peers[0]: + return None + channel = peers[0]['channels'][0] + return channel['short_channel_id'] + + def is_channel_active(self, chanid): + channels = self.rpc.listchannels()['channels'] + active = [(c['short_channel_id'], c['channel_flags']) for c in channels if c['active']] + return (chanid, 0) in active and (chanid, 1) in active + + def wait_for_channel_onchain(self, peerid): + txid = only_one(only_one(self.rpc.listpeers(peerid)['peers'])['channels'])['scratch_txid'] + wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool()) + + def wait_channel_active(self, chanid): + wait_for(lambda: self.is_channel_active(chanid)) + + # This waits until gossipd sees channel_update in both directions + # (or for local channels, at least a local announcement) + def wait_for_channel_updates(self, scids): + # Could happen in any order... + self.daemon.wait_for_logs(['Received channel_update for channel {}/0'.format(c) + for c in scids] + + ['Received channel_update for channel {}/1'.format(c) + for c in scids]) + + def wait_for_route(self, destination, timeout=30): + """ Wait for a route to the destination to become available. + """ + start_time = time.time() + while time.time() < start_time + timeout: + try: + self.rpc.getroute(destination.info['id'], 1, 1) + return True + except Exception: + time.sleep(1) + if time.time() > start_time + timeout: + raise ValueError("Error waiting for a route to destination {}".format(destination)) + + def pay(self, dst, amt, label=None): + if not label: + label = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(20)) + + rhash = dst.rpc.invoice(amt, label, label)['payment_hash'] + invoices = dst.rpc.listinvoices(label)['invoices'] + assert len(invoices) == 1 and invoices[0]['status'] == 'unpaid' + + routestep = { + 'msatoshi': amt, + 'id': dst.info['id'], + 'delay': 5, + 'channel': '1x1x1' + } + + def wait_pay(): + # Up to 10 seconds for payment to succeed. + start_time = time.time() + while dst.rpc.listinvoices(label)['invoices'][0]['status'] != 'paid': + if time.time() > start_time + 10: + raise TimeoutError('Payment timed out') + time.sleep(0.1) + # sendpay is async now + self.rpc.sendpay([routestep], rhash) + # wait for sendpay to comply + self.rpc.waitsendpay(rhash) + + # Note: this feeds through the smoother in update_feerate, so changing + # it on a running daemon may not give expected result! + def set_feerates(self, feerates, wait_for_effect=True): + # (bitcoind returns bitcoin per kb, so these are * 4) + + def mock_estimatesmartfee(r): + params = r['params'] + if params == [2, 'CONSERVATIVE']: + feerate = feerates[0] * 4 + elif params == [4, 'ECONOMICAL']: + feerate = feerates[1] * 4 + elif params == [100, 'ECONOMICAL']: + feerate = feerates[2] * 4 + else: + raise ValueError() + return { + 'id': r['id'], + 'error': None, + 'result': { + 'feerate': Decimal(feerate) / 10**8 + }, + } + self.daemon.rpcproxy.mock_rpc('estimatesmartfee', mock_estimatesmartfee) + + # Technically, this waits until it's called, not until it's processed. + # We wait until all three levels have been called. + if wait_for_effect: + wait_for(lambda: self.daemon.rpcproxy.mock_counts['estimatesmartfee'] >= 3) + + def wait_for_onchaind_broadcast(self, name, resolve=None): + """Wait for onchaind to drop tx name to resolve (if any)""" + if resolve: + r = self.daemon.wait_for_log('Broadcasting {} .* to resolve {}' + .format(name, resolve)) + else: + r = self.daemon.wait_for_log('Broadcasting {} .* to resolve ' + .format(name)) + + rawtx = re.search(r'.* \(([0-9a-fA-F]*)\) ', r).group(1) + txid = self.bitcoin.rpc.decoderawtransaction(rawtx, True)['txid'] + + wait_for(lambda: txid in self.bitcoin.rpc.getrawmempool()) + + def query_gossip(self, querytype, *args, filters=[]): + """Generate a gossip query, feed it into this node and get responses + in hex""" + query = subprocess.run(['devtools/mkquery', + querytype] + [str(a) for a in args], + check=True, + timeout=TIMEOUT, + stdout=subprocess.PIPE).stdout.strip() + out = subprocess.run(['devtools/gossipwith', + '--timeout-after={}'.format(int(math.sqrt(TIMEOUT) * 1000)), + '{}@localhost:{}'.format(self.info['id'], + self.port), + query], + check=True, + timeout=TIMEOUT, stdout=subprocess.PIPE).stdout + + def passes_filters(hmsg, filters): + for f in filters: + if hmsg.startswith(f): + return False + return True + + msgs = [] + while len(out): + length = struct.unpack('>H', out[0:2])[0] + hmsg = out[2:2 + length].hex() + if passes_filters(hmsg, filters): + msgs.append(out[2:2 + length].hex()) + out = out[2 + length:] + return msgs + + +class NodeFactory(object): + """A factory to setup and start `lightningd` daemons. + """ + def __init__(self, testname, bitcoind, executor, directory, db_provider): + self.testname = testname + self.next_id = 1 + self.nodes = [] + self.executor = executor + self.bitcoind = bitcoind + self.directory = directory + self.lock = threading.Lock() + self.db_provider = db_provider + + def split_options(self, opts): + """Split node options from cli options + + Some options are used to instrument the node wrapper and some are passed + to the daemon on the command line. Split them so we know where to use + them. + """ + node_opt_keys = [ + 'disconnect', + 'may_fail', + 'allow_broken_log', + 'may_reconnect', + 'random_hsm', + 'log_all_io', + 'feerates', + 'wait_for_bitcoind_sync', + 'allow_bad_gossip' + ] + node_opts = {k: v for k, v in opts.items() if k in node_opt_keys} + cli_opts = {k: v for k, v in opts.items() if k not in node_opt_keys} + return node_opts, cli_opts + + def get_next_port(self): + with self.lock: + return reserve() + + def get_node_id(self): + """Generate a unique numeric ID for a lightning node + """ + with self.lock: + node_id = self.next_id + self.next_id += 1 + return node_id + + def get_nodes(self, num_nodes, opts=None): + """Start a number of nodes in parallel, each with its own options + """ + if opts is None: + # No opts were passed in, give some dummy opts + opts = [{} for _ in range(num_nodes)] + elif isinstance(opts, dict): + # A single dict was passed in, so we use these opts for all nodes + opts = [opts] * num_nodes + + assert len(opts) == num_nodes + + jobs = [] + for i in range(num_nodes): + node_opts, cli_opts = self.split_options(opts[i]) + jobs.append(self.executor.submit( + self.get_node, options=cli_opts, + node_id=self.get_node_id(), **node_opts + )) + + return [j.result() for j in jobs] + + def get_node(self, disconnect=None, options=None, may_fail=False, + may_reconnect=False, random_hsm=False, + feerates=(15000, 7500, 3750), start=True, log_all_io=False, + dbfile=None, node_id=None, allow_broken_log=False, + wait_for_bitcoind_sync=True, allow_bad_gossip=False): + if not node_id: + node_id = self.get_node_id() + + port = self.get_next_port() + + lightning_dir = os.path.join( + self.directory, "lightning-{}/".format(node_id)) + + if os.path.exists(lightning_dir): + shutil.rmtree(lightning_dir) + + socket_path = os.path.join(lightning_dir, "lightning-rpc").format(node_id) + daemon = LightningD( + lightning_dir, bitcoindproxy=self.bitcoind.get_proxy(), + port=port, random_hsm=random_hsm, node_id=node_id + ) + # If we have a disconnect string, dump it to a file for daemon. + if disconnect: + daemon.disconnect_file = os.path.join(lightning_dir, "dev_disconnect") + with open(daemon.disconnect_file, "w") as f: + f.write("\n".join(disconnect)) + daemon.opts["dev-disconnect"] = "dev_disconnect" + if log_all_io: + assert DEVELOPER + daemon.env["LIGHTNINGD_DEV_LOG_IO"] = "1" + daemon.opts["log-level"] = "io" + if DEVELOPER: + daemon.opts["dev-fail-on-subdaemon-fail"] = None + daemon.env["LIGHTNINGD_DEV_MEMLEAK"] = "1" + if os.getenv("DEBUG_SUBD"): + daemon.opts["dev-debugger"] = os.getenv("DEBUG_SUBD") + if VALGRIND: + daemon.env["LIGHTNINGD_DEV_NO_BACKTRACE"] = "1" + if not may_reconnect: + daemon.opts["dev-no-reconnect"] = None + + if options is not None: + daemon.opts.update(options) + + # Get the DB backend DSN we should be using for this test and this node. + db = self.db_provider.get_db(lightning_dir, self.testname, node_id) + dsn = db.get_dsn() + if dsn is not None: + daemon.opts['wallet'] = dsn + + rpc = LightningRpc(socket_path, self.executor) + + node = LightningNode(daemon, rpc, self.bitcoind, self.executor, may_fail=may_fail, + may_reconnect=may_reconnect, allow_broken_log=allow_broken_log, + allow_bad_gossip=allow_bad_gossip, db=db) + + # Regtest estimatefee are unusable, so override. + node.set_feerates(feerates, False) + + self.nodes.append(node) + if VALGRIND: + node.daemon.cmd_prefix = [ + 'valgrind', + '-q', + '--trace-children=yes', + '--trace-children-skip=*python*,*bitcoin-cli*,*elements-cli*', + '--error-exitcode=7', + '--log-file={}/valgrind-errors.%p'.format(node.daemon.lightning_dir) + ] + + if dbfile: + out = open(os.path.join(node.daemon.lightning_dir, 'lightningd.sqlite3'), 'xb') + with lzma.open(os.path.join('tests/data', dbfile), 'rb') as f: + out.write(f.read()) + + if start: + try: + node.start(wait_for_bitcoind_sync) + except Exception: + node.daemon.stop() + raise + return node + + def line_graph(self, num_nodes, fundchannel=True, fundamount=10**6, wait_for_announce=False, opts=None, announce_channels=True): + """ Create nodes, connect them and optionally fund channels. + """ + assert not (wait_for_announce and not announce_channels), "You've asked to wait for an announcement that's not coming. (wait_for_announce=True,announce_channels=False)" + nodes = self.get_nodes(num_nodes, opts=opts) + bitcoin = nodes[0].bitcoin + connections = [(nodes[i], nodes[i + 1]) for i in range(0, num_nodes - 1)] + + for src, dst in connections: + src.rpc.connect(dst.info['id'], 'localhost', dst.port) + + # If we're returning now, make sure dst all show connections in + # getpeers. + if not fundchannel: + for src, dst in connections: + dst.daemon.wait_for_log('openingd-{} chan #[0-9]*: Handed peer, entering loop'.format(src.info['id'])) + return nodes + + # If we got here, we want to fund channels + for src, dst in connections: + addr = src.rpc.newaddr()['bech32'] + src.bitcoin.rpc.sendtoaddress(addr, (fundamount + 1000000) / 10**8) + + bitcoin.generate_block(1) + for src, dst in connections: + wait_for(lambda: len(src.rpc.listfunds()['outputs']) > 0) + tx = src.rpc.fundchannel(dst.info['id'], fundamount, announce=announce_channels) + wait_for(lambda: tx['txid'] in bitcoin.rpc.getrawmempool()) + + # Confirm all channels and wait for them to become usable + bitcoin.generate_block(1) + scids = [] + for src, dst in connections: + wait_for(lambda: src.channel_state(dst) == 'CHANNELD_NORMAL') + scid = src.get_channel_scid(dst) + src.daemon.wait_for_log(r'Received channel_update for channel {scid}/. now ACTIVE'.format(scid=scid)) + scids.append(scid) + + if not wait_for_announce: + return nodes + + bitcoin.generate_block(5) + + def both_dirs_ready(n, scid): + resp = n.rpc.listchannels(scid) + return [a['active'] for a in resp['channels']] == [True, True] + + # Make sure everyone sees all channels: we can cheat and + # simply check the ends (since it's a line). + wait_for(lambda: both_dirs_ready(nodes[0], scids[-1])) + wait_for(lambda: both_dirs_ready(nodes[-1], scids[0])) + + # Make sure we have all node announcements, too (just check ends) + for n in nodes: + for end in (nodes[0], nodes[-1]): + wait_for(lambda: 'alias' in only_one(end.rpc.listnodes(n.info['id'])['nodes'])) + + return nodes + + def killall(self, expected_successes): + """Returns true if every node we expected to succeed actually succeeded""" + unexpected_fail = False + err_msgs = [] + for i in range(len(self.nodes)): + leaks = None + # leak detection upsets VALGRIND by reading uninitialized mem. + # If it's dead, we'll catch it below. + if not VALGRIND: + try: + # This also puts leaks in log. + leaks = self.nodes[i].rpc.dev_memleak()['leaks'] + except Exception: + pass + + try: + self.nodes[i].stop() + except Exception: + if expected_successes[i]: + unexpected_fail = True + + if leaks is not None and len(leaks) != 0: + unexpected_fail = True + err_msgs.append("Node {} has memory leaks: {}".format( + self.nodes[i].daemon.lightning_dir, + json.dumps(leaks, sort_keys=True, indent=4) + )) + + return not unexpected_fail, err_msgs diff --git a/contrib/pyln-testing/requirements.txt b/contrib/pyln-testing/requirements.txt new file mode 100644 index 000000000..04e81630e --- /dev/null +++ b/contrib/pyln-testing/requirements.txt @@ -0,0 +1,5 @@ +pytest==5.0.1 +Flask==1.1.1 +cheroot==6.5.5 +ephemeral-port-reserve==1.1.1 +python-bitcoinlib==0.10.1 diff --git a/contrib/pyln-testing/setup.py b/contrib/pyln-testing/setup.py new file mode 100644 index 000000000..7ffd25166 --- /dev/null +++ b/contrib/pyln-testing/setup.py @@ -0,0 +1,22 @@ +from setuptools import setup +from pyln.testing import __version__ + + +with open('README.md', encoding='utf-8') as f: + long_description = f.read() + +with open('requirements.txt', 'r') as f: + requirements = [l.strip() for l in f] + +setup(name='pyln-testing', + version=__version__, + description='Library to facilitate writing tests for for lightningd', + long_description=long_description, + long_description_content_type='text/markdown', + url='http://github.com/ElementsProject/lightning', + author='Christian Decker', + author_email='decker.christian@gmail.com', + install_requires=requirements, + license='MIT', + packages=['pyln.testing'], + zip_safe=True)