fix: package

This commit is contained in:
Florian Hönicke
2023-04-10 02:24:06 +02:00
parent c191d53ba1
commit 0188f4354c
9 changed files with 1030 additions and 0 deletions

2
src/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
__version__ = '0.18.3'
from src.main import main

32
src/constants.py Normal file
View File

@@ -0,0 +1,32 @@
EXECUTOR_FILE_NAME = 'executor.py'
TEST_EXECUTOR_FILE_NAME = 'test_executor.py'
REQUIREMENTS_FILE_NAME = 'requirements.txt'
DOCKER_FILE_NAME = 'Dockerfile'
CLIENT_FILE_NAME = 'client.py'
STREAMLIT_FILE_NAME = 'streamlit.py'
EXECUTOR_FILE_TAG = 'python'
TEST_EXECUTOR_FILE_TAG = 'python'
REQUIREMENTS_FILE_TAG = ''
DOCKER_FILE_TAG = 'dockerfile'
CLIENT_FILE_TAG = 'python'
STREAMLIT_FILE_TAG = 'python'
FILE_AND_TAG_PAIRS = [
(EXECUTOR_FILE_NAME, EXECUTOR_FILE_TAG),
(TEST_EXECUTOR_FILE_NAME, TEST_EXECUTOR_FILE_TAG),
(REQUIREMENTS_FILE_NAME, REQUIREMENTS_FILE_TAG),
(DOCKER_FILE_NAME, DOCKER_FILE_TAG),
(CLIENT_FILE_NAME, CLIENT_FILE_TAG),
(STREAMLIT_FILE_NAME, STREAMLIT_FILE_TAG)
]
EXECUTOR_FOLDER_v1 = 'executor_v1'
EXECUTOR_FOLDER_v2 = 'executor_v2'
FLOW_URL_PLACEHOLDER = 'jcloud.jina.ai'
PRICING_GPT4_PROMPT = 0.03
PRICING_GPT4_GENERATION = 0.06
PRICING_GPT3_5_TURBO_PROMPT = 0.002
PRICING_GPT3_5_TURBO_GENERATION = 0.002

117
src/gpt.py Normal file
View File

@@ -0,0 +1,117 @@
import os
from time import sleep
from typing import List, Tuple
import openai
from openai.error import RateLimitError, Timeout
from src.constants import PRICING_GPT4_PROMPT, PRICING_GPT4_GENERATION, PRICING_GPT3_5_TURBO_PROMPT, \
PRICING_GPT3_5_TURBO_GENERATION
from src.prompt_system import system_base_definition
from src.utils.io import timeout_generator_wrapper, GenerationTimeoutError
from src.utils.string_tools import print_colored
class GPTSession:
def __init__(self):
self.get_openai_api_key()
if self.is_gpt4_available():
self.supported_model = 'gpt-4'
self.pricing_prompt = PRICING_GPT4_PROMPT
self.pricing_generation = PRICING_GPT4_GENERATION
else:
self.supported_model = 'gpt-3.5-turbo'
self.pricing_prompt = PRICING_GPT3_5_TURBO_PROMPT
self.pricing_generation = PRICING_GPT3_5_TURBO_GENERATION
self.chars_prompt_so_far = 0
self.chars_generation_so_far = 0
def get_openai_api_key(self):
if 'OPENAI_API_KEY' not in os.environ:
raise Exception('You need to set OPENAI_API_KEY in your environment')
openai.api_key = os.environ['OPENAI_API_KEY']
def is_gpt4_available(self):
try:
openai.ChatCompletion.create(
model="gpt-4",
messages=[{
"role": 'system',
"content": 'test'
}]
)
return True
except openai.error.InvalidRequestError:
return False
def cost_callback(self, chars_prompt, chars_generation):
self.chars_prompt_so_far += chars_prompt
self.chars_generation_so_far += chars_generation
print('\n')
money_prompt = round(self.chars_prompt_so_far / 3.4 * self.pricing_prompt / 1000, 2)
money_generation = round(self.chars_generation_so_far / 3.4 * self.pricing_generation / 1000, 2)
print('money prompt:', f'${money_prompt}')
print('money generation:', f'${money_generation}')
print('total money:', f'${money_prompt + money_generation}')
print('\n')
def get_conversation(self):
return _GPTConversation(self.supported_model, self.cost_callback)
class _GPTConversation:
def __init__(self, model: str, cost_callback, prompt_list: List[Tuple[str, str]] = None):
self.model = model
if prompt_list is None:
prompt_list = [('system', system_base_definition)]
self.prompt_list = prompt_list
self.cost_callback = cost_callback
print_colored('system', system_base_definition, 'magenta')
def query(self, prompt: str):
print_colored('user', prompt, 'blue')
self.prompt_list.append(('user', prompt))
response = self.get_response(self.prompt_list)
self.prompt_list.append(('assistant', response))
return response
def get_response_from_stream(self, response_generator):
response_generator_with_timeout = timeout_generator_wrapper(response_generator, 10)
complete_string = ''
for chunk in response_generator_with_timeout:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
content = delta['content']
print_colored('' if complete_string else 'assistent', content, 'green', end='')
complete_string += content
return complete_string
def get_response(self, prompt_list: List[Tuple[str, str]]):
for i in range(10):
try:
response_generator = openai.ChatCompletion.create(
temperature=0,
max_tokens=2_000,
model=self.model,
stream=True,
messages=[
{
"role": prompt[0],
"content": prompt[1]
}
for prompt in prompt_list
]
)
complete_string = self.get_response_from_stream(response_generator)
except (RateLimitError, Timeout, ConnectionError, GenerationTimeoutError) as e:
print(e)
print('retrying, be aware that this affects the cost calculation')
sleep(3)
continue
chars_prompt = sum(len(prompt[1]) for prompt in prompt_list)
chars_generation = len(complete_string)
self.cost_callback(chars_prompt, chars_generation)
return complete_string
raise Exception('Failed to get response')

