Add auto_tts plugin

This commit is contained in:
Jipok
2023-11-23 20:48:16 +05:00
parent 904f49e475
commit 371b9597ee
17 changed files with 63 additions and 17 deletions

View File

@@ -294,7 +294,7 @@ class OpenAIHelper:
return response, plugins_used return response, plugins_used
logging.info(f'Calling function {function_name} with arguments {arguments}') logging.info(f'Calling function {function_name} with arguments {arguments}')
function_response = await self.plugin_manager.call_function(function_name, arguments) function_response = await self.plugin_manager.call_function(function_name, self, arguments)
if function_name not in plugins_used: if function_name not in plugins_used:
plugins_used += (function_name,) plugins_used += (function_name,)

View File

@@ -1,6 +1,7 @@
import json import json
from plugins.gtts_text_to_speech import GTTSTextToSpeech from plugins.gtts_text_to_speech import GTTSTextToSpeech
from plugins.auto_tts import AutoTextToSpeech
from plugins.dice import DicePlugin from plugins.dice import DicePlugin
from plugins.youtube_audio_extractor import YouTubeAudioExtractorPlugin from plugins.youtube_audio_extractor import YouTubeAudioExtractorPlugin
from plugins.ddg_image_search import DDGImageSearchPlugin from plugins.ddg_image_search import DDGImageSearchPlugin
@@ -36,6 +37,7 @@ class PluginManager:
'dice': DicePlugin, 'dice': DicePlugin,
'deepl_translate': DeeplTranslatePlugin, 'deepl_translate': DeeplTranslatePlugin,
'gtts_text_to_speech': GTTSTextToSpeech, 'gtts_text_to_speech': GTTSTextToSpeech,
'auto_tts': AutoTextToSpeech,
'whois': WhoisPlugin, 'whois': WhoisPlugin,
'webshot': WebshotPlugin, 'webshot': WebshotPlugin,
} }
@@ -47,14 +49,14 @@ class PluginManager:
""" """
return [spec for specs in map(lambda plugin: plugin.get_spec(), self.plugins) for spec in specs] return [spec for specs in map(lambda plugin: plugin.get_spec(), self.plugins) for spec in specs]
async def call_function(self, function_name, arguments): async def call_function(self, function_name, helper, arguments):
""" """
Call a function based on the name and parameters provided Call a function based on the name and parameters provided
""" """
plugin = self.__get_plugin_by_function_name(function_name) plugin = self.__get_plugin_by_function_name(function_name)
if not plugin: if not plugin:
return json.dumps({'error': f'Function {function_name} not found'}) return json.dumps({'error': f'Function {function_name} not found'})
return json.dumps(await plugin.execute(function_name, **json.loads(arguments)), default=str) return json.dumps(await plugin.execute(function_name, helper, **json.loads(arguments)), default=str)
def get_plugin_source_name(self, function_name) -> str: def get_plugin_source_name(self, function_name) -> str:
""" """

44
bot/plugins/auto_tts.py Normal file
View File

@@ -0,0 +1,44 @@
import datetime
import tempfile
from typing import Dict
from .plugin import Plugin
class AutoTextToSpeech(Plugin):
"""
A plugin to convert text to speech using Openai Speech API
"""
def get_source_name(self) -> str:
return "TTS"
def get_spec(self) -> [Dict]:
return [{
"name": "translate_text_to_speech",
"description": "Translate text to speech using OpenAI API",
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "The text to translate to speech"},
},
"required": ["text"],
},
}]
async def execute(self, function_name, helper, **kwargs) -> Dict:
try:
bytes, text_length = await helper.generate_speech(text=kwargs['text'])
with tempfile.NamedTemporaryFile(delete=False, suffix='.opus') as temp_file:
temp_file.write(bytes.getvalue())
temp_file_path = temp_file.name
except Exception as e:
logging.exception(e)
return {"Result": "Exception: " + str(e)}
return {
'direct_result': {
'kind': 'file',
'format': 'path',
'value': temp_file_path
}
}

View File

@@ -26,5 +26,5 @@ class CryptoPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
return requests.get(f"https://api.coincap.io/v2/rates/{kwargs['asset']}").json() return requests.get(f"https://api.coincap.io/v2/rates/{kwargs['asset']}").json()

