Files
chatgpt-telegram-bot/bot/usage_tracker.py
2023-11-18 18:05:27 +01:00

365 lines
16 KiB
Python

import os.path
import pathlib
import json
from datetime import date
def year_month(date_str):
# extract string of year-month from date, eg: '2023-03'
return str(date_str)[:7]
class UsageTracker:
"""
UsageTracker class
Enables tracking of daily/monthly usage per user.
User files are stored as JSON in /usage_logs directory.
JSON example:
{
"user_name": "@user_name",
"current_cost": {
"day": 0.45,
"month": 3.23,
"all_time": 3.23,
"last_update": "2023-03-14"},
"usage_history": {
"chat_tokens": {
"2023-03-13": 520,
"2023-03-14": 1532
},
"transcription_seconds": {
"2023-03-13": 125,
"2023-03-14": 64
},
"number_images": {
"2023-03-12": [0, 2, 3],
"2023-03-13": [1, 2, 3],
"2023-03-14": [0, 1, 2]
}
}
}
"""
def __init__(self, user_id, user_name, logs_dir="usage_logs"):
"""
Initializes UsageTracker for a user with current date.
Loads usage data from usage log file.
:param user_id: Telegram ID of the user
:param user_name: Telegram user name
:param logs_dir: path to directory of usage logs, defaults to "usage_logs"
"""
self.user_id = user_id
self.logs_dir = logs_dir
# path to usage file of given user
self.user_file = f"{logs_dir}/{user_id}.json"
if os.path.isfile(self.user_file):
with open(self.user_file, "r") as file:
self.usage = json.load(file)
if 'vision_tokens' not in self.usage['usage_history']:
self.usage['usage_history']['vision_tokens'] = {}
if 'tts_characters' not in self.usage['usage_history']:
self.usage['usage_history']['tts_characters'] = {}
else:
# ensure directory exists
pathlib.Path(logs_dir).mkdir(exist_ok=True)
# create new dictionary for this user
self.usage = {
"user_name": user_name,
"current_cost": {"day": 0.0, "month": 0.0, "all_time": 0.0, "last_update": str(date.today())},
"usage_history": {"chat_tokens": {}, "transcription_seconds": {}, "number_images": {}, "tts_characters": {}, "vision_tokens":{}}
}
# token usage functions:
def add_chat_tokens(self, tokens, tokens_price=0.002):
"""Adds used tokens from a request to a users usage history and updates current cost
:param tokens: total tokens used in last request
:param tokens_price: price per 1000 tokens, defaults to 0.002
"""
today = date.today()
token_cost = round(float(tokens) * tokens_price / 1000, 6)
self.add_current_costs(token_cost)
# update usage_history
if str(today) in self.usage["usage_history"]["chat_tokens"]:
# add token usage to existing date
self.usage["usage_history"]["chat_tokens"][str(today)] += tokens
else:
# create new entry for current date
self.usage["usage_history"]["chat_tokens"][str(today)] = tokens
# write updated token usage to user file
with open(self.user_file, "w") as outfile:
json.dump(self.usage, outfile)
def get_current_token_usage(self):
"""Get token amounts used for today and this month
:return: total number of tokens used per day and per month
"""
today = date.today()
if str(today) in self.usage["usage_history"]["chat_tokens"]:
usage_day = self.usage["usage_history"]["chat_tokens"][str(today)]
else:
usage_day = 0
month = str(today)[:7] # year-month as string
usage_month = 0
for today, tokens in self.usage["usage_history"]["chat_tokens"].items():
if today.startswith(month):
usage_month += tokens
return usage_day, usage_month
# image usage functions:
def add_image_request(self, image_size, image_prices="0.016,0.018,0.02"):
"""Add image request to users usage history and update current costs.
:param image_size: requested image size
:param image_prices: prices for images of sizes ["256x256", "512x512", "1024x1024"],
defaults to [0.016, 0.018, 0.02]
"""
sizes = ["256x256", "512x512", "1024x1024"]
requested_size = sizes.index(image_size)
image_cost = image_prices[requested_size]
today = date.today()
self.add_current_costs(image_cost)
# update usage_history
if str(today) in self.usage["usage_history"]["number_images"]:
# add token usage to existing date
self.usage["usage_history"]["number_images"][str(today)][requested_size] += 1
else:
# create new entry for current date
self.usage["usage_history"]["number_images"][str(today)] = [0, 0, 0]
self.usage["usage_history"]["number_images"][str(today)][requested_size] += 1
# write updated image number to user file
with open(self.user_file, "w") as outfile:
json.dump(self.usage, outfile)
def get_current_image_count(self):
"""Get number of images requested for today and this month.
:return: total number of images requested per day and per month
"""
today = date.today()
if str(today) in self.usage["usage_history"]["number_images"]:
usage_day = sum(self.usage["usage_history"]["number_images"][str(today)])
else:
usage_day = 0
month = str(today)[:7] # year-month as string
usage_month = 0
for today, images in self.usage["usage_history"]["number_images"].items():
if today.startswith(month):
usage_month += sum(images)
return usage_day, usage_month
# vision usage functions
def add_vision_tokens(self, tokens, vision_token_price=0.01):
"""
Adds requested vision tokens to a users usage history and updates current cost.
:param tokens: total tokens used in last request
:param vision_token_price: price per 1K tokens transcription, defaults to 0.01
"""
today = date.today()
token_price = round(tokens * vision_token_price / 1000, 2)
self.add_current_costs(token_price)
# update usage_history
if str(today) in self.usage["usage_history"]["vision_tokens"]:
# add requested seconds to existing date
self.usage["usage_history"]["vision_tokens"][str(today)] += tokens
else:
# create new entry for current date
self.usage["usage_history"]["vision_tokens"][str(today)] = tokens
# write updated token usage to user file
with open(self.user_file, "w") as outfile:
json.dump(self.usage, outfile)
def get_current_vision_tokens(self):
"""Get vision tokens for today and this month.
:return: total amount of vision tokens per day and per month
"""
today = date.today()
if str(today) in self.usage["usage_history"]["vision_tokens"]:
tokens_day = self.usage["usage_history"]["vision_tokens"][str(today)]
else:
tokens_day = 0
month = str(today)[:7] # year-month as string
tokens_month = 0
for today, tokens in self.usage["usage_history"]["vision_tokens"].items():
if today.startswith(month):
tokens_month += tokens
return tokens_day, tokens_month
# tts usage functions:
def add_tts_request(self, text_length, tts_model, tts_prices):
tts_models = ['tts-1', 'tts-1-hd']
price = tts_prices[tts_models.index(tts_model)]
today = date.today()
tts_price = round(text_length * price / 1000, 2)
self.add_current_costs(tts_price)
if 'tts_characters' not in self.usage['usage_history']:
self.usage['usage_history']['tts_characters'] = {}
if tts_model not in self.usage['usage_history']['tts_characters']:
self.usage['usage_history']['tts_characters'][tts_model] = {}
# update usage_history
if str(today) in self.usage["usage_history"]["tts_characters"][tts_model]:
# add requested text length to existing date
self.usage["usage_history"]["tts_characters"][tts_model][str(today)] += text_length
else:
# create new entry for current date
self.usage["usage_history"]["tts_characters"][tts_model][str(today)] = text_length
# write updated token usage to user file
with open(self.user_file, "w") as outfile:
json.dump(self.usage, outfile)
def get_current_tts_usage(self):
"""Get length of speech generated for today and this month.
:return: total amount of characters converted to speech per day and per month
"""
tts_models = ['tts-1', 'tts-1-hd']
today = date.today()
characters_day = 0
for tts_model in tts_models:
if tts_model in self.usage["usage_history"]["tts_characters"] and \
str(today) in self.usage["usage_history"]["tts_characters"][tts_model]:
characters_day += self.usage["usage_history"]["tts_characters"][tts_model][str(today)]
month = str(today)[:7] # year-month as string
characters_month = 0
for tts_model in tts_models:
if tts_model in self.usage["usage_history"]["tts_characters"]:
for today, characters in self.usage["usage_history"]["tts_characters"][tts_model].items():
if today.startswith(month):
characters_month += characters
return int(characters_day), int(characters_month)
# transcription usage functions:
def add_transcription_seconds(self, seconds, minute_price=0.006):
"""Adds requested transcription seconds to a users usage history and updates current cost.
:param seconds: total seconds used in last request
:param minute_price: price per minute transcription, defaults to 0.006
"""
today = date.today()
transcription_price = round(seconds * minute_price / 60, 2)
self.add_current_costs(transcription_price)
# update usage_history
if str(today) in self.usage["usage_history"]["transcription_seconds"]:
# add requested seconds to existing date
self.usage["usage_history"]["transcription_seconds"][str(today)] += seconds
else:
# create new entry for current date
self.usage["usage_history"]["transcription_seconds"][str(today)] = seconds
# write updated token usage to user file
with open(self.user_file, "w") as outfile:
json.dump(self.usage, outfile)
def add_current_costs(self, request_cost):
"""
Add current cost to all_time, day and month cost and update last_update date.
"""
today = date.today()
last_update = date.fromisoformat(self.usage["current_cost"]["last_update"])
# add to all_time cost, initialize with calculation of total_cost if key doesn't exist
self.usage["current_cost"]["all_time"] = \
self.usage["current_cost"].get("all_time", self.initialize_all_time_cost()) + request_cost
# add current cost, update new day
if today == last_update:
self.usage["current_cost"]["day"] += request_cost
self.usage["current_cost"]["month"] += request_cost
else:
if today.month == last_update.month:
self.usage["current_cost"]["month"] += request_cost
else:
self.usage["current_cost"]["month"] = request_cost
self.usage["current_cost"]["day"] = request_cost
self.usage["current_cost"]["last_update"] = str(today)
def get_current_transcription_duration(self):
"""Get minutes and seconds of audio transcribed for today and this month.
:return: total amount of time transcribed per day and per month (4 values)
"""
today = date.today()
if str(today) in self.usage["usage_history"]["transcription_seconds"]:
seconds_day = self.usage["usage_history"]["transcription_seconds"][str(today)]
else:
seconds_day = 0
month = str(today)[:7] # year-month as string
seconds_month = 0
for today, seconds in self.usage["usage_history"]["transcription_seconds"].items():
if today.startswith(month):
seconds_month += seconds
minutes_day, seconds_day = divmod(seconds_day, 60)
minutes_month, seconds_month = divmod(seconds_month, 60)
return int(minutes_day), round(seconds_day, 2), int(minutes_month), round(seconds_month, 2)
# general functions
def get_current_cost(self):
"""Get total USD amount of all requests of the current day and month
:return: cost of current day and month
"""
today = date.today()
last_update = date.fromisoformat(self.usage["current_cost"]["last_update"])
if today == last_update:
cost_day = self.usage["current_cost"]["day"]
cost_month = self.usage["current_cost"]["month"]
else:
cost_day = 0.0
if today.month == last_update.month:
cost_month = self.usage["current_cost"]["month"]
else:
cost_month = 0.0
# add to all_time cost, initialize with calculation of total_cost if key doesn't exist
cost_all_time = self.usage["current_cost"].get("all_time", self.initialize_all_time_cost())
return {"cost_today": cost_day, "cost_month": cost_month, "cost_all_time": cost_all_time}
def initialize_all_time_cost(self, tokens_price=0.002, image_prices="0.016,0.018,0.02", minute_price=0.006, vision_token_price=0.01, tts_prices='0.015,0.030'):
"""Get total USD amount of all requests in history
:param tokens_price: price per 1000 tokens, defaults to 0.002
:param image_prices: prices for images of sizes ["256x256", "512x512", "1024x1024"],
defaults to [0.016, 0.018, 0.02]
:param minute_price: price per minute transcription, defaults to 0.006
:param vision_token_price: price per 1K vision token interpretation, defaults to 0.01
:param tts_prices: price per 1K characters tts per model ['tts-1', 'tts-1-hd'], defaults to [0.015, 0.030]
:return: total cost of all requests
"""
total_tokens = sum(self.usage['usage_history']['chat_tokens'].values())
token_cost = round(total_tokens * tokens_price / 1000, 6)
total_images = [sum(values) for values in zip(*self.usage['usage_history']['number_images'].values())]
image_prices_list = [float(x) for x in image_prices.split(',')]
image_cost = sum([count * price for count, price in zip(total_images, image_prices_list)])
total_transcription_seconds = sum(self.usage['usage_history']['transcription_seconds'].values())
transcription_cost = round(total_transcription_seconds * minute_price / 60, 2)
total_vision_tokens = sum(self.usage['usage_history']['vision_tokens'].values())
vision_cost = round(total_vision_tokens * vision_token_price / 1000, 2)
total_characters = [sum(tts_model.values()) for tts_model in self.usage['usage_history']['tts_characters'].values()]
tts_prices_list = [float(x) for x in tts_prices.split(',')]
tts_cost = round(sum([count * price / 1000 for count, price in zip(total_characters, tts_prices_list)]), 2)
all_time_cost = token_cost + transcription_cost + image_cost + vision_cost + tts_cost
return all_time_cost