mirror of
https://github.com/aljazceru/chatgpt-telegram-bot.git
synced 2025-12-20 06:05:12 +01:00
added youtube audio extractor, dice, and direct response for images
This commit is contained in:
69
bot/utils.py
69
bot/utils.py
@@ -2,7 +2,9 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
import telegram
|
||||
from telegram import Message, MessageEntity, Update, ChatMember, constants
|
||||
@@ -173,6 +175,7 @@ async def is_allowed(config, update: Update, context: CallbackContext, is_inline
|
||||
f'(id: {user_id}) are not allowed')
|
||||
return False
|
||||
|
||||
|
||||
def is_admin(config, user_id: int, log_no_admin=False) -> bool:
|
||||
"""
|
||||
Checks if the user is the admin of the bot.
|
||||
@@ -284,6 +287,9 @@ def add_chat_request_to_usage_tracker(usage, config, user_id, used_tokens):
|
||||
:param used_tokens: The number of tokens used
|
||||
"""
|
||||
try:
|
||||
if int(used_tokens) == 0:
|
||||
logging.warning('No tokens used. Not adding chat request to usage tracker.')
|
||||
return
|
||||
# add chat request to users usage tracker
|
||||
usage[user_id].add_chat_tokens(used_tokens, config['token_price'])
|
||||
# add guest chat request to guest usage tracker
|
||||
@@ -305,3 +311,66 @@ def get_reply_to_message_id(config, update: Update):
|
||||
if config['enable_quoting'] or is_group_chat(update):
|
||||
return update.message.message_id
|
||||
return None
|
||||
|
||||
|
||||
def is_direct_result(response: any) -> bool:
|
||||
"""
|
||||
Checks if the dict contains a direct result that can be sent directly to the user
|
||||
:param response: The response value
|
||||
:return: Boolean indicating if the result is a direct result
|
||||
"""
|
||||
if type(response) is not dict:
|
||||
try:
|
||||
json_response = json.loads(response)
|
||||
return json_response.get('direct_result', False)
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
return response.get('direct_result', False)
|
||||
|
||||
|
||||
async def handle_direct_result(config, update: Update, response: any):
|
||||
"""
|
||||
Handles a direct result from a plugin
|
||||
"""
|
||||
if type(response) is not dict:
|
||||
response = json.loads(response)
|
||||
|
||||
result = response['direct_result']
|
||||
kind = result['kind']
|
||||
format = result['format']
|
||||
value = result['value']
|
||||
|
||||
common_args = {
|
||||
'message_thread_id': get_thread_id(update),
|
||||
'reply_to_message_id': get_reply_to_message_id(config, update),
|
||||
}
|
||||
|
||||
if kind == 'photo':
|
||||
if format == 'url':
|
||||
await update.effective_message.reply_photo(**common_args, photo=value)
|
||||
elif kind == 'gif':
|
||||
if format == 'url':
|
||||
await update.effective_message.reply_document(**common_args, document=value)
|
||||
elif kind == 'file':
|
||||
if format == 'path':
|
||||
await update.effective_message.reply_document(**common_args, document=open(value, 'rb'))
|
||||
cleanup_intermediate_files(response)
|
||||
elif kind == 'dice':
|
||||
await update.effective_message.reply_dice(**common_args, emoji=value)
|
||||
|
||||
|
||||
def cleanup_intermediate_files(response: any):
|
||||
"""
|
||||
Deletes intermediate files created by plugins
|
||||
"""
|
||||
if type(response) is not dict:
|
||||
response = json.loads(response)
|
||||
|
||||
result = response['direct_result']
|
||||
kind = result['kind']
|
||||
format = result['format']
|
||||
value = result['value']
|
||||
|
||||
if kind == 'file' and format == 'path':
|
||||
os.remove(value)
|
||||
Reference in New Issue
Block a user