Files
lightning/tools/reckless
Alex Myers 651c5b6de0 reckless: use config that was explicitly passed to lightningd
Regtest environments commonly use explicit definition of the config
file for lightningd.  This can be queried and utilized by default,
saving redundant definitions between lightning and reckless.
2022-11-08 13:19:36 +01:00

711 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
from subprocess import Popen, PIPE
import sys
import json
import os
import argparse
from pathlib import Path
import shutil
import tempfile
import requests
repos = ['https://github.com/lightningd/plugins']
def py_entry_guesses(name):
return [name, f'{name}.py', '__init__.py']
def unsupported_entry(name):
return [f'{name}.go', f'{name}.sh']
class InstInfo:
def __init__(self, name, url, git_url):
self.name = name
self.repo = url # Used for 'git clone'
self.git_url = git_url # API access for github repos
self.entry = None
self.deps = None
self.subdir = None
self.commit = None
def __repr__(self):
return f'<InstInfo object>\n'\
f'name: {self.name}\nrepo: {self.repo}\ngit: {self.git_url}'\
f'\nentry:{self.entry}\ndepency source:{self.deps}'
def get_inst_details(self):
"""
Populate installation details from a github repo url.
Return True if all data is found.
"""
r = requests.get(self.git_url)
if r.status_code != 200:
return False
if 'git/tree' in self.git_url:
tree = r.json()['tree']
else:
tree = r.json()
entry_guesses = py_entry_guesses(self.name)
for g in entry_guesses:
for f in tree:
if f['path'] == g:
self.entry = g
break
if self.entry is not None:
break
if self.entry is None:
for g in unsupported_entry(self.name):
for f in tree:
if f['path'] == g:
# FIXME: This should be easier to implement
print(f'entrypoint {g} is not yet supported')
return False
dependency_info = ['requirements.txt', 'pyproject.toml']
for d in dependency_info:
for f in tree:
if f['path'] == d:
self.deps = d
break
if self.deps is not None:
break
if not self.entry:
return False
if not self.deps:
return False
return True
def create_dir(r, directory):
"""Creation of a directory at path `d` with a maximum new dir depth `r`"""
if os.path.exists(directory):
return True
elif r <= 0:
return False
elif create_dir(r-1, os.path.split(directory)[0]):
os.mkdir(directory, 0o777)
print(f'created directory {directory}')
if os.path.exists(directory):
return True
def remove_dir(target):
try:
shutil.rmtree(target)
return True
except NotADirectoryError:
print(f"Tried to remove directory {target} that does not exist.")
except PermissionError:
print(f"Permission denied removing dir: {target}")
return False
class Config():
"""A generic class for procuring, reading and editing config files"""
def obtain_config(self, config_path, default_text, warn=False):
"""Return a config file from the desired location. Create one with
default_text if it cannot be found."""
if isinstance(config_path, type(None)):
raise Exception("Generic config must be passed a config_path.")
assert isinstance(config_path, str)
# FIXME: warn if reckless dir exists, but conf not found
if os.path.exists(config_path):
with open(config_path, 'r+') as f:
config_content = f.readlines()
return config_content
print(f'config file not found: {config_path}')
if warn:
confirm = input('press [Y] to create one now.\n').upper() == 'Y'
else:
confirm = True
if not confirm:
sys.exit(1)
parent_path = os.path.split(config_path)[0]
# Create up to one parent in the directory tree.
if create_dir(1, parent_path):
with open(self.conf_fp, 'w') as f:
f.write(default_text)
# FIXME: Handle write failure
return default_text
else:
print('could not create the parent directory')
return None
def editConfigFile(self, addline, removeline):
remove_these_lines = []
with open(self.conf_fp, 'r') as reckless_conf:
original = reckless_conf.readlines()
empty_lines = []
for n, l in enumerate(original):
if l.strip() == removeline:
remove_these_lines.append(n)
continue
if l.strip() == '':
empty_lines.append(n)
if n-1 in empty_lines:
# The white space is getting excessive.
remove_these_lines.append(n)
continue
with open(self.conf_fp, 'w') as conf_write:
# no need to write if passed 'None'
line_exists = not bool(addline)
for n, l in enumerate(original):
if n not in remove_these_lines:
if n > 0:
conf_write.write(f'\n{l.strip()}')
else:
conf_write.write(l.strip())
if addline == l:
# addline is idempotent
line_exists = True
if not line_exists:
conf_write.write(f'\n{addline}')
def __init__(self, path=None, default_text=None, warn=False):
assert path is not None
assert default_text is not None
self.conf_fp = path
self.content = self.obtain_config(self.conf_fp, default_text,
warn=warn)
class RecklessConfig(Config):
"""Reckless config (by default, specific to the bitcoin network only.)
This is inherited by the lightningd config and contains all reckless
maintained plugins."""
def enable_plugin(self, plugin_path):
"""Handle persistent plugin loading via config"""
self.editConfigFile(f'plugin={plugin_path}',
f'disable-plugin={plugin_path}')
def disable_plugin(self, plugin_path):
"""Handle persistent plugin disabling via config"""
self.editConfigFile(f'disable-plugin={plugin_path}',
f'plugin={plugin_path}')
def __init__(self, path=None, default_text=None):
if path is None:
path = os.path.join(LIGHTNING_DIR, 'reckless',
'bitcoin-reckless.conf')
if default_text is None:
default_text = '# This configuration file is managed by reckles' +\
's to activate and disable\n# reckless-installed' +\
' plugins\n\n'
Config.__init__(self, path=str(path), default_text=default_text)
self.reckless_dir = os.path.split(path)[0]
class LightningBitcoinConfig(Config):
"""lightningd config specific to the bitcoin network. This is inherited by
the main lightningd config and in turn, inherits bitcoin-reckless.conf."""
def __init__(self, path=None, default_text=None, warn=True):
if path is None:
path = os.path.join(LIGHTNING_DIR, 'bitcoin', 'config')
if default_text is None:
default_text = "# This config was autopopulated by reckless\n\n"
Config.__init__(self, path=str(path),
default_text=default_text, warn=warn)
class InferInstall():
"""Once a plugin is installed, we may need its directory and entrypoint"""
def __init__(self, name):
reck_contents = os.listdir(RECKLESS_CONFIG.reckless_dir)
if name[-3:] == '.py':
name = name[:-3]
if name in reck_contents:
self.dir = os.path.join(RECKLESS_CONFIG.reckless_dir, name)
else:
raise Exception(f"Could not find a reckless directory for {name}")
plug_contents = os.listdir(os.path.join(RECKLESS_CONFIG.reckless_dir,
name))
for n in py_entry_guesses(name):
if n in plug_contents:
self.entry = os.path.join(self.dir, n)
self.name = n
return
raise Exception(f'plugin entrypoint not found in {self.dir}')
def help(target):
if len(target) > 0:
print(globals()[target[0]].__doc__)
else:
parser.print_help(sys.stdout)
def verbose(*args):
if not IS_VERBOSE:
return
print(*args)
def _search_repo(name, url):
"""look in given repo and, if found, populate InstInfo"""
# Remove api subdomain, subdirectories, etc.
repo = url.split('/')
while '' in repo:
repo.remove('')
repo_name = None
for i in range(len(repo)):
if 'github.com' in repo[i]:
# Extract user and repo name
start = i + 1
if repo[start] == 'repo':
# Maybe we were passed an api.github.com/repo/<user> url
start = start + 1
repo_user = repo[start]
repo_name = repo[start+1]
break
# FIXME: Handle non-github repos.
# Get details from the github API.
if repo_name is not None:
api_url = f'https://api.github.com/repos/{repo_user}/' + \
f'{repo_name}/contents/'
plugins_cont = api_url
r = requests.get(plugins_cont, timeout=5)
if r.status_code != 200:
print("Plugin repository unavailable")
return False
# Repo is for this plugin
if repo_name == name:
MyPlugin = InstInfo(name, f'https://github.com/{repo_user}/'
f'{repo_name}', api_url)
if not MyPlugin.get_inst_details():
return False
return MyPlugin
# Repo contains multiple plugins?
for x in r.json():
if x["name"] == name:
# Look for the rest of the install details
# These are in lightningd/plugins directly
if 'lightningd/plugins/' in x['html_url']:
MyPlugin = InstInfo(name,
'https://github.com/lightningd/plugins',
x['git_url'])
MyPlugin.subdir = x['name']
# submodules from another github repo
else:
MyPlugin = InstInfo(name, x['html_url'], x['git_url'])
# Submodule URLs are appended with /tree/<commit hash>
if MyPlugin.repo.split('/')[-2] == 'tree':
MyPlugin.commit = MyPlugin.repo.split('/')[-1]
MyPlugin.repo = MyPlugin.repo.split('/tree/')[0]
verbose(f'repo using commit: {MyPlugin.commit}')
if not MyPlugin.get_inst_details():
return False
return MyPlugin
return False
def _install_plugin(src):
"""make sure the repo exists and clone it."""
verbose(f'Install requested from {src}.')
if RECKLESS_CONFIG is None:
print('error: reckless install directory unavailable')
sys.exit(2)
req = requests.get(src.repo, timeout=20)
if not req.status_code == 200:
print('plugin source repository unavailable')
sys.exit(1)
# Use a unique directory for each cloned repo.
clone_path = 'reckless-{}'.format(str(hash(os.times()))[-9:])
clone_path = os.path.join(tempfile.gettempdir(), clone_path)
inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir,
src.name)
if os.path.exists(clone_path):
verbose(f'{clone_path} already exists - deleting')
shutil.rmtree(clone_path)
# clone git repository to /tmp/reckless-...
if ('http' in src.repo[:4]) or ('github.com' in src.repo):
# Ugly, but interactively handling stderr gets hairy.
if IS_VERBOSE:
git = Popen(['git', 'clone', src.repo, clone_path],
stdout=PIPE)
else:
git = Popen(['git', 'clone', src.repo, clone_path],
stdout=PIPE, stderr=PIPE)
git.wait()
if git.returncode != 0:
if git.stderr:
print(git.stderr.read().decode())
if os.path.exists(clone_path):
remove_dir(clone_path)
print('Error: Failed to clone repo')
return False
plugin_path = clone_path
if src.subdir is not None:
plugin_path = os.path.join(clone_path, src.subdir)
os.chdir(plugin_path)
if src.commit:
verbose(f"Checking out commit {src.commit}")
checkout = Popen(['git', 'checkout', src.commit],
stdout=PIPE, stderr=PIPE)
checkout.wait()
if checkout.returncode != 0:
print(f'failed to checkout referenced commit {src.commit}')
return False
# Install dependencies via requirements.txt or pyproject.toml
mypip = 'pip3' if shutil.which('pip3') else 'pip'
if not shutil.which(mypip):
raise Exception(f'{mypip} not found in PATH')
install_methods = {
'requirements.txt': [mypip, 'install', '-r', 'requirements.txt'],
'pyproject.toml': [mypip, 'install', '-e', '.']
}
if src.deps is not None:
verbose(f'installing dependencies using {src.deps}')
procedure = install_methods[src.deps]
pip = Popen(procedure, stdout=PIPE)
pip.wait()
if pip.returncode == 0:
print('dependencies installed successfully')
else:
print('error encountered installing dependencies')
verbose(pip.stdout.read())
return False
test = Popen([os.path.join(plugin_path, src.entry)],
stdout=PIPE, stderr=PIPE, universal_newlines=True)
test_log = []
with test.stderr:
for line in test.stderr:
test_log.append(line.strip('\n'))
test.wait()
# FIXME: add noexec test/warning. Maybe try chmod entrypoint.
if test.returncode != 0:
verbose("plugin testing error:")
for line in test_log:
verbose(f' {line}')
print('plugin testing failed')
return False
# Find this cute little plugin a forever home
shutil.copytree(plugin_path, inst_path)
print(f'plugin installed: {inst_path}')
remove_dir(clone_path)
return True
def install(plugin_name):
"""reckless install <plugin>
downloads plugin from source repos, installs and activates plugin"""
assert isinstance(plugin_name, list)
plugin_name = plugin_name[0]
if plugin_name is None:
print('missing argument: plugin_name')
src = search([plugin_name])
if src:
verbose(f'Retrieving {plugin_name} from {src.repo}')
if not _install_plugin(src):
print('installation aborted')
sys.exit(1)
inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir,
src.name,
src.entry)
RECKLESS_CONFIG.enable_plugin(inst_path)
enable([plugin_name])
def uninstall(plugin_name):
"""reckless uninstall <plugin>
disables plugin and deletes the plugin's reckless dir"""
assert isinstance(plugin_name, list)
plugin_name = plugin_name[0]
if plugin_name is not None:
# FIXME: Do something here.
print('Uninstalling plugin {}'.format(plugin_name))
disable([plugin_name])
plugin_dir = os.path.join(RECKLESS_CONFIG.reckless_dir, plugin_name)
verbose("looking for {}".format(plugin_dir))
if remove_dir(plugin_dir):
print(f"{plugin_name} uninstalled successfully.")
def search(plugin_name):
"""reckless search <plugin>
searches plugin index for plugin"""
plugin_name = plugin_name[0]
if plugin_name is None:
print('plugin name required')
return None
ordered_repos = RECKLESS_SOURCES
for r in RECKLESS_SOURCES:
if r.split('/')[-1].lower() == plugin_name.lower():
ordered_repos.remove(r)
ordered_repos.insert(0, r)
for r in ordered_repos:
p = _search_repo(plugin_name, r)
if p:
print(f"found {p.name} in repo: {p.repo}")
verbose(f"entry: {p.entry}")
if p.subdir:
verbose(f'sub-directory: {p.subdir}')
return p
print(f'Unable to locate source for plugin {plugin_name}')
def lightning_cli_available():
clncli = Popen(LIGHTNING_CLI_CALL, stdout=PIPE, stderr=PIPE)
clncli.wait(timeout=1)
if clncli.returncode == 0:
return True
else:
return False
def enable(plugin_name):
"""reckless enable <plugin>
dynamically activates plugin and adds to config (persistent)"""
assert isinstance(plugin_name, list)
plugin_name = plugin_name[0]
if plugin_name is None:
sys.stderr.write('Plugin name required.')
sys.exit(1)
inst = InferInstall(plugin_name)
path = inst.entry
if not os.path.exists(path):
print('cannot find installed plugin at expected path {}'
.format(path))
sys.exit(1)
verbose('activating {}'.format(plugin_name))
if not lightning_cli_available():
# Config update should not be dependent upon lightningd running
RECKLESS_CONFIG.enable_plugin(path)
return
cmd = LIGHTNING_CLI_CALL.copy()
cmd.extend(['plugin', 'start', path])
clncli = Popen(cmd, stdout=PIPE)
clncli.wait(timeout=3)
if clncli.returncode == 0:
RECKLESS_CONFIG.enable_plugin(path)
print('{} enabled'.format(plugin_name))
else:
err = eval(clncli.stdout.read().decode().replace('\n', ''))['message']
if ': already registered' in err:
RECKLESS_CONFIG.enable_plugin(path)
verbose(f'{inst.name} already registered with lightningd')
print('{} enabled'.format(plugin_name))
else:
print(f'reckless: {inst.name} failed to start!')
print(err)
sys.exit(clncli.returncode)
def disable(plugin_name):
"""reckless disable <plugin>
deactivates an installed plugin"""
assert isinstance(plugin_name, list)
plugin_name = plugin_name[0]
if plugin_name is None:
sys.stderr.write('Plugin name required.')
sys.exit(1)
inst = InferInstall(plugin_name)
path = inst.entry
if not os.path.exists(path):
sys.stderr.write(f'Could not find plugin at {path}\n')
sys.exit(1)
if not lightning_cli_available():
RECKLESS_CONFIG.disable_plugin(path)
print(f'{plugin_name} disabled')
return
cmd = LIGHTNING_CLI_CALL.copy()
cmd.extend(['plugin', 'stop', path])
clncli = Popen(cmd, stdout=PIPE, stderr=PIPE)
clncli.wait(timeout=3)
output = json.loads(clncli.stdout.read().decode()
.replace('\n', '').replace(' ', ''))
# print(output)
if ('code' in output.keys() and output['code'] == -32602):
print('plugin not currently running')
elif clncli.returncode != 0:
print('lightning-cli plugin stop failed')
sys.stderr.write(clncli.stderr.read().decode())
sys.exit(clncli.returncode)
RECKLESS_CONFIG.disable_plugin(path)
print(f'{plugin_name} disabled')
def load_config(reckless_dir=None, network='bitcoin'):
"""Initial directory discovery and config file creation."""
# Does the lightning-cli already reference an explicit config?
net_conf = None
if lightning_cli_available():
cmd = LIGHTNING_CLI_CALL
cmd.extend(['listconfigs'])
clncli = Popen(cmd, stdout=PIPE, stderr=PIPE)
clncli.wait(timeout=3)
if clncli.returncode == 0:
output = json.loads(clncli.stdout.read().decode()
.replace('\n', '').replace(' ', ''))
if 'conf' in output:
net_conf = LightningBitcoinConfig(path=output['conf'])
if reckless_dir is None:
reckless_dir = str(os.path.join(LIGHTNING_DIR, 'reckless'))
else:
if not os.path.isabs(reckless_dir):
reckless_dir = os.path.join(os.getcwd(), reckless_dir)
# Reckless applies to the bitcoin network configuration by default.
if network == 'bitcoin':
reck_conf_path = os.path.join(reckless_dir, 'bitcoin-reckless.conf')
if not net_conf:
# This config file inherits the RecklessConfig.
net_conf = LightningBitcoinConfig()
elif network == 'regtest':
reck_conf_path = os.path.join(reckless_dir, 'regtest-reckless.conf')
regtest_path = os.path.join(LIGHTNING_DIR, 'regtest', 'config')
if not net_conf:
# Actually the regtest network config
net_conf = LightningBitcoinConfig(path=regtest_path)
# Reckless manages plugins here.
reckless_conf = RecklessConfig(path=reck_conf_path)
if not reckless_conf:
print('Error: reckless config file could not be written')
sys.exit(1)
if not net_conf:
print('Error: could not load or create the network specific lightningd'
' config (default .lightning/bitcoin)')
sys.exit(1)
net_conf.editConfigFile(f'include {reckless_conf.conf_fp}', None)
return reckless_conf
def get_sources_file():
return os.path.join(RECKLESS_DIR, '.sources')
def sources_from_file():
sources_file = get_sources_file()
read_sources = []
with open(sources_file, 'r') as f:
for src in f.readlines():
if len(src.strip()) > 0:
read_sources.append(src.strip())
# print('loaded sources:', repos)
return read_sources
def loadSources():
"""Look for the repo sources file"""
sources_file = get_sources_file()
# This would have been created if possible
if not os.path.exists(sources_file):
print('Warning: Reckless requires write access')
Config(path=sources_file,
default_text='https://github.com/lightningd/plugins')
return ['https://github.com/lightningd/plugins']
return sources_from_file()
def add_source(src):
"""Additional git repositories, directories, etc. are passed here
singly."""
# Is it a file?
assert isinstance(src, str)
maybe_path = os.path.realpath(src)
if os.path.exists(maybe_path):
# FIXME: This should handle either a directory or a git repo
if os.path.isdir(maybe_path):
print(f'Plugin source directory found: {maybe_path}')
elif 'github.com' in src:
my_file = Config(path=str(get_sources_file()),
default_text='https://github.com/lightningd/plugins')
my_file.editConfigFile(src, None)
def remove_source(src):
assert isinstance(src, str)
if src in sources_from_file():
my_file = Config(path=get_sources_file(),
default_text='https://github.com/lightningd/plugins')
my_file.editConfigFile(None, src)
print('plugin source removed')
else:
print(f'source not found: {src}')
def list_source():
for src in sources_from_file():
print(src)
def source(cmd):
"""reckless source <add/remove/list>
reckless source add <github repository url> adds a source to search
reckless source remove <github repository url> removes a source
reckless source list lists all sources
"""
assert isinstance(cmd, list)
if cmd[0] == 'add':
for src in cmd[1:]:
add_source(src)
elif cmd[0] == 'remove' or cmd[0] == 'rem' or cmd[0] == 'rm':
for src in cmd[1:]:
remove_source(src)
elif cmd[0] == 'list':
list_source()
else:
print('unrecognized argument to reckless source: {cmd[0]}')
sys.exit(1)
def sources(cmd):
# alias to improve ux
source(cmd)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(dest='command', help='install/uninstall/search/enable'
'/disable/source/help',
type=str)
# Here we pass a list of argument in as target. This is useful for
# subcommands (i.e., reckless source add <repo>) as well as for passing
# lists of plugins to handle in sequence (to be implemented.)
parser.add_argument(dest='target', nargs='*', default=None, help='target',
type=str)
# This default depends on the .lightning directory
parser.add_argument('-d', '--reckless-dir',
help='specify a data directory for reckless to use',
type=str, default=None)
parser.add_argument('-l', '--lightning',
help='lightning data directory (default:~/.lightning)',
type=str,
default=Path.home().joinpath('.lightning'))
parser.add_argument('-r', '--regtest', action='store_true')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
if hasattr(args, 'command'):
if args.command in ['install', 'uninstall', 'search', 'enable',
'disable', 'help', 'source', 'sources']:
NETWORK = 'regtest' if args.regtest else 'bitcoin'
LIGHTNING_DIR = Path(args.lightning)
LIGHTNING_CLI_CALL = ['lightning-cli']
if NETWORK != 'bitcoin':
LIGHTNING_CLI_CALL.append(f'--network={NETWORK}')
if LIGHTNING_DIR != Path.home().joinpath('.lightning'):
LIGHTNING_CLI_CALL.append(f'--lightning-dir={LIGHTNING_DIR}')
if args.reckless_dir:
RECKLESS_DIR = args.reckless_dir
else:
RECKLESS_DIR = os.path.join(LIGHTNING_DIR, 'reckless')
RECKLESS_CONFIG = load_config(reckless_dir=RECKLESS_DIR,
network=NETWORK)
RECKLESS_SOURCES = loadSources()
IS_VERBOSE = bool(args.verbose)
globals()[args.command](args.target)
sys.exit(0)
else:
print(f'{args.command}: command unrecognized')