#!/usr/bin/env python3 from subprocess import Popen, PIPE import sys import json import os import argparse from pathlib import Path import shutil import tempfile import requests repos = ['https://github.com/lightningd/plugins'] def py_entry_guesses(name): return [name, f'{name}.py', '__init__.py'] def unsupported_entry(name): return [f'{name}.go', f'{name}.sh'] class InstInfo: def __init__(self, name, url, git_url): self.name = name self.repo = url # Used for 'git clone' self.git_url = git_url # API access for github repos self.entry = None self.deps = None self.subdir = None self.commit = None def __repr__(self): return f'\n'\ f'name: {self.name}\nrepo: {self.repo}\ngit: {self.git_url}'\ f'\nentry:{self.entry}\ndepency source:{self.deps}' def get_inst_details(self): """ Populate installation details from a github repo url. Return True if all data is found. """ r = requests.get(self.git_url) if r.status_code != 200: return False if 'git/tree' in self.git_url: tree = r.json()['tree'] else: tree = r.json() entry_guesses = py_entry_guesses(self.name) for g in entry_guesses: for f in tree: if f['path'] == g: self.entry = g break if self.entry is not None: break if self.entry is None: for g in unsupported_entry(self.name): for f in tree: if f['path'] == g: # FIXME: This should be easier to implement print(f'entrypoint {g} is not yet supported') return False dependency_info = ['requirements.txt', 'pyproject.toml'] for d in dependency_info: for f in tree: if f['path'] == d: self.deps = d break if self.deps is not None: break if not self.entry: return False if not self.deps: return False return True def create_dir(r, directory): """Creation of a directory at path `d` with a maximum new dir depth `r`""" if os.path.exists(directory): return True elif r <= 0: return False elif create_dir(r-1, os.path.split(directory)[0]): os.mkdir(directory, 0o777) print(f'created directory {directory}') if os.path.exists(directory): return True def remove_dir(target): try: shutil.rmtree(target) return True except NotADirectoryError: print(f"Tried to remove directory {target} that does not exist.") except PermissionError: print(f"Permission denied removing dir: {target}") return False class Config(): """A generic class for procuring, reading and editing config files""" def obtain_config(self, config_path, default_text): """Return a config file from the desired location. Create one with default_text if it cannot be found.""" if isinstance(config_path, type(None)): raise Exception("Generic config must be passed a config_path.") assert isinstance(config_path, str) # FIXME: warn if reckless dir exists, but conf not found if os.path.exists(config_path): with open(config_path, 'r+') as f: config_content = f.readlines() return config_content print(f'config file not found: {config_path}') confirm = input('press [Y] to create one now.\n') if confirm.upper() != 'Y': sys.exit(1) parent_path = os.path.split(config_path)[0] # Create up to one parent in the directory tree. if create_dir(1, parent_path): with open(self.conf_fp, 'w') as f: f.write(default_text) # FIXME: Handle write failure return default_text else: print('could not create the parent directory') return None def editConfigFile(self, addline, removeline): remove_these_lines = [] with open(self.conf_fp, 'r') as reckless_conf: original = reckless_conf.readlines() empty_lines = [] for n, l in enumerate(original): if l.strip() == removeline: remove_these_lines.append(n) continue if l.strip() == '': empty_lines.append(n) if n-1 in empty_lines: # The white space is getting excessive. remove_these_lines.append(n) continue with open(self.conf_fp, 'w') as conf_write: # no need to write if passed 'None' line_exists = not bool(addline) for n, l in enumerate(original): if n not in remove_these_lines: if n > 0: conf_write.write(f'\n{l.strip()}') else: conf_write.write(l.strip()) if addline == l: # addline is idempotent line_exists = True if not line_exists: conf_write.write(f'\n{addline}') def __init__(self, path=None, default_text=None): assert path is not None assert default_text is not None self.conf_fp = path self.content = self.obtain_config(self.conf_fp, default_text) class RecklessConfig(Config): """Reckless config (by default, specific to the bitcoin network only.) This is inherited by the lightningd config and contains all reckless maintained plugins.""" def enable_plugin(self, plugin_path): """Handle persistent plugin loading via config""" self.editConfigFile(f'plugin={plugin_path}', f'disable-plugin={plugin_path}') def disable_plugin(self, plugin_path): """Handle persistent plugin disabling via config""" self.editConfigFile(f'disable-plugin={plugin_path}', f'plugin={plugin_path}') def __init__(self, path=None, default_text=None): if path is None: path = os.path.join(LIGHTNING_DIR, 'reckless', 'bitcoin-reckless.conf') if default_text is None: default_text = '# This configuration file is managed by reckles' +\ 's to activate and disable\n# reckless-installed' +\ ' plugins\n\n' Config.__init__(self, path=str(path), default_text=default_text) self.reckless_dir = os.path.split(path)[0] class LightningBitcoinConfig(Config): """lightningd config specific to the bitcoin network. This is inherited by the main lightningd config and in turn, inherits bitcoin-reckless.conf.""" def __init__(self, path=None, default_text=None): if path is None: path = os.path.join(LIGHTNING_DIR, 'bitcoin', 'config') if default_text is None: default_text = "# This config was autopopulated by reckless\n\n" Config.__init__(self, path=str(path), default_text=default_text) class InferInstall(): """Once a plugin is installed, we may need its directory and entrypoint""" def __init__(self, name): reck_contents = os.listdir(RECKLESS_CONFIG.reckless_dir) if name[-3:] == '.py': name = name[:-3] if name in reck_contents: self.dir = os.path.join(RECKLESS_CONFIG.reckless_dir, name) else: raise Exception(f"Could not find a reckless directory for {name}") plug_contents = os.listdir(os.path.join(RECKLESS_CONFIG.reckless_dir, name)) for n in py_entry_guesses(name): if n in plug_contents: self.entry = os.path.join(self.dir, n) self.name = n return raise Exception(f'plugin entrypoint not found in {self.dir}') def help(target): if len(target) > 0: print(globals()[target[0]].__doc__) else: parser.print_help(sys.stdout) def verbose(*args): if not IS_VERBOSE: return print(*args) def _search_repo(name, url): """look in given repo and, if found, populate InstInfo""" # Remove api subdomain, subdirectories, etc. repo = url.split('/') while '' in repo: repo.remove('') repo_name = None for i in range(len(repo)): if 'github.com' in repo[i]: # Extract user and repo name start = i + 1 if repo[start] == 'repo': # Maybe we were passed an api.github.com/repo/ url start = start + 1 repo_user = repo[start] repo_name = repo[start+1] break # FIXME: Handle non-github repos. # Get details from the github API. if repo_name is not None: api_url = f'https://api.github.com/repos/{repo_user}/' + \ f'{repo_name}/contents/' plugins_cont = api_url r = requests.get(plugins_cont, timeout=5) if r.status_code != 200: print("Plugin repository unavailable") return False # Repo is for this plugin if repo_name == name: MyPlugin = InstInfo(name, f'https://github.com/{repo_user}/' f'{repo_name}', api_url) if not MyPlugin.get_inst_details(): return False return MyPlugin # Repo contains multiple plugins? for x in r.json(): if x["name"] == name: # Look for the rest of the install details # These are in lightningd/plugins directly if 'lightningd/plugins/' in x['html_url']: MyPlugin = InstInfo(name, 'https://github.com/lightningd/plugins', x['git_url']) MyPlugin.subdir = x['name'] # submodules from another github repo else: MyPlugin = InstInfo(name, x['html_url'], x['git_url']) # Submodule URLs are appended with /tree/ if MyPlugin.repo.split('/')[-2] == 'tree': MyPlugin.commit = MyPlugin.repo.split('/')[-1] MyPlugin.repo = MyPlugin.repo.split('/tree/')[0] verbose(f'repo using commit: {MyPlugin.commit}') if not MyPlugin.get_inst_details(): return False return MyPlugin return False def _install_plugin(src): """make sure the repo exists and clone it.""" verbose(f'Install requested from {src}.') if RECKLESS_CONFIG is None: print('error: reckless install directory unavailable') sys.exit(2) req = requests.get(src.repo, timeout=20) if not req.status_code == 200: print('plugin source repository unavailable') sys.exit(1) # Use a unique directory for each cloned repo. clone_path = 'reckless-{}'.format(str(hash(os.times()))[-9:]) clone_path = os.path.join(tempfile.gettempdir(), clone_path) inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir, src.name) if os.path.exists(clone_path): verbose(f'{clone_path} already exists - deleting') shutil.rmtree(clone_path) # clone git repository to /tmp/reckless-... if ('http' in src.repo[:4]) or ('github.com' in src.repo): # Ugly, but interactively handling stderr gets hairy. if IS_VERBOSE: git = Popen(['git', 'clone', src.repo, clone_path], stdout=PIPE) else: git = Popen(['git', 'clone', src.repo, clone_path], stdout=PIPE, stderr=PIPE) git.wait() if git.returncode != 0: if git.stderr: print(git.stderr.read().decode()) if os.path.exists(clone_path): remove_dir(clone_path) print('Error: Failed to clone repo') return False plugin_path = clone_path if src.subdir is not None: plugin_path = os.path.join(clone_path, src.subdir) os.chdir(plugin_path) if src.commit: verbose(f"Checking out commit {src.commit}") checkout = Popen(['git', 'checkout', src.commit], stdout=PIPE, stderr=PIPE) checkout.wait() if checkout.returncode != 0: print(f'failed to checkout referenced commit {src.commit}') return False # Install dependencies via requirements.txt install_methods = { 'requirements.txt': ['pip', 'install', '-r', 'requirements.txt'], 'pyproject.toml': ['pip', 'install', '-e', '.'] } if src.deps is not None: verbose(f'installing dependencies using {src.deps}') procedure = install_methods[src.deps] pip = Popen(procedure, stdout=PIPE) pip.wait() if pip.returncode == 0: print('dependencies installed successfully') else: print('error encountered installing dependencies') verbose(pip.stdout.read()) return False test = Popen([os.path.join(plugin_path, src.entry)], stdout=PIPE, stderr=PIPE, universal_newlines=True) test_log = [] with test.stderr: for line in test.stderr: test_log.append(line.strip('\n')) test.wait() # FIXME: add noexec test/warning. Maybe try chmod entrypoint. if test.returncode != 0: verbose("plugin testing error:") for line in test_log: verbose(f' {line}') print('plugin testing failed') return False # Find this cute little plugin a forever home shutil.copytree(plugin_path, inst_path) print(f'plugin installed: {inst_path}') remove_dir(clone_path) return True def install(plugin_name): """reckless install downloads plugin from source repos, installs and activates plugin""" assert isinstance(plugin_name, list) plugin_name = plugin_name[0] if plugin_name is None: print('missing argument: plugin_name') src = search([plugin_name]) if src: verbose(f'Retrieving {plugin_name} from {src.repo}') if not _install_plugin(src): print('installation aborted') sys.exit(1) inst_path = os.path.join(RECKLESS_CONFIG.reckless_dir, src.name, src.entry) RECKLESS_CONFIG.enable_plugin(inst_path) enable([plugin_name]) def uninstall(plugin_name): """reckless uninstall disables plugin and deletes the plugin's reckless dir""" assert isinstance(plugin_name, list) plugin_name = plugin_name[0] if plugin_name is not None: # FIXME: Do something here. print('Uninstalling plugin {}'.format(plugin_name)) disable([plugin_name]) plugin_dir = os.path.join(RECKLESS_CONFIG.reckless_dir, plugin_name) verbose("looking for {}".format(plugin_dir)) if remove_dir(plugin_dir): print(f"{plugin_name} uninstalled successfully.") def search(plugin_name): """reckless search searches plugin index for plugin""" plugin_name = plugin_name[0] if plugin_name is None: print('plugin name required') return None ordered_repos = RECKLESS_SOURCES for r in RECKLESS_SOURCES: if r.split('/')[-1].lower() == plugin_name.lower(): ordered_repos.remove(r) ordered_repos.insert(0, r) for r in ordered_repos: p = _search_repo(plugin_name, r) if p: print(f"found {p.name} in repo: {p.repo}") verbose(f"entry: {p.entry}") if p.subdir: verbose(f'sub-directory: {p.subdir}') return p print(f'Unable to locate source for plugin {plugin_name}') def lightning_cli_available(): clncli = Popen(['lightning-cli'], stdout=PIPE, stderr=PIPE) clncli.wait(timeout=1) if clncli.returncode == 0: return True else: return False def enable(plugin_name): """reckless enable dynamically activates plugin and adds to config (persistent)""" assert isinstance(plugin_name, list) plugin_name = plugin_name[0] if plugin_name is None: sys.stderr.write('Plugin name required.') sys.exit(1) inst = InferInstall(plugin_name) path = inst.entry if not os.path.exists(path): print('cannot find installed plugin at expected path {}' .format(path)) sys.exit(1) verbose('activating {}'.format(plugin_name)) if not lightning_cli_available(): # Config update should not be dependent upon lightningd running RECKLESS_CONFIG.enable_plugin(path) return clncli = Popen(['lightning-cli', 'plugin', 'start', path], stdout=PIPE) clncli.wait(timeout=3) if clncli.returncode == 0: RECKLESS_CONFIG.enable_plugin(path) print('{} enabled'.format(plugin_name)) else: err = eval(clncli.stdout.read().decode().replace('\n', ''))['message'] if ': already registered' in err: RECKLESS_CONFIG.enable_plugin(path) verbose(f'{inst.name} already registered with lightningd') print('{} enabled'.format(plugin_name)) else: print(f'reckless: {inst.name} failed to start!') print(err) sys.exit(clncli.returncode) def disable(plugin_name): """reckless disable deactivates an installed plugin""" assert isinstance(plugin_name, list) plugin_name = plugin_name[0] if plugin_name is None: sys.stderr.write('Plugin name required.') sys.exit(1) inst = InferInstall(plugin_name) path = inst.entry if not os.path.exists(path): sys.stderr.write(f'Could not find plugin at {path}\n') sys.exit(1) if not lightning_cli_available(): RECKLESS_CONFIG.disable_plugin(path) print(f'{plugin_name} disabled') return clncli = Popen(['lightning-cli', 'plugin', 'stop', path], stdout=PIPE, stderr=PIPE) clncli.wait(timeout=3) output = json.loads(clncli.stdout.read().decode() .replace('\n', '').replace(' ', '')) # print(output) if ('code' in output.keys() and output['code'] == -32602): print('plugin not currently running') elif clncli.returncode != 0: print('lightning-cli plugin stop failed') sys.stderr.write(clncli.stderr.read().decode()) sys.exit(clncli.returncode) RECKLESS_CONFIG.disable_plugin(path) print(f'{plugin_name} disabled') def load_config(reckless_dir=None, network='bitcoin'): """Initial directory discovery and config file creation.""" if reckless_dir is None: reckless_dir = str(os.path.join(LIGHTNING_DIR, 'reckless')) else: if not os.path.isabs(reckless_dir): reckless_dir = os.path.join(os.getcwd(), reckless_dir) # Reckless applies to the bitcoin network configuration by default. if network == 'bitcoin': reck_conf_path = os.path.join(reckless_dir, 'bitcoin-reckless.conf') # This config file inherits the RecklessConfig. net_conf = LightningBitcoinConfig() elif network == 'regtest': reck_conf_path = os.path.join(reckless_dir, 'regtest-reckless.conf') regtest_path = os.path.join(LIGHTNING_DIR, 'regtest', 'config') # Actually the regtest network config net_conf = LightningBitcoinConfig(path=regtest_path) # Reckless manages plugins here. reckless_conf = RecklessConfig(path=reck_conf_path) if not reckless_conf: print('Error: reckless config file could not be written') sys.exit(1) if not net_conf: print('Error: could not load or create the network specific lightningd' ' config (default .lightning/bitcoin)') sys.exit(1) net_conf.editConfigFile(f'include {reckless_conf.conf_fp}', None) return reckless_conf def get_sources_file(): return os.path.join(RECKLESS_DIR, '.sources') def sources_from_file(): sources_file = get_sources_file() read_sources = [] with open(sources_file, 'r') as f: for src in f.readlines(): if len(src.strip()) > 0: read_sources.append(src.strip()) # print('loaded sources:', repos) return read_sources def loadSources(): """Look for the repo sources file""" sources_file = get_sources_file() # This would have been created if possible if not os.path.exists(sources_file): print('Warning: Reckless requires write access') Config(path=sources_file, default_text='https://github.com/lightningd/plugins') return ['https://github.com/lightningd/plugins'] return sources_from_file() def add_source(src): """Additional git repositories, directories, etc. are passed here singly.""" # Is it a file? assert isinstance(src, str) maybe_path = os.path.realpath(src) if os.path.exists(maybe_path): # FIXME: This should handle either a directory or a git repo if os.path.isdir(maybe_path): print(f'Plugin source directory found: {maybe_path}') elif 'github.com' in src: my_file = Config(path=str(get_sources_file()), default_text='https://github.com/lightningd/plugins') my_file.editConfigFile(src, None) def remove_source(src): assert isinstance(src, str) if src in sources_from_file(): my_file = Config(path=get_sources_file(), default_text='https://github.com/lightningd/plugins') my_file.editConfigFile(None, src) print('plugin source removed') else: print(f'source not found: {src}') def list_source(): for src in sources_from_file(): print(src) def source(cmd): """reckless source reckless source add adds a source to search reckless source remove removes a source reckless source list lists all sources """ assert isinstance(cmd, list) if cmd[0] == 'add': for src in cmd[1:]: add_source(src) elif cmd[0] == 'remove' or cmd[0] == 'rem' or cmd[0] == 'rm': for src in cmd[1:]: remove_source(src) elif cmd[0] == 'list': list_source() else: print('unrecognized argument to reckless source: {cmd[0]}') sys.exit(1) def sources(cmd): # alias to improve ux source(cmd) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument(dest='command', help='install/uninstall/search/enable' '/disable/source/help', type=str) # Here we pass a list of argument in as target. This is useful for # subcommands (i.e., reckless source add ) as well as for passing # lists of plugins to handle in sequence (to be implemented.) parser.add_argument(dest='target', nargs='*', default=None, help='target', type=str) # This default depends on the .lightning directory parser.add_argument('-d', '--reckless-dir', help='specify a data directory for reckless to use', type=str, default=None) parser.add_argument('-l', '--lightning', help='lightning data directory (default:~/.lightning)', type=str, default=Path.home().joinpath('.lightning')) parser.add_argument('-r', '--regtest', action='store_true') parser.add_argument('-v', '--verbose', action='store_true') args = parser.parse_args() if hasattr(args, 'command'): if args.command in ['install', 'uninstall', 'search', 'enable', 'disable', 'help', 'source', 'sources']: NETWORK = 'regtest' if args.regtest else 'bitcoin' LIGHTNING_DIR = Path(args.lightning) if args.reckless_dir: RECKLESS_DIR = args.reckless_dir else: RECKLESS_DIR = os.path.join(LIGHTNING_DIR, 'reckless') RECKLESS_CONFIG = load_config(reckless_dir=RECKLESS_DIR, network=NETWORK) RECKLESS_SOURCES = loadSources() IS_VERBOSE = bool(args.verbose) globals()[args.command](args.target) sys.exit(0) else: print(f'{args.command}: command unrecognized')