168
src/jina_cloud.py Normal file
View File

@@ -0,0 +1,168 @@
import hashlib
import json
import os
import re
import subprocess
import webbrowser
from pathlib import Path
import hubble
from hubble.executor.helper import upload_file, archive_package, get_request_header
from jcloud.flow import CloudFlow
def redirect_callback(href):
print(
f'You need login to Jina first to use GPTDeploy\n'
f'Please open this link if it does not open automatically in your browser: {href}'
)
webbrowser.open(href, new=0, autoraise=True)
def jina_auth_login():
try:
hubble.Client(jsonify=True).get_user_info(log_error=False)
except hubble.AuthenticationRequiredError:
hubble.login(prompt='login', redirect_callback=redirect_callback)
def push_executor(dir_path):
dir_path = Path(dir_path)
md5_hash = hashlib.md5()
bytesio = archive_package(dir_path)
content = bytesio.getvalue()
md5_hash.update(content)
md5_digest = md5_hash.hexdigest()
form_data = {
'public': 'True',
'private': 'False',
'verbose': 'True',
'md5sum': md5_digest,
}
req_header = get_request_header()
resp = upload_file(
'https://api.hubble.jina.ai/v2/rpc/executor.push',
'filename',
content,
dict_data=form_data,
headers=req_header,
stream=False,
method='post',
)
json_lines_str = resp.content.decode('utf-8')
if 'exited on non-zero code' not in json_lines_str:
return ''
responses = []
for json_line in json_lines_str.splitlines():
if 'exit code:' in json_line:
break
d = json.loads(json_line)
if 'payload' in d and type(d['payload']) == str:
responses.append(d['payload'])
elif type(d) == str:
responses.append(d)
return '\n'.join(responses)
def get_user_name():
client = hubble.Client(max_retries=None, jsonify=True)
response = client.get_user_info()
return response['data']['name']
def deploy_on_jcloud(flow_yaml):
cloud_flow = CloudFlow(path=flow_yaml)
return cloud_flow.__enter__().endpoints['gateway']
def deploy_flow(executor_name, dest_folder):
flow = f'''
jtype: Flow
with:
name: nowapi
env:
JINA_LOG_LEVEL: DEBUG
jcloud:
version: 3.14.2.dev18
labels:
creator: microchain
name: gptdeploy
executors:
- name: {executor_name.lower()}
uses: jinaai+docker://{get_user_name()}/{executor_name}:latest
env:
JINA_LOG_LEVEL: DEBUG
jcloud:
resources:
instance: C2
capacity: spot
'''
full_flow_path = os.path.join(dest_folder,
'flow.yml')
with open(full_flow_path, 'w') as f:
f.write(flow)
# print('try local execution')
# flow = Flow.load_config(full_flow_path)
# with flow:
# pass
print('deploy flow on jcloud')
return deploy_on_jcloud(flow_yaml=full_flow_path)
def replace_client_line(file_content: str, replacement: str) -> str:
lines = file_content.split('\n')
for index, line in enumerate(lines):
if 'Client(' in line:
lines[index] = replacement
break
return '\n'.join(lines)
def update_client_line_in_file(file_path, host):
with open(file_path, 'r') as file:
content = file.read()
replaced_content = replace_client_line(content, f"client = Client(host='{host}')")
with open(file_path, 'w') as file:
file.write(replaced_content)
def process_error_message(error_message):
lines = error_message.split('\n')
relevant_lines = []
pattern = re.compile(r"^#\d+ \[[ \d]+/[ \d]+\]") # Pattern to match lines like "#11 [7/8]"
last_matching_line_index = None
for index, line in enumerate(lines):
if pattern.match(line):
last_matching_line_index = index
if last_matching_line_index is not None:
relevant_lines = lines[last_matching_line_index:]
return '\n'.join(relevant_lines[-25:])
def build_docker(path):
# The command to build the Docker image
cmd = f"docker build -t micromagic {path}"
# Run the command and capture the output
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
stdout, stderr = process.communicate()
# Check if there was an error
if process.returncode != 0:
error_message = stderr.decode("utf-8")
relevant_error_message = process_error_message(error_message)
return relevant_error_message
else:
print("Docker build completed successfully.")
return ''

