mirror of
https://github.com/aljazceru/chatgpt-telegram-bot.git
synced 2025-12-22 07:04:59 +01:00
Apply PEP8 styling standard & reformat
This commit is contained in:
@@ -50,10 +50,10 @@ def main():
|
||||
# remove support for old budget names at some point in the future
|
||||
if os.environ.get('MONTHLY_USER_BUDGETS') is not None:
|
||||
logging.warning('The environment variable MONTHLY_USER_BUDGETS is deprecated. '
|
||||
'Please use USER_BUDGETS with BUDGET_PERIOD instead.')
|
||||
'Please use USER_BUDGETS with BUDGET_PERIOD instead.')
|
||||
if os.environ.get('MONTHLY_GUEST_BUDGET') is not None:
|
||||
logging.warning('The environment variable MONTHLY_GUEST_BUDGET is deprecated. '
|
||||
'Please use GUEST_BUDGET with BUDGET_PERIOD instead.')
|
||||
'Please use GUEST_BUDGET with BUDGET_PERIOD instead.')
|
||||
|
||||
telegram_config = {
|
||||
'token': os.environ['TELEGRAM_BOT_TOKEN'],
|
||||
@@ -71,7 +71,7 @@ def main():
|
||||
'ignore_group_transcriptions': os.environ.get('IGNORE_GROUP_TRANSCRIPTIONS', 'true').lower() == 'true',
|
||||
'group_trigger_keyword': os.environ.get('GROUP_TRIGGER_KEYWORD', ''),
|
||||
'token_price': float(os.environ.get('TOKEN_PRICE', 0.002)),
|
||||
'image_prices': [float(i) for i in os.environ.get('IMAGE_PRICES',"0.016,0.018,0.02").split(",")],
|
||||
'image_prices': [float(i) for i in os.environ.get('IMAGE_PRICES', "0.016,0.018,0.02").split(",")],
|
||||
'transcription_price': float(os.environ.get('TOKEN_PRICE', 0.006)),
|
||||
'bot_language': os.environ.get('BOT_LANGUAGE', 'en'),
|
||||
}
|
||||
|
||||
@@ -26,9 +26,11 @@ def default_max_tokens(model: str) -> int:
|
||||
"""
|
||||
return 1200 if model in GPT_3_MODELS else 2400
|
||||
|
||||
|
||||
with open('translations.json', 'r', encoding='utf-8') as f:
|
||||
translations = json.load(f)
|
||||
|
||||
|
||||
def localized_text(key, bot_language):
|
||||
"""
|
||||
Return translated text for a key in specified bot_language.
|
||||
@@ -46,6 +48,7 @@ def localized_text(key, bot_language):
|
||||
# return key as text
|
||||
return key
|
||||
|
||||
|
||||
class OpenAIHelper:
|
||||
"""
|
||||
ChatGPT helper class.
|
||||
@@ -172,7 +175,7 @@ class OpenAIHelper:
|
||||
frequency_penalty=self.config['frequency_penalty'],
|
||||
stream=stream
|
||||
)
|
||||
|
||||
|
||||
except openai.error.RateLimitError as e:
|
||||
raise Exception(f"⚠️ _{localized_text('openai_rate_limit', bot_language)}._ ⚠️\n{str(e)}") from e
|
||||
|
||||
@@ -198,7 +201,10 @@ class OpenAIHelper:
|
||||
|
||||
if 'data' not in response or len(response['data']) == 0:
|
||||
logging.error(f'No response from GPT: {str(response)}')
|
||||
raise Exception(f"⚠️ _{localized_text('error', bot_language)}._ ⚠️\n{localized_text('try_again', bot_language)}.")
|
||||
raise Exception(
|
||||
f"⚠️ _{localized_text('error', bot_language)}._ "
|
||||
f"⚠️\n{localized_text('try_again', bot_language)}."
|
||||
)
|
||||
|
||||
return response['data'][0]['url'], self.config['image_size']
|
||||
except Exception as e:
|
||||
@@ -253,8 +259,8 @@ class OpenAIHelper:
|
||||
:return: The summary
|
||||
"""
|
||||
messages = [
|
||||
{ "role": "assistant", "content": "Summarize this conversation in 700 characters or less" },
|
||||
{ "role": "user", "content": str(conversation) }
|
||||
{"role": "assistant", "content": "Summarize this conversation in 700 characters or less"},
|
||||
{"role": "user", "content": str(conversation)}
|
||||
]
|
||||
response = await openai.ChatCompletion.acreate(
|
||||
model=self.config['model'],
|
||||
@@ -304,7 +310,7 @@ class OpenAIHelper:
|
||||
num_tokens += tokens_per_name
|
||||
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
|
||||
return num_tokens
|
||||
|
||||
|
||||
def get_billing_current_month(self):
|
||||
"""Gets billed usage for current month from OpenAI API.
|
||||
|
||||
@@ -324,5 +330,5 @@ class OpenAIHelper:
|
||||
}
|
||||
response = requests.get("https://api.openai.com/dashboard/billing/usage", headers=headers, params=params)
|
||||
billing_data = json.loads(response.text)
|
||||
usage_month = billing_data["total_usage"] / 100 # convert cent amount to dollars
|
||||
return usage_month
|
||||
usage_month = billing_data["total_usage"] / 100 # convert cent amount to dollars
|
||||
return usage_month
|
||||
|
||||
@@ -7,8 +7,8 @@ import asyncio
|
||||
import telegram
|
||||
from uuid import uuid4
|
||||
from telegram import constants, BotCommandScopeAllGroupChats
|
||||
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
|
||||
from telegram import Message, MessageEntity, Update, InlineQueryResultArticle, InputTextMessageContent, BotCommand, ChatMember
|
||||
from telegram import InlineKeyboardMarkup, InlineKeyboardButton, InlineQueryResultArticle
|
||||
from telegram import Message, MessageEntity, Update, InputTextMessageContent, BotCommand, ChatMember
|
||||
from telegram.error import RetryAfter, TimedOut
|
||||
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, \
|
||||
filters, InlineQueryHandler, CallbackQueryHandler, Application, CallbackContext
|
||||
@@ -17,6 +17,7 @@ from pydub import AudioSegment
|
||||
from openai_helper import OpenAIHelper, localized_text
|
||||
from usage_tracker import UsageTracker
|
||||
|
||||
|
||||
def message_text(message: Message) -> str:
|
||||
"""
|
||||
Returns the text of a message, excluding any bot commands.
|
||||
@@ -25,21 +26,23 @@ def message_text(message: Message) -> str:
|
||||
if message_text is None:
|
||||
return ''
|
||||
|
||||
for _, text in sorted(message.parse_entities([MessageEntity.BOT_COMMAND]).items(), key=(lambda item: item[0].offset)):
|
||||
for _, text in sorted(message.parse_entities([MessageEntity.BOT_COMMAND]).items(),
|
||||
key=(lambda item: item[0].offset)):
|
||||
message_text = message_text.replace(text, '').strip()
|
||||
|
||||
return message_text if len(message_text) > 0 else ''
|
||||
|
||||
|
||||
class ChatGPTTelegramBot:
|
||||
"""
|
||||
Class representing a ChatGPT Telegram Bot.
|
||||
"""
|
||||
# Mapping of budget period to cost period
|
||||
budget_cost_map = {
|
||||
"monthly":"cost_month",
|
||||
"daily":"cost_today",
|
||||
"all-time":"cost_all_time"
|
||||
}
|
||||
"monthly": "cost_month",
|
||||
"daily": "cost_today",
|
||||
"all-time": "cost_all_time"
|
||||
}
|
||||
|
||||
def __init__(self, config: dict, openai: OpenAIHelper):
|
||||
"""
|
||||
@@ -58,8 +61,9 @@ class ChatGPTTelegramBot:
|
||||
BotCommand(command='resend', description=localized_text('resend_description', bot_language))
|
||||
]
|
||||
self.group_commands = [
|
||||
BotCommand(command='chat', description=localized_text('chat_description', bot_language))
|
||||
] + self.commands
|
||||
BotCommand(command='chat',
|
||||
description=localized_text('chat_description', bot_language))
|
||||
] + self.commands
|
||||
self.disallowed_message = localized_text('disallowed', bot_language)
|
||||
self.budget_limit_message = localized_text('budget_limit', bot_language)
|
||||
self.usage = {}
|
||||
@@ -74,40 +78,39 @@ class ChatGPTTelegramBot:
|
||||
commands_description = [f'/{command.command} - {command.description}' for command in commands]
|
||||
bot_language = self.config['bot_language']
|
||||
help_text = (
|
||||
localized_text('help_text', bot_language)[0] +
|
||||
'\n\n' +
|
||||
'\n'.join(commands_description) +
|
||||
'\n\n' +
|
||||
localized_text('help_text', bot_language)[1] +
|
||||
'\n\n' +
|
||||
localized_text('help_text', bot_language)[2]
|
||||
localized_text('help_text', bot_language)[0] +
|
||||
'\n\n' +
|
||||
'\n'.join(commands_description) +
|
||||
'\n\n' +
|
||||
localized_text('help_text', bot_language)[1] +
|
||||
'\n\n' +
|
||||
localized_text('help_text', bot_language)[2]
|
||||
)
|
||||
await update.message.reply_text(help_text, disable_web_page_preview=True)
|
||||
|
||||
|
||||
async def stats(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
"""
|
||||
Returns token usage statistics for current day and month.
|
||||
"""
|
||||
if not await self.is_allowed(update, context):
|
||||
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
|
||||
f'is not allowed to request their usage statistics')
|
||||
f'is not allowed to request their usage statistics')
|
||||
await self.send_disallowed_message(update, context)
|
||||
return
|
||||
|
||||
logging.info(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
|
||||
f'requested their usage statistics')
|
||||
|
||||
f'requested their usage statistics')
|
||||
|
||||
user_id = update.message.from_user.id
|
||||
if user_id not in self.usage:
|
||||
self.usage[user_id] = UsageTracker(user_id, update.message.from_user.name)
|
||||
|
||||
tokens_today, tokens_month = self.usage[user_id].get_current_token_usage()
|
||||
images_today, images_month = self.usage[user_id].get_current_image_count()
|
||||
(transcribe_minutes_today, transcribe_seconds_today, transcribe_minutes_month,
|
||||
transcribe_seconds_month) = self.usage[user_id].get_current_transcription_duration()
|
||||
(transcribe_minutes_today, transcribe_seconds_today, transcribe_minutes_month,
|
||||
transcribe_seconds_month) = self.usage[user_id].get_current_transcription_duration()
|
||||
current_cost = self.usage[user_id].get_current_cost()
|
||||
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
chat_messages, chat_token_length = self.openai.get_conversation_stats(chat_id)
|
||||
remaining_budget = self.get_remaining_budget(update)
|
||||
@@ -137,13 +140,20 @@ class ChatGPTTelegramBot:
|
||||
)
|
||||
# text_budget filled with conditional content
|
||||
text_budget = "\n\n"
|
||||
budget_period =self.config['budget_period']
|
||||
budget_period = self.config['budget_period']
|
||||
if remaining_budget < float('inf'):
|
||||
text_budget += f"{localized_text('stats_budget', bot_language)}{localized_text(budget_period, bot_language)}: ${remaining_budget:.2f}.\n"
|
||||
text_budget += (
|
||||
f"{localized_text('stats_budget', bot_language)}"
|
||||
f"{localized_text(budget_period, bot_language)}: "
|
||||
f"${remaining_budget:.2f}.\n"
|
||||
)
|
||||
# add OpenAI account information for admin request
|
||||
if self.is_admin(update):
|
||||
text_budget += f"{localized_text('stats_openai', bot_language)}{self.openai.get_billing_current_month():.2f}"
|
||||
|
||||
text_budget += (
|
||||
f"{localized_text('stats_openai', bot_language)}"
|
||||
f"{self.openai.get_billing_current_month():.2f}"
|
||||
)
|
||||
|
||||
usage_text = text_current_conversation + text_today + text_month + text_budget
|
||||
await update.message.reply_text(usage_text, parse_mode=constants.ParseMode.MARKDOWN)
|
||||
|
||||
@@ -161,7 +171,8 @@ class ChatGPTTelegramBot:
|
||||
if chat_id not in self.last_message:
|
||||
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id})'
|
||||
f' does not have anything to resend')
|
||||
await context.bot.send_message(chat_id=chat_id, text=localized_text('resend_failed', self.config['bot_language']))
|
||||
await context.bot.send_message(chat_id=chat_id,
|
||||
text=localized_text('resend_failed', self.config['bot_language']))
|
||||
return
|
||||
|
||||
# Update message text, clear self.last_message and send the request to prompt
|
||||
@@ -178,12 +189,12 @@ class ChatGPTTelegramBot:
|
||||
"""
|
||||
if not await self.is_allowed(update, context):
|
||||
logging.warning(f'User {update.message.from_user.name} (id: {update.message.from_user.id}) '
|
||||
f'is not allowed to reset the conversation')
|
||||
f'is not allowed to reset the conversation')
|
||||
await self.send_disallowed_message(update, context)
|
||||
return
|
||||
|
||||
logging.info(f'Resetting the conversation for user {update.message.from_user.name} '
|
||||
f'(id: {update.message.from_user.id})...')
|
||||
f'(id: {update.message.from_user.id})...')
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
reset_content = message_text(update.message)
|
||||
@@ -194,17 +205,19 @@ class ChatGPTTelegramBot:
|
||||
"""
|
||||
Generates an image for the given prompt using DALL·E APIs
|
||||
"""
|
||||
if not self.config['enable_image_generation'] or not await self.check_allowed_and_within_budget(update, context):
|
||||
if not self.config['enable_image_generation'] or not await self.check_allowed_and_within_budget(update,
|
||||
context):
|
||||
return
|
||||
|
||||
chat_id = update.effective_chat.id
|
||||
image_query = message_text(update.message)
|
||||
if image_query == '':
|
||||
await context.bot.send_message(chat_id=chat_id, text=localized_text('image_no_prompt', self.config['bot_language']))
|
||||
await context.bot.send_message(chat_id=chat_id,
|
||||
text=localized_text('image_no_prompt', self.config['bot_language']))
|
||||
return
|
||||
|
||||
logging.info(f'New image generation request received from user {update.message.from_user.name} '
|
||||
f'(id: {update.message.from_user.id})')
|
||||
f'(id: {update.message.from_user.id})')
|
||||
|
||||
async def _generate():
|
||||
try:
|
||||
@@ -257,7 +270,10 @@ class ChatGPTTelegramBot:
|
||||
await context.bot.send_message(
|
||||
chat_id=chat_id,
|
||||
reply_to_message_id=self.get_reply_to_message_id(update),
|
||||
text=f"{localized_text('media_download_fail', bot_language)[0]}: {str(e)}. {localized_text('media_download_fail', bot_language)[1]}",
|
||||
text=(
|
||||
f"{localized_text('media_download_fail', bot_language)[0]}: "
|
||||
f"{str(e)}. {localized_text('media_download_fail', bot_language)[1]}"
|
||||
),
|
||||
parse_mode=constants.ParseMode.MARKDOWN
|
||||
)
|
||||
return
|
||||
@@ -267,7 +283,7 @@ class ChatGPTTelegramBot:
|
||||
audio_track = AudioSegment.from_file(filename)
|
||||
audio_track.export(filename_mp3, format="mp3")
|
||||
logging.info(f'New transcribe request received from user {update.message.from_user.name} '
|
||||
f'(id: {update.message.from_user.id})')
|
||||
f'(id: {update.message.from_user.id})')
|
||||
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
@@ -323,7 +339,10 @@ class ChatGPTTelegramBot:
|
||||
self.usage["guests"].add_chat_tokens(total_tokens, self.config['token_price'])
|
||||
|
||||
# Split into chunks of 4096 characters (Telegram's message limit)
|
||||
transcript_output = f"_{localized_text('transcript', bot_language)}:_\n\"{transcript}\"\n\n_{localized_text('answer', bot_language)}:_\n{response}"
|
||||
transcript_output = (
|
||||
f"_{localized_text('transcript', bot_language)}:_\n\"{transcript}\"\n\n"
|
||||
f"_{localized_text('answer', bot_language)}:_\n{response}"
|
||||
)
|
||||
chunks = self.split_into_chunks(transcript_output)
|
||||
|
||||
for index, transcript_chunk in enumerate(chunks):
|
||||
@@ -360,8 +379,9 @@ class ChatGPTTelegramBot:
|
||||
|
||||
if not await self.check_allowed_and_within_budget(update, context):
|
||||
return
|
||||
|
||||
logging.info(f'New message received from user {update.message.from_user.name} (id: {update.message.from_user.id})')
|
||||
|
||||
logging.info(
|
||||
f'New message received from user {update.message.from_user.name} (id: {update.message.from_user.id})')
|
||||
chat_id = update.effective_chat.id
|
||||
user_id = update.message.from_user.id
|
||||
prompt = message_text(update.message)
|
||||
@@ -400,7 +420,8 @@ class ChatGPTTelegramBot:
|
||||
if chunk != len(chunks) - 1:
|
||||
chunk += 1
|
||||
try:
|
||||
await self.edit_message_with_retry(context, chat_id, str(sent_message.message_id), chunks[-2])
|
||||
await self.edit_message_with_retry(context, chat_id, str(sent_message.message_id),
|
||||
chunks[-2])
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
@@ -414,9 +435,11 @@ class ChatGPTTelegramBot:
|
||||
|
||||
if is_group_chat:
|
||||
# group chats have stricter flood limits
|
||||
cutoff = 180 if len(content) > 1000 else 120 if len(content) > 200 else 90 if len(content) > 50 else 50
|
||||
cutoff = 180 if len(content) > 1000 else 120 if len(content) > 200 else 90 if len(
|
||||
content) > 50 else 50
|
||||
else:
|
||||
cutoff = 90 if len(content) > 1000 else 45 if len(content) > 200 else 25 if len(content) > 50 else 15
|
||||
cutoff = 90 if len(content) > 1000 else 45 if len(content) > 200 else 25 if len(
|
||||
content) > 50 else 15
|
||||
|
||||
cutoff += backoff
|
||||
|
||||
@@ -463,6 +486,7 @@ class ChatGPTTelegramBot:
|
||||
|
||||
else:
|
||||
total_tokens = 0
|
||||
|
||||
async def _reply():
|
||||
nonlocal total_tokens
|
||||
response, total_tokens = await self.openai.get_chat_response(chat_id=chat_id, query=prompt)
|
||||
@@ -571,7 +595,10 @@ class ChatGPTTelegramBot:
|
||||
if query:
|
||||
self.inline_queries_cache.pop(unique_id)
|
||||
else:
|
||||
error_message = f'{localized_text("error", bot_language)}. {localized_text("try_again", bot_language)}'
|
||||
error_message = (
|
||||
f'{localized_text("error", bot_language)}. '
|
||||
f'{localized_text("try_again", bot_language)}'
|
||||
)
|
||||
await self.edit_message_with_retry(context,
|
||||
chat_id=None,
|
||||
message_id=inline_message_id,
|
||||
@@ -644,7 +671,8 @@ class ChatGPTTelegramBot:
|
||||
logging.warning(str(e))
|
||||
raise e
|
||||
|
||||
async def wrap_with_indicator(self, update: Update, context: CallbackContext, chat_action: constants.ChatAction, coroutine):
|
||||
async def wrap_with_indicator(self, update: Update, context: CallbackContext, chat_action: constants.ChatAction,
|
||||
coroutine):
|
||||
"""
|
||||
Wraps a coroutine while repeatedly sending a chat action to the user.
|
||||
"""
|
||||
@@ -738,7 +766,7 @@ class ChatGPTTelegramBot:
|
||||
logging.info(f'{user} is a member. Allowing group chat message...')
|
||||
return True
|
||||
logging.info(f'Group chat messages from user {name} '
|
||||
f'(id: {user_id}) are not allowed')
|
||||
f'(id: {user_id}) are not allowed')
|
||||
return False
|
||||
|
||||
def is_admin(self, user_id, log_no_admin=False) -> bool:
|
||||
@@ -765,11 +793,11 @@ class ChatGPTTelegramBot:
|
||||
:param user_id: User id
|
||||
:return: The user's budget as a float, or None if the user is not found in the allowed user list
|
||||
"""
|
||||
|
||||
|
||||
# no budget restrictions for admins and '*'-budget lists
|
||||
if self.is_admin(user_id) or self.config['user_budgets'] == '*':
|
||||
return float('inf')
|
||||
|
||||
|
||||
user_budgets = self.config['user_budgets'].split(',')
|
||||
if self.config['allowed_user_ids'] == '*':
|
||||
# same budget for all users, use value in first position of budget list
|
||||
@@ -798,7 +826,7 @@ class ChatGPTTelegramBot:
|
||||
name = update.inline_query.from_user.name if is_inline else update.message.from_user.name
|
||||
if user_id not in self.usage:
|
||||
self.usage[user_id] = UsageTracker(user_id, name)
|
||||
|
||||
|
||||
# Get budget for users
|
||||
user_budget = self.get_user_budget(user_id)
|
||||
budget_period = self.config['budget_period']
|
||||
@@ -829,7 +857,8 @@ class ChatGPTTelegramBot:
|
||||
|
||||
return remaining_budget > 0
|
||||
|
||||
async def check_allowed_and_within_budget(self, update: Update, context: ContextTypes.DEFAULT_TYPE, is_inline=False) -> bool:
|
||||
async def check_allowed_and_within_budget(self, update: Update, context: ContextTypes.DEFAULT_TYPE,
|
||||
is_inline=False) -> bool:
|
||||
"""
|
||||
Checks if the user is allowed to use the bot and if they are within their budget
|
||||
:param update: Telegram update object
|
||||
@@ -842,12 +871,12 @@ class ChatGPTTelegramBot:
|
||||
|
||||
if not await self.is_allowed(update, context, is_inline=is_inline):
|
||||
logging.warning(f'User {name} (id: {user_id}) '
|
||||
f'is not allowed to use the bot')
|
||||
f'is not allowed to use the bot')
|
||||
await self.send_disallowed_message(update, context, is_inline)
|
||||
return False
|
||||
if not self.is_within_budget(update, is_inline=is_inline):
|
||||
logging.warning(f'User {name} (id: {user_id}) '
|
||||
f'reached their usage limit')
|
||||
f'reached their usage limit')
|
||||
await self.send_budget_reached_message(update, context, is_inline)
|
||||
return False
|
||||
|
||||
|
||||
@@ -3,10 +3,12 @@ import pathlib
|
||||
import json
|
||||
from datetime import date
|
||||
|
||||
|
||||
def year_month(date):
|
||||
# extract string of year-month from date, eg: '2023-03'
|
||||
return str(date)[:7]
|
||||
|
||||
|
||||
class UsageTracker:
|
||||
"""
|
||||
UsageTracker class
|
||||
@@ -75,7 +77,8 @@ class UsageTracker:
|
||||
last_update = date.fromisoformat(self.usage["current_cost"]["last_update"])
|
||||
token_cost = round(tokens * tokens_price / 1000, 6)
|
||||
# add to all_time cost, initialize with calculation of total_cost if key doesn't exist
|
||||
self.usage["current_cost"]["all_time"] = self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + token_cost
|
||||
self.usage["current_cost"]["all_time"] = \
|
||||
self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + token_cost
|
||||
# add current cost, update new day
|
||||
if today == last_update:
|
||||
self.usage["current_cost"]["day"] += token_cost
|
||||
@@ -94,7 +97,7 @@ class UsageTracker:
|
||||
else:
|
||||
# create new entry for current date
|
||||
self.usage["usage_history"]["chat_tokens"][str(today)] = tokens
|
||||
|
||||
|
||||
# write updated token usage to user file
|
||||
with open(self.user_file, "w") as outfile:
|
||||
json.dump(self.usage, outfile)
|
||||
@@ -109,7 +112,7 @@ class UsageTracker:
|
||||
usage_day = self.usage["usage_history"]["chat_tokens"][str(today)]
|
||||
else:
|
||||
usage_day = 0
|
||||
month = str(today)[:7] # year-month as string
|
||||
month = str(today)[:7] # year-month as string
|
||||
usage_month = 0
|
||||
for today, tokens in self.usage["usage_history"]["chat_tokens"].items():
|
||||
if today.startswith(month):
|
||||
@@ -132,7 +135,8 @@ class UsageTracker:
|
||||
today = date.today()
|
||||
last_update = date.fromisoformat(self.usage["current_cost"]["last_update"])
|
||||
# add to all_time cost, initialize with calculation of total_cost if key doesn't exist
|
||||
self.usage["current_cost"]["all_time"] = self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + image_cost
|
||||
self.usage["current_cost"]["all_time"] = \
|
||||
self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + image_cost
|
||||
# add current cost, update new day
|
||||
if today == last_update:
|
||||
self.usage["current_cost"]["day"] += image_cost
|
||||
@@ -153,7 +157,7 @@ class UsageTracker:
|
||||
# create new entry for current date
|
||||
self.usage["usage_history"]["number_images"][str(today)] = [0, 0, 0]
|
||||
self.usage["usage_history"]["number_images"][str(today)][requested_size] += 1
|
||||
|
||||
|
||||
# write updated image number to user file
|
||||
with open(self.user_file, "w") as outfile:
|
||||
json.dump(self.usage, outfile)
|
||||
@@ -163,12 +167,12 @@ class UsageTracker:
|
||||
|
||||
:return: total number of images requested per day and per month
|
||||
"""
|
||||
today=date.today()
|
||||
today = date.today()
|
||||
if str(today) in self.usage["usage_history"]["number_images"]:
|
||||
usage_day = sum(self.usage["usage_history"]["number_images"][str(today)])
|
||||
else:
|
||||
usage_day = 0
|
||||
month = str(today)[:7] # year-month as string
|
||||
month = str(today)[:7] # year-month as string
|
||||
usage_month = 0
|
||||
for today, images in self.usage["usage_history"]["number_images"].items():
|
||||
if today.startswith(month):
|
||||
@@ -186,7 +190,8 @@ class UsageTracker:
|
||||
last_update = date.fromisoformat(self.usage["current_cost"]["last_update"])
|
||||
transcription_price = round(seconds * minute_price / 60, 2)
|
||||
# add to all_time cost, initialize with calculation of total_cost if key doesn't exist
|
||||
self.usage["current_cost"]["all_time"] = self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + transcription_price
|
||||
self.usage["current_cost"]["all_time"] = \
|
||||
self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + transcription_price
|
||||
# add current cost, update new day
|
||||
if today == last_update:
|
||||
self.usage["current_cost"]["day"] += transcription_price
|
||||
@@ -206,7 +211,7 @@ class UsageTracker:
|
||||
else:
|
||||
# create new entry for current date
|
||||
self.usage["usage_history"]["transcription_seconds"][str(today)] = seconds
|
||||
|
||||
|
||||
# write updated token usage to user file
|
||||
with open(self.user_file, "w") as outfile:
|
||||
json.dump(self.usage, outfile)
|
||||
@@ -221,7 +226,7 @@ class UsageTracker:
|
||||
seconds_day = self.usage["usage_history"]["transcription_seconds"][str(today)]
|
||||
else:
|
||||
seconds_day = 0
|
||||
month = str(today)[:7] # year-month as string
|
||||
month = str(today)[:7] # year-month as string
|
||||
seconds_month = 0
|
||||
for today, seconds in self.usage["usage_history"]["transcription_seconds"].items():
|
||||
if today.startswith(month):
|
||||
@@ -229,7 +234,7 @@ class UsageTracker:
|
||||
minutes_day, seconds_day = divmod(seconds_day, 60)
|
||||
minutes_month, seconds_month = divmod(seconds_month, 60)
|
||||
return int(minutes_day), round(seconds_day, 2), int(minutes_month), round(seconds_month, 2)
|
||||
|
||||
|
||||
# general functions
|
||||
def get_current_cost(self):
|
||||
"""Get total USD amount of all requests of the current day and month
|
||||
@@ -262,13 +267,13 @@ class UsageTracker:
|
||||
"""
|
||||
total_tokens = sum(self.usage['usage_history']['chat_tokens'].values())
|
||||
token_cost = round(total_tokens * tokens_price / 1000, 6)
|
||||
|
||||
|
||||
total_images = [sum(values) for values in zip(*self.usage['usage_history']['number_images'].values())]
|
||||
image_prices_list = [float(x) for x in image_prices.split(',')]
|
||||
image_cost = sum([count * price for count, price in zip(total_images, image_prices_list)])
|
||||
|
||||
|
||||
total_transcription_seconds = sum(self.usage['usage_history']['transcription_seconds'].values())
|
||||
transcription_cost = round(total_transcription_seconds * minute_price / 60, 2)
|
||||
|
||||
all_time_cost = token_cost + transcription_cost + image_cost
|
||||
return all_time_cost
|
||||
return all_time_cost
|
||||
|
||||
Reference in New Issue
Block a user