mirror of
https://github.com/aljazceru/chatgpt-telegram-bot.git
synced 2025-12-22 07:04:59 +01:00
Merge main into feat/working_inline_queries
Handle disallowed_message & budget_limit_message for inline queries
This commit is contained in:
@@ -8,10 +8,11 @@ import json
|
||||
import telegram
|
||||
from uuid import uuid4
|
||||
from telegram import constants, BotCommandScopeAllGroupChats
|
||||
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
|
||||
from telegram import Message, MessageEntity, Update, InlineQueryResultArticle, InputTextMessageContent, BotCommand, ChatMember
|
||||
from telegram.error import RetryAfter, TimedOut
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, \
|
||||
filters, InlineQueryHandler, Application, CallbackContext
|
||||
filters, InlineQueryHandler, CallbackQueryHandler, Application, CallbackContext
|
||||
|
||||
from pydub import AudioSegment
|
||||
from openai_helper import OpenAIHelper, localized_text
|
||||
@@ -64,6 +65,7 @@ class ChatGPTTelegramBot:
|
||||
self.budget_limit_message = localized_text('budget_limit', bot_language)
|
||||
self.usage = {}
|
||||
self.last_message = {}
|
||||
self.inline_queries_cache = {}
|
||||
|
||||
async def help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""
|
||||
@@ -511,21 +513,89 @@ class ChatGPTTelegramBot:
|
||||
Handle the inline query. This is run when you type: @botusername <query>
|
||||
"""
|
||||
query = update.inline_query.query
|
||||
|
||||
if query == '':
|
||||
if len(query) < 3:
|
||||
return
|
||||
if not await self.check_allowed_and_within_budget(update, context, is_inline=True):
|
||||
return
|
||||
|
||||
results = [
|
||||
InlineQueryResultArticle(
|
||||
id=str(uuid4()),
|
||||
title='Ask ChatGPT',
|
||||
input_message_content=InputTextMessageContent(query),
|
||||
description=query,
|
||||
thumb_url='https://user-images.githubusercontent.com/11541888/223106202-7576ff11-2c8e-408d-94ea-b02a7a32149a.png'
|
||||
)
|
||||
]
|
||||
callback_data_suffix = "gpt:"
|
||||
result_id = str(uuid4())
|
||||
self.inline_queries_cache[result_id] = query
|
||||
callback_data = f'{callback_data_suffix}{result_id}'
|
||||
|
||||
await update.inline_query.answer(results)
|
||||
await self.send_inline_query_result(update, result_id, message_content=query, callback_data=callback_data)
|
||||
|
||||
async def send_inline_query_result(self, update: Update, result_id, message_content, callback_data=""):
|
||||
try:
|
||||
reply_markup = None
|
||||
if callback_data:
|
||||
reply_markup = InlineKeyboardMarkup([[
|
||||
InlineKeyboardButton(text='Answer with ChatGPT',
|
||||
callback_data=callback_data)
|
||||
]])
|
||||
|
||||
inline_query_result = InlineQueryResultArticle(
|
||||
id=result_id,
|
||||
title="Ask ChatGPT",
|
||||
input_message_content=InputTextMessageContent(message_content),
|
||||
description=message_content,
|
||||
thumb_url='https://user-images.githubusercontent.com/11541888/223106202-7576ff11-2c8e-408d-94ea'
|
||||
'-b02a7a32149a.png',
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
|
||||
await update.inline_query.answer([inline_query_result])
|
||||
except Exception as e:
|
||||
logging.error(f'An error occurred while generating the result card for inline query {e}')
|
||||
|
||||
async def handle_callback_inline_query(self, update: Update, context: CallbackContext):
|
||||
callback_data = update.callback_query.data
|
||||
user_id = update.callback_query.from_user.id
|
||||
inline_message_id = update.callback_query.inline_message_id
|
||||
name = update.callback_query.from_user.name
|
||||
callback_data_suffix = "gpt:"
|
||||
query = ""
|
||||
|
||||
try:
|
||||
if callback_data.startswith(callback_data_suffix):
|
||||
unique_id = callback_data.split(':')[1]
|
||||
|
||||
# Retrieve the long text from the cache
|
||||
query = self.inline_queries_cache.get(unique_id)
|
||||
if query:
|
||||
self.inline_queries_cache.pop(unique_id)
|
||||
else:
|
||||
await context.bot.edit_message_text(inline_message_id=inline_message_id,
|
||||
text='An error occurred while reading your prompt. Please try '
|
||||
'again.')
|
||||
return
|
||||
|
||||
# Edit the current message to indicate that the answer is being processed
|
||||
await context.bot.edit_message_text(inline_message_id=inline_message_id,
|
||||
text=f'Getting the answer...\n\n**Prompt:**\n{query}',
|
||||
parse_mode='Markdown')
|
||||
|
||||
logging.info(f'Generating response for inline query by {name}')
|
||||
response, used_tokens = await self.openai.get_chat_response(chat_id=user_id, query=query)
|
||||
|
||||
try:
|
||||
# add chat request to users usage tracker
|
||||
self.usage[user_id].add_chat_tokens(used_tokens, self.config['token_price'])
|
||||
# add guest chat request to guest usage tracker
|
||||
allowed_user_ids = self.config['allowed_user_ids'].split(',')
|
||||
if str(user_id) not in allowed_user_ids and 'guests' in self.usage:
|
||||
self.usage["guests"].add_chat_tokens(used_tokens, self.config['token_price'])
|
||||
except Exception as e:
|
||||
logging.warning(f'Failed to add tokens to usage_logs: {str(e)}')
|
||||
pass
|
||||
|
||||
# Edit the original message with the generated content
|
||||
await context.bot.edit_message_text(inline_message_id=inline_message_id, parse_mode='Markdown',
|
||||
text=f'{query}\n\n**GPT:**\n{response}')
|
||||
except Exception as e:
|
||||
await context.bot.edit_message_text(inline_message_id=inline_message_id,
|
||||
text=f'Failed to generate the answer. Please try again.')
|
||||
logging.error(f'Failed to respond to an inline query via button callback: {e}')
|
||||
|
||||
async def edit_message_with_retry(self, context: ContextTypes.DEFAULT_TYPE, chat_id: int,
|
||||
message_id: int, text: str, markdown: bool = True):
|
||||
@@ -574,24 +644,32 @@ class ChatGPTTelegramBot:
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
|
||||
async def send_disallowed_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
async def send_disallowed_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE, is_inline=False):
|
||||
"""
|
||||
Sends the disallowed message to the user.
|
||||
"""
|
||||
if not is_inline:
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text=self.disallowed_message,
|
||||
disable_web_page_preview=True
|
||||
)
|
||||
else:
|
||||
result_id = str(uuid4())
|
||||
await self.send_inline_query_result(update, result_id, message_content=self.disallowed_message)
|
||||
|
||||
async def send_budget_reached_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
async def send_budget_reached_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE, is_inline=False):
|
||||
"""
|
||||
Sends the budget reached message to the user.
|
||||
"""
|
||||
if not is_inline:
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text=self.budget_limit_message
|
||||
)
|
||||
else:
|
||||
result_id = str(uuid4())
|
||||
await self.send_inline_query_result(update, result_id, message_content=self.budget_limit_message)
|
||||
|
||||
async def error_handler(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""
|
||||
@@ -623,23 +701,23 @@ class ChatGPTTelegramBot:
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
async def is_allowed(self, update: Update, context: CallbackContext) -> bool:
|
||||
async def is_allowed(self, update: Update, context: CallbackContext, is_inline=False) -> bool:
|
||||
"""
|
||||
Checks if the user is allowed to use the bot.
|
||||
"""
|
||||
if self.config['allowed_user_ids'] == '*':
|
||||
return True
|
||||
|
||||
if self.is_admin(update):
|
||||
user_id = update.inline_query.from_user.id if is_inline else update.message.from_user.id
|
||||
if self.is_admin(user_id):
|
||||
return True
|
||||
|
||||
name = update.inline_query.from_user.name if is_inline else update.message.from_user.name
|
||||
allowed_user_ids = self.config['allowed_user_ids'].split(',')
|
||||
# Check if user is allowed
|
||||
if str(update.message.from_user.id) in allowed_user_ids:
|
||||
if str(user_id) in allowed_user_ids:
|
||||
return True
|
||||
|
||||
# Check if it's a group a chat with at least one authorized member
|
||||
if self.is_group_chat(update):
|
||||
if not is_inline and self.is_group_chat(update):
|
||||
admin_user_ids = self.config['admin_user_ids'].split(',')
|
||||
for user in itertools.chain(allowed_user_ids, admin_user_ids):
|
||||
if not user.strip():
|
||||
@@ -647,12 +725,11 @@ class ChatGPTTelegramBot:
|
||||
if await self.is_user_in_group(update, context, user):
|
||||
logging.info(f'{user} is a member. Allowing group chat message...')
|
||||
return True
|
||||
logging.info(f'Group chat messages from user {update.message.from_user.name} '
|
||||
f'(id: {update.message.from_user.id}) are not allowed')
|
||||
|
||||
logging.info(f'Group chat messages from user {name} '
|
||||
f'(id: {user_id}) are not allowed')
|
||||
return False
|
||||
|
||||
def is_admin(self, update: Update, log_no_admin=False) -> bool:
|
||||
def is_admin(self, user_id, log_no_admin=False) -> bool:
|
||||
"""
|
||||
Checks if the user is the admin of the bot.
|
||||
The first user in the user list is the admin.
|
||||
@@ -665,20 +742,20 @@ class ChatGPTTelegramBot:
|
||||
admin_user_ids = self.config['admin_user_ids'].split(',')
|
||||
|
||||
# Check if user is in the admin user list
|
||||
if str(update.message.from_user.id) in admin_user_ids:
|
||||
if str(user_id) in admin_user_ids:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_user_budget(self, update: Update) -> float | None:
|
||||
def get_user_budget(self, user_id) -> float | None:
|
||||
"""
|
||||
Get the user's budget based on their user ID and the bot configuration.
|
||||
:param update: Telegram update object
|
||||
:param user_id: User id
|
||||
:return: The user's budget as a float, or None if the user is not found in the allowed user list
|
||||
"""
|
||||
|
||||
# no budget restrictions for admins and '*'-budget lists
|
||||
if self.is_admin(update) or self.config['user_budgets'] == '*':
|
||||
if self.is_admin(user_id) or self.config['user_budgets'] == '*':
|
||||
return float('inf')
|
||||
|
||||
user_budgets = self.config['user_budgets'].split(',')
|
||||
@@ -689,7 +766,6 @@ class ChatGPTTelegramBot:
|
||||
'only the first value is used as budget for everyone.')
|
||||
return float(user_budgets[0])
|
||||
|
||||
user_id = update.message.from_user.id
|
||||
allowed_user_ids = self.config['allowed_user_ids'].split(',')
|
||||
if str(user_id) in allowed_user_ids:
|
||||
user_index = allowed_user_ids.index(str(user_id))
|
||||
@@ -699,18 +775,20 @@ class ChatGPTTelegramBot:
|
||||
return float(user_budgets[user_index])
|
||||
return None
|
||||
|
||||
def get_remaining_budget(self, update: Update) -> float:
|
||||
def get_remaining_budget(self, update: Update, is_inline=False) -> float:
|
||||
"""
|
||||
Calculate the remaining budget for a user based on their current usage.
|
||||
:param update: Telegram update object
|
||||
:param is_inline: Boolean flag for inline queries
|
||||
:return: The remaining budget for the user as a float
|
||||
"""
|
||||
user_id = update.message.from_user.id
|
||||
user_id = update.inline_query.from_user.id if is_inline else update.message.from_user.id
|
||||
name = update.inline_query.from_user.name if is_inline else update.message.from_user.name
|
||||
if user_id not in self.usage:
|
||||
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
|
||||
self.usage[user_id] = UsageTracker(user_id, name)
|
||||
|
||||
# Get budget for users
|
||||
user_budget = self.get_user_budget(update)
|
||||
user_budget = self.get_user_budget(user_id)
|
||||
budget_period = self.config['budget_period']
|
||||
if user_budget is not None:
|
||||
cost = self.usage[user_id].get_current_cost()[self.budget_cost_map[budget_period]]
|
||||
@@ -722,38 +800,43 @@ class ChatGPTTelegramBot:
|
||||
cost = self.usage['guests'].get_current_cost()[self.budget_cost_map[budget_period]]
|
||||
return self.config['guest_budget'] - cost
|
||||
|
||||
def is_within_budget(self, update: Update) -> bool:
|
||||
def is_within_budget(self, update: Update, is_inline=False) -> bool:
|
||||
"""
|
||||
Checks if the user reached their usage limit.
|
||||
Initializes UsageTracker for user and guest when needed.
|
||||
:param update: Telegram update object
|
||||
:param is_inline: Boolean flag for inline queries
|
||||
:return: Boolean indicating if the user has a positive budget
|
||||
"""
|
||||
user_id = update.message.from_user.id
|
||||
user_id = update.inline_query.from_user.id if is_inline else update.message.from_user.id
|
||||
name = update.inline_query.from_user.name if is_inline else update.message.from_user.name
|
||||
if user_id not in self.usage:
|
||||
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
|
||||
self.usage[user_id] = UsageTracker(user_id, name)
|
||||
|
||||
remaining_budget = self.get_remaining_budget(update)
|
||||
remaining_budget = self.get_remaining_budget(update, is_inline=is_inline)
|
||||
|
||||
return remaining_budget > 0
|
||||
|
||||
async def check_allowed_and_within_budget(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
|
||||
async def check_allowed_and_within_budget(self, update: Update, context: ContextTypes.DEFAULT_TYPE, is_inline=False) -> bool:
|
||||
"""
|
||||
Checks if the user is allowed to use the bot and if they are within their budget
|
||||
:param update: Telegram update object
|
||||
:param context: Telegram context object
|
||||
:param is_inline: Boolean flag for inline queries
|
||||
:return: Boolean indicating if the user is allowed to use the bot
|
||||
"""
|
||||
if not await self.is_allowed(update, context):
|
||||
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
|
||||
f'is not allowed to use the bot')
|
||||
await self.send_disallowed_message(update, context)
|
||||
return False
|
||||
name = update.inline_query.from_user.name if is_inline else update.message.from_user.name
|
||||
user_id = update.inline_query.from_user.id if is_inline else update.message.from_user.id
|
||||
|
||||
if not self.is_within_budget(update):
|
||||
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
|
||||
if not await self.is_allowed(update, context, is_inline=is_inline):
|
||||
logging.warning(f'User {name} (id: {user_id}) '
|
||||
f'is not allowed to use the bot')
|
||||
await self.send_disallowed_message(update, context, is_inline)
|
||||
return False
|
||||
if not self.is_within_budget(update, is_inline=is_inline):
|
||||
logging.warning(f'User {name} (id: {user_id}) '
|
||||
f'reached their usage limit')
|
||||
await self.send_budget_reached_message(update, context)
|
||||
await self.send_budget_reached_message(update, context, is_inline)
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -808,8 +891,9 @@ class ChatGPTTelegramBot:
|
||||
self.transcribe))
|
||||
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), self.prompt))
|
||||
application.add_handler(InlineQueryHandler(self.inline_query, chat_types=[
|
||||
constants.ChatType.GROUP, constants.ChatType.SUPERGROUP
|
||||
constants.ChatType.GROUP, constants.ChatType.SUPERGROUP, constants.ChatType.PRIVATE
|
||||
]))
|
||||
application.add_handler(CallbackQueryHandler(self.handle_callback_inline_query))
|
||||
|
||||
application.add_error_handler(self.error_handler)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user