69
src/key_handling.py Normal file
View File

@@ -0,0 +1,69 @@
import os
import platform
import subprocess
import click
try:
import psutil
except ImportError:
psutil = None
def get_shell():
if psutil is None:
return None
try:
p = psutil.Process(os.getpid())
while p.parent() and p.parent().name() != "init":
p = p.parent()
return p.name()
except Exception as e:
click.echo(f"Error detecting shell: {e}")
return None
def set_env_variable(shell, key):
shell_config = {
"bash": {"config_file": "~/.bashrc", "export_line": f"export OPENAI_API_KEY={key}"},
"zsh": {"config_file": "~/.zshrc", "export_line": f"export OPENAI_API_KEY={key}"},
"fish": {
"config_file": "~/.config/fish/config.fish",
"export_line": f"set -gx OPENAI_API_KEY {key}",
},
}
if shell not in shell_config:
click.echo("Sorry, your shell is not supported.")
return
config_file = os.path.expanduser(shell_config[shell]["config_file"])
with open(config_file, "a") as file:
file.write(f"\n{shell_config[shell]['export_line']}\n")
click.echo(f"OPENAI_API_KEY has been set in {config_file}.")
def set_api_key(key):
system_platform = platform.system().lower()
if system_platform == "windows":
set_env_variable_command = f'setx OPENAI_API_KEY "{key}"'
subprocess.call(set_env_variable_command, shell=True)
click.echo("OPENAI_API_KEY has been set.")
elif system_platform in ["linux", "darwin"]:
if "OPENAI_API_KEY" in os.environ:
if not click.confirm("OPENAI_API_KEY is already set. Do you want to overwrite it?"):
click.echo("Aborted.")
return
shell = get_shell()
if shell is None:
click.echo("Error: Unable to detect your shell or psutil is not available. Please set the environment variable manually.")
return
set_env_variable(shell, key)
else:
click.echo("Sorry, this platform is not supported.")

337
src/main.py Normal file
View File

