Merge branch 'feature/support-functions' into feature/support-functions

This commit is contained in:
ned
2023-07-03 22:22:04 +02:00
committed by GitHub
15 changed files with 463 additions and 149 deletions

View File

@@ -100,28 +100,33 @@ Check out the [Budget Manual](https://github.com/n3d1117/chatgpt-telegram-bot/di
#### Functions #### Functions
| Parameter | Description | Default value | | Parameter | Description | Default value |
|-----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------| |-----------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------|
| `ENABLE_FUNCTIONS` | Whether to use functions (aka plugins). You can read more about functions [here](https://openai.com/blog/function-calling-and-other-api-updates) | `true` (if available for the model) | | `ENABLE_FUNCTIONS` | Whether to use functions (aka plugins). You can read more about functions [here](https://openai.com/blog/function-calling-and-other-api-updates) | `true` (if available for the model) |
| `FUNCTIONS_MAX_CONSECUTIVE_CALLS` | Maximum number of back-to-back function calls to be made by the model in a single response before displaying a user-facing message | `10` | | `FUNCTIONS_MAX_CONSECUTIVE_CALLS` | Maximum number of back-to-back function calls to be made by the model in a single response before displaying a user-facing message | `10` |
| `PLUGINS` | List of plugins to enable (see below for a full list), e.g: `PLUGINS=wolfram,weather` | `-` | | `PLUGINS` | List of plugins to enable (see below for a full list), e.g: `PLUGINS=wolfram,weather` | - |
| `SHOW_PLUGINS_USED` | Whether to show which plugins were used for a response | `false` | | `SHOW_PLUGINS_USED` | Whether to show which plugins were used for a response | `false` |
| `WOLFRAM_APP_ID` | Wolfram Alpha APP ID (required for the `wolfram` plugin, you can get one [here](https://products.wolframalpha.com/simple-api/documentation)) | `-` | | `WOLFRAM_APP_ID` | Wolfram Alpha APP ID (required only for the `wolfram` plugin, you can get one [here](https://products.wolframalpha.com/simple-api/documentation)) | - |
| `SPOTIFY_CLIENT_ID` | Spotify app Client ID (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` | | `SPOTIFY_CLIENT_ID` | Spotify app Client ID (required only for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | - |
| `SPOTIFY_CLIENT_SECRET` | Spotify app Client Secret (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` | | `SPOTIFY_CLIENT_SECRET` | Spotify app Client Secret (required only for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | - |
| `SPOTIFY_REDIRECT_URI` | Spotify app Redirect URI (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` | | `SPOTIFY_REDIRECT_URI` | Spotify app Redirect URI (required only for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | - |
| `WORLDTIME_DEFAULT_TIMEZONE` | Default timezone to use, i.e. `Europe/Rome` (required only for the `worldtimeapi` plugin, you can get TZ Identifiers from [here](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)) | - |
| `DUCKDUCKGO_SAFESEARCH` | DuckDuckGo safe search (`on`, `off` or `moderate`) (optional, applies to `ddg_web_search` and `ddg_image_search`) | `moderate` |
| `DEEPL_API_KEY` | DeepL API key (required for the `deepl` plugin, you can get one [here](https://www.deepl.com/pro-api?cta=header-pro-api)) | `-` | | `DEEPL_API_KEY` | DeepL API key (required for the `deepl` plugin, you can get one [here](https://www.deepl.com/pro-api?cta=header-pro-api)) | `-` |
#### Available plugins #### Available plugins
| Name | Description | Required API key(s) | | Name | Description | Required environment variable(s) |
|----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------| |---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------|
| `weather` | Daily weather and 7-day forecast for any location (powered by [Open-Meteo](https://open-meteo.com)) | `-` | | `weather` | Daily weather and 7-day forecast for any location (powered by [Open-Meteo](https://open-meteo.com)) | - |
| `wolfram` | WolframAlpha queries (powered by [WolframAlpha](https://www.wolframalpha.com)) | `WOLFRAM_APP_ID` | | `wolfram` | WolframAlpha queries (powered by [WolframAlpha](https://www.wolframalpha.com)) | `WOLFRAM_APP_ID` |
| `web_search` | Web search (powered by [DuckDuckGo](https://duckduckgo.com)) | `-` | | `ddg_web_search` | Web search (powered by [DuckDuckGo](https://duckduckgo.com)) | - |
| `crypto` | Live cryptocurrencies rate (powered by [CoinCap](https://coincap.io)) - by [@stumpyfr](https://github.com/stumpyfr) | `-` | | `ddg_translate` | Translate text to any language (powered by [DuckDuckGo](https://duckduckgo.com)) | - |
| `ddg_image_search` | Search image or GIF (powered by [DuckDuckGo](https://duckduckgo.com)) | - |
| `crypto` | Live cryptocurrencies rate (powered by [CoinCap](https://coincap.io)) - by [@stumpyfr](https://github.com/stumpyfr) | - |
| `spotify` | Spotify top tracks/artists, currently playing song and content search (powered by [Spotify](https://spotify.com)). Requires one-time authorization. | `SPOTIFY_CLIENT_ID`, `SPOTIFY_CLIENT_SECRET`, `SPOTIFY_REDIRECT_URI` | | `spotify` | Spotify top tracks/artists, currently playing song and content search (powered by [Spotify](https://spotify.com)). Requires one-time authorization. | `SPOTIFY_CLIENT_ID`, `SPOTIFY_CLIENT_SECRET`, `SPOTIFY_REDIRECT_URI` |
| `translate` | Translate text to any language (powered by [DuckDuckGo](https://duckduckgo.com)) | `-` | | `worldtimeapi` | Get latest world time (powered by [WorldTimeAPI](https://worldtimeapi.org/)) | `WORLDTIME_DEFAULT_TIMEZONE` |
| `deepl` | Translate text to any language (powered by [DeepL](https://deepl.com)) | `DEEPL_API_KEY` | | `dice` | Send a dice in the chat! | - |
| `image_search` | Search image or GIF (powered by [DuckDuckGo](https://duckduckgo.com)) | `-` | | `youtube_audio_extractor` | Extract audio from YouTube videos | - |
| `deepl` | Translate text to any language (powered by [DeepL](https://deepl.com)) - by [@LedyBacer](https://github.com/LedyBacer) | `DEEPL_API_KEY` |
Check out the [official API reference](https://platform.openai.com/docs/api-reference/chat) for more details. Check out the [official API reference](https://platform.openai.com/docs/api-reference/chat) for more details.

View File

@@ -14,6 +14,7 @@ from calendar import monthrange
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from bot.utils import is_direct_result
from plugin_manager import PluginManager from plugin_manager import PluginManager
# Models can be found here: https://platform.openai.com/docs/models/overview # Models can be found here: https://platform.openai.com/docs/models/overview
@@ -118,6 +119,9 @@ class OpenAIHelper:
response = await self.__common_get_chat_response(chat_id, query) response = await self.__common_get_chat_response(chat_id, query)
if self.config['enable_functions']: if self.config['enable_functions']:
response, plugins_used = await self.__handle_function_call(chat_id, response) response, plugins_used = await self.__handle_function_call(chat_id, response)
if is_direct_result(response):
return response, '0'
answer = '' answer = ''
if len(response.choices) > 1 and self.config['n_choices'] > 1: if len(response.choices) > 1 and self.config['n_choices'] > 1:
@@ -158,6 +162,9 @@ class OpenAIHelper:
response = await self.__common_get_chat_response(chat_id, query, stream=True) response = await self.__common_get_chat_response(chat_id, query, stream=True)
if self.config['enable_functions']: if self.config['enable_functions']:
response, plugins_used = await self.__handle_function_call(chat_id, response, stream=True) response, plugins_used = await self.__handle_function_call(chat_id, response, stream=True)
if is_direct_result(response):
yield response, '0'
return
answer = '' answer = ''
async for item in response: async for item in response:
@@ -283,7 +290,16 @@ class OpenAIHelper:
logging.info(f'Calling function {function_name} with arguments {arguments}') logging.info(f'Calling function {function_name} with arguments {arguments}')
function_response = await self.plugin_manager.call_function(function_name, arguments) function_response = await self.plugin_manager.call_function(function_name, arguments)
logging.info(f'Got response {function_response}')
if function_name not in plugins_used:
plugins_used += (function_name,)
if is_direct_result(function_response):
self.__add_function_call_to_history(chat_id=chat_id, function_name=function_name,
content=json.dumps({'result': 'Done, the content has been sent'
'to the user.'}))
return function_response, plugins_used
self.__add_function_call_to_history(chat_id=chat_id, function_name=function_name, content=function_response) self.__add_function_call_to_history(chat_id=chat_id, function_name=function_name, content=function_response)
response = await openai.ChatCompletion.acreate( response = await openai.ChatCompletion.acreate(
model=self.config['model'], model=self.config['model'],
@@ -292,8 +308,6 @@ class OpenAIHelper:
function_call='auto' if times < self.config['functions_max_consecutive_calls'] else 'none', function_call='auto' if times < self.config['functions_max_consecutive_calls'] else 'none',
stream=stream stream=stream
) )
if function_name not in plugins_used:
plugins_used += (function_name,)
return await self.__handle_function_call(chat_id, response, stream, times + 1, plugins_used) return await self.__handle_function_call(chat_id, response, stream, times + 1, plugins_used)
async def generate_image(self, prompt: str) -> tuple[str, str]: async def generate_image(self, prompt: str) -> tuple[str, str]:

View File

@@ -1,32 +1,39 @@
import json import json
from plugins.images import ImageSearchPlugin from bot.plugins.dice import DicePlugin
from plugins.translate import TranslatePlugin from bot.plugins.youtube_audio_extractor import YouTubeAudioExtractorPlugin
from plugins.ddg_image_search import DDGImageSearchPlugin
from plugins.ddg_translate import DDGTranslatePlugin
from plugins.spotify import SpotifyPlugin from plugins.spotify import SpotifyPlugin
from plugins.crypto import CryptoPlugin from plugins.crypto import CryptoPlugin
from plugins.weather import WeatherPlugin from plugins.weather import WeatherPlugin
from plugins.web_search import WebSearchPlugin from plugins.ddg_web_search import DDGWebSearchPlugin
from plugins.wolfram_alpha import WolframAlphaPlugin from plugins.wolfram_alpha import WolframAlphaPlugin
from plugins.deepl import DeeplTranslatePlugin from plugins.deepl import DeeplTranslatePlugin
from plugins.worldtimeapi import WorldTimeApiPlugin
class PluginManager: class PluginManager:
""" """
A class to manage the plugins and call the correct functions A class to manage the plugins and call the correct functions
""" """
def __init__(self, config): def __init__(self, config):
enabled_plugins = config.get('plugins', []) enabled_plugins = config.get('plugins', [])
plugin_mapping = { plugin_mapping = {
'wolfram': WolframAlphaPlugin(), 'wolfram': WolframAlphaPlugin,
'weather': WeatherPlugin(), 'weather': WeatherPlugin,
'crypto': CryptoPlugin(), 'crypto': CryptoPlugin,
'web_search': WebSearchPlugin(), 'ddg_web_search': DDGWebSearchPlugin,
'spotify': SpotifyPlugin(), 'ddg_translate': DDGTranslatePlugin,
'translate': TranslatePlugin(), 'ddg_image_search': DDGImageSearchPlugin,
'image_search': ImageSearchPlugin(), 'spotify': SpotifyPlugin,
'deepl': DeeplTranslatePlugin() 'worldtimeapi': WorldTimeApiPlugin,
'youtube_audio_extractor': YouTubeAudioExtractorPlugin,
'dice': DicePlugin,
'deepl': DeeplTranslatePlugin
} }
self.plugins = [plugin_mapping[plugin] for plugin in enabled_plugins] self.plugins = [plugin_mapping[plugin]() for plugin in enabled_plugins if plugin in plugin_mapping]
def get_functions_specs(self): def get_functions_specs(self):
""" """

View File

@@ -0,0 +1,74 @@
import os
import random
from itertools import islice
from typing import Dict
from duckduckgo_search import DDGS
from .plugin import Plugin
class DDGImageSearchPlugin(Plugin):
"""
A plugin to search images and GIFs for a given query, using DuckDuckGo
"""
def __init__(self):
self.safesearch = os.getenv('DUCKDUCKGO_SAFESEARCH', 'moderate')
def get_source_name(self) -> str:
return "DuckDuckGo Images"
def get_spec(self) -> [Dict]:
return [{
"name": "search_images",
"description": "Search image or GIFs for a given query",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The query to search for"},
"type": {
"type": "string",
"enum": ["photo", "gif"],
"description": "The type of image to search for. Default to `photo` if not specified",
},
"region": {
"type": "string",
"enum": ['xa-ar', 'xa-en', 'ar-es', 'au-en', 'at-de', 'be-fr', 'be-nl', 'br-pt', 'bg-bg',
'ca-en', 'ca-fr', 'ct-ca', 'cl-es', 'cn-zh', 'co-es', 'hr-hr', 'cz-cs', 'dk-da',
'ee-et', 'fi-fi', 'fr-fr', 'de-de', 'gr-el', 'hk-tzh', 'hu-hu', 'in-en', 'id-id',
'id-en', 'ie-en', 'il-he', 'it-it', 'jp-jp', 'kr-kr', 'lv-lv', 'lt-lt', 'xl-es',
'my-ms', 'my-en', 'mx-es', 'nl-nl', 'nz-en', 'no-no', 'pe-es', 'ph-en', 'ph-tl',
'pl-pl', 'pt-pt', 'ro-ro', 'ru-ru', 'sg-en', 'sk-sk', 'sl-sl', 'za-en', 'es-es',
'se-sv', 'ch-de', 'ch-fr', 'ch-it', 'tw-tzh', 'th-th', 'tr-tr', 'ua-uk', 'uk-en',
'us-en', 'ue-es', 've-es', 'vn-vi', 'wt-wt'],
"description": "The region to use for the search. Infer this from the language used for the"
"query. Default to `wt-wt` if not specified",
}
},
"required": ["query", "type", "region"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
with DDGS() as ddgs:
image_type = kwargs.get('type', 'photo')
ddgs_images_gen = ddgs.images(
kwargs['query'],
region=kwargs.get('region', 'wt-wt'),
safesearch=self.safesearch,
type_image=image_type,
)
results = list(islice(ddgs_images_gen, 10))
if not results or len(results) == 0:
return {"result": "No results found"}
# Shuffle the results to avoid always returning the same image
random.shuffle(results)
return {
'direct_result': {
'kind': image_type,
'format': 'url',
'value': results[0]['image']
}
}

View File

@@ -5,7 +5,7 @@ from duckduckgo_search import DDGS
from .plugin import Plugin from .plugin import Plugin
class TranslatePlugin(Plugin): class DDGTranslatePlugin(Plugin):
""" """
A plugin to translate a given text from a language to another, using DuckDuckGo A plugin to translate a given text from a language to another, using DuckDuckGo
""" """

View File

@@ -0,0 +1,67 @@
import os
from itertools import islice
from typing import Dict
from duckduckgo_search import DDGS
from .plugin import Plugin
class DDGWebSearchPlugin(Plugin):
"""
A plugin to search the web for a given query, using DuckDuckGo
"""
def __init__(self):
self.safesearch = os.getenv('DUCKDUCKGO_SAFESEARCH', 'moderate')
def get_source_name(self) -> str:
return "DuckDuckGo"
def get_spec(self) -> [Dict]:
return [{
"name": "web_search",
"description": "Execute a web search for the given query and return a list of results",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "the user query"
},
"region": {
"type": "string",
"enum": ['xa-ar', 'xa-en', 'ar-es', 'au-en', 'at-de', 'be-fr', 'be-nl', 'br-pt', 'bg-bg',
'ca-en', 'ca-fr', 'ct-ca', 'cl-es', 'cn-zh', 'co-es', 'hr-hr', 'cz-cs', 'dk-da',
'ee-et', 'fi-fi', 'fr-fr', 'de-de', 'gr-el', 'hk-tzh', 'hu-hu', 'in-en', 'id-id',
'id-en', 'ie-en', 'il-he', 'it-it', 'jp-jp', 'kr-kr', 'lv-lv', 'lt-lt', 'xl-es',
'my-ms', 'my-en', 'mx-es', 'nl-nl', 'nz-en', 'no-no', 'pe-es', 'ph-en', 'ph-tl',
'pl-pl', 'pt-pt', 'ro-ro', 'ru-ru', 'sg-en', 'sk-sk', 'sl-sl', 'za-en', 'es-es',
'se-sv', 'ch-de', 'ch-fr', 'ch-it', 'tw-tzh', 'th-th', 'tr-tr', 'ua-uk', 'uk-en',
'us-en', 'ue-es', 've-es', 'vn-vi', 'wt-wt'],
"description": "The region to use for the search. Infer this from the language used for the"
"query. Default to `wt-wt` if not specified",
}
},
"required": ["query", "region"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
with DDGS() as ddgs:
ddgs_gen = ddgs.text(
kwargs['query'],
region=kwargs.get('region', 'wt-wt'),
safesearch=self.safesearch
)
results = list(islice(ddgs_gen, 3))
if results is None or len(results) == 0:
return {"Result": "No good DuckDuckGo Search Result was found"}
def to_metadata(result: Dict) -> Dict[str, str]:
return {
"snippet": result["body"],
"title": result["title"],
"link": result["href"],
}
return {"result": [to_metadata(result) for result in results]}

38
bot/plugins/dice.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import Dict
from .plugin import Plugin
class DicePlugin(Plugin):
"""
A plugin to send a die in the chat
"""
def get_source_name(self) -> str:
return "Dice"
def get_spec(self) -> [Dict]:
return [{
"name": "send_dice",
"description": "Send a dice in the chat, with a random number between 1 and 6",
"parameters": {
"type": "object",
"properties": {
"emoji": {
"type": "string",
"enum": ["🎲", "🎯", "🏀", "", "🎳", "🎰"],
"description": "Emoji on which the dice throw animation is based."
"Dice can have values 1-6 for “🎲”, “🎯” and “🎳”, values 1-5 for “🏀” "
"and “⚽”, and values 1-64 for “🎰”. Defaults to “🎲”.",
}
},
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
return {
'direct_result': {
'kind': 'dice',
'format': 'dice',
'value': kwargs.get('emoji', '🎲')
}
}

View File

@@ -1,43 +0,0 @@
from itertools import islice
from typing import Dict
from duckduckgo_search import DDGS
from .plugin import Plugin
class ImageSearchPlugin(Plugin):
"""
A plugin to search images and GIFs for a given query, using DuckDuckGo
"""
def get_source_name(self) -> str:
return "DuckDuckGo Images"
def get_spec(self) -> [Dict]:
return [{
"name": "search_images",
"description": "Search image or GIFs for a given query",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The query to search for"},
"type": {
"type": "string",
"enum": ["photo", "gif"],
"description": "The type of image to search for. Default to photo if not specified",
}
},
"required": ["query", "type"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
with DDGS() as ddgs:
ddgs_images_gen = ddgs.images(
kwargs['query'],
region="wt-wt",
safesearch='off',
type_image=kwargs.get('type', 'photo'),
)
results = list(islice(ddgs_images_gen, 1))
return {"result": results[0]["image"]}

View File

@@ -1,51 +0,0 @@
from itertools import islice
from typing import Dict
from duckduckgo_search import DDGS
from .plugin import Plugin
class WebSearchPlugin(Plugin):
"""
A plugin to search the web for a given query, using DuckDuckGo
"""
def get_source_name(self) -> str:
return "DuckDuckGo"
def get_spec(self) -> [Dict]:
return [{
"name": "web_search",
"description": "Execute a web search for the given query and return a list of results",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "the user query"
}
},
"required": ["query"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
with DDGS() as ddgs:
ddgs_gen = ddgs.text(
kwargs['query'],
region='wt-wt',
safesearch='off'
)
results = list(islice(ddgs_gen, 3))
if results is None or len(results) == 0:
return {"Result": "No good DuckDuckGo Search Result was found"}
def to_metadata(result: Dict) -> Dict[str, str]:
return {
"snippet": result["body"],
"title": result["title"],
"link": result["href"],
}
return {"result": [to_metadata(result) for result in results]}

View File

@@ -0,0 +1,49 @@
import os, requests
from typing import Dict
from datetime import datetime
from .plugin import Plugin
class WorldTimeApiPlugin(Plugin):
"""
A plugin to get the current time from a given timezone, using WorldTimeAPI
"""
def __init__(self):
default_timezone = os.getenv('WORLDTIME_DEFAULT_TIMEZONE')
if not default_timezone:
raise ValueError('WORLDTIME_DEFAULT_TIMEZONE environment variable must be set to use WorldTimeApiPlugin')
self.default_timezone = default_timezone
def get_source_name(self) -> str:
return "WorldTimeAPI"
def get_spec(self) -> [Dict]:
return [{
"name": "worldtimeapi",
"description": f"Get the current time from a given timezone",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": f"The timezone identifier (e.g: `Europe/Rome`). Infer this from the location."
f"Use {self.default_timezone} if not specified."
}
},
"required": ["timezone"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
timezone = kwargs.get('timezone', self.default_timezone)
url = f'https://worldtimeapi.org/api/timezone/{timezone}'
try:
wtr = requests.get(url).json().get('datetime')
wtr_obj = datetime.strptime(wtr, "%Y-%m-%dT%H:%M:%S.%f%z")
time_24hr = wtr_obj.strftime("%H:%M:%S")
time_12hr = wtr_obj.strftime("%I:%M:%S %p")
return {"24hr": time_24hr, "12hr": time_12hr}
except:
return {"result": "No result was found"}

View File

@@ -0,0 +1,44 @@
from typing import Dict
from pytube import YouTube
from .plugin import Plugin
class YouTubeAudioExtractorPlugin(Plugin):
"""
A plugin to extract audio from a YouTube video
"""
def get_source_name(self) -> str:
return "YouTube Audio Extractor"
def get_spec(self) -> [Dict]:
return [{
"name": "extract_youtube_audio",
"description": "Extract audio from a YouTube video",
"parameters": {
"type": "object",
"properties": {
"youtube_link": {"type": "string", "description": "YouTube video link to extract audio from"}
},
"required": ["youtube_link"],
},
}]
async def execute(self, function_name, **kwargs) -> Dict:
link = kwargs['youtube_link']
try:
video = YouTube(link)
audio = video.streams.filter(only_audio=True, file_extension='mp4').first()
output = video.title + '.mp4'
audio.download(filename=output)
return {
'direct_result': {
'kind': 'file',
'format': 'path',
'value': output
}
}
except:
return {'result': 'Failed to extract audio'}

View File

@@ -16,7 +16,8 @@ from pydub import AudioSegment
from utils import is_group_chat, get_thread_id, message_text, wrap_with_indicator, split_into_chunks, \ from utils import is_group_chat, get_thread_id, message_text, wrap_with_indicator, split_into_chunks, \
edit_message_with_retry, get_stream_cutoff_values, is_allowed, get_remaining_budget, is_admin, is_within_budget, \ edit_message_with_retry, get_stream_cutoff_values, is_allowed, get_remaining_budget, is_admin, is_within_budget, \
get_reply_to_message_id, add_chat_request_to_usage_tracker, error_handler get_reply_to_message_id, add_chat_request_to_usage_tracker, error_handler, is_direct_result, handle_direct_result, \
cleanup_intermediate_files
from openai_helper import OpenAIHelper, localized_text from openai_helper import OpenAIHelper, localized_text
from usage_tracker import UsageTracker from usage_tracker import UsageTracker
@@ -401,6 +402,9 @@ class ChatGPTTelegramBot:
stream_chunk = 0 stream_chunk = 0
async for content, tokens in stream_response: async for content, tokens in stream_response:
if is_direct_result(content):
return await handle_direct_result(self.config, update, content)
if len(content.strip()) == 0: if len(content.strip()) == 0:
continue continue
@@ -472,6 +476,9 @@ class ChatGPTTelegramBot:
nonlocal total_tokens nonlocal total_tokens
response, total_tokens = await self.openai.get_chat_response(chat_id=chat_id, query=prompt) response, total_tokens = await self.openai.get_chat_response(chat_id=chat_id, query=prompt)
if is_direct_result(response):
return await handle_direct_result(self.config, update, response)
# Split into chunks of 4096 characters (Telegram's message limit) # Split into chunks of 4096 characters (Telegram's message limit)
chunks = split_into_chunks(response) chunks = split_into_chunks(response)
@@ -585,12 +592,21 @@ class ChatGPTTelegramBot:
is_inline=True) is_inline=True)
return return
unavailable_message = localized_text("function_unavailable_in_inline_mode", bot_language)
if self.config['stream']: if self.config['stream']:
stream_response = self.openai.get_chat_response_stream(chat_id=user_id, query=query) stream_response = self.openai.get_chat_response_stream(chat_id=user_id, query=query)
i = 0 i = 0
prev = '' prev = ''
backoff = 0 backoff = 0
async for content, tokens in stream_response: async for content, tokens in stream_response:
if is_direct_result(content):
cleanup_intermediate_files(content)
await edit_message_with_retry(context, chat_id=None,
message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{unavailable_message}',
is_inline=True)
return
if len(content.strip()) == 0: if len(content.strip()) == 0:
continue continue
@@ -648,6 +664,14 @@ class ChatGPTTelegramBot:
logging.info(f'Generating response for inline query by {name}') logging.info(f'Generating response for inline query by {name}')
response, total_tokens = await self.openai.get_chat_response(chat_id=user_id, query=query) response, total_tokens = await self.openai.get_chat_response(chat_id=user_id, query=query)
if is_direct_result(response):
cleanup_intermediate_files(response)
await edit_message_with_retry(context, chat_id=None,
message_id=inline_message_id,
text=f'{query}\n\n_{answer_tr}:_\n{unavailable_message}',
is_inline=True)
return
text_content = f'{query}\n\n_{answer_tr}:_\n{response}' text_content = f'{query}\n\n_{answer_tr}:_\n{response}'
# We only want to send the first 4096 characters. No chunking allowed in inline mode. # We only want to send the first 4096 characters. No chunking allowed in inline mode.

View File

@@ -2,7 +2,9 @@ from __future__ import annotations
import asyncio import asyncio
import itertools import itertools
import json
import logging import logging
import os
import telegram import telegram
from telegram import Message, MessageEntity, Update, ChatMember, constants from telegram import Message, MessageEntity, Update, ChatMember, constants
@@ -173,6 +175,7 @@ async def is_allowed(config, update: Update, context: CallbackContext, is_inline
f'(id: {user_id}) are not allowed') f'(id: {user_id}) are not allowed')
return False return False
def is_admin(config, user_id: int, log_no_admin=False) -> bool: def is_admin(config, user_id: int, log_no_admin=False) -> bool:
""" """
Checks if the user is the admin of the bot. Checks if the user is the admin of the bot.
@@ -284,6 +287,9 @@ def add_chat_request_to_usage_tracker(usage, config, user_id, used_tokens):
:param used_tokens: The number of tokens used :param used_tokens: The number of tokens used
""" """
try: try:
if int(used_tokens) == 0:
logging.warning('No tokens used. Not adding chat request to usage tracker.')
return
# add chat request to users usage tracker # add chat request to users usage tracker
usage[user_id].add_chat_tokens(used_tokens, config['token_price']) usage[user_id].add_chat_tokens(used_tokens, config['token_price'])
# add guest chat request to guest usage tracker # add guest chat request to guest usage tracker
@@ -305,3 +311,66 @@ def get_reply_to_message_id(config, update: Update):
if config['enable_quoting'] or is_group_chat(update): if config['enable_quoting'] or is_group_chat(update):
return update.message.message_id return update.message.message_id
return None return None
def is_direct_result(response: any) -> bool:
"""
Checks if the dict contains a direct result that can be sent directly to the user
:param response: The response value
:return: Boolean indicating if the result is a direct result
"""
if type(response) is not dict:
try:
json_response = json.loads(response)
return json_response.get('direct_result', False)
except:
return False
else:
return response.get('direct_result', False)
async def handle_direct_result(config, update: Update, response: any):
"""
Handles a direct result from a plugin
"""
if type(response) is not dict:
response = json.loads(response)
result = response['direct_result']
kind = result['kind']
format = result['format']
value = result['value']
common_args = {
'message_thread_id': get_thread_id(update),
'reply_to_message_id': get_reply_to_message_id(config, update),
}
if kind == 'photo':
if format == 'url':
await update.effective_message.reply_photo(**common_args, photo=value)
elif kind == 'gif':
if format == 'url':
await update.effective_message.reply_document(**common_args, document=value)
elif kind == 'file':
if format == 'path':
await update.effective_message.reply_document(**common_args, document=open(value, 'rb'))
cleanup_intermediate_files(response)
elif kind == 'dice':
await update.effective_message.reply_dice(**common_args, emoji=value)
def cleanup_intermediate_files(response: any):
"""
Deletes intermediate files created by plugins
"""
if type(response) is not dict:
response = json.loads(response)
result = response['direct_result']
kind = result['kind']
format = result['format']
value = result['value']
if kind == 'file' and format == 'path':
os.remove(value)

View File

@@ -8,3 +8,4 @@ tenacity==8.2.2
wolframalpha==5.0.0 wolframalpha==5.0.0
duckduckgo_search==3.8.3 duckduckgo_search==3.8.3
spotipy==2.23.0 spotipy==2.23.0
pytube==15.0.0

View File

@@ -39,7 +39,8 @@
"try_again":"Please try again in a while", "try_again":"Please try again in a while",
"answer_with_chatgpt":"Answer with ChatGPT", "answer_with_chatgpt":"Answer with ChatGPT",
"ask_chatgpt":"Ask ChatGPT", "ask_chatgpt":"Ask ChatGPT",
"loading":"Loading..." "loading":"Loading...",
"function_unavailable_in_inline_mode": "This function is unavailable in inline mode"
}, },
"es": { "es": {
"help_description":"Muestra el mensaje de ayuda", "help_description":"Muestra el mensaje de ayuda",
@@ -81,7 +82,8 @@
"try_again":"Por favor, inténtalo de nuevo más tarde", "try_again":"Por favor, inténtalo de nuevo más tarde",
"answer_with_chatgpt":"Responder con ChatGPT", "answer_with_chatgpt":"Responder con ChatGPT",
"ask_chatgpt":"Preguntar a ChatGPT", "ask_chatgpt":"Preguntar a ChatGPT",
"loading":"Cargando..." "loading":"Cargando...",
"function_unavailable_in_inline_mode": "Esta función no está disponible en el modo inline"
}, },
"pt-br": { "pt-br": {
"help_description": "Mostra a mensagem de ajuda", "help_description": "Mostra a mensagem de ajuda",
@@ -123,7 +125,9 @@
"try_again": "Por favor, tente novamente mais tarde", "try_again": "Por favor, tente novamente mais tarde",
"answer_with_chatgpt": "Responder com ChatGPT", "answer_with_chatgpt": "Responder com ChatGPT",
"ask_chatgpt": "Perguntar ao ChatGPT", "ask_chatgpt": "Perguntar ao ChatGPT",
"loading": "Carregando..."}, "loading": "Carregando...",
"function_unavailable_in_inline_mode": "Esta função não está disponível no modo inline"
},
"de": { "de": {
"help_description":"Zeige die Hilfenachricht", "help_description":"Zeige die Hilfenachricht",
"reset_description":"Setze die Konversation zurück. Optionale Eingabe einer grundlegenden Anweisung (z.B. /reset Du bist ein hilfreicher Assistent)", "reset_description":"Setze die Konversation zurück. Optionale Eingabe einer grundlegenden Anweisung (z.B. /reset Du bist ein hilfreicher Assistent)",
@@ -164,7 +168,8 @@
"try_again":"Bitte versuche es später erneut", "try_again":"Bitte versuche es später erneut",
"answer_with_chatgpt":"Antworte mit ChatGPT", "answer_with_chatgpt":"Antworte mit ChatGPT",
"ask_chatgpt":"Frage ChatGPT", "ask_chatgpt":"Frage ChatGPT",
"loading":"Lade..." "loading":"Lade...",
"function_unavailable_in_inline_mode": "Diese Funktion ist im Inline-Modus nicht verfügbar"
}, },
"fi": { "fi": {
"help_description":"Näytä ohjeet", "help_description":"Näytä ohjeet",
@@ -206,7 +211,8 @@
"try_again":"Yritä myöhemmin uudelleen", "try_again":"Yritä myöhemmin uudelleen",
"answer_with_chatgpt":"Vastaa ChatGPT:n avulla", "answer_with_chatgpt":"Vastaa ChatGPT:n avulla",
"ask_chatgpt":"Kysy ChatGPT:ltä", "ask_chatgpt":"Kysy ChatGPT:ltä",
"loading":"Lataa..." "loading":"Lataa...",
"function_unavailable_in_inline_mode": "Tämä toiminto ei ole käytettävissä sisäisessä tilassa"
}, },
"ru": { "ru": {
"help_description":"Показать справочное сообщение", "help_description":"Показать справочное сообщение",
@@ -248,7 +254,8 @@
"try_again":"Пожалуйста, повторите попытку позже", "try_again":"Пожалуйста, повторите попытку позже",
"answer_with_chatgpt":"Ответить с помощью ChatGPT", "answer_with_chatgpt":"Ответить с помощью ChatGPT",
"ask_chatgpt":"Спросить ChatGPT", "ask_chatgpt":"Спросить ChatGPT",
"loading":"Загрузка..." "loading":"Загрузка...",
"function_unavailable_in_inline_mode": "Эта функция недоступна в режиме inline"
}, },
"tr": { "tr": {
"help_description":"Yardım mesajını göster", "help_description":"Yardım mesajını göster",
@@ -290,7 +297,8 @@
"try_again":"Lütfen birazdan tekrar deneyiniz", "try_again":"Lütfen birazdan tekrar deneyiniz",
"answer_with_chatgpt":"ChatGPT ile cevapla", "answer_with_chatgpt":"ChatGPT ile cevapla",
"ask_chatgpt":"ChatGPT'ye sor", "ask_chatgpt":"ChatGPT'ye sor",
"loading":"Yükleniyor..." "loading":"Yükleniyor...",
"function_unavailable_in_inline_mode": "Bu işlev inline modda kullanılamaz"
}, },
"it": { "it": {
"help_description":"Mostra il messaggio di aiuto", "help_description":"Mostra il messaggio di aiuto",
@@ -332,7 +340,8 @@
"try_again":"Riprova più tardi", "try_again":"Riprova più tardi",
"answer_with_chatgpt":"Rispondi con ChatGPT", "answer_with_chatgpt":"Rispondi con ChatGPT",
"ask_chatgpt":"Chiedi a ChatGPT", "ask_chatgpt":"Chiedi a ChatGPT",
"loading":"Carico..." "loading":"Carico...",
"function_unavailable_in_inline_mode": "Questa funzione non è disponibile in modalità inline"
}, },
"id": { "id": {
"help_description": "Menampilkan pesan bantuan", "help_description": "Menampilkan pesan bantuan",
@@ -374,7 +383,8 @@
"try_again": "Silakan coba lagi nanti", "try_again": "Silakan coba lagi nanti",
"answer_with_chatgpt": "Jawaban dengan ChatGPT", "answer_with_chatgpt": "Jawaban dengan ChatGPT",
"ask_chatgpt": "Tanya ChatGPT", "ask_chatgpt": "Tanya ChatGPT",
"loading": "Sedang memuat..." "loading": "Sedang memuat...",
"function_unavailable_in_inline_mode": "Fungsi ini tidak tersedia dalam mode inline"
}, },
"nl": { "nl": {
"help_description":"Toon uitleg", "help_description":"Toon uitleg",
@@ -416,7 +426,8 @@
"try_again":"Probeer het a.u.b. later opnieuw", "try_again":"Probeer het a.u.b. later opnieuw",
"answer_with_chatgpt":"Antwoord met ChatGPT", "answer_with_chatgpt":"Antwoord met ChatGPT",
"ask_chatgpt":"Vraag ChatGPT", "ask_chatgpt":"Vraag ChatGPT",
"loading":"Laden..." "loading":"Laden...",
"function_unavailable_in_inline_mode": "Deze functie is niet beschikbaar in de inline modus"
}, },
"zh-cn": { "zh-cn": {
"help_description":"显示帮助信息", "help_description":"显示帮助信息",
@@ -458,7 +469,8 @@
"try_again":"请稍后再试", "try_again":"请稍后再试",
"answer_with_chatgpt":"使用ChatGPT回答", "answer_with_chatgpt":"使用ChatGPT回答",
"ask_chatgpt":"询问ChatGPT", "ask_chatgpt":"询问ChatGPT",
"loading":"载入中..." "loading":"载入中...",
"function_unavailable_in_inline_mode": "此功能在内联模式下不可用"
}, },
"zh-tw": { "zh-tw": {
"help_description":"顯示幫助訊息", "help_description":"顯示幫助訊息",
@@ -500,7 +512,8 @@
"try_again":"請稍後重試", "try_again":"請稍後重試",
"answer_with_chatgpt":"使用 ChatGPT 回答", "answer_with_chatgpt":"使用 ChatGPT 回答",
"ask_chatgpt":"詢問 ChatGPT", "ask_chatgpt":"詢問 ChatGPT",
"loading":"載入中…" "loading":"載入中…",
"function_unavailable_in_inline_mode": "此功能在內嵌模式下不可用"
}, },
"vi": { "vi": {
"help_description":"Hiển thị trợ giúp", "help_description":"Hiển thị trợ giúp",
@@ -542,7 +555,8 @@
"try_again":"Vui lòng thử lại sau một lúc", "try_again":"Vui lòng thử lại sau một lúc",
"answer_with_chatgpt":"Trả lời với ChatGPT", "answer_with_chatgpt":"Trả lời với ChatGPT",
"ask_chatgpt":"Hỏi ChatGPT", "ask_chatgpt":"Hỏi ChatGPT",
"loading":"Đang tải..." "loading":"Đang tải...",
"function_unavailable_in_inline_mode": "Chức năng này không khả dụng trong chế độ nội tuyến"
}, },
"fa": { "fa": {
"help_description":"نمایش پیغام راهنما", "help_description":"نمایش پیغام راهنما",
@@ -584,7 +598,8 @@
"try_again":"لطفا بعد از مدتی دوباره امتحان کنید", "try_again":"لطفا بعد از مدتی دوباره امتحان کنید",
"answer_with_chatgpt":"با ChatGPT پاسخ دهید", "answer_with_chatgpt":"با ChatGPT پاسخ دهید",
"ask_chatgpt":"از ChatGPT بپرسید", "ask_chatgpt":"از ChatGPT بپرسید",
"loading":"در حال بارگذاری..." "loading":"در حال بارگذاری...",
"function_unavailable_in_inline_mode": "این عملکرد در حالت آنلاین در دسترس نیست"
}, },
"uk": { "uk": {
"help_description":"Показати повідомлення допомоги", "help_description":"Показати повідомлення допомоги",
@@ -626,6 +641,7 @@
"try_again":"Будь ласка, спробуйте знову через деякий час", "try_again":"Будь ласка, спробуйте знову через деякий час",
"answer_with_chatgpt":"Відповідь за допомогою ChatGPT", "answer_with_chatgpt":"Відповідь за допомогою ChatGPT",
"ask_chatgpt":"Запитати ChatGPT", "ask_chatgpt":"Запитати ChatGPT",
"loading":"Завантаження..." "loading":"Завантаження...",
"function_unavailable_in_inline_mode": "Ця функція недоступна в режимі Inline"
} }
} }