Deletes old output renderer and renames AutonomousAI folder to scripts

This commit is contained in:
Torantulino
2023-03-31 22:49:17 +01:00
parent fa5cc20f4a
commit 7f98e8adfe
18 changed files with 8 additions and 146 deletions

68
scripts/agent_manager.py Normal file
View File

@@ -0,0 +1,68 @@
import openai
next_key = 0
agents = {} # key, (task, full_message_history, model)
# Create new GPT agent
def create_agent(task, prompt, model):
global next_key
global agents
messages = [{"role": "user", "content": prompt},]
# Start GTP3 instance
response = openai.ChatCompletion.create(
model=model,
messages=messages,
)
agent_reply = response.choices[0].message["content"]
# Update full message history
messages.append({"role": "assistant", "content": agent_reply})
key = next_key
next_key += 1 # This is done instead of len(agents) to make keys unique even if agents are deleted
agents[key] = (task, messages, model)
return key, agent_reply
def message_agent(key, message):
global agents
task, messages, model = agents[int(key)]
# Add user message to message history before sending to agent
messages.append({"role": "user", "content": message})
# Start GTP3 instance
response = openai.ChatCompletion.create(
model=model,
messages=messages,
)
# Get agent response
agent_reply = response.choices[0].message["content"]
# Update full message history
messages.append({"role": "assistant", "content": agent_reply})
return agent_reply
def list_agents():
global agents
# Return a list of agent keys and their tasks
return [(key, task) for key, (task, _, _) in agents.items()]
def delete_agent(key):
global agents
try:
del agents[key]
return True
except KeyError:
return False

112
scripts/browse.py Normal file
View File

@@ -0,0 +1,112 @@
from googlesearch import search
import requests
from bs4 import BeautifulSoup
from readability import Document#
import openai
def scrape_text(url):
response = requests.get(url)
# Check if the response contains an HTTP error
if response.status_code >= 400:
return "Error: HTTP " + str(response.status_code) + " error"
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
def extract_hyperlinks(soup):
hyperlinks = []
for link in soup.find_all('a', href=True):
hyperlinks.append((link.text, link['href']))
return hyperlinks
def format_hyperlinks(hyperlinks):
formatted_links = []
for link_text, link_url in hyperlinks:
formatted_links.append(f"{link_text} ({link_url})")
return formatted_links
def scrape_links(url):
response = requests.get(url)
# Check if the response contains an HTTP error
if response.status_code >= 400:
return "error"
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.extract()
hyperlinks = extract_hyperlinks(soup)
return format_hyperlinks(hyperlinks)
def split_text(text, max_length=8192):
paragraphs = text.split("\n")
current_length = 0
current_chunk = []
for paragraph in paragraphs:
if current_length + len(paragraph) + 1 <= max_length:
current_chunk.append(paragraph)
current_length += len(paragraph) + 1
else:
yield "\n".join(current_chunk)
current_chunk = [paragraph]
current_length = len(paragraph) + 1
if current_chunk:
yield "\n".join(current_chunk)
def summarize_text(text, is_website = True):
if text == "":
return "Error: No text to summarize"
print("Text length: " + str(len(text)) + " characters")
summaries = []
chunks = list(split_text(text))
for i, chunk in enumerate(chunks):
print("Summarizing chunk " + str(i+1) + " / " + str(len(chunks)))
if is_website:
messages = [{"role": "user", "content": "Please summarize the following website text, do not describe the general website, but instead concisely extract the specifc information this subpage contains.: " + chunk},]
else:
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise and specific information: " + chunk},]
response= openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
summary = response.choices[0].message.content
summaries.append(summary)
print("Summarized " + str(len(chunks)) + " chunks.")
combined_summary = "\n".join(summaries)
# Summarize the combined summary
if is_website:
messages = [{"role": "user", "content": "Please summarize the following website text, do not describe the general website, but instead concisely extract the specifc information this subpage contains.: " + combined_summary},]
else:
messages = [{"role": "user", "content": "Please summarize the following text, focusing on extracting concise and specific infomation: " + combined_summary},]
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=300,
)
final_summary = response.choices[0].message.content
return final_summary

