#!/usr/bin/env python3 from subprocess import Popen, PIPE import sys import json import os import argparse from pathlib import Path import shutil import tempfile import requests from typing import Union repos = ['https://github.com/lightningd/plugins'] def py_entry_guesses(name): return [name, f'{name}.py', '__init__.py'] def unsupported_entry(name): return [f'{name}.go', f'{name}.sh'] class InstInfo: def __init__(self, name, url, git_url): self.name = name self.repo = url # Used for 'git clone' self.git_url = git_url # API access for github repos self.entry = None self.deps = None self.subdir = None self.commit = None def __repr__(self): return f'\n'\ f'name: {self.name}\nrepo: {self.repo}\ngit: {self.git_url}'\ f'\nentry:{self.entry}\ndepency source:{self.deps}' def get_inst_details(self): """ Populate installation details from a github repo url. Return True if all data is found. """ r = requests.get(self.git_url) if r.status_code != 200: return False if 'git/tree' in self.git_url: tree = r.json()['tree'] else: tree = r.json() entry_guesses = py_entry_guesses(self.name) for g in entry_guesses: for f in tree: if f['path'] == g: self.entry = g break if self.entry is not None: break if self.entry is None: for g in unsupported_entry(self.name): for f in tree: if f['path'] == g: # FIXME: This should be easier to implement print(f'entrypoint {g} is not yet supported') return False dependency_info = ['requirements.txt', 'pyproject.toml'] for d in dependency_info: for f in tree: if f['path'] == d: self.deps = d break if self.deps is not None: break if not self.entry: return False if not self.deps: return False return True def create_dir(r: int, directory: str) -> bool: """Creation of a directory at path `d` with a maximum new dir depth `r`""" if os.path.exists(directory): return True elif r <= 0: return False elif create_dir(r-1, os.path.split(directory)[0]): os.mkdir(directory, 0o777) print(f'created directory {directory}') assert os.path.exists(directory) return True def remove_dir(target: str) -> bool: try: shutil.rmtree(target) return True except NotADirectoryError: print(f"Tried to remove directory {target} that does not exist.") except PermissionError: print(f"Permission denied removing dir: {target}") return False class Config(): """A generic class for procuring, reading and editing config files""" def obtain_config(self, config_path: str, default_text: str, warn: bool = False) -> str: """Return a config file from the desired location. Create one with default_text if it cannot be found.""" if isinstance(config_path, type(None)): raise Exception("Generic config must be passed a config_path.") assert isinstance(config_path, str) # FIXME: warn if reckless dir exists, but conf not found if os.path.exists(config_path): with open(config_path, 'r+') as f: config_content = f.readlines() return config_content print(f'config file not found: {config_path}') if warn: confirm = input('press [Y] to create one now.\n').upper() == 'Y' else: confirm = True if not confirm: sys.exit(1) parent_path = os.path.split(config_path)[0] # Create up to one parent in the directory tree. if create_dir(1, parent_path): with open(self.conf_fp, 'w') as f: f.write(default_text) # FIXME: Handle write failure return default_text else: verbose(f'could not create the parent directory {parent_path}') raise FileNotFoundError('invalid parent directory') def editConfigFile(self, addline: str, removeline: str): remove_these_lines = [] with open(self.conf_fp, 'r') as reckless_conf: original = reckless_conf.readlines() empty_lines = [] for n, l in enumerate(original): if l.strip() == removeline: remove_these_lines.append(n) continue if l.strip() == '': empty_lines.append(n) if n-1 in empty_lines: # The white space is getting excessive. remove_these_lines.append(n) continue with open(self.conf_fp, 'w') as conf_write: # no need to write if passed 'None' line_exists = not bool(addline) for n, l in enumerate(original): if n not in remove_these_lines: if n > 0: conf_write.write(f'\n{l.strip()}') else: conf_write.write(l.strip()) if addline == l: # addline is idempotent line_exists = True if not line_exists: conf_write.write(f'\n{addline}') def __init__(self, path: Union[str, None] = None, default_text: Union[str, None] = None, warn: bool = False): assert path is not None assert default_text is not None self.conf_fp = path self.content = self.obtain_config(self.conf_fp, default_text, warn=warn) class RecklessConfig(Config): """Reckless config (by default, specific to the bitcoin network only.) This is inherited by the lightningd config and contains all reckless maintained plugins.""" def enable_plugin(self, plugin_path: str): """Handle persistent plugin loading via config""" self.editConfigFile(f'plugin={plugin_path}', f'disable-plugin={plugin_path}') def disable_plugin(self, plugin_path: str): """Handle persistent plugin disabling via config""" self.editConfigFile(f'disable-plugin={plugin_path}', f'plugin={plugin_path}') def __init__(self, path: Union[str, None] = None, default_text: Union[str, None] = None): if path is None: path = os.path.join(LIGHTNING_DIR, 'reckless', 'bitcoin-reckless.conf') if default_text is None: default_text = '# This configuration file is managed by reckles' +\ 's to activate and disable\n# reckless-installed' +\ ' plugins\n\n' Config.__init__(self, path=str(path), default_text=default_text) self.reckless_dir = os.path.split(path)[0] class LightningBitcoinConfig(Config): """lightningd config specific to the bitcoin network. This is inherited by the main lightningd config and in turn, inherits bitcoin-reckless.conf.""" def __init__(self, path: Union[str, None] = None, default_text: Union[str, None] = None, warn: bool = True): if path is None: path = os.path.join(LIGHTNING_DIR, 'bitcoin', 'config') if default_text is None: default_text = "# This config was autopopulated by reckless\n\n" Config.__init__(self, path=str(path), default_text=default_text, warn=warn) class InferInstall(): """Once a plugin is installed, we may need its directory and entrypoint""" def __init__(self, name: str): reck_contents = os.listdir(RECKLESS_CONFIG.reckless_dir) if name[-3:] == '.py': name = name[:-3] if name in reck_contents: self.dir = os.path.join(RECKLESS_CONFIG.reckless_dir, name) else: raise Exception(f"Could not find a reckless directory for {name}") plug_contents = os.listdir(os.path.join(RECKLESS_CONFIG.reckless_dir, name)) for n in py_entry_guesses(name): if n in plug_contents: self.entry = os.path.join(self.dir, n) self.name = n return raise Exception(f'plugin entrypoint not found in {self.dir}') def help_alias(targets: list): if len(targets) == 0: parser.print_help(sys.stdout) else: print('try "reckless {} -h"'.format(' '.join(targets))) sys.exit(1) def verbose(*args): if not IS_VERBOSE: return print(*args) def _search_repo(name: str, url: str) -> InstInfo: """look in given repo and, if found, populate InstInfo""" # Remove api subdomain, subdirectories, etc. repo = url.split('/') while '' in repo: repo.remove('') repo_name = None for i in range(len(repo)): if 'github.com' in repo[i]: # Extract user and repo name start = i + 1 if repo[start] == 'repo': # Maybe we were passed an api.github.com/repo/ url start = start + 1 repo_user = repo[start] repo_name = repo[start+1] break # FIXME: Handle non-github repos. # Get details from the github API. if repo_name is not None: api_url = f'https://api.github.com/repos/{repo_user}/' + \ f'{repo_name}/contents/' plugins_cont = api_url r = requests.get(plugins_cont, timeout=5) if r.status_code != 200: print("Plugin repository unavailable") return False # Repo is for this plugin if repo_name == name: MyPlugin = InstInfo(name, f'https://github.com/{repo_user}/' f'{repo_name}', api_url) if not MyPlugin.get_inst_details(): return False return MyPlugin # Repo contains multiple plugins? for x in r.json(): if x["name"] == name: # Look for the rest of the install details # These are in lightningd/plugins directly if 'lightningd/plugins/' in x['html_url']: MyPlugin = InstInfo(name, 'https://github.com/lightningd/plugins', x['git_url']) MyPlugin.subdir = x['name'] # submodules from another github repo else: MyPlugin = InstInfo(name, x['html_url'], x['git_url']) # Submodule URLs are appended with /tree/ if MyPlugin.repo.split('/')[-2] == 'tree': MyPlugin.commit = MyPlugin.repo.split('/')[-1] MyPlugin.repo = MyPlugin.repo.split('/tree/')[0] verbose(f'repo using commit: {MyPlugin.commit}') if not MyPlugin.get_inst_details(): return False return MyPlugin return False def _install_plugin(src: InstInfo) -> bool: """make sure the repo exists and clone it.""" verbose(f'Install requested from {src}.') if RECKLESS_CONFIG is None: print('error: reckless install directory unavailable') sys.exit(2) req = requests.get(src.repo, timeout=20) if not req.status_code == 200: print('plugin source repository unavailable') sys.exit(1) # Use a unique directory for each cloned repo. clone_path = 'reckless-{}'.format(str(hash(os.times()))[-9:]) clone_path = os.path.join(tempfile.gettempdir(), clone_path) inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir, src.name) if os.path.exists(clone_path): verbose(f'{clone_path} already exists - deleting') shutil.rmtree(clone_path) # clone git repository to /tmp/reckless-... if ('http' in src.repo[:4]) or ('github.com' in src.repo): # Ugly, but interactively handling stderr gets hairy. if IS_VERBOSE: git = Popen(['git', 'clone', src.repo, clone_path], stdout=PIPE) else: git = Popen(['git', 'clone', src.repo, clone_path], stdout=PIPE, stderr=PIPE) git.wait() if git.returncode != 0: if git.stderr: print(git.stderr.read().decode()) if os.path.exists(clone_path): remove_dir(clone_path) print('Error: Failed to clone repo') return False plugin_path = clone_path if src.subdir is not None: plugin_path = os.path.join(clone_path, src.subdir) os.chdir(plugin_path) if src.commit: verbose(f"Checking out commit {src.commit}") checkout = Popen(['git', 'checkout', src.commit], stdout=PIPE, stderr=PIPE) checkout.wait() if checkout.returncode != 0: print(f'failed to checkout referenced commit {src.commit}') return False # Install dependencies via requirements.txt or pyproject.toml mypip = 'pip3' if shutil.which('pip3') else 'pip' if not shutil.which(mypip): raise Exception(f'{mypip} not found in PATH') install_methods = { 'requirements.txt': [mypip, 'install', '-r', 'requirements.txt'], 'pyproject.toml': [mypip, 'install', '-e', '.'] } if src.deps is not None: verbose(f'installing dependencies using {src.deps}') procedure = install_methods[src.deps] pip = Popen(procedure, stdout=PIPE) pip.wait() if pip.returncode == 0: print('dependencies installed successfully') else: print('error encountered installing dependencies') verbose(pip.stdout.read()) return False test = Popen([os.path.join(plugin_path, src.entry)], stdout=PIPE, stderr=PIPE, universal_newlines=True) test_log = [] with test.stderr: for line in test.stderr: test_log.append(line.strip('\n')) test.wait() # FIXME: add noexec test/warning. Maybe try chmod entrypoint. if test.returncode != 0: verbose("plugin testing error:") for line in test_log: verbose(f' {line}') print('plugin testing failed') return False # Find this cute little plugin a forever home shutil.copytree(plugin_path, inst_path) print(f'plugin installed: {inst_path}') remove_dir(clone_path) return True def install(plugin_name: str): """downloads plugin from source repos, installs and activates plugin""" assert isinstance(plugin_name, str) src = search(plugin_name) if src: verbose(f'Retrieving {plugin_name} from {src.repo}') if not _install_plugin(src): print('installation aborted') sys.exit(1) inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir, src.name, src.entry) RECKLESS_CONFIG.enable_plugin(inst_path) enable(plugin_name) def uninstall(plugin_name: str): """disables plugin and deletes the plugin's reckless dir""" assert isinstance(plugin_name, str) print(f'Uninstalling plugin {plugin_name}') disable(plugin_name) plugin_dir = os.path.join(RECKLESS_CONFIG.reckless_dir, plugin_name) verbose(f'looking for {plugin_dir}') if remove_dir(plugin_dir): print(f"{plugin_name} uninstalled successfully.") def search(plugin_name: str) -> InstInfo: """searches plugin index for plugin""" ordered_repos = RECKLESS_SOURCES for r in RECKLESS_SOURCES: # Search repos named after the plugin first if r.split('/')[-1].lower() == plugin_name.lower(): ordered_repos.remove(r) ordered_repos.insert(0, r) for r in ordered_repos: p = _search_repo(plugin_name, r) if p: print(f"found {p.name} in repo: {p.repo}") verbose(f"entry: {p.entry}") if p.subdir: verbose(f'sub-directory: {p.subdir}') return p print(f'Unable to locate source for plugin {plugin_name}') def lightning_cli_available() -> bool: """returns True if lightning-cli rpc available with current config""" clncli = Popen(LIGHTNING_CLI_CALL, stdout=PIPE, stderr=PIPE) clncli.wait(timeout=1) if clncli.returncode == 0: return True else: return False def enable(plugin_name: str): """dynamically activates plugin and adds to config (persistent)""" assert isinstance(plugin_name, str) inst = InferInstall(plugin_name) path = inst.entry if not os.path.exists(path): print('cannot find installed plugin at expected path {}' .format(path)) sys.exit(1) verbose('activating {}'.format(plugin_name)) if not lightning_cli_available(): # Config update should not be dependent upon lightningd running RECKLESS_CONFIG.enable_plugin(path) return cmd = LIGHTNING_CLI_CALL.copy() cmd.extend(['plugin', 'start', path]) clncli = Popen(cmd, stdout=PIPE) clncli.wait(timeout=3) if clncli.returncode == 0: RECKLESS_CONFIG.enable_plugin(path) print('{} enabled'.format(plugin_name)) else: err = eval(clncli.stdout.read().decode().replace('\n', ''))['message'] if ': already registered' in err: RECKLESS_CONFIG.enable_plugin(path) verbose(f'{inst.name} already registered with lightningd') print('{} enabled'.format(plugin_name)) else: print(f'reckless: {inst.name} failed to start!') print(err) sys.exit(clncli.returncode) def disable(plugin_name: str): """reckless disable deactivates an installed plugin""" assert isinstance(plugin_name, str) inst = InferInstall(plugin_name) path = inst.entry if not os.path.exists(path): sys.stderr.write(f'Could not find plugin at {path}\n') sys.exit(1) if not lightning_cli_available(): RECKLESS_CONFIG.disable_plugin(path) print(f'{plugin_name} disabled') return cmd = LIGHTNING_CLI_CALL.copy() cmd.extend(['plugin', 'stop', path]) clncli = Popen(cmd, stdout=PIPE, stderr=PIPE) clncli.wait(timeout=3) output = json.loads(clncli.stdout.read().decode() .replace('\n', '').replace(' ', '')) if ('code' in output.keys() and output['code'] == -32602): print('plugin not currently running') elif clncli.returncode != 0: print('lightning-cli plugin stop failed') sys.stderr.write(clncli.stderr.read().decode()) sys.exit(clncli.returncode) RECKLESS_CONFIG.disable_plugin(path) print(f'{plugin_name} disabled') def load_config(reckless_dir: Union[str, None] = None, network: str = 'bitcoin') -> Config: """Initial directory discovery and config file creation.""" # Does the lightning-cli already reference an explicit config? net_conf = None if lightning_cli_available(): cmd = LIGHTNING_CLI_CALL cmd.extend(['listconfigs']) clncli = Popen(cmd, stdout=PIPE, stderr=PIPE) clncli.wait(timeout=3) if clncli.returncode == 0: output = json.loads(clncli.stdout.read().decode() .replace('\n', '').replace(' ', '')) if 'conf' in output: net_conf = LightningBitcoinConfig(path=output['conf']) if reckless_dir is None: reckless_dir = str(os.path.join(LIGHTNING_DIR, 'reckless')) else: if not os.path.isabs(reckless_dir): reckless_dir = os.path.join(os.getcwd(), reckless_dir) # Reckless applies to the bitcoin network configuration by default. if network == 'bitcoin': reck_conf_path = os.path.join(reckless_dir, 'bitcoin-reckless.conf') if not net_conf: # This config file inherits the RecklessConfig. net_conf = LightningBitcoinConfig() elif network == 'regtest': reck_conf_path = os.path.join(reckless_dir, 'regtest-reckless.conf') regtest_path = os.path.join(LIGHTNING_DIR, 'regtest', 'config') if not net_conf: # Actually the regtest network config net_conf = LightningBitcoinConfig(path=regtest_path) # Reckless manages plugins here. try: reckless_conf = RecklessConfig(path=reck_conf_path) except FileNotFoundError: print('Error: reckless config file could not be written: ', str(reck_conf_path)) sys.exit(1) if not net_conf: print('Error: could not load or create the network specific lightningd' ' config (default .lightning/bitcoin)') sys.exit(1) net_conf.editConfigFile(f'include {reckless_conf.conf_fp}', None) return reckless_conf def get_sources_file() -> str: return os.path.join(RECKLESS_DIR, '.sources') def sources_from_file() -> list: sources_file = get_sources_file() read_sources = [] with open(sources_file, 'r') as f: for src in f.readlines(): if len(src.strip()) > 0: read_sources.append(src.strip()) # print('loaded sources:', repos) return read_sources def loadSources() -> list: """Look for the repo sources file.""" sources_file = get_sources_file() # This would have been created if possible if not os.path.exists(sources_file): print('Warning: Reckless requires write access') Config(path=sources_file, default_text='https://github.com/lightningd/plugins') return ['https://github.com/lightningd/plugins'] return sources_from_file() def add_source(src: str): """Additional git repositories, directories, etc. are passed here.""" assert isinstance(src, str) # Is it a file? maybe_path = os.path.realpath(src) if os.path.exists(maybe_path): # FIXME: This should handle either a directory or a git repo if os.path.isdir(maybe_path): print(f'Plugin source directory found: {maybe_path}') elif 'github.com' in src: my_file = Config(path=str(get_sources_file()), default_text='https://github.com/lightningd/plugins') my_file.editConfigFile(src, None) def remove_source(src: str): """Remove a source from the sources file.""" assert isinstance(src, str) if src in sources_from_file(): my_file = Config(path=get_sources_file(), default_text='https://github.com/lightningd/plugins') my_file.editConfigFile(None, src) print('plugin source removed') else: print(f'source not found: {src}') def list_source(): """Provide the user with all stored source repositories.""" for src in sources_from_file(): print(src) if __name__ == '__main__': parser = argparse.ArgumentParser() # This default depends on the .lightning directory parser.add_argument('-d', '--reckless-dir', help='specify a data directory for reckless to use', type=str, default=None) parser.add_argument('-l', '--lightning', help='lightning data directory (default:~/.lightning)', type=str, default=Path.home().joinpath('.lightning')) parser.add_argument('-r', '--regtest', action='store_true') parser.add_argument('-v', '--verbose', action='store_true') cmd1 = parser.add_subparsers(dest='cmd1', help='command', required=True) install_cmd = cmd1.add_parser('install', help='search for and install a ' 'plugin, then test and activate') install_cmd.add_argument('targets', type=str, nargs='*') install_cmd.set_defaults(func=install) uninstall_cmd = cmd1.add_parser('uninstall', help='deactivate a plugin ' 'and remove it from the directory') uninstall_cmd.add_argument('targets', type=str, nargs='*') uninstall_cmd.set_defaults(func=uninstall) search_cmd = cmd1.add_parser('search', help='search for a plugin from ' 'the available source repositories') search_cmd.add_argument('targets', type=str, nargs='*') search_cmd.set_defaults(func=search) enable_cmd = cmd1.add_parser('enable', help='dynamically enable a plugin ' 'and update config') enable_cmd.add_argument('targets', type=str, nargs='*') enable_cmd.set_defaults(func=enable) disable_cmd = cmd1.add_parser('disable', help='disable a plugin') disable_cmd.add_argument('targets', type=str, nargs='*') disable_cmd.set_defaults(func=disable) source_parser = cmd1.add_parser('source', help='manage plugin search ' 'sources') source_subs = source_parser.add_subparsers(dest='source_subs', required=True) list_parse = source_subs.add_parser('list', help='list available plugin ' 'sources (repositories)') list_parse.set_defaults(func=list_source) source_add = source_subs.add_parser('add', help='add a source repository') source_add.add_argument('targets', type=str, nargs='*') source_add.set_defaults(func=add_source) source_rem = source_subs.add_parser('remove', aliases=['rem', 'rm'], help='remove a plugin source ' 'repository') source_rem.add_argument('targets', type=str, nargs='*') source_rem.set_defaults(func=remove_source) help_cmd = cmd1.add_parser('help', help='for contextual help, use ' '"reckless -h"') help_cmd.add_argument('targets', type=str, nargs='*') help_cmd.set_defaults(func=help_alias) args = parser.parse_args() NETWORK = 'regtest' if args.regtest else 'bitcoin' LIGHTNING_DIR = Path(args.lightning) LIGHTNING_CLI_CALL = ['lightning-cli'] if NETWORK != 'bitcoin': LIGHTNING_CLI_CALL.append(f'--network={NETWORK}') if LIGHTNING_DIR != Path.home().joinpath('.lightning'): LIGHTNING_CLI_CALL.append(f'--lightning-dir={LIGHTNING_DIR}') if args.reckless_dir: RECKLESS_DIR = args.reckless_dir else: RECKLESS_DIR = os.path.join(LIGHTNING_DIR, 'reckless') RECKLESS_CONFIG = load_config(reckless_dir=RECKLESS_DIR, network=NETWORK) RECKLESS_SOURCES = loadSources() IS_VERBOSE = bool(args.verbose) if 'targets' in args: # FIXME: Catch missing argument if args.func.__name__ == 'help_alias': args.func(args.targets) sys.exit(0) for target in args.targets: args.func(target) else: args.func()