various improvements and new plugins

This commit is contained in:
ned
2023-06-17 23:39:12 +02:00
parent c9d9c75d6c
commit 302cdc035d
11 changed files with 298 additions and 78 deletions

View File

@@ -35,6 +35,9 @@ A [Telegram bot](https://core.telegram.org/bots/api) that integrates with OpenAI
- [x] (NEW!) Support *functions* (plugins) to extend the bot's functionality with 3rd party services
- Currently available functions:
- Daily weather and 7-day forecast for any location (powered by [Open-Meteo](https://open-meteo.com))
- Live cryptocurrencies rate (powered by [CoinCap](https://coincap.io)) - by [@stumpyfr](https://github.com/stumpyfr)
- WolframAlpha queries (powered by [WolframAlpha](https://www.wolframalpha.com)) - requires a WolframAlpha API key
- Web search (powered by [DuckDuckGo](https://duckduckgo.com))
## Additional features - help needed!
If you'd like to help, check out the [issues](https://github.com/n3d1117/chatgpt-telegram-bot/issues) section and contribute!
@@ -101,9 +104,12 @@ Check out the [Budget Manual](https://github.com/n3d1117/chatgpt-telegram-bot/di
#### Functions
| Parameter | Description | Default value |
|------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------|
|-----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------|
| `ENABLE_FUNCTIONS` | Whether to use functions (aka plugins). You can read more about functions [here](https://openai.com/blog/function-calling-and-other-api-updates) | `true` (if available for the model) |
| `FUNCTIONS_MAX_CONSECUTIVE_CALLS` | Maximum number of back-to-back function calls to be made by the model in a single response before displaying a user-facing message | `10` |
| `PLUGINS` | List of plugins to enable (`wolfram`, `weather`, `crypto`, `web_search`), e.g: PLUGINS=wolfram,weather | `-` |
| `SHOW_PLUGINS_USED` | Whether to show which plugins were used for a response | `false` |
| `WOLFRAM_APP_ID` | Wolfram Alpha APP ID (required for the `wolfram` plugin, you can get one [here](https://products.wolframalpha.com/simple-api/documentation) | `false` |
Check out the [official API reference](https://platform.openai.com/docs/api-reference/chat) for more details.

View File

@@ -1,23 +1,48 @@
import json
from plugins.weather import weather_function_spec, get_current_weather
from bot.plugins.crypto import CryptoPlugin
from bot.plugins.weather import WeatherPlugin
from bot.plugins.web_search import WebSearchPlugin
from bot.plugins.wolfram_alpha import WolframAlphaPlugin
def get_functions_specs():
class PluginManager:
"""
A class to manage the plugins and call the correct functions
"""
def __init__(self, config):
enabled_plugins = config.get('plugins', [])
plugins = [
WolframAlphaPlugin() if 'wolfram' in enabled_plugins else None,
WeatherPlugin() if 'weather' in enabled_plugins else None,
CryptoPlugin() if 'crypto' in enabled_plugins else None,
WebSearchPlugin() if 'web_search' in enabled_plugins else None,
]
self.plugins = [plugin for plugin in plugins if plugin is not None]
def get_functions_specs(self):
"""
Return the list of function specs that can be called by the model
"""
return [
weather_function_spec(),
]
return [plugin.get_spec() for plugin in self.plugins]
async def call_function(function_name, arguments):
async def call_function(self, function_name, arguments):
"""
Call a function based on the name and parameters provided
"""
if function_name == "get_current_weather":
arguments = json.loads(arguments)
return await get_current_weather(arguments["latitude"], arguments["longitude"], arguments["unit"])
plugin = self.__get_plugin_by_function_name(function_name)
if not plugin:
return json.dumps({'error': f'Function {function_name} not found'})
return json.dumps(await plugin.execute(**json.loads(arguments)))
raise Exception(f"Function {function_name} not found")
def get_plugin_source_name(self, function_name) -> str:
"""
Return the source name of the plugin
"""
plugin = self.__get_plugin_by_function_name(function_name)
if not plugin:
return ''
return plugin.get_source_name()
def __get_plugin_by_function_name(self, function_name):
return next((plugin for plugin in self.plugins if plugin.get_spec().get('name') == function_name), None)

View File

@@ -3,6 +3,7 @@ import os
from dotenv import load_dotenv
from bot.functions import PluginManager
from openai_helper import OpenAIHelper, default_max_tokens, are_functions_available
from telegram_bot import ChatGPTTelegramBot
@@ -47,6 +48,7 @@ def main():
'presence_penalty': float(os.environ.get('PRESENCE_PENALTY', 0.0)),
'frequency_penalty': float(os.environ.get('FREQUENCY_PENALTY', 0.0)),
'bot_language': os.environ.get('BOT_LANGUAGE', 'en'),
'show_plugins_used': os.environ.get('SHOW_PLUGINS_USED', 'false').lower() == 'true',
}
if openai_config['enable_functions'] and not functions_available:
@@ -82,8 +84,13 @@ def main():
'bot_language': os.environ.get('BOT_LANGUAGE', 'en'),
}
plugin_config = {
'plugins': os.environ.get('PLUGINS', '').split(',')
}
# Setup and run ChatGPT and Telegram bot
openai_helper = OpenAIHelper(config=openai_config)
plugin_manager = PluginManager(config=plugin_config)
openai_helper = OpenAIHelper(config=openai_config, plugin_manager=plugin_manager)
telegram_bot = ChatGPTTelegramBot(config=telegram_config, openai=openai_helper)
telegram_bot.run()

View File

@@ -14,7 +14,7 @@ from calendar import monthrange
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from functions import get_functions_specs, call_function
from bot.functions import PluginManager
# Models can be found here: https://platform.openai.com/docs/models/overview
GPT_3_MODELS = ("gpt-3.5-turbo", "gpt-3.5-turbo-0301", "gpt-3.5-turbo-0613")
@@ -84,14 +84,16 @@ class OpenAIHelper:
ChatGPT helper class.
"""
def __init__(self, config: dict):
def __init__(self, config: dict, plugin_manager: PluginManager):
"""
Initializes the OpenAI helper class with the given configuration.
:param config: A dictionary containing the GPT configuration
:param plugin_manager: The plugin manager
"""
openai.api_key = config['api_key']
openai.proxy = config['proxy']
self.config = config
self.plugin_manager = plugin_manager
self.conversations: dict[int: list] = {} # {chat_id: history}
self.last_updated: dict[int: datetime] = {} # {chat_id: last_update_timestamp}
@@ -112,9 +114,10 @@ class OpenAIHelper:
:param query: The query to send to the model
:return: The answer from the model and the number of tokens used
"""
plugins_used = ()
response = await self.__common_get_chat_response(chat_id, query)
if self.config['enable_functions']:
response = await self.__handle_function_call(chat_id, response)
response, plugins_used = await self.__handle_function_call(chat_id, response)
answer = ''
if len(response.choices) > 1 and self.config['n_choices'] > 1:
@@ -130,11 +133,17 @@ class OpenAIHelper:
self.__add_to_history(chat_id, role="assistant", content=answer)
bot_language = self.config['bot_language']
show_plugins_used = len(plugins_used) > 0 and self.config['show_plugins_used']
plugin_names = tuple(self.plugin_manager.get_plugin_source_name(plugin) for plugin in plugins_used)
if self.config['show_usage']:
answer += "\n\n---\n" \
f"💰 {str(response.usage['total_tokens'])} {localized_text('stats_tokens', bot_language)}" \
f" ({str(response.usage['prompt_tokens'])} {localized_text('prompt', bot_language)}," \
f" {str(response.usage['completion_tokens'])} {localized_text('completion', bot_language)})"
if show_plugins_used:
answer += f"\n🔌 {', '.join(plugin_names)}"
elif show_plugins_used:
answer += f"\n\n---\n🔌 {', '.join(plugin_names)}"
return answer, response.usage['total_tokens']
@@ -145,9 +154,10 @@ class OpenAIHelper:
:param query: The query to send to the model
:return: The answer from the model and the number of tokens used, or 'not_finished'
"""
plugins_used = ()
response = await self.__common_get_chat_response(chat_id, query, stream=True)
if self.config['enable_functions']:
response = await self.__handle_function_call(chat_id, response, stream=True)
response, plugins_used = await self.__handle_function_call(chat_id, response, stream=True)
answer = ''
async for item in response:
@@ -161,8 +171,14 @@ class OpenAIHelper:
self.__add_to_history(chat_id, role="assistant", content=answer)
tokens_used = str(self.__count_tokens(self.conversations[chat_id]))
show_plugins_used = len(plugins_used) > 0 and self.config['show_plugins_used']
plugin_names = tuple(self.plugin_manager.get_plugin_source_name(plugin) for plugin in plugins_used)
if self.config['show_usage']:
answer += f"\n\n---\n💰 {tokens_used} {localized_text('stats_tokens', self.config['bot_language'])}"
if show_plugins_used:
answer += f"\n🔌 {', '.join(plugin_names)}"
elif show_plugins_used:
answer += f"\n\n---\n🔌 {', '.join(plugin_names)}"
yield answer, tokens_used
@@ -217,7 +233,9 @@ class OpenAIHelper:
}
if self.config['enable_functions']:
common_args['functions'] = get_functions_specs()
functions = self.plugin_manager.get_functions_specs()
if len(functions) > 0:
common_args['functions'] = self.plugin_manager.get_functions_specs()
common_args['function_call'] = 'auto'
return await openai.ChatCompletion.acreate(**common_args)
@@ -231,7 +249,7 @@ class OpenAIHelper:
except Exception as e:
raise Exception(f"⚠️ _{localized_text('error', bot_language)}._ ⚠️\n{str(e)}") from e
async def __handle_function_call(self, chat_id, response, stream=False, times=0):
async def __handle_function_call(self, chat_id, response, stream=False, times=0, plugins_used=()):
function_name = ''
arguments = ''
if stream:
@@ -247,9 +265,9 @@ class OpenAIHelper:
elif 'finish_reason' in first_choice and first_choice.finish_reason == 'function_call':
break
else:
return response
return response, plugins_used
else:
return response
return response, plugins_used
else:
if 'choices' in response and len(response.choices) > 0:
first_choice = response.choices[0]
@@ -259,21 +277,24 @@ class OpenAIHelper:
if 'arguments' in first_choice.message.function_call:
arguments += str(first_choice.message.function_call.arguments)
else:
return response
return response, plugins_used
else:
return response
return response, plugins_used
logging.info(f'Calling function {function_name}...')
function_response = await call_function(function_name, arguments)
logging.info(f'Calling function {function_name} with arguments {arguments}')
function_response = await self.plugin_manager.call_function(function_name, arguments)
logging.info(f'Got response {function_response}')
self.__add_function_call_to_history(chat_id=chat_id, function_name=function_name, content=function_response)
response = await openai.ChatCompletion.acreate(
model=self.config['model'],
messages=self.conversations[chat_id],
functions=get_functions_specs(),
functions=self.plugin_manager.get_functions_specs(),
function_call='auto' if times < self.config['functions_max_consecutive_calls'] else 'none',
stream=stream
)
return await self.__handle_function_call(chat_id, response, stream, times+1)
if function_name not in plugins_used:
plugins_used += (function_name,)
return await self.__handle_function_call(chat_id, response, stream, times + 1, plugins_used)
async def generate_image(self, prompt: str) -> tuple[str, str]:
"""

30
bot/plugins/crypto.py Normal file
View File

@@ -0,0 +1,30 @@
from typing import Dict
import requests
from bot.plugins.plugin import Plugin
# Author: https://github.com/stumpyfr
class CryptoPlugin(Plugin):
"""
A plugin to fetch the current rate of various cryptocurrencies
"""
def get_source_name(self) -> str:
return "CoinCap"
def get_spec(self) -> Dict:
return {
"name": "get_crypto_rate",
"description": "Get the current rate of various crypto currencies",
"parameters": {
"type": "object",
"properties": {
"asset": {"type": "string", "description": "Asset of the crypto"}
},
"required": ["asset"],
},
}
async def execute(self, **kwargs) -> Dict:
return requests.get(f"https://api.coincap.io/v2/rates/{kwargs['asset']}").json()

30
bot/plugins/plugin.py Normal file
View File

@@ -0,0 +1,30 @@
from abc import abstractmethod, ABC
from typing import Dict
class Plugin(ABC):
"""
A plugin interface which can be used to create plugins for the ChatGPT API.
"""
@abstractmethod
def get_source_name(self) -> str:
"""
Return the name of the source of the plugin.
"""
pass
@abstractmethod
def get_spec(self) -> Dict:
"""
Function spec in the form of JSON schema as specified in the OpenAI documentation:
https://platform.openai.com/docs/api-reference/chat/create#chat/create-functions
"""
pass
@abstractmethod
async def execute(self, **kwargs) -> Dict:
"""
Execute the plugin and return a JSON serializable response
"""
pass

View File

@@ -1,9 +1,18 @@
import json
from typing import Dict
import requests
from bot.plugins.plugin import Plugin
def weather_function_spec():
class WeatherPlugin(Plugin):
"""
A plugin to get the current weather and 7-day daily forecast for a location
"""
def get_source_name(self) -> str:
return "OpenMeteo"
def get_spec(self) -> Dict:
return {
"name": "get_current_weather",
"description": "Get the current and 7-day daily weather forecast for a location using Open Meteo APIs.",
@@ -28,22 +37,13 @@ def weather_function_spec():
}
}
async def get_current_weather(latitude, longitude, unit):
"""
Get the current weather in a given location using the Open Meteo API
Source: https://open-meteo.com/en/docs
:param latitude: The latitude of the location to get the weather for
:param longitude: The longitude of the location to get the weather for
:param unit: The unit to use for the temperature (`celsius` or `fahrenheit`)
:return: The JSON response to be fed back to the model
"""
request = requests.get(f'https://api.open-meteo.com/v1/forecast'
f'?latitude={latitude}'
f'&longitude={longitude}'
f'&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_mean,'
f'&forecast_days=7'
f'&timezone=auto'
f'&temperature_unit={unit}'
f'&current_weather=true')
return json.dumps(request.json())
async def execute(self, **kwargs) -> Dict:
url = f'https://api.open-meteo.com/v1/forecast'\
f'?latitude={kwargs["latitude"]}'\
f'&longitude={kwargs["longitude"]}'\
f'&temperature_unit={kwargs["unit"]}' \
'&current_weather=true' \
'&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_mean,' \
'&forecast_days=7' \
'&timezone=auto'
return requests.get(url).json()

51
bot/plugins/web_search.py Normal file
View File

@@ -0,0 +1,51 @@
from itertools import islice
from typing import Dict
from duckduckgo_search import DDGS
from bot.plugins.plugin import Plugin
class WebSearchPlugin(Plugin):
"""
A plugin to search the web for a given query, using DuckDuckGo
"""
def get_source_name(self) -> str:
return "DuckDuckGo"
def get_spec(self) -> Dict:
return {
"name": "web_search",
"description": "Execute a web search for the given query and return a list of results",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "the user query"
}
},
"required": ["query"],
},
}
async def execute(self, **kwargs) -> Dict:
with DDGS() as ddgs:
ddgs_gen = ddgs.text(
kwargs['query'],
region='wt-wt',
safesearch='off'
)
results = list(islice(ddgs_gen, 8))
if results is None or len(results) == 0:
return {"Result": "No good DuckDuckGo Search Result was found"}
def to_metadata(result: Dict) -> Dict[str, str]:
return {
"snippet": result["body"],
"title": result["title"],
"link": result["href"],
}
return {"result": [to_metadata(result) for result in results]}

View File

@@ -0,0 +1,48 @@
import os
from typing import Dict
import wolframalpha
from bot.plugins.plugin import Plugin
class WolframAlphaPlugin(Plugin):
"""
A plugin to answer questions using WolframAlpha.
"""
def __init__(self):
wolfram_app_id = os.getenv('WOLFRAM_APP_ID')
if not wolfram_app_id:
raise ValueError('WOLFRAM_APP_ID environment variable must be set to use WolframAlphaPlugin')
self.app_id = wolfram_app_id
def get_source_name(self) -> str:
return "WolframAlpha"
def get_spec(self) -> Dict:
return {
"name": "answer_with_wolfram_alpha",
"description": "Get an answer to a question using Wolfram Alpha. Input should the the query in English.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query, in english (translate if necessary)"}
},
"required": ["query"]
}
}
async def execute(self, **kwargs) -> Dict:
client = wolframalpha.Client(self.app_id)
res = client.query(kwargs['query'])
try:
assumption = next(res.pods).text
answer = next(res.results).text
except StopIteration:
return {'answer': 'Wolfram Alpha wasn\'t able to answer it'}
if answer is None or answer == "":
return {'answer': 'No good Wolfram Alpha Result was found'}
else:
return {'assumption': assumption, 'answer': answer}

View File

@@ -5,3 +5,5 @@ openai==0.27.8
python-telegram-bot==20.3
requests~=2.31.0
tenacity==8.2.2
wolframalpha==5.0.0
duckduckgo_search==3.8.3

View File

@@ -12,7 +12,7 @@
"stats_conversation":["Current conversation", "chat messages in history", "chat tokens in history"],
"usage_today":"Usage today",
"usage_month":"Usage this month",
"stats_tokens":"chat tokens used",
"stats_tokens":"tokens",
"stats_images":"images generated",
"stats_transcribe":["minutes and", "seconds transcribed"],
"stats_total":"💰 For a total amount of $",