@@ -0,0 +1,337 @@
import random
import click
from src import gpt, jina_cloud
from src.jina_cloud import push_executor, process_error_message, jina_auth_login
from src.key_handling import set_api_key
from src.prompt_tasks import general_guidelines, executor_file_task, chain_of_thought_creation, test_executor_file_task, \
chain_of_thought_optimization, requirements_file_task, docker_file_task, not_allowed
from src.utils.io import recreate_folder, persist_file
from src.utils.string_tools import print_colored
import os
import re
from src.constants import FILE_AND_TAG_PAIRS
gpt_session = gpt.GPTSession()
def extract_content_from_result(plain_text, file_name):
pattern = fr"^\*\*{file_name}\*\*\n```(?:\w+\n)?([\s\S]*?)```"
match = re.search(pattern, plain_text, re.MULTILINE)
if match:
return match.group(1).strip()
else:
return ''
def write_config_yml(executor_name, dest_folder):
config_content = f'''
jtype: {executor_name}
py_modules:
- executor.py
metas:
name: {executor_name}
'''
with open(os.path.join(dest_folder, 'config.yml'), 'w') as f:
f.write(config_content)
def get_all_executor_files_with_content(folder_path):
file_name_to_content = {}
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
file_name_to_content[filename] = content
return file_name_to_content
def files_to_string(file_name_to_content):
all_executor_files_string = ''
for file_name, tag in FILE_AND_TAG_PAIRS:
if file_name in file_name_to_content:
all_executor_files_string += f'**{file_name}**\n'
all_executor_files_string += f'```{tag}\n'
all_executor_files_string += file_name_to_content[file_name]
all_executor_files_string += '\n```\n\n'
return all_executor_files_string
def wrap_content_in_code_block(executor_content, file_name, tag):
return f'**{file_name}**\n```{tag}\n{executor_content}\n```\n\n'
def create_executor(
description,
test,
output_path,
executor_name,
package,
is_chain_of_thought=False,
):
EXECUTOR_FOLDER_v1 = get_executor_path(output_path, package, 1)
recreate_folder(EXECUTOR_FOLDER_v1)
recreate_folder('../flow')
print_colored('', '############# Executor #############', 'red')
user_query = (
general_guidelines()
+ executor_file_task(executor_name, description, test, package)
+ chain_of_thought_creation()
)
conversation = gpt_session.get_conversation()
executor_content_raw = conversation.query(user_query)
if is_chain_of_thought:
executor_content_raw = conversation.query(
f"General rules: " + not_allowed() + chain_of_thought_optimization('python', 'executor.py'))
executor_content = extract_content_from_result(executor_content_raw, 'executor.py')
persist_file(executor_content, os.path.join(EXECUTOR_FOLDER_v1, 'executor.py'))
print_colored('', '############# Test Executor #############', 'red')
user_query = (
general_guidelines()
+ wrap_content_in_code_block(executor_content, 'executor.py', 'python')
+ test_executor_file_task(executor_name, test)
)
conversation = gpt_session.get_conversation()
test_executor_content_raw = conversation.query(user_query)
if is_chain_of_thought:
test_executor_content_raw = conversation.query(
f"General rules: " + not_allowed() +
chain_of_thought_optimization('python', 'test_executor.py')
+ "Don't add any additional tests. "
)
test_executor_content = extract_content_from_result(test_executor_content_raw, 'test_executor.py')
persist_file(test_executor_content, os.path.join(EXECUTOR_FOLDER_v1, 'test_executor.py'))
print_colored('', '############# Requirements #############', 'red')
user_query = (
general_guidelines()
+ wrap_content_in_code_block(executor_content, 'executor.py', 'python')
+ wrap_content_in_code_block(test_executor_content, 'test_executor.py', 'python')
+ requirements_file_task()
)
conversation = gpt_session.get_conversation()
requirements_content_raw = conversation.query(user_query)
if is_chain_of_thought:
requirements_content_raw = conversation.query(
chain_of_thought_optimization('', '../requirements.txt') + "Keep the same version of jina ")
requirements_content = extract_content_from_result(requirements_content_raw, '../requirements.txt')
persist_file(requirements_content, os.path.join(EXECUTOR_FOLDER_v1, '../requirements.txt'))
print_colored('', '############# Dockerfile #############', 'red')
user_query = (
general_guidelines()
+ wrap_content_in_code_block(executor_content, 'executor.py', 'python')
+ wrap_content_in_code_block(test_executor_content, 'test_executor.py', 'python')
+ wrap_content_in_code_block(requirements_content, '../requirements.txt', '')
+ docker_file_task()
)
conversation = gpt_session.get_conversation()
dockerfile_content_raw = conversation.query(user_query)
if is_chain_of_thought:
dockerfile_content_raw = conversation.query(
f"General rules: " + not_allowed() + chain_of_thought_optimization('dockerfile', 'Dockerfile'))
dockerfile_content = extract_content_from_result(dockerfile_content_raw, 'Dockerfile')
persist_file(dockerfile_content, os.path.join(EXECUTOR_FOLDER_v1, 'Dockerfile'))
write_config_yml(executor_name, EXECUTOR_FOLDER_v1)
def create_playground(executor_name, executor_path, host):
print_colored('', '############# Playground #############', 'red')
file_name_to_content = get_all_executor_files_with_content(executor_path)
user_query = (
general_guidelines()
+ wrap_content_in_code_block(file_name_to_content['executor.py'], 'executor.py', 'python')
+ wrap_content_in_code_block(file_name_to_content['test_executor.py'], 'test_executor.py', 'python')
+ f'''
Create a playground for the executor {executor_name} using streamlit.
The playground must look like it was made by a professional designer.
All the ui elements are well thought out to make them visually appealing and easy to use.
The executor is hosted on {host}.
This is an example how you can connect to the executor assuming the document (d) is already defined:
from jina import Client, Document, DocumentArray
client = Client(host='{host}')
response = client.post('/', inputs=DocumentArray([d])) # always use '/'
print(response[0].text) # can also be blob in case of image/audio..., this should be visualized in the streamlit app
'''
)
conversation = gpt_session.get_conversation()
conversation.query(user_query)
playground_content_raw = conversation.query(
f"General rules: " + not_allowed() + chain_of_thought_optimization('python', 'app.py'))
playground_content = extract_content_from_result(playground_content_raw, 'app.py')
persist_file(playground_content, os.path.join(executor_path, 'app.py'))
def get_executor_path(output_path, package, version):
package_path = '_'.join(package)
return os.path.join(output_path, package_path, f'v{version}')
def debug_executor(output_path, package, description, test):
MAX_DEBUGGING_ITERATIONS = 10
error_before = ''
for i in range(1, MAX_DEBUGGING_ITERATIONS):
previous_executor_path = get_executor_path(output_path, package, i)
next_executor_path = get_executor_path(output_path, package, i + 1)
log_hubble = push_executor(previous_executor_path)
error = process_error_message(log_hubble)
if error:
recreate_folder(next_executor_path)
file_name_to_content = get_all_executor_files_with_content(previous_executor_path)
all_files_string = files_to_string(file_name_to_content)
user_query = (
f"General rules: " + not_allowed()
+ 'Here is the description of the task the executor must solve:\n'
+ description
+ '\n\nHere is the test scenario the executor must pass:\n'
+ test
+ 'Here are all the files I use:\n'
+ all_files_string
+ (('This is an error that is already fixed before:\n'
+ error_before) if error_before else '')
+ '\n\nNow, I get the following error:\n'
+ error + '\n'
+ 'Think quickly about possible reasons. '
'Then output the files that need change. '
"Don't output files that don't need change. "
"If you output a file, then write the complete file. "
"Use the exact same syntax to wrap the code:\n"
f"**...**\n"
f"```...\n"
f"...code...\n"
f"```\n\n"
)
conversation = gpt_session.get_conversation()
returned_files_raw = conversation.query(user_query)
for file_name, tag in FILE_AND_TAG_PAIRS:
updated_file = extract_content_from_result(returned_files_raw, file_name)
if updated_file:
file_name_to_content[file_name] = updated_file
for file_name, content in file_name_to_content.items():
persist_file(content, os.path.join(next_executor_path, file_name))
error_before = error
else:
break
if i == MAX_DEBUGGING_ITERATIONS - 1:
raise MaxDebugTimeReachedException('Could not debug the executor.')
return get_executor_path(output_path, package, i)
class MaxDebugTimeReachedException(BaseException):
pass
def generate_executor_name(description):
conversation = gpt_session.get_conversation()
user_query = f'''
Generate a name for the executor matching the description:
"{description}"
The executor name must fulfill the following criteria:
- camel case
- start with a capital letter
- only consists of lower and upper case characters
- end with Executor.
The output is a the raw string wrapped into ``` and starting with **name.txt** like this:
**name.txt**
```
PDFParserExecutor
```
'''
name_raw = conversation.query(user_query)
name = extract_content_from_result(name_raw, 'name.txt')
return name
def get_possible_packages(description, threads):
print_colored('', '############# What package to use? #############', 'red')
user_query = f'''
Here is the task description of the problme you need to solve:
"{description}"
First, write down all the subtasks you need to solve which require python packages.
For each subtask:
Provide a list of 1 to 3 python packages you could use to solve the subtask. Prefer modern packages.
For each package:
Write down some non-obvious thoughts about the challenges you might face for the task and give multiple approaches on how you handle them.
For example, there might be some packages you must not use because they do not obay the rules:
{not_allowed()}
Discuss the pros and cons for all of these packages.
Create a list of package subsets that you could use to solve the task.
The list is sorted in a way that the most promising subset of packages is at the top.
The maximum length of the list is 5.
The output must be a list of lists wrapped into ``` and starting with **packages.csv** like this:
**packages.csv**
```
package1,package2
package2,package3,...
...
```
'''
conversation = gpt_session.get_conversation()
packages_raw = conversation.query(user_query)
packages_csv_string = extract_content_from_result(packages_raw, 'packages.csv')
packages = [package.split(',') for package in packages_csv_string.split('\n')]
packages = packages[:threads]
return packages
@click.group(invoke_without_command=True)
def main():
pass
@main.command()
@click.option('--description', required=True, help='Description of the executor.')
@click.option('--test', required=True, help='Test scenario for the executor.')
@click.option('--num_approaches', default=3, type=int, help='Number of num_approaches to use to fulfill the task (default: 3).')
@click.option('--output_path', default='executor', help='Path to the output folder (must be empty). ')
def create(
description,
test,
num_approaches=3,
output_path='executor',
):
jina_auth_login()
generated_name = generate_executor_name(description)
executor_name = f'{generated_name}{random.randint(0, 1000_000)}'
packages_list = get_possible_packages(description, num_approaches)
recreate_folder(output_path)
# packages_list = [['a']]
# executor_name = 'ColorPaletteGeneratorExecutor5946'
# executor_path = '/Users/florianhonicke/jina/gptdeploy/executor/colorsys_colorharmony/v5'
# host = 'grpcs://gptdeploy-5f6ea44fc8.wolf.jina.ai'
for packages in packages_list:
try:
create_executor(description, test, output_path, executor_name, packages)
executor_path = debug_executor(output_path, packages, description, test)
print('Deploy a jina flow')
host = jina_cloud.deploy_flow(executor_name, executor_path)
print(f'Flow is deployed create the playground for {host}')
create_playground(executor_name, executor_path, host)
except MaxDebugTimeReachedException:
print('Could not debug the executor.')
continue
print(
'Executor name:', executor_name, '\n',
'Executor path:', executor_path, '\n',
'Host:', host, '\n',
'Playground:', f'streamlit run {os.path.join(executor_path, "app.py")}', '\n',
)
break
@main.command()
@click.option('--key', required=True, help='Your OpenAI API key.')
def configure(key):
set_api_key(key)
if __name__ == '__main__':
main()

