Revert "♻ refactor: rename package"

This reverts commit 55796828dd.
This commit is contained in:
Florian Hönicke
2023-04-24 23:20:09 +02:00
parent 9ee674817f
commit 0339b24353
33 changed files with 1940 additions and 7 deletions

0
src/apis/__init__.py Normal file
View File

149
src/apis/gpt.py Normal file
View File

@@ -0,0 +1,149 @@
import os
from time import sleep
from typing import List, Any
import openai
from langchain import PromptTemplate
from langchain.callbacks import CallbackManager
from langchain.chat_models import ChatOpenAI
from openai.error import RateLimitError
from langchain.schema import HumanMessage, SystemMessage, BaseMessage
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from requests.exceptions import ConnectionError
from urllib3.exceptions import InvalidChunkLength
from src.constants import PRICING_GPT4_PROMPT, PRICING_GPT4_GENERATION, PRICING_GPT3_5_TURBO_PROMPT, \
PRICING_GPT3_5_TURBO_GENERATION, CHARS_PER_TOKEN
from src.options.generate.templates_system import template_system_message_base, executor_example, docarray_example, \
client_example, gpt_example
from src.utils.string_tools import print_colored
def configure_openai_api_key():
if 'OPENAI_API_KEY' not in os.environ:
print_colored('You need to set OPENAI_API_KEY in your environment.', '''
Run:
gptdeploy configure --key <your_openai_api_key>
If you have updated it already, please restart your terminal.
''', 'red')
exit(1)
openai.api_key = os.environ['OPENAI_API_KEY']
class GPTSession:
def __init__(self, task_description, test_description, model: str = 'gpt-4', ):
self.task_description = task_description
self.test_description = test_description
if model == 'gpt-4' and self.is_gpt4_available():
self.pricing_prompt = PRICING_GPT4_PROMPT
self.pricing_generation = PRICING_GPT4_GENERATION
else:
if model == 'gpt-4':
print_colored('GPT version info', 'GPT-4 is not available. Using GPT-3.5-turbo instead.', 'yellow')
model = 'gpt-3.5-turbo'
self.pricing_prompt = PRICING_GPT3_5_TURBO_PROMPT
self.pricing_generation = PRICING_GPT3_5_TURBO_GENERATION
self.model_name = model
self.chars_prompt_so_far = 0
self.chars_generation_so_far = 0
def get_conversation(self, system_definition_examples: List[str] = ['gpt', 'executor', 'docarray', 'client']):
return _GPTConversation(
self.model_name, self.cost_callback, self.task_description, self.test_description, system_definition_examples
)
@staticmethod
def is_gpt4_available():
try:
for i in range(5):
try:
openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": 'system',
"content": 'you respond nothing'
}]
)
break
except RateLimitError:
sleep(1)
continue
return True
except openai.error.InvalidRequestError:
return False
def cost_callback(self, chars_prompt, chars_generation):
self.chars_prompt_so_far += chars_prompt
self.chars_generation_so_far += chars_generation
print('\n')
money_prompt = self._calculate_money_spent(self.chars_prompt_so_far, self.pricing_prompt)
money_generation = self._calculate_money_spent(self.chars_generation_so_far, self.pricing_generation)
print('Total money spent so far on openai.com:', f'${money_prompt + money_generation:.3f}')
print('\n')
@staticmethod
def _calculate_money_spent(num_chars, price):
return round(num_chars / CHARS_PER_TOKEN * price / 1000, 3)
class AssistantStreamingStdOutCallbackHandler(StreamingStdOutCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
print_colored('', token, 'green', end='')
class _GPTConversation:
def __init__(self, model: str, cost_callback, task_description, test_description, system_definition_examples: List[str] = ['executor', 'docarray', 'client']):
self._chat = ChatOpenAI(
model_name=model,
streaming=True,
callback_manager=CallbackManager([AssistantStreamingStdOutCallbackHandler()]),
verbose=True,
temperature=0,
)
self.cost_callback = cost_callback
self.messages: List[BaseMessage] = []
self.system_message = self._create_system_message(task_description, test_description, system_definition_examples)
if os.environ['VERBOSE'].lower() == 'true':
print_colored('system', self.system_message.content, 'magenta')
def chat(self, prompt: str):
chat_message = HumanMessage(content=prompt)
self.messages.append(chat_message)
if os.environ['VERBOSE'].lower() == 'true':
print_colored('user', prompt, 'blue')
print_colored('assistant', '', 'green', end='')
for i in range(10):
try:
response = self._chat([self.system_message] + self.messages)
break
except (ConnectionError, InvalidChunkLength) as e:
print('There was a connection error. Retrying...')
if i == 9:
raise e
sleep(10)
if os.environ['VERBOSE'].lower() == 'true':
print()
self.cost_callback(sum([len(m.content) for m in self.messages]), len(response.content))
self.messages.append(response)
return response.content
@staticmethod
def _create_system_message(task_description, test_description, system_definition_examples: List[str] = []) -> SystemMessage:
system_message = PromptTemplate.from_template(template_system_message_base).format(
task_description=task_description,
test_description=test_description,
)
if 'gpt' in system_definition_examples:
system_message += f'\n{gpt_example}'
if 'executor' in system_definition_examples:
system_message += f'\n{executor_example}'
if 'docarray' in system_definition_examples:
system_message += f'\n{docarray_example}'
if 'client' in system_definition_examples:
system_message += f'\n{client_example}'
return SystemMessage(content=system_message)

323
src/apis/jina_cloud.py Normal file
View File

@@ -0,0 +1,323 @@
import hashlib
import json
import os
import re
import subprocess
import threading
import time
import webbrowser
from pathlib import Path
from typing import Dict
import click
import hubble
import requests
from hubble.executor.helper import upload_file, archive_package, get_full_version
from jcloud.flow import CloudFlow
from jina import Flow
from src.constants import DEMO_TOKEN
from src.utils.io import suppress_stdout, is_docker_running
from src.utils.string_tools import print_colored
def wait_until_app_is_ready(url):
is_app_ready = False
while not is_app_ready:
try:
response = requests.get(url)
print('waiting for app to be ready...')
if response.status_code == 200:
is_app_ready = True
except requests.exceptions.RequestException:
pass
time.sleep(0.5)
def open_streamlit_app(host: str):
url = f"{host}/playground"
wait_until_app_is_ready(url)
webbrowser.open(url, new=2)
def redirect_callback(href):
print(
f'You need login to Jina first to use GPTDeploy\n'
f'Please open this link if it does not open automatically in your browser: {href}'
)
webbrowser.open(href, new=0, autoraise=True)
def jina_auth_login():
try:
hubble.Client(jsonify=True).get_user_info(log_error=False)
except hubble.AuthenticationRequiredError:
print('You need login to Jina first to use GPTDeploy')
print_colored('', '''
If you just created an account, it can happen that the login callback is not working.
In this case, please cancel this run, rerun your gptdeploy command and login into your account again.
''', 'green'
)
hubble.login(prompt='login', redirect_callback=redirect_callback)
def push_executor(dir_path):
for i in range(3):
try:
return _push_executor(dir_path)
except Exception as e:
if i == 2:
raise e
print(f'connection error - retrying in 5 seconds...')
time.sleep(5)
def get_request_header() -> Dict:
"""Return the header of request with an authorization token.
:return: request header
"""
metas, envs = get_full_version()
headers = {
**{f'jinameta-{k}': str(v) for k, v in metas.items()},
**envs,
}
headers['Authorization'] = f'token {DEMO_TOKEN}'
return headers
def _push_executor(dir_path):
dir_path = Path(dir_path)
md5_hash = hashlib.md5()
bytesio = archive_package(dir_path)
content = bytesio.getvalue()
md5_hash.update(content)
md5_digest = md5_hash.hexdigest()
form_data = {
'public': 'True',
'private': 'False',
'verbose': 'True',
'buildEnv': f'{{"OPENAI_API_KEY": "{os.environ["OPENAI_API_KEY"]}"}}',
'md5sum': md5_digest,
}
with suppress_stdout():
headers = get_request_header()
resp = upload_file(
'https://api.hubble.jina.ai/v2/rpc/executor.push',
'filename',
content,
dict_data=form_data,
headers=headers,
stream=False,
method='post',
)
json_lines_str = resp.content.decode('utf-8')
if 'AuthenticationRequiredWithBearerChallengeError' in json_lines_str:
raise Exception('The executor is not authorized to be pushed to Jina Cloud.')
if 'exited on non-zero code' not in json_lines_str:
return ''
responses = []
for json_line in json_lines_str.splitlines():
if 'exit code:' in json_line:
break
d = json.loads(json_line)
if 'payload' in d and type(d['payload']) == str:
responses.append(d['payload'])
elif type(d) == str:
responses.append(d)
return '\n'.join(responses)
def get_user_name(token=None):
client = hubble.Client(max_retries=None, jsonify=True, token=token)
response = client.get_user_info()
return response['data']['name']
def _deploy_on_jcloud(flow_yaml):
cloud_flow = CloudFlow(path=flow_yaml)
return cloud_flow.__enter__().endpoints['gateway']
def deploy_on_jcloud(executor_name, microservice_path):
print('Deploy a jina flow')
full_flow_path = create_flow_yaml(microservice_path, executor_name, use_docker=True, use_custom_gateway=True)
for i in range(3):
try:
host = _deploy_on_jcloud(flow_yaml=full_flow_path)
break
except Exception as e:
print(f'Could not deploy on Jina Cloud. Trying again in 5 seconds. Error: {e}')
time.sleep(5)
except SystemExit as e:
raise SystemExit(f'''
Looks like your free credits ran out.
Please add payment information to your account and try again.
Visit https://cloud.jina.ai/
''') from e
if i == 2:
raise Exception('''
Could not deploy on Jina Cloud.
This can happen when the microservice is buggy, if it requires too much memory or if the Jina Cloud is overloaded.
Please try again later.
'''
)
print(f'''
Your Microservice is deployed at {host} and the playground is available at {host}/playground
We open now the playground in your browser.
''')
open_streamlit_app(host)
return host
def run_streamlit_app(app_path):
subprocess.run(['streamlit', 'run', app_path, 'server.address', '0.0.0.0', '--server.port', '8081'])
def run_locally(executor_name, microservice_version_path):
if is_docker_running():
use_docker = True
else:
click.echo('''
Docker daemon doesn\'t seem to be running (possible reasons: incorrect docker installation, docker command isn\'t in system path, insufficient permissions, docker is running but unrespnsive).
It might be important to run your microservice within a docker container.
Your machine might not have all the dependencies installed.
You have 3 options:
a) start the docker daemon
b) run gptdeploy deploy... to deploy your microservice on Jina Cloud. All dependencies will be installed there.
c) try to run your microservice locally without docker. It is worth a try but might fail.
'''
)
user_input = click.prompt('Do you want to run your microservice locally without docker? (Y/n)', type=str, default='y')
if user_input.lower() != 'y':
exit(1)
use_docker = False
print('Run a jina flow locally')
full_flow_path = create_flow_yaml(microservice_version_path, executor_name, use_docker, False)
flow = Flow.load_config(full_flow_path)
with flow:
print(f'''
Your microservice started locally.
We now start the playground for you.
''')
app_path = os.path.join(microservice_version_path, 'gateway', "app.py")
# Run the Streamlit app in a separate thread
streamlit_thread = threading.Thread(target=run_streamlit_app, args=(app_path,))
streamlit_thread.start()
# Open the Streamlit app in the user's default web browser
open_streamlit_app(host='http://localhost:8081')
flow.block()
def create_flow_yaml(dest_folder, executor_name, use_docker, use_custom_gateway):
if use_docker:
prefix = 'jinaai+docker'
else:
prefix = 'jinaai'
flow = f'''jtype: Flow
with:
port: 8080
protocol: http
jcloud:
version: 3.15.1.dev14
labels:
creator: microchain
name: gptdeploy
gateway:
{f"uses: {prefix}://{get_user_name(DEMO_TOKEN)}/Gateway{executor_name}:latest" if use_custom_gateway else ""}
{"" if use_docker else "install-requirements: True"}
executors:
- name: {executor_name.lower()}
uses: {prefix}://{get_user_name(DEMO_TOKEN)}/{executor_name}:latest
{"" if use_docker else "install-requirements: True"}
env:
OPENAI_API_KEY: {os.environ['OPENAI_API_KEY']}
jcloud:
resources:
instance: C2
capacity: spot
'''
full_flow_path = os.path.join(dest_folder,
'flow.yml')
with open(full_flow_path, 'w', encoding='utf-8') as f:
f.write(flow)
return full_flow_path
def replace_client_line(file_content: str, replacement: str) -> str:
lines = file_content.split('\n')
for index, line in enumerate(lines):
if 'Client(' in line:
lines[index] = replacement
break
return '\n'.join(lines)
def update_client_line_in_file(file_path, host):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
replaced_content = replace_client_line(content, f"client = Client(host='{host}')")
with open(file_path, 'w', encoding='utf-8') as file:
file.write(replaced_content)
def shorten_logs(relevant_lines):
# handle duplicate error messages
for index, line in enumerate(relevant_lines):
if '--- Captured stderr call ----' in line:
relevant_lines = relevant_lines[:index]
# filter pip install logs
relevant_lines = [line for line in relevant_lines if ' Requirement already satisfied: ' not in line]
# filter version not found logs
for index, line in enumerate(relevant_lines):
if 'ERROR: Could not find a version that satisfies the requirement ' in line:
start_and_end = line[:150] + '...' + line[-150:]
relevant_lines[index] = start_and_end
return relevant_lines
def clean_color_codes(response):
response = re.sub(r'\x1b\[[0-9;]*m', '', response)
return response
def process_error_message(error_message):
lines = error_message.split('\n')
relevant_lines = []
pattern = re.compile(r"^#\d+ \[[ \d]+/[ \d]+\]") # Pattern to match lines like "#11 [7/8]"
last_matching_line_index = None
for index, line in enumerate(lines):
if pattern.match(line):
last_matching_line_index = index
if last_matching_line_index is not None:
relevant_lines = lines[last_matching_line_index:]
relevant_lines = shorten_logs(relevant_lines)
response = '\n'.join(relevant_lines[-100:]).strip()
response = clean_color_codes(response)
# the following code tests the case that the docker file is corrupted and can not be parsed
# the method above will not return a relevant error message in this case
# but the last line of the error message will start with "error"
last_line = lines[-1]
if not response and last_line.startswith('error: '):
return last_line
return response