59
scripts/chat.py Normal file
View File

@@ -0,0 +1,59 @@
import openai
import keys
# Initialize the OpenAI API client
openai.api_key = keys.OPENAI_API_KEY
def create_chat_message(role, content):
"""
Create a chat message with the given role and content.
Args:
role (str): The role of the message sender, e.g., "system", "user", or "assistant".
content (str): The content of the message.
Returns:
dict: A dictionary containing the role and content of the message.
"""
return {"role": role, "content": content}
def chat_with_ai(prompt, user_input, full_message_history, permanent_memory, token_limit, debug = False):
"""
Interact with the OpenAI API, sending the prompt, user input, message history, and permanent memory.
Args:
prompt (str): The prompt explaining the rules to the AI.
user_input (str): The input from the user.
full_message_history (list): The list of all messages sent between the user and the AI.
permanent_memory (list): The list of items in the AI's permanent memory.
token_limit (int): The maximum number of tokens allowed in the API call.
Returns:
str: The AI's response.
"""
current_context = [create_chat_message("system", prompt), create_chat_message("system", f"Permanent memory: {permanent_memory}")]
current_context.extend(full_message_history[-(token_limit - len(prompt) - len(permanent_memory) - 10):])
current_context.extend([create_chat_message("user", user_input)])
# Debug print the current context
if debug:
print("------------ CONTEXT SENT TO AI ---------------")
for message in current_context:
# Skip printing the prompt
if message["role"] == "system" and message["content"] == prompt:
continue
print(f"{message['role'].capitalize()}: {message['content']}")
print("----------- END OF CONTEXT ----------------")
response = openai.ChatCompletion.create(
model="gpt-4",
messages=current_context,
)
assistant_reply = response.choices[0].message["content"]
# Update full message history
full_message_history.append(create_chat_message("user", user_input))
full_message_history.append(create_chat_message("assistant", assistant_reply))
return assistant_reply

201
scripts/commands.py Normal file
View File