114
src/prompt_system.py Normal file
View File

@@ -0,0 +1,114 @@
from src.constants import FLOW_URL_PLACEHOLDER
executor_example = '''
Using the Jina framework, users can define executors.
Here is an example of how an executor can be defined. It always starts with a comment:
**executor.py**
```python
# this executor binary files as input and returns the length of each binary file as output
from jina import Executor, requests, DocumentArray, Document
class MyInfoExecutor(Executor):
def __init__(self, **kwargs):
super().__init__()
@requests() # each executor must have exactly this decorator without parameters
def foo(self, docs: DocumentArray, **kwargs) => DocumentArray:
for d in docs:
d.load_uri_to_blob()
d.blob = None
return docs
```
An executor gets a DocumentArray as input and returns a DocumentArray as output.
'''
docarray_example = f'''
A DocumentArray is a python class that can be seen as a list of Documents.
A Document is a python class that represents a single document.
Here is the protobuf definition of a Document:
message DocumentProto {{
// A hexdigest that represents a unique document ID
string id = 1;
oneof content {{
// the raw binary content of this document, which often represents the original document when comes into jina
bytes blob = 2;
// the ndarray of the image/audio/video document
NdArrayProto tensor = 3;
// a text document
string text = 4;
}}
// a uri of the document is a remote url starts with http or https or data URI scheme
string uri = 5;
// list of the sub-documents of this document (recursive structure)
repeated DocumentProto chunks = 6;
// the matched documents on the same level (recursive structure)
repeated DocumentProto matches = 7;
// the embedding of this document
NdArrayProto embedding = 8;
}}
Here is an example of how a DocumentArray can be defined:
from jina import DocumentArray, Document
d1 = Document(text='hello')
# you can load binary data into a document
url = 'https://...'
response = requests.get(url)
obj_data = response.content
d2 = Document(blob=obj_data) # blob is bytes like b'\\x89PNG\\r\\n\\x1a\\n\
d3 = Document(tensor=numpy.array([1, 2, 3]), chunks=[Document(uri=/local/path/to/file)]
d4 = Document(
uri='https://docs.docarray.org/img/logo.png',
)
d5 = Document()
d5.tensor = np.ones((2,4))
d5.uri = 'https://audio.com/audio.mp3'
d6 = Document()
d6.blob # like b'RIFF\\x00\\x00\\x00\\x00WAVEfmt \\x10\\x00...'
docs = DocumentArray([
d1, d2, d3, d4
])
d7 = Document()
d7.text = 'test string'
d8 = Document()
d8.text = json.dumps([{{"id": "1", "text": ["hello", 'test']}}, {{"id": "2", "text": "world"}}])
# the document has a helper function load_uri_to_blob:
# For instance, d4.load_uri_to_blob() downloads the file from d4.uri and stores it in d4.blob.
# If d4.uri was something like 'https://website.web/img.jpg', then d4.blob would be something like b'\\xff\\xd8\\xff\\xe0\\x00\\x10JFIF\\x00\\x01\\x01...
'''
client_example = f'''
After the executor is deployed, it can be called via Jina Client.
Here is an example of a client file:
**client.py**
```python
from jina import Client, Document, DocumentArray
client = Client(host='{FLOW_URL_PLACEHOLDER}')
d = Document(uri='...')
d.load_uri_to_blob()
response = client.post('/', inputs=DocumentArray([d])) # the client must be called on '/'
print(response[0].text)
```
'''
system_base_definition = f'''
You are a principal engineer working at Jina - an open source company."
{executor_example}
{docarray_example}
{client_example}
'''