View File

@@ -49,7 +49,7 @@ class DDGImageSearchPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
with DDGS() as ddgs: with DDGS() as ddgs:
image_type = kwargs.get('type', 'photo') image_type = kwargs.get('type', 'photo')
ddgs_images_gen = ddgs.images( ddgs_images_gen = ddgs.images(

View File

@@ -26,6 +26,6 @@ class DDGTranslatePlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
with DDGS() as ddgs: with DDGS() as ddgs:
return ddgs.translate(kwargs['text'], to=kwargs['to_language']) return ddgs.translate(kwargs['text'], to=kwargs['to_language'])

View File

@@ -46,7 +46,7 @@ class DDGWebSearchPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
with DDGS() as ddgs: with DDGS() as ddgs:
ddgs_gen = ddgs.text( ddgs_gen = ddgs.text(
kwargs['query'], kwargs['query'],

View File

@@ -33,7 +33,7 @@ class DeeplTranslatePlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
if self.api_key.endswith(':fx'): if self.api_key.endswith(':fx'):
url = "https://api-free.deepl.com/v2/translate" url = "https://api-free.deepl.com/v2/translate"
else: else:

View File

@@ -31,7 +31,7 @@ class GTTSTextToSpeech(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
tts = gTTS(kwargs['text'], lang=kwargs.get('lang', 'en')) tts = gTTS(kwargs['text'], lang=kwargs.get('lang', 'en'))
output = f'gtts_{datetime.datetime.now().timestamp()}.mp3' output = f'gtts_{datetime.datetime.now().timestamp()}.mp3'
tts.save(output) tts.save(output)

View File

@@ -23,7 +23,7 @@ class Plugin(ABC):
pass pass
@abstractmethod @abstractmethod
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
""" """
Execute the plugin and return a JSON serializable response Execute the plugin and return a JSON serializable response
""" """

View File

@@ -111,7 +111,7 @@ class SpotifyPlugin(Plugin):
} }
] ]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
time_range = kwargs.get('time_range', 'short_term') time_range = kwargs.get('time_range', 'short_term')
limit = kwargs.get('limit', 5) limit = kwargs.get('limit', 5)

View File

@@ -57,7 +57,7 @@ class WeatherPlugin(Plugin):
} }
] ]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
url = f'https://api.open-meteo.com/v1/forecast' \ url = f'https://api.open-meteo.com/v1/forecast' \
f'?latitude={kwargs["latitude"]}' \ f'?latitude={kwargs["latitude"]}' \
f'&longitude={kwargs["longitude"]}' \ f'&longitude={kwargs["longitude"]}' \

View File

@@ -26,7 +26,7 @@ class WebshotPlugin(Plugin):
characters = string.ascii_letters + string.digits characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length)) return ''.join(random.choice(characters) for _ in range(length))
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
try: try:
image_url = f'https://image.thum.io/get/maxAge/12/width/720/{kwargs["url"]}' image_url = f'https://image.thum.io/get/maxAge/12/width/720/{kwargs["url"]}'

View File

@@ -24,7 +24,7 @@ class WhoisPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
try: try:
whois_result = whois.query(kwargs['domain']) whois_result = whois.query(kwargs['domain'])
if whois_result is None: if whois_result is None:

View File

@@ -32,7 +32,7 @@ class WolframAlphaPlugin(Plugin):
} }
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
client = wolframalpha.Client(self.app_id) client = wolframalpha.Client(self.app_id)
res = client.query(kwargs['query']) res = client.query(kwargs['query'])
try: try:

View File

@@ -35,7 +35,7 @@ class WorldTimeApiPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
timezone = kwargs.get('timezone', self.default_timezone) timezone = kwargs.get('timezone', self.default_timezone)
url = f'https://worldtimeapi.org/api/timezone/{timezone}' url = f'https://worldtimeapi.org/api/timezone/{timezone}'

View File

@@ -28,7 +28,7 @@ class YouTubeAudioExtractorPlugin(Plugin):
}, },
}] }]
async def execute(self, function_name, **kwargs) -> Dict: async def execute(self, function_name, helper, **kwargs) -> Dict:
link = kwargs['youtube_link'] link = kwargs['youtube_link']
try: try:
video = YouTube(link) video = YouTube(link)