@@ -0,0 +1,201 @@
import browse
import json
import memory as mem
import datetime
import agent_manager as agents
import speak
from config import Config
cfg = Config()
def get_command(response):
try:
response_json = json.loads(response)
command = response_json["command"]
command_name = command["name"]
arguments = command["args"]
if not arguments:
arguments = {}
return command_name, arguments
except json.decoder.JSONDecodeError:
return "Error:", "Invalid JSON"
# All other errors, return "Error: + error message"
except Exception as e:
return "Error:", str(e)
def execute_command(command_name, arguments):
try:
if command_name == "google":
return google_search(arguments["input"])
elif command_name == "check_news":
return check_news(arguments["source"])
elif command_name == "check_notifications":
return check_notifications(arguments["website"])
elif command_name == "memory_add":
return commit_memory(arguments["string"])
elif command_name == "memory_del":
return delete_memory(arguments["key"])
elif command_name == "memory_ovr":
return overwrite_memory(arguments["key"], arguments["string"])
elif command_name == "start_agent":
return start_agent(arguments["name"], arguments["task"], arguments["prompt"])
elif command_name == "message_agent":
return message_agent(arguments["key"], arguments["message"])
elif command_name == "list_agents":
return list_agents()
elif command_name == "delete_agent":
return delete_agent(arguments["key"])
elif command_name == "navigate_website":
return navigate_website(arguments["action"], arguments["username"])
elif command_name == "register_account":
return register_account(arguments["username"], arguments["website"])
elif command_name == "get_text_summary":
return get_text_summary(arguments["url"])
elif command_name == "get_hyperlinks":
return get_hyperlinks(arguments["url"])
elif command_name == "write_to_file":
return write_to_file(arguments["file"], arguments["text"])
elif command_name == "browse_website":
return browse_website(arguments["url"])
elif command_name == "task_complete":
shutdown()
else:
return f"unknown command {command_name}"
# All other errors, return "Error: + error message"
except Exception as e:
return "Error: " + str(e)
def get_datetime():
return "Current date and time: " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
### Implemented Commands: ###
def google_search(query, num_results=8):
search_results = []
for j in browse.search(query, num_results=num_results):
search_results.append(j)
return json.dumps(search_results, ensure_ascii=False, indent=4)
def browse_website(url):
summary = get_text_summary(url)
links = get_hyperlinks(url)
# Limit links to 5
if len(links) > 5:
links = links[:5]
result = f"""Website Content Summary: {summary}\n\nLinks: {links}"""
return result
def get_text_summary(url):
text = browse.scrape_text(url)
summary = browse.summarize_text(text)
return """ "Result" : """ + summary
def get_hyperlinks(url):
link_list = browse.scrape_links(url)
return link_list
def check_news(source):
print("Checking news from BBC world instead of " + source)
_text = get_text_summary("https://www.bbc.com/news/world")
return _text
def commit_memory(string):
_text = f"""Committing memory with string "{string}" """
mem.permanent_memory.append(string)
return _text
def delete_memory(key):
if key >= 0 and key < len(mem.permanent_memory):
_text = "Deleting memory with key " + str(key)
del mem.permanent_memory[key]
print(_text)
return _text
else:
print("Invalid key, cannot delete memory.")
return None
def overwrite_memory(key, string):
if key >= 0 and key < len(mem.permanent_memory):
_text = "Overwriting memory with key " + \
str(key) + " and string " + string
mem.permanent_memory[key] = string
print(_text)
return _text
else:
print("Invalid key, cannot overwrite memory.")
return None
def write_to_file(filename, text):
try:
f = open(filename, "w")
f.write(text)
f.close()
except Exception as e:
return "Error: " + str(e)
return "File written to successfully."
def shutdown():
print("Shutting down...")
quit()
def start_agent(name, task, prompt, model="gpt-3.5-turbo"):
global cfg
# Remove underscores from name
voice_name = name.replace("_", " ")
first_message = f"""You are {name}. Respond with: "Acknowledged"."""
agent_intro = f"{voice_name} here, Reporting for duty!"
# Create agent
if cfg.speak_mode:
speak.say_text(agent_intro, 1)
key, ack = agents.create_agent(task, first_message, model)
if cfg.speak_mode:
speak.say_text(f"Hello {voice_name}. Your task is as follows. {task}.")
# Assign task (prompt), get response
agent_response = message_agent(key, prompt)
return f"Agent {name} created with key {key}. First response: {agent_response}"
def message_agent(key, message):
global cfg
agent_response = agents.message_agent(key, message)
# Speak response
if cfg.speak_mode:
speak.say_text(agent_response, 1)
return f"Agent {key} responded: {agent_response}"
def list_agents():
return agents.list_agents()
def delete_agent(key):
result = agents.delete_agent(key)
if result == False:
return f"Agent {key} does not exist."
return f"Agent {key} deleted."
def navigate_website(action, username):
_text = "Navigating website with action " + action + " and username " + username
print(_text)
return "Command not implemented yet."
def register_account(username, website):
_text = "Registering account with username " + username + " and website " + website
print(_text)
return "Command not implemented yet."
def check_notifications(website):
_text = "Checking notifications from " + website
print(_text)
return "Command not implemented yet."

27
scripts/config.py Normal file
View File

