improvements and added initial spotify plugin

This commit is contained in:
ned
2023-06-19 19:53:34 +02:00
parent d86cd38527
commit c1bfda47b5
13 changed files with 473 additions and 127 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ __pycache__
.DS_Store
/usage_logs
venv
/.cache

View File

@@ -106,14 +106,18 @@ Check out the [Budget Manual](https://github.com/n3d1117/chatgpt-telegram-bot/di
| `PLUGINS` | List of plugins to enable (see below for a full list), e.g: `PLUGINS=wolfram,weather` | `-` |
| `SHOW_PLUGINS_USED` | Whether to show which plugins were used for a response | `false` |
| `WOLFRAM_APP_ID` | Wolfram Alpha APP ID (required for the `wolfram` plugin, you can get one [here](https://products.wolframalpha.com/simple-api/documentation)) | `-` |
| `SPOTIFY_CLIENT_ID` | Spotify app Client ID (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` |
| `SPOTIFY_CLIENT_SECRET` | Spotify app Client Secret (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` |
| `SPOTIFY_REDIRECT_URI` | Spotify app Redirect URI (required for the `spotify` plugin, you can find it on the [dashboard](https://developer.spotify.com/dashboard/)) | `-` |
#### Available plugins
| Name | Description | Required API key |
|--------------|---------------------------------------------------------------------------------------------------------------------|------------------|
| Name | Description | Required API key(s) |
|--------------|-------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------|
| `weather` | Daily weather and 7-day forecast for any location (powered by [Open-Meteo](https://open-meteo.com)) | `-` |
| `wolfram` | WolframAlpha queries (powered by [WolframAlpha](https://www.wolframalpha.com)) | `WOLFRAM_APP_ID` |
| `web_search` | Web search (powered by [DuckDuckGo](https://duckduckgo.com)) | `-` |
| `crypto` | Live cryptocurrencies rate (powered by [CoinCap](https://coincap.io)) - by [@stumpyfr](https://github.com/stumpyfr) | `-` |
| `spotify` | Spotify top tracks/artists and currently playing song (powered by [Spotify](https://spotify.com)). Requires one-time auth approval. | `SPOTIFY_CLIENT_ID`, `SPOTIFY_CLIENT_SECRET`, `SPOTIFY_REDIRECT_URI` |
Check out the [official API reference](https://platform.openai.com/docs/api-reference/chat) for more details.

View File

@@ -3,7 +3,7 @@ import os
from dotenv import load_dotenv
from functions import PluginManager
from plugin_manager import PluginManager
from openai_helper import OpenAIHelper, default_max_tokens, are_functions_available
from telegram_bot import ChatGPTTelegramBot

View File

@@ -14,7 +14,7 @@ from calendar import monthrange
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from functions import PluginManager
from plugin_manager import PluginManager
# Models can be found here: https://platform.openai.com/docs/models/overview
GPT_3_MODELS = ("gpt-3.5-turbo", "gpt-3.5-turbo-0301", "gpt-3.5-turbo-0613")

View File

@@ -1,5 +1,7 @@
import json
from plugins.python import PythonPlugin
from plugins.spotify import SpotifyPlugin
from plugins.crypto import CryptoPlugin
from plugins.weather import WeatherPlugin
from plugins.web_search import WebSearchPlugin
@@ -12,19 +14,20 @@ class PluginManager:
"""
def __init__(self, config):
enabled_plugins = config.get('plugins', [])
plugins = [
WolframAlphaPlugin() if 'wolfram' in enabled_plugins else None,
WeatherPlugin() if 'weather' in enabled_plugins else None,
CryptoPlugin() if 'crypto' in enabled_plugins else None,
WebSearchPlugin() if 'web_search' in enabled_plugins else None,
]
self.plugins = [plugin for plugin in plugins if plugin is not None]
plugin_mapping = {
'wolfram': WolframAlphaPlugin(),
'weather': WeatherPlugin(),
'crypto': CryptoPlugin(),
'web_search': WebSearchPlugin(),
'spotify': SpotifyPlugin(),
}
self.plugins = [plugin_mapping[plugin] for plugin in enabled_plugins]
def get_functions_specs(self):
"""
Return the list of function specs that can be called by the model
"""
return [plugin.get_spec() for plugin in self.plugins]
return [spec for specs in map(lambda plugin: plugin.get_spec(), self.plugins) for spec in specs]
async def call_function(self, function_name, arguments):
"""
@@ -45,4 +48,5 @@ class PluginManager:
return plugin.get_source_name()
def __get_plugin_by_function_name(self, function_name):
return next((plugin for plugin in self.plugins if plugin.get_spec().get('name') == function_name), None)
return next((plugin for plugin in self.plugins
if function_name in map(lambda spec: spec.get('name'), plugin.get_spec())), None)

View File

@@ -13,8 +13,8 @@ class CryptoPlugin(Plugin):
def get_source_name(self) -> str:
return "CoinCap"
def get_spec(self) -> Dict:
return {
def get_spec(self) -> [Dict]:
return [{
"name": "get_crypto_rate",
"description": "Get the current rate of various crypto currencies",
"parameters": {
@@ -24,7 +24,7 @@ class CryptoPlugin(Plugin):
},
"required": ["asset"],
},
}
}]
async def execute(self, **kwargs) -> Dict:
async def execute(self, function_name, **kwargs) -> Dict:
return requests.get(f"https://api.coincap.io/v2/rates/{kwargs['asset']}").json()

View File

@@ -15,15 +15,15 @@ class Plugin(ABC):
pass
@abstractmethod
def get_spec(self) -> Dict:
def get_spec(self) -> [Dict]:
"""
Function spec in the form of JSON schema as specified in the OpenAI documentation:
Function specs in the form of JSON schema as specified in the OpenAI documentation:
https://platform.openai.com/docs/api-reference/chat/create#chat/create-functions
"""
pass
@abstractmethod
async def execute(self, **kwargs) -> Dict:
async def execute(self, function_name, **kwargs) -> Dict:
"""
Execute the plugin and return a JSON serializable response
"""

302
bot/plugins/spotify.py Normal file
View File

@@ -0,0 +1,302 @@
import os
from typing import Dict
import spotipy
from spotipy import SpotifyOAuth
from .plugin import Plugin
class SpotifyPlugin(Plugin):
"""
A plugin to fetch information from Spotify
"""
def __init__(self):
spotify_client_id = os.getenv('SPOTIFY_CLIENT_ID')
spotify_client_secret = os.getenv('SPOTIFY_CLIENT_SECRET')
spotify_redirect_uri = os.getenv('SPOTIFY_REDIRECT_URI')
if not spotify_client_id or not spotify_client_secret or not spotify_redirect_uri:
raise ValueError('SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET and SPOTIFY_REDIRECT_URI environment variables'
' are required to use SpotifyPlugin')
self.spotify = spotipy.Spotify(
auth_manager=SpotifyOAuth(
client_id=spotify_client_id,
client_secret=spotify_client_secret,
redirect_uri=spotify_redirect_uri,
scope="user-top-read,user-read-currently-playing",
open_browser=False
)
)
def get_source_name(self) -> str:
return "Spotify"
def get_spec(self) -> [Dict]:
time_range_param = {
"type": "string",
"enum": ["short_term", "medium_term", "long_term"],
"description": "The time range of the data to be returned. Short term is the last 4 weeks, "
"medium term is last 6 months, long term is last several years. "
"Ignore if action is currently_playing",
}
limit_param = {
"type": "integer",
"description": "The number of results to return. Max is 50. Default to 10 if not specified."
"Ignore if action is currently_playing",
}
type_param = {
"type": "string",
"enum": ["album", "artist", "track"],
"description": "Type of content to search",
}
return [
{
"name": "spotify_get_currently_playing_song",
"description": "Get the user's currently playing song",
"parameters": {
"type": "object",
"properties": {}
}
},
{
"name": "spotify_get_users_top_artists",
"description": "Get the user's top artists",
"parameters": {
"type": "object",
"properties": {
"time_range": time_range_param,
"limit": limit_param
}
}
},
{
"name": "spotify_get_users_top_tracks",
"description": "Get the user's top tracks",
"parameters": {
"type": "object",
"properties": {
"time_range": time_range_param,
"limit": limit_param
}
}
},
{
"name": "spotify_search_by_query",
"description": "Search spotify content by query",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
},
"type": type_param
},
"required": ["query", "type"]
}
},
{
"name": "spotify_lookup_by_id",
"description": "Lookup spotify content by id",
"parameters": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The exact id to lookup. Can be a track id, an artist id or an album id",
},
"type": type_param
},
"required": ["id", "type"]
}
}
]
async def execute(self, function_name, **kwargs) -> Dict:
time_range = kwargs.get('time_range', 'short_term')
limit = kwargs.get('limit', 10)
if function_name == 'spotify_get_currently_playing_song':
return self.fetch_currently_playing()
elif function_name == 'spotify_get_users_top_artists':
return self.fetch_top_artists(time_range, limit)
elif function_name == 'spotify_get_users_top_tracks':
return self.fetch_top_tracks(time_range, limit)
elif function_name == 'spotify_search_by_query':
query = kwargs.get('query', '')
search_type = kwargs.get('type', 'track')
return self.search_by_query(query, search_type, limit)
elif function_name == 'spotify_lookup_by_id':
content_id = kwargs.get('id')
search_type = kwargs.get('type', 'track')
return self.search_by_id(content_id, search_type)
def fetch_currently_playing(self) -> Dict:
"""
Fetch user's currently playing song from Spotify
"""
currently_playing = self.spotify.current_user_playing_track()
result = {
'name': currently_playing['item']['name'],
'artist': currently_playing['item']['artists'][0]['name'],
'album': currently_playing['item']['album']['name'],
'url': currently_playing['item']['external_urls']['spotify'],
'__album_id': currently_playing['item']['album']['id'],
'__artist_id': currently_playing['item']['artists'][0]['id'],
'__track_id': currently_playing['item']['id'],
}
return {"result": result}
def fetch_top_tracks(self, time_range='short_term', limit=5) -> Dict:
"""
Fetch user's top tracks from Spotify
"""
results = []
top_tracks = self.spotify.current_user_top_tracks(limit=limit, time_range=time_range)
for item in top_tracks['items']:
results.append({
'name': item['name'],
'artist': item['artists'][0]['name'],
'album': item['album']['name'],
'album_release_date': item['album']['release_date'],
'url': item['external_urls']['spotify'],
'album_url': item['album']['external_urls']['spotify'],
'artist_url': item['artists'][0]['external_urls']['spotify'],
'__track_id': item['id'],
'__album_id': item['album']['id'],
'__artist_id': item['artists'][0]['id'],
})
return {'results': results}
def fetch_top_artists(self, time_range='short_term', limit=5) -> Dict:
"""
Fetch user's top artists from Spotify
"""
results = []
top_artists = self.spotify.current_user_top_artists(limit=limit, time_range=time_range)
for item in top_artists['items']:
results.append({
'name': item['name'],
'url': item['external_urls']['spotify'],
'__artist_id': item['id']
})
return {'results': results}
def search_by_query(self, query, search_type, limit=5) -> Dict:
"""
Search content by query on Spotify
"""
results = {}
search_response = self.spotify.search(q=query, limit=limit, type=search_type)
if 'tracks' in search_response:
results['tracks'] = []
for item in search_response['tracks']['items']:
results['tracks'].append({
'name': item['name'],
'artist': item['artists'][0]['name'],
'album': item['album']['name'],
'album_release_date': item['album']['release_date'],
'url': item['external_urls']['spotify'],
'album_url': item['album']['external_urls']['spotify'],
'artist_url': item['artists'][0]['external_urls']['spotify'],
'__artist_id': item['artists'][0]['id'],
'__album_id': item['album']['id'],
'__track_id': item['id'],
})
if 'artists' in search_response:
results['artists'] = []
for item in search_response['artists']['items']:
results['artists'].append({
'name': item['name'],
'url': item['external_urls']['spotify'],
'__artist_id': item['id'],
})
if 'albums' in search_response:
results['albums'] = []
for item in search_response['albums']['items']:
results['albums'].append({
'name': item['name'],
'artist': item['artists'][0]['name'],
'url': item['external_urls']['spotify'],
'artist_url': item['artists'][0]['external_urls']['spotify'],
'release_date': item['release_date'],
'__artist_id': item['artists'][0]['id'],
'__album_id': item['id'],
})
return {'results': results}
def search_by_id(self, content_id, search_type) -> Dict:
"""
Search content by exact id on Spotify
"""
if search_type == 'track':
search_response = self.spotify.track(content_id)
return {'result': self._get_track(search_response)}
elif search_type == 'artist':
search_response = self.spotify.artist(content_id)
albums_response = self.spotify.artist_albums(artist_id=content_id, limit=3)
return {'result': self._get_artist(search_response, albums_response)}
elif search_type == 'album':
search_response = self.spotify.album(content_id)
return {'result': self._get_album(search_response)}
else:
return {'error': 'Invalid search type. Must be track, artist or album'}
def _get_artist(self, response, albums):
return {
'name': response['name'],
'url': response['external_urls']['spotify'],
'__artist_id': response['id'],
'followers': response['followers']['total'],
'genres': response['genres'],
'albums': [
{
'name': album['name'],
'__album_id': album['id'],
'url': album['external_urls']['spotify'],
'release_date': album['release_date'],
'total_tracks': album['total_tracks'],
}
for album in albums['items']
],
}
def _get_track(self, response):
return {
'name': response['name'],
'artist': response['artists'][0]['name'],
'__artist_id': response['artists'][0]['id'],
'album': response['album']['name'],
'__album_id': response['album']['id'],
'url': response['external_urls']['spotify'],
'__track_id': response['id'],
'duration_ms': response['duration_ms'],
'track_number': response['track_number'],
'explicit': response['explicit'],
}
def _get_album(self, response):
return {
'name': response['name'],
'artist': response['artists'][0]['name'],
'__artist_id': response['artists'][0]['id'],
'url': response['external_urls']['spotify'],
'release_date': response['release_date'],
'total_tracks': response['total_tracks'],
'__album_id': response['id'],
'label': response['label'],
'tracks': [
{
'name': track['name'],
'url': track['external_urls']['spotify'],
'__track_id': track['id'],
'duration_ms': track['duration_ms'],
'track_number': track['track_number'],
'explicit': track['explicit'],
}
for track in response['tracks']['items']
]
}

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Dict
import requests
@@ -9,24 +10,20 @@ class WeatherPlugin(Plugin):
"""
A plugin to get the current weather and 7-day daily forecast for a location
"""
def get_source_name(self) -> str:
return "OpenMeteo"
def get_spec(self) -> Dict:
return {
def get_spec(self) -> [Dict]:
return [
{
"name": "get_current_weather",
"description": "Get the current and 7-day daily weather forecast for a location using Open Meteo APIs.",
"description": "Get the current weather for a location using Open Meteo APIs.",
"parameters": {
"type": "object",
"properties": {
"latitude": {
"type": "string",
"description": "Latitude of the location"
},
"longitude": {
"type": "string",
"description": "Longitude of the location"
},
"latitude": {"type": "string", "description": "Latitude of the location"},
"longitude": {"type": "string", "description": "Longitude of the location"},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
@@ -34,16 +31,53 @@ class WeatherPlugin(Plugin):
},
},
"required": ["latitude", "longitude", "unit"],
},
},
{
"name": "get_forecast_weather",
"description": "Get daily weather forecast for a location using Open Meteo APIs."
f"Today is {datetime.today().strftime('%A, %B %d, %Y')}",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "string", "description": "Latitude of the location"},
"longitude": {"type": "string", "description": "Longitude of the location"},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the provided location.",
},
"forecast_days": {
"type": "integer",
"description": "The number of days to forecast, including today. Default is 7. Max 14. "
"Use 1 for today, 2 for today and tomorrow, and so on.",
},
},
"required": ["latitude", "longitude", "unit", "forecast_days"],
},
}
}
]
async def execute(self, **kwargs) -> Dict:
url = f'https://api.open-meteo.com/v1/forecast'\
f'?latitude={kwargs["latitude"]}'\
f'&longitude={kwargs["longitude"]}'\
f'&temperature_unit={kwargs["unit"]}' \
'&current_weather=true' \
'&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_mean,' \
'&forecast_days=7' \
'&timezone=auto'
async def execute(self, function_name, **kwargs) -> Dict:
url = f'https://api.open-meteo.com/v1/forecast' \
f'?latitude={kwargs["latitude"]}' \
f'&longitude={kwargs["longitude"]}' \
f'&temperature_unit={kwargs["unit"]}'
if function_name == 'get_current_weather':
url += '&current_weather=true'
return requests.get(url).json()
elif function_name == 'get_forecast_weather':
url += '&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_mean,'
url += f'&forecast_days={kwargs["forecast_days"]}'
url += '&timezone=auto'
response = requests.get(url).json()
results = {}
for i, time in enumerate(response["daily"]["time"]):
results[datetime.strptime(time, "%Y-%m-%d").strftime("%A, %B %d, %Y")] = {
"weathercode": response["daily"]["weathercode"][i],
"temperature_2m_max": response["daily"]["temperature_2m_max"][i],
"temperature_2m_min": response["daily"]["temperature_2m_min"][i],
"precipitation_probability_mean": response["daily"]["precipitation_probability_mean"][i]
}
return {"today": datetime.today().strftime("%A, %B %d, %Y"), "forecast": results}

View File

@@ -14,8 +14,8 @@ class WebSearchPlugin(Plugin):
def get_source_name(self) -> str:
return "DuckDuckGo"
def get_spec(self) -> Dict:
return {
def get_spec(self) -> [Dict]:
return [{
"name": "web_search",
"description": "Execute a web search for the given query and return a list of results",
"parameters": {
@@ -28,9 +28,9 @@ class WebSearchPlugin(Plugin):
},
"required": ["query"],
},
}
}]
async def execute(self, **kwargs) -> Dict:
async def execute(self, function_name, **kwargs) -> Dict:
with DDGS() as ddgs:
ddgs_gen = ddgs.text(
kwargs['query'],

View File

@@ -19,8 +19,8 @@ class WolframAlphaPlugin(Plugin):
def get_source_name(self) -> str:
return "WolframAlpha"
def get_spec(self) -> Dict:
return {
def get_spec(self) -> [Dict]:
return [{
"name": "answer_with_wolfram_alpha",
"description": "Get an answer to a question using Wolfram Alpha. Input should the the query in English.",
"parameters": {
@@ -30,9 +30,9 @@ class WolframAlphaPlugin(Plugin):
},
"required": ["query"]
}
}
}]
async def execute(self, **kwargs) -> Dict:
async def execute(self, function_name, **kwargs) -> Dict:
client = wolframalpha.Client(self.app_id)
res = client.query(kwargs['query'])
try:

View File

@@ -388,8 +388,10 @@ class ChatGPTTelegramBot:
total_tokens = 0
if self.config['stream']:
async def _reply():
nonlocal total_tokens
await update.effective_message.reply_chat_action(
action=constants.ChatAction.TYPING,
message_thread_id=get_thread_id(update)
)
stream_response = self.openai.get_chat_response_stream(chat_id=chat_id, query=prompt)
i = 0
@@ -466,8 +468,6 @@ class ChatGPTTelegramBot:
if tokens != 'not_finished':
total_tokens = int(tokens)
await wrap_with_indicator(update, context, _reply, constants.ChatAction.TYPING)
else:
async def _reply():
nonlocal total_tokens

View File

@@ -7,3 +7,4 @@ requests~=2.31.0
tenacity==8.2.2
wolframalpha==5.0.0
duckduckgo_search==3.8.3
spotipy==2.23.0