"""Handles loading of plugins.""" import importlib import json import mimetypes import os import zipfile from pathlib import Path from urllib.parse import urlparse from zipimport import zipimporter from typing import List, Optional, Tuple import openapi_python_client import requests from abstract_singleton import AbstractSingleton from openapi_python_client.cli import _process_config, Config as OpenAPIConfig from autogpt.config import Config def inspect_zip_for_module(zip_path: str, debug: bool = False) -> Optional[str]: """ Inspect a zipfile for a module. Args: zip_path (str): Path to the zipfile. debug (bool, optional): Enable debug logging. Defaults to False. Returns: Optional[str]: The name of the module if found, else None. """ with zipfile.ZipFile(zip_path, "r") as zfile: for name in zfile.namelist(): if name.endswith("__init__.py"): if debug: print(f"Found module '{name}' in the zipfile at: {name}") return name if debug: print(f"Module '__init__.py' not found in the zipfile @ {zip_path}.") return None def write_dict_to_json_file(data: dict, file_path: str): """ Write a dictionary to a JSON file. Args: data (dict): Dictionary to write. file_path (str): Path to the file. """ with open(file_path, 'w') as file: json.dump(data, file, indent=4) def fetch_openai_plugins_manifest_and_spec(cfg: Config) -> dict: """ Fetch the manifest for a list of OpenAI plugins. Args: urls (List): List of URLs to fetch. Returns: dict: per url dictionary of manifest and spec. """ # TODO add directory scan manifests = {} for url in cfg.plugins_openai: openai_plugin_client_dir = f"{cfg.plugins_dir}/openai/{urlparse(url).netloc}" create_directory_if_not_exists(openai_plugin_client_dir) if not os.path.exists(f'{openai_plugin_client_dir}/ai-plugin.json'): try: response = requests.get(f"{url}/.well-known/ai-plugin.json") if response.status_code == 200: manifest = response.json() if manifest["schema_version"] != "v1": print(f"Unsupported manifest version: {manifest['schem_version']} for {url}") continue if manifest["api"]["type"] != "openapi": print(f"Unsupported API type: {manifest['api']['type']} for {url}") continue write_dict_to_json_file(manifest, f'{openai_plugin_client_dir}/ai-plugin.json') else: print(f"Failed to fetch manifest for {url}: {response.status_code}") except requests.exceptions.RequestException as e: print(f"Error while requesting manifest from {url}: {e}") else: print(f"Manifest for {url} already exists") manifest = json.load(open(f'{openai_plugin_client_dir}/ai-plugin.json')) if not os.path.exists(f'{openai_plugin_client_dir}/openapi.json'): openapi_spec = openapi_python_client._get_document(url=manifest["api"]["url"], path=None, timeout=5) write_dict_to_json_file(openapi_spec, f'{openai_plugin_client_dir}/openapi.json') else: print(f"OpenAPI spec for {url} already exists") openapi_spec = json.load(open(f'{openai_plugin_client_dir}/openapi.json')) manifests[url] = { 'manifest': manifest, 'openapi_spec': openapi_spec } return manifests def create_directory_if_not_exists(directory_path: str) -> bool: """ Create a directory if it does not exist. Args: directory_path (str): Path to the directory. Returns: bool: True if the directory was created, else False. """ if not os.path.exists(directory_path): try: os.makedirs(directory_path) print(f"Created directory: {directory_path}") return True except OSError as e: print(f"Error creating directory {directory_path}: {e}") return False else: print(f"Directory {directory_path} already exists") return True def initialize_openai_plugins(manifests_specs: dict, cfg: Config, debug: bool = False) -> dict: """ Initialize OpenAI plugins. Args: manifests_specs (dict): per url dictionary of manifest and spec. cfg (Config): Config instance including plugins config debug (bool, optional): Enable debug logging. Defaults to False. Returns: dict: per url dictionary of manifest, spec and client. """ openai_plugins_dir = f'{cfg.plugins_dir}/openai' if create_directory_if_not_exists(openai_plugins_dir): for url, manifest_spec in manifests_specs.items(): openai_plugin_client_dir = f'{openai_plugins_dir}/{urlparse(url).hostname}' _meta_option = openapi_python_client.MetaType.SETUP, _config = OpenAPIConfig(**{ 'project_name_override': 'client', 'package_name_override': 'client', }) prev_cwd = Path.cwd() os.chdir(openai_plugin_client_dir) Path('ai-plugin.json') if not os.path.exists('client'): client_results = openapi_python_client.create_new_client( url=manifest_spec['manifest']['api']['url'], path=None, meta=_meta_option, config=_config, ) if client_results: print(f"Error creating OpenAPI client: {client_results[0].header} \n" f" details: {client_results[0].detail}") continue spec = importlib.util.spec_from_file_location('client', 'client/client/client.py') module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) client = module.Client(base_url=url) os.chdir(prev_cwd) manifest_spec['client'] = client return manifests_specs def scan_plugins(cfg: Config, debug: bool = False) -> List[Tuple[str, Path]]: """Scan the plugins directory for plugins. Args: cfg (Config): Config instance including plugins config debug (bool, optional): Enable debug logging. Defaults to False. Returns: List[Tuple[str, Path]]: List of plugins. """ plugins = [] # Generic plugins plugins_path_path = Path(cfg.plugins_dir) for plugin in plugins_path_path.glob("*.zip"): if module := inspect_zip_for_module(str(plugin), debug): plugins.append((module, plugin)) # OpenAI plugins if cfg.plugins_openai: manifests_specs = fetch_openai_plugins_manifest_and_spec(cfg) if manifests_specs.keys(): manifests_specs_clients = initialize_openai_plugins(manifests_specs, cfg, debug) return plugins def blacklist_whitelist_check(plugins: List[AbstractSingleton], cfg: Config): """Check if the plugin is in the whitelist or blacklist. Args: plugins (List[Tuple[str, Path]]): List of plugins. cfg (Config): Config object. Returns: List[Tuple[str, Path]]: List of plugins. """ loaded_plugins = [] for plugin in plugins: if plugin.__name__ in cfg.plugins_blacklist: continue if plugin.__name__ in cfg.plugins_whitelist: loaded_plugins.append(plugin()) else: ack = input( f"WARNNG Plugin {plugin.__name__} found. But not in the" " whitelist... Load? (y/n): " ) if ack.lower() == "y": loaded_plugins.append(plugin()) if loaded_plugins: print(f"\nPlugins found: {len(loaded_plugins)}\n" "--------------------") for plugin in loaded_plugins: print(f"{plugin._name}: {plugin._version} - {plugin._description}") return loaded_plugins def load_plugins(cfg: Config = Config(), debug: bool = False) -> List[object]: """Load plugins from the plugins directory. Args: cfg (Config): Config instance including plugins config debug (bool, optional): Enable debug logging. Defaults to False. Returns: List[AbstractSingleton]: List of plugins initialized. """ plugins = scan_plugins(cfg) plugin_modules = [] for module, plugin in plugins: plugin = Path(plugin) module = Path(module) if debug: print(f"Plugin: {plugin} Module: {module}") zipped_package = zipimporter(plugin) zipped_module = zipped_package.load_module(str(module.parent)) for key in dir(zipped_module): if key.startswith("__"): continue a_module = getattr(zipped_module, key) a_keys = dir(a_module) if "_abc_impl" in a_keys and a_module.__name__ != "AutoGPTPluginTemplate": plugin_modules.append(a_module) return blacklist_whitelist_check(plugin_modules, cfg)