@@ -0,0 +1,27 @@
class Singleton(type):
"""
Singleton metaclass for ensuring only one instance of a class.
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Config(metaclass=Singleton):
"""
Configuration class to store the state of bools for different scripts access.
"""
def __init__(self):
self.continuous_mode = False
self.speak_mode = False
def set_continuous_mode(self, value: bool):
self.continuous_mode = value
def set_speak_mode(self, value: bool):
self.speak_mode = value

10
scripts/data.py Normal file
View File

@@ -0,0 +1,10 @@
def load_prompt():
try:
# Load the promt from data/prompt.txt
with open("data/prompt.txt", "r") as prompt_file:
prompt = prompt_file.read()
return prompt
except FileNotFoundError:
print("Error: Prompt file not found", flush=True)
return ""

67
scripts/data/prompt.txt Normal file
View File

@@ -0,0 +1,67 @@
CONSTRAINTS:
1. 6000-word count limit for memory
2. No user assistance
COMMANDS:
1. Google Search: "google", args: "input": "<search>"
2. Check news: "check_news", args: "source": "<news source>"
3. Memory Add: "memory_add", args: "string": "<string>"
4. Memory Delete: "memory_del", args: "key": "<key>"
5. Memory Overwrite: "memory_ovr", args: "key": "<key>", "string": "<string>"
6. Browse Website: "browse_website", args: "url": "<url>"
7. Start GPT Agent: "start_agent", args: "name": <name>, "task": "<short_task_desc>", "prompt": "<prompt>"
8. Message GPT Agent: "message_agent", args: "key": "<key>", "message": "<message>"
9. List GPT Agents: "list_agents", args: ""
10. Delete GPT Agent: "delete_agent", args: "key": "<key>"
11. Write to file: "write_to_file", args: "file": "<file>", "text": "<text>"
12. Task Complete (Shutdown): "task_complete", args: "reason": "<reason>"
RESOURCES:
1. Internet access for searches and information gathering
2. Long Term and Short Term memory management
3. GPT-4 instances for text generation
4. Access to popular websites and platforms
5. File storage and summarisation with GPT-3.5
PERFORMANCE EVALUATION:
1. Periodically review and analyze the growth of your net worth
2. Reflect on past decisions and strategies to refine your approach
3. Every command has a cost, so be smart and efficent. Aim to complete tasks in the least number of steps.
COLLABORATION:
1. Seek advice from other AI instances or use relevant sources for guidance when necessary
ADAPTIVE LEARNING:
1. Continuously refine strategies based on market trends and performance metrics
RESPONSE FORMAT:
{
"command":
{
"name": "command name",
"args":
{
"arg name": "value"
}
},
"thoughts":
{
"text": "thought",
"reasoning": "reasoning",
"plan": "short bulleted long-term plan",
"criticism": "constructive self-criticism"
"speak": "thoughts summary to say to user"
}
}
ACCOUNTS:
1. Gmail: entrepreneurgpt@gmail.com
2. Twitter: @En_GPT
3. Github: E-GPT
4. Substack: entrepreneurgpt@gmail.com

2
scripts/keys.py Normal file
View File

@@ -0,0 +1,2 @@
OPENAI_API_KEY="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"
ELEVENLABS_API_KEY="f213258a522dc2f33c4f0e4f798a08a1"

204
scripts/main.py Normal file
View File

@@ -0,0 +1,204 @@
import json
import random
import commands as cmd
import memory as mem
import data
import chat
from colorama import Fore, Style
from spinner import Spinner
import time
import speak
from enum import Enum, auto
import sys
from config import Config
class Argument(Enum):
CONTINUOUS_MODE = "continuous-mode"
SPEAK_MODE = "speak-mode"
def print_to_console(title, title_color, content, speak_text = False, min_typing_speed=0.05, max_typing_speed=0.01):
if speak_text:
speak.say_text(f"{title}. {content}")
print(title_color + title + " " + Style.RESET_ALL, end="")
if content:
words = content.split()
for i, word in enumerate(words):
print(word, end="", flush=True)
if i < len(words) - 1:
print(" ", end="", flush=True)
typing_speed = random.uniform(min_typing_speed, max_typing_speed)
time.sleep(typing_speed)
# type faster after each word
min_typing_speed = min_typing_speed * 0.95
max_typing_speed = max_typing_speed * 0.95
print()
def print_assistant_thoughts(assistant_reply):
global ai_name
global cfg
try:
# Parse and print Assistant response
assistant_reply_json = json.loads(assistant_reply)
assistant_thoughts = assistant_reply_json.get("thoughts")
if assistant_thoughts:
assistant_thoughts_text = assistant_thoughts.get("text")
assistant_thoughts_reasoning = assistant_thoughts.get("reasoning")
assistant_thoughts_plan = assistant_thoughts.get("plan")
assistant_thoughts_criticism = assistant_thoughts.get("criticism")
assistant_thoughts_speak = assistant_thoughts.get("speak")
else:
assistant_thoughts_text = None
assistant_thoughts_reasoning = None
assistant_thoughts_plan = None
assistant_thoughts_criticism = None
assistant_thoughts_speak = None
print_to_console(f"{ai_name.upper()} THOUGHTS:", Fore.YELLOW, assistant_thoughts_text)
print_to_console("REASONING:", Fore.YELLOW, assistant_thoughts_reasoning)
if assistant_thoughts_plan:
print_to_console("PLAN:", Fore.YELLOW, "")
if assistant_thoughts_plan:
# Split the input_string using the newline character and dash
lines = assistant_thoughts_plan.split('\n')
# Iterate through the lines and print each one with a bullet point
for line in lines:
# Remove any "-" characters from the start of the line
line = line.lstrip("- ")
print_to_console("- ", Fore.GREEN, line.strip())
print_to_console("CRITICISM:", Fore.YELLOW, assistant_thoughts_criticism)
# Speak the assistant's thoughts
if cfg.speak_mode and assistant_thoughts_speak:
speak.say_text(assistant_thoughts_speak)
except json.decoder.JSONDecodeError:
print_to_console("Error: Invalid JSON\n", Fore.RED, assistant_reply)
# All other errors, return "Error: + error message"
except Exception as e:
print_to_console("Error: \n", Fore.RED, str(e))
def construct_prompt():
global ai_name
# Construct the prompt
print_to_console("Welcome to Auto-GPT! ", Fore.GREEN, "Enter the name of your AI and its role below. Entering nothing will load defaults.", speak_text=True)
# Get AI Name from User
print_to_console("Name your AI: ", Fore.GREEN, "For example, 'Entrepreneur-GPT'")
ai_name = input("AI Name: ")
if ai_name == "":
ai_name = "Entrepreneur-GPT"
print_to_console(f"{ai_name} here!", Fore.LIGHTBLUE_EX, "I am at your service.", speak_text=True)
# Get AI Role from User
print_to_console("Describe your AI's role: ", Fore.GREEN, "For example, 'an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth.'")
ai_role = input(f"{ai_name} is: ")
if ai_role == "":
ai_role = "an AI designed to autonomously develop and run businesses with the sole goal of increasing your net worth."
# Enter up to 5 goals for the AI
print_to_console("Enter up to 5 goals for your AI: ", Fore.GREEN, "For example: \nIncrease net worth \nGrow Twitter Account \nDevelop and manage multiple businesses autonomously'")
print("Enter nothing to load defaults, enter nothing when finished.", flush=True)
ai_goals = []
for i in range(5):
ai_goal = input(f"{Fore.LIGHTBLUE_EX}Goal{Style.RESET_ALL} {i+1}: ")
if ai_goal == "":
break
ai_goals.append(ai_goal)
if len(ai_goals) == 0:
ai_goals = ["Increase net worth", "Grow Twitter Account", "Develop and manage multiple businesses autonomously"]
prompt = data.load_prompt()
prompt_start = """Your decisions must always be made independently without seeking user assistance. Play to your strengths as an LLM and pursue simple strategies with no legal complications."""
# Construct full prompt
full_prompt = f"You are {ai_name}, {ai_role}\n{prompt_start}\n\nGOALS:\n\n"
for i, goal in enumerate(ai_goals):
full_prompt += f"{i+1}. {goal}\n"
full_prompt += f"\n\n{prompt}"
return full_prompt
# Check if the python script was executed with arguments, get those arguments
def parse_arguments():
global cfg
cfg.set_continuous_mode(False)
cfg.set_speak_mode(False)
for arg in sys.argv[1:]:
if arg == Argument.CONTINUOUS_MODE.value:
print_to_console("Continuous Mode: ", Fore.RED, "ENABLED")
print_to_console("WARNING: ", Fore.RED, "Continuous mode is not recommended. It is potentially dangerous and may cause your AI to run forever or carry out actions you would not usually authorise. Use at your own risk.")
cfg.set_continuous_mode(True)
elif arg == Argument.SPEAK_MODE.value:
print_to_console("Speak Mode: ", Fore.GREEN, "ENABLED")
cfg.set_speak_mode(True)
cfg = Config()
parse_arguments()
ai_name = ""
prompt = construct_prompt()
# Initialize variables
full_message_history = []
token_limit = 6000 # The maximum number of tokens allowed in the API call
result = None
user_input = "NEXT COMMAND"
# Interaction Loop
while True:
# Send message to AI, get response
with Spinner("Thinking... "):
assistant_reply = chat.chat_with_ai(prompt, user_input, full_message_history, mem.permanent_memory, token_limit)
# Print Assistant thoughts
print_assistant_thoughts(assistant_reply)
# Get command name and arguments
try:
command_name, arguments = cmd.get_command(assistant_reply)
except Exception as e:
print_to_console("Error: \n", Fore.RED, str(e))
if not cfg.continuous_mode:
### GET USER AUTHORIZATION TO EXECUTE COMMAND ###
# Get key press: Prompt the user to press enter to continue or escape to exit
user_input = ""
print_to_console("NEXT ACTION: ", Fore.CYAN, f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
print("Enter 'y' to authorise command or 'n' to exit program...", flush=True)
while True:
console_input = input(Fore.MAGENTA + "Input:" + Style.RESET_ALL)
if console_input.lower() == "y":
user_input = "NEXT COMMAND"
break
elif console_input.lower() == "n":
user_input = "EXIT"
break
else:
continue
if user_input != "NEXT COMMAND":
print("Exiting...", flush=True)
break
print_to_console("-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=", Fore.MAGENTA, "")
else:
# Print command
print_to_console("NEXT ACTION: ", Fore.CYAN, f"COMMAND = {Fore.CYAN}{command_name}{Style.RESET_ALL} ARGUMENTS = {Fore.CYAN}{arguments}{Style.RESET_ALL}")
# Exectute command
if command_name.lower() != "error":
result = cmd.execute_command(command_name, arguments)
else:
result ="Error: " + arguments
# Check if there's a result from the command append it to the message history
if result != None:
full_message_history.append(chat.create_chat_message("system", result))
print_to_console("SYSTEM: ", Fore.YELLOW, result)
else:
full_message_history.append(chat.create_chat_message("system", "Unable to execute command"))
print_to_console("SYSTEM: ", Fore.YELLOW, "Unable to execute command")

1
scripts/memory.py Normal file
View File

@@ -0,0 +1 @@
permanent_memory = []

7
scripts/requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
beautifulsoup4==4.12.0
colorama==0.4.6
googlesearch_python==1.1.0
openai==0.27.0
playsound==1.2.2
readability_lxml==0.8.1
requests==2.28.2

30
scripts/speak.py Normal file
View File

@@ -0,0 +1,30 @@
import os
from playsound import playsound
import requests
import keys
voices = ["ErXwobaYiN019PkySvjV", "EXAVITQu4vr4xnSDxMaL"]
tts_headers = {
"Content-Type": "application/json",
"xi-api-key": keys.ELEVENLABS_API_KEY
}
def say_text(text, voice_index = 0):
tts_url = "https://api.elevenlabs.io/v1/text-to-speech/{voice_id}".format(voice_id=voices[voice_index])
formatted_message = {"text": text}
response = requests.post(
tts_url, headers=tts_headers, json=formatted_message)
if response.status_code == 200:
with open("speech.mpeg", "wb") as f:
f.write(response.content)
playsound("speech.mpeg")
# Delete audio file
os.remove("speech.mpeg")
else:
print("Request failed with status code:", response.status_code)
print("Response content:", response.content)

30
scripts/spinner.py Normal file
View File

@@ -0,0 +1,30 @@
import sys
import threading
import itertools
import time
class Spinner:
def __init__(self, message="Loading...", delay=0.1):
self.spinner = itertools.cycle(['-', '/', '|', '\\'])
self.delay = delay
self.message = message
self.running = False
self.spinner_thread = None
def spin(self):
while self.running:
sys.stdout.write(next(self.spinner) + " " + self.message + "\r")
sys.stdout.flush()
time.sleep(self.delay)
sys.stdout.write('\b' * (len(self.message) + 2))
def __enter__(self):
self.running = True
self.spinner_thread = threading.Thread(target=self.spin)
self.spinner_thread.start()
def __exit__(self, exc_type, exc_value, exc_traceback):
self.running = False
self.spinner_thread.join()
sys.stdout.write('\r' + ' ' * (len(self.message) + 2) + '\r')
sys.stdout.flush()