138
src/prompt_tasks.py Normal file
View File

@@ -0,0 +1,138 @@
from src.constants import EXECUTOR_FILE_NAME, REQUIREMENTS_FILE_NAME, TEST_EXECUTOR_FILE_NAME, DOCKER_FILE_NAME, \
DOCKER_FILE_TAG, CLIENT_FILE_TAG, CLIENT_FILE_NAME, STREAMLIT_FILE_TAG, STREAMLIT_FILE_NAME, EXECUTOR_FILE_TAG, \
REQUIREMENTS_FILE_TAG, TEST_EXECUTOR_FILE_TAG
def general_guidelines():
return (
"The code you write is production ready. "
"Every file starts with comments describing what the code is doing before the first import. "
"Comments can only be written within code blocks. "
"Then all imports are listed. "
"It is important to import all modules that could be needed in the executor code. "
"Always import: "
"from jina import Executor, DocumentArray, Document, requests "
"Start from top-level and then fully implement all methods. "
"\n"
)
def _task(task, tag_name, file_name):
return (
task + f"The code will go into {file_name}. Wrap the code into:\n"
f"**{file_name}**\n"
f"```{tag_name}\n"
f"...code...\n"
f"```\n\n"
)
def executor_file_task(executor_name, executor_description, test_scenario, package):
return _task(f'''
Write the executor called '{executor_name}'.
It matches the following description: '{executor_description}'.
It will be tested with the following scenario: '{test_scenario}'.
For the implementation use the following package: '{package}'.
Have in mind that d.uri is never a path to a local file. It is always a url.
''' + not_allowed(),
EXECUTOR_FILE_TAG,
EXECUTOR_FILE_NAME
)
def test_executor_file_task(executor_name, test_scenario):
return _task(
"Write a small unit test for the executor. "
"Start the test with an extensive comment about the test case. "
+ (
f"Write a single test case that tests the following scenario: '{test_scenario}'. "
f"In case the test scenario is not precise enough, test the most general case without any assumptions."
if test_scenario else ""
)
+ "Use the following import to import the executor: "
f"from executor import {executor_name} "
+ not_allowed()
+ "The test must not open local files. "
+ "The test must not mock a function of the executor. "
+ "The test must not use other data than the one provided in the test scenario. ",
TEST_EXECUTOR_FILE_TAG,
TEST_EXECUTOR_FILE_NAME
)
def requirements_file_task():
return _task(
"Write the content of the requirements.txt file. "
"Make sure to include pytest. "
"Make sure that jina==3.14.1. "
"All versions are fixed using ~=, ==, <, >, <=, >=. The package versions should not have conflicts. ",
REQUIREMENTS_FILE_TAG,
REQUIREMENTS_FILE_NAME
)
def docker_file_task():
return _task(
"Write the Dockerfile that defines the environment with all necessary dependencies that the executor uses. "
"The Dockerfile runs the test during the build process. "
"It is important to make sure that all libs are installed that are required by the python packages. "
"Usually libraries are installed with apt-get. "
"Be aware that the machine the docker container is running on does not have a GPU - only CPU. "
"Add the config.yml file to the Dockerfile. "
"The base image of the Dockerfile is FROM jinaai/jina:3.14.1-py39-standard. "
'The entrypoint is ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]. '
'Make sure the all files are in the /workdir. '
"The Dockerfile runs the test during the build process. " + not_allowed(),
DOCKER_FILE_TAG,
DOCKER_FILE_NAME
)
def client_file_task():
return _task(
"Write the client file. ",
CLIENT_FILE_TAG,
CLIENT_FILE_NAME
)
def streamlit_file_task():
return _task(
"Write the streamlit file allowing to make requests . ",
STREAMLIT_FILE_TAG,
STREAMLIT_FILE_NAME
)
def chain_of_thought_creation():
return (
"First, write down some non-obvious thoughts about the challenges of the task and give multiple approaches on how you handle them. "
"For example, the given package you could used in different ways and not all of them obay the rules: "
+ "Discuss the pros and cons for all of these approaches and then decide for one of the approaches. "
"Then write as I told you. "
)
def chain_of_thought_optimization(tag_name, file_name):
return _task(
f'First, write down an extensive list of obvious and non-obvious observations about {file_name} that could need an adjustment. Explain why. '
f"Think if all the changes are required and finally decide for the changes you want to make, "
f"but you are not allowed disregard the instructions in the previous message. "
f"Be very hesitant to change the code. Only make a change if you are sure that it is necessary. "
f"Output only {file_name} "
f"Write the whole content of {file_name} - even if you decided to change only a small thing or even nothing. ",
tag_name,
file_name
)
def not_allowed():
return '''
The executor must not use the GPU.
The executor must not access a database.
The executor must not access a display.
The executor must not access external apis except unless it is explicitly mentioned in the description or test case (e.g. by mentioning the api that should be used or by providing a URL to access the data).
The executor must not load data from the local file system unless it was created by the executor itself.
The executor must not use a pre-trained model unless it is explicitly mentioned in the description.
The executor must not train a model.
The executor must not use Document.tags.
'''

53
src/server.py Normal file
View File

@@ -0,0 +1,53 @@
# from fastapi import FastAPI
# from fastapi.exceptions import RequestValidationError
# from pydantic import BaseModel
# from typing import Optional, Dict
#
# from starlette.middleware.cors import CORSMiddleware
# from starlette.requests import Request
# from starlette.responses import JSONResponse
# from main import main
#
# app = FastAPI()
#
# # Define the request model
# class CreateRequest(BaseModel):
# test_scenario: str
# executor_description: str
#
# # Define the response model
# class CreateResponse(BaseModel):
# result: Dict[str, str]
# success: bool
# message: Optional[str]
#
# @app.post("/create", response_model=CreateResponse)
# def create_endpoint(request: CreateRequest):
#
# result = main(
# executor_description=request.executor_description,
# test_scenario=request.test_scenario,
# )
# return CreateResponse(result=result, success=True, message=None)
#
#
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
#
# # Add a custom exception handler for RequestValidationError
# @app.exception_handler(RequestValidationError)
# def validation_exception_handler(request: Request, exc: RequestValidationError):
# return JSONResponse(
# status_code=422,
# content={"detail": exc.errors()},
# )
#
#
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run("server:app", host="0.0.0.0", port=8000, log_level="info")