Merge pull request #178 from AlexHTW/add-budget-types

Add budget periods and improvements
This commit is contained in:
ned
2023-04-10 17:13:02 +02:00
committed by GitHub
5 changed files with 146 additions and 90 deletions

View File

@@ -1,3 +1,4 @@
from __future__ import annotations
import logging
import os
import itertools
@@ -34,6 +35,18 @@ class ChatGPTTelegramBot:
"""
Class representing a ChatGPT Telegram Bot.
"""
# Mapping of budget period to cost period
budget_cost_map = {
"monthly":"cost_month",
"daily":"cost_today",
"all-time":"cost_all_time"
}
# Mapping of budget period to a print output
budget_print_map = {
"monthly": " this month",
"daily": " today",
"all-time": ""
}
def __init__(self, config: dict, openai: OpenAIHelper):
"""
@@ -56,7 +69,7 @@ class ChatGPTTelegramBot:
] + self.commands
self.disallowed_message = "Sorry, you are not allowed to use this bot. You can check out the source code at " \
"https://github.com/n3d1117/chatgpt-telegram-bot"
self.budget_limit_message = "Sorry, you have reached your monthly usage limit."
self.budget_limit_message = f"Sorry, you have reached your usage limit{self.budget_print_map[config['budget_period']]}."
self.usage = {}
self.last_message = {}
@@ -95,12 +108,13 @@ class ChatGPTTelegramBot:
tokens_today, tokens_month = self.usage[user_id].get_current_token_usage()
images_today, images_month = self.usage[user_id].get_current_image_count()
transcribe_durations = self.usage[user_id].get_current_transcription_duration()
cost_today, cost_month = self.usage[user_id].get_current_cost()
(transcribe_minutes_today, transcribe_seconds_today, transcribe_minutes_month,
transcribe_seconds_month) = self.usage[user_id].get_current_transcription_duration()
current_cost = self.usage[user_id].get_current_cost()
chat_id = update.effective_chat.id
chat_messages, chat_token_length = self.openai.get_conversation_stats(chat_id)
budget = await self.get_remaining_budget(update)
remaining_budget = self.get_remaining_budget(update)
text_current_conversation = f"*Current conversation:*\n"+\
f"{chat_messages} chat messages in history.\n"+\
@@ -109,18 +123,19 @@ class ChatGPTTelegramBot:
text_today = f"*Usage today:*\n"+\
f"{tokens_today} chat tokens used.\n"+\
f"{images_today} images generated.\n"+\
f"{transcribe_durations[0]} minutes and {transcribe_durations[1]} seconds transcribed.\n"+\
f"💰 For a total amount of ${cost_today:.2f}\n"+\
f"{transcribe_minutes_today} minutes and {transcribe_seconds_today} seconds transcribed.\n"+\
f"💰 For a total amount of ${current_cost['cost_today']:.2f}\n"+\
f"----------------------------\n"
text_month = f"*Usage this month:*\n"+\
f"{tokens_month} chat tokens used.\n"+\
f"{images_month} images generated.\n"+\
f"{transcribe_durations[2]} minutes and {transcribe_durations[3]} seconds transcribed.\n"+\
f"💰 For a total amount of ${cost_month:.2f}"
f"{transcribe_minutes_month} minutes and {transcribe_seconds_month} seconds transcribed.\n"+\
f"💰 For a total amount of ${current_cost['cost_month']:.2f}"
# text_budget filled with conditional content
text_budget = "\n\n"
if budget < float('inf'):
text_budget += f"You have a remaining budget of ${budget:.2f} this month.\n"
budget_period =self.config['budget_period']
if remaining_budget < float('inf'):
text_budget += f"You have a remaining budget of ${remaining_budget:.2f}{self.budget_print_map[budget_period]}.\n"
# add OpenAI account information for admin request
if self.is_admin(update):
text_budget += f"Your OpenAI account was billed ${self.openai.get_billing_current_month():.2f} this month."
@@ -631,13 +646,14 @@ class ChatGPTTelegramBot:
return False
def is_admin(self, update: Update) -> bool:
def is_admin(self, update: Update, log_no_admin=False) -> bool:
"""
Checks if the user is the admin of the bot.
The first user in the user list is the admin.
"""
if self.config['admin_user_ids'] == '-':
logging.info('No admin user defined.')
if log_no_admin:
logging.info('No admin user defined.')
return False
admin_user_ids = self.config['admin_user_ids'].split(',')
@@ -648,78 +664,72 @@ class ChatGPTTelegramBot:
return False
async def get_remaining_budget(self, update: Update) -> float:
def get_user_budget(self, update: Update) -> float | None:
"""
Get the user's budget based on their user ID and the bot configuration.
:param update: Telegram update object
:return: The user's budget as a float, or None if the user is not found in the allowed user list
"""
# no budget restrictions for admins and '*'-budget lists
if self.is_admin(update) or self.config['user_budgets'] == '*':
return float('inf')
user_budgets = self.config['user_budgets'].split(',')
if self.config['allowed_user_ids'] == '*':
# same budget for all users, use value in first position of budget list
if len(user_budgets) > 1:
logging.warning('multiple values for budgets set with unrestricted user list '
'only the first value is used as budget for everyone.')
return float(user_budgets[0])
user_id = update.message.from_user.id
if user_id not in self.usage:
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
if self.is_admin(update):
return float('inf')
if self.config['monthly_user_budgets'] == '*':
return float('inf')
allowed_user_ids = self.config['allowed_user_ids'].split(',')
if str(user_id) in allowed_user_ids:
# find budget for allowed user
user_index = allowed_user_ids.index(str(user_id))
user_budgets = self.config['monthly_user_budgets'].split(',')
# check if user is included in budgets list
if len(user_budgets) <= user_index:
logging.warning(f'No budget set for user: {update.message.from_user.name} ({user_id}).')
logging.warning(f'No budget set for user id: {user_id}. Budget list shorter than user list.')
return 0.0
user_budget = float(user_budgets[user_index])
cost_month = self.usage[user_id].get_current_cost()[1]
remaining_budget = user_budget - cost_month
return remaining_budget
else:
return 0.0
return float(user_budgets[user_index])
return None
async def is_within_budget(self, update: Update, context: CallbackContext) -> bool:
def get_remaining_budget(self, update: Update) -> float:
"""
Checks if the user reached their monthly usage limit.
Calculate the remaining budget for a user based on their current usage.
:param update: Telegram update object
:return: The remaining budget for the user as a float
"""
user_id = update.message.from_user.id
if user_id not in self.usage:
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
# Get budget for users
user_budget = self.get_user_budget(update)
budget_period = self.config['budget_period']
if user_budget is not None:
cost = self.usage[user_id].get_current_cost()[self.budget_cost_map[budget_period]]
return user_budget - cost
# Get budget for guests
if 'guests' not in self.usage:
self.usage['guests'] = UsageTracker('guests', 'all guest users in group chats')
cost = self.usage['guests'].get_current_cost()[self.budget_cost_map[budget_period]]
return self.config['guest_budget'] - cost
def is_within_budget(self, update: Update) -> bool:
"""
Checks if the user reached their usage limit.
Initializes UsageTracker for user and guest when needed.
:param update: Telegram update object
:return: Boolean indicating if the user has a positive budget
"""
user_id = update.message.from_user.id
if user_id not in self.usage:
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
if self.is_admin(update):
return True
remaining_budget = self.get_remaining_budget(update)
if self.config['monthly_user_budgets'] == '*':
return True
allowed_user_ids = self.config['allowed_user_ids'].split(',')
if str(user_id) in allowed_user_ids:
# find budget for allowed user
user_index = allowed_user_ids.index(str(user_id))
user_budgets = self.config['monthly_user_budgets'].split(',')
# check if user is included in budgets list
if len(user_budgets) <= user_index:
logging.warning(f'No budget set for user: {update.message.from_user.name} ({user_id}).')
return False
user_budget = float(user_budgets[user_index])
cost_month = self.usage[user_id].get_current_cost()[1]
# Check if allowed user is within budget
return user_budget > cost_month
# Check if group member is within budget
if self.is_group_chat(update):
admin_user_ids = self.config['admin_user_ids'].split(',')
for user in itertools.chain(allowed_user_ids, admin_user_ids):
if not user.strip():
continue
if await self.is_user_in_group(update, context, user):
if 'guests' not in self.usage:
self.usage['guests'] = UsageTracker('guests', 'all guest users in group chats')
if self.config['monthly_guest_budget'] >= self.usage['guests'].get_current_cost()[1]:
return True
logging.warning('Monthly guest budget for group chats used up.')
return False
logging.info(f'Group chat messages from user {update.message.from_user.name} '
f'(id: {update.message.from_user.id}) are not allowed')
return False
return remaining_budget > 0
async def check_allowed_and_within_budget(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""
@@ -734,7 +744,7 @@ class ChatGPTTelegramBot:
await self.send_disallowed_message(update, context)
return False
if not await self.is_within_budget(update, context):
if not self.is_within_budget(update):
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
f'reached their usage limit')
await self.send_budget_reached_message(update, context)