added weaviate to the supported vector memory providers

This commit is contained in:
cs0lar
2023-04-11 11:14:13 +01:00
43 changed files with 1318 additions and 374 deletions

View File

@@ -0,0 +1,56 @@
from memory.local import LocalCache
try:
from memory.redismem import RedisMemory
except ImportError:
print("Redis not installed. Skipping import.")
RedisMemory = None
try:
from memory.pinecone import PineconeMemory
except ImportError:
print("Pinecone not installed. Skipping import.")
PineconeMemory = None
try:
from memory.weaviate import WeaviateMemory
except ImportError:
print("Weaviate not installed. Skipping import.")
WeaviateMemory = None
def get_memory(cfg, init=False):
memory = None
if cfg.memory_backend == "pinecone":
if not PineconeMemory:
print("Error: Pinecone is not installed. Please install pinecone"
" to use Pinecone as a memory backend.")
else:
memory = PineconeMemory(cfg)
if init:
memory.clear()
elif cfg.memory_backend == "redis":
if not RedisMemory:
print("Error: Redis is not installed. Please install redis-py to"
" use Redis as a memory backend.")
else:
memory = RedisMemory(cfg)
elif cfg.memory_backend == "weaviate":
if not WeaviateMemory:
print("Error: Weaviate is not installed. Please install weaviate-client to"
" use Weaviate as a memory backend.")
else:
memory = WeaviateMemory(cfg)
if memory is None:
memory = LocalCache(cfg)
if init:
memory.clear()
return memory
__all__ = [
"get_memory",
"LocalCache",
"RedisMemory",
"PineconeMemory",
"WeaviateMemory"
]

31
scripts/memory/base.py Normal file
View File

@@ -0,0 +1,31 @@
"""Base class for memory providers."""
import abc
from config import AbstractSingleton
import openai
def get_ada_embedding(text):
text = text.replace("\n", " ")
return openai.Embedding.create(input=[text], model="text-embedding-ada-002")["data"][0]["embedding"]
class MemoryProviderSingleton(AbstractSingleton):
@abc.abstractmethod
def add(self, data):
pass
@abc.abstractmethod
def get(self, data):
pass
@abc.abstractmethod
def clear(self):
pass
@abc.abstractmethod
def get_relevant(self, data, num_relevant=5):
pass
@abc.abstractmethod
def get_stats(self):
pass

114
scripts/memory/local.py Normal file
View File

@@ -0,0 +1,114 @@
import dataclasses
import orjson
from typing import Any, List, Optional
import numpy as np
import os
from memory.base import MemoryProviderSingleton, get_ada_embedding
EMBED_DIM = 1536
SAVE_OPTIONS = orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_SERIALIZE_DATACLASS
def create_default_embeddings():
return np.zeros((0, EMBED_DIM)).astype(np.float32)
@dataclasses.dataclass
class CacheContent:
texts: List[str] = dataclasses.field(default_factory=list)
embeddings: np.ndarray = dataclasses.field(
default_factory=create_default_embeddings
)
class LocalCache(MemoryProviderSingleton):
# on load, load our database
def __init__(self, cfg) -> None:
self.filename = f"{cfg.memory_index}.json"
if os.path.exists(self.filename):
with open(self.filename, 'rb') as f:
loaded = orjson.loads(f.read())
self.data = CacheContent(**loaded)
else:
self.data = CacheContent()
def add(self, text: str):
"""
Add text to our list of texts, add embedding as row to our
embeddings-matrix
Args:
text: str
Returns: None
"""
if 'Command Error:' in text:
return ""
self.data.texts.append(text)
embedding = get_ada_embedding(text)
vector = np.array(embedding).astype(np.float32)
vector = vector[np.newaxis, :]
self.data.embeddings = np.concatenate(
[
vector,
self.data.embeddings,
],
axis=0,
)
with open(self.filename, 'wb') as f:
out = orjson.dumps(
self.data,
option=SAVE_OPTIONS
)
f.write(out)
return text
def clear(self) -> str:
"""
Clears the redis server.
Returns: A message indicating that the memory has been cleared.
"""
self.data = CacheContent()
return "Obliviated"
def get(self, data: str) -> Optional[List[Any]]:
"""
Gets the data from the memory that is most relevant to the given data.
Args:
data: The data to compare to.
Returns: The most relevant data.
"""
return self.get_relevant(data, 1)
def get_relevant(self, text: str, k: int) -> List[Any]:
""""
matrix-vector mult to find score-for-each-row-of-matrix
get indices for top-k winning scores
return texts for those indices
Args:
text: str
k: int
Returns: List[str]
"""
embedding = get_ada_embedding(text)
scores = np.dot(self.data.embeddings, embedding)
top_k_indices = np.argsort(scores)[-k:][::-1]
return [self.data.texts[i] for i in top_k_indices]
def get_stats(self):
"""
Returns: The stats of the local cache.
"""
return len(self.data.texts), self.data.embeddings.shape

View File

@@ -0,0 +1,50 @@
import pinecone
from memory.base import MemoryProviderSingleton, get_ada_embedding
class PineconeMemory(MemoryProviderSingleton):
def __init__(self, cfg):
pinecone_api_key = cfg.pinecone_api_key
pinecone_region = cfg.pinecone_region
pinecone.init(api_key=pinecone_api_key, environment=pinecone_region)
dimension = 1536
metric = "cosine"
pod_type = "p1"
table_name = "auto-gpt"
# this assumes we don't start with memory.
# for now this works.
# we'll need a more complicated and robust system if we want to start with memory.
self.vec_num = 0
if table_name not in pinecone.list_indexes():
pinecone.create_index(table_name, dimension=dimension, metric=metric, pod_type=pod_type)
self.index = pinecone.Index(table_name)
def add(self, data):
vector = get_ada_embedding(data)
# no metadata here. We may wish to change that long term.
resp = self.index.upsert([(str(self.vec_num), vector, {"raw_text": data})])
_text = f"Inserting data into memory at index: {self.vec_num}:\n data: {data}"
self.vec_num += 1
return _text
def get(self, data):
return self.get_relevant(data, 1)
def clear(self):
self.index.delete(deleteAll=True)
return "Obliviated"
def get_relevant(self, data, num_relevant=5):
"""
Returns all the data in the memory that is relevant to the given data.
:param data: The data to compare to.
:param num_relevant: The number of relevant data to return. Defaults to 5
"""
query_embedding = get_ada_embedding(data)
results = self.index.query(query_embedding, top_k=num_relevant, include_metadata=True)
sorted_results = sorted(results.matches, key=lambda x: x.score)
return [str(item['metadata']["raw_text"]) for item in sorted_results]
def get_stats(self):
return self.index.describe_index_stats()

143
scripts/memory/redismem.py Normal file
View File

@@ -0,0 +1,143 @@
"""Redis memory provider."""
from typing import Any, List, Optional
import redis
from redis.commands.search.field import VectorField, TextField
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
import numpy as np
from memory.base import MemoryProviderSingleton, get_ada_embedding
SCHEMA = [
TextField("data"),
VectorField(
"embedding",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE"
}
),
]
class RedisMemory(MemoryProviderSingleton):
def __init__(self, cfg):
"""
Initializes the Redis memory provider.
Args:
cfg: The config object.
Returns: None
"""
redis_host = cfg.redis_host
redis_port = cfg.redis_port
redis_password = cfg.redis_password
self.dimension = 1536
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
password=redis_password,
db=0 # Cannot be changed
)
self.cfg = cfg
if cfg.wipe_redis_on_start:
self.redis.flushall()
try:
self.redis.ft(f"{cfg.memory_index}").create_index(
fields=SCHEMA,
definition=IndexDefinition(
prefix=[f"{cfg.memory_index}:"],
index_type=IndexType.HASH
)
)
except Exception as e:
print("Error creating Redis search index: ", e)
existing_vec_num = self.redis.get(f'{cfg.memory_index}-vec_num')
self.vec_num = int(existing_vec_num.decode('utf-8')) if\
existing_vec_num else 0
def add(self, data: str) -> str:
"""
Adds a data point to the memory.
Args:
data: The data to add.
Returns: Message indicating that the data has been added.
"""
if 'Command Error:' in data:
return ""
vector = get_ada_embedding(data)
vector = np.array(vector).astype(np.float32).tobytes()
data_dict = {
b"data": data,
"embedding": vector
}
pipe = self.redis.pipeline()
pipe.hset(f"{self.cfg.memory_index}:{self.vec_num}", mapping=data_dict)
_text = f"Inserting data into memory at index: {self.vec_num}:\n"\
f"data: {data}"
self.vec_num += 1
pipe.set(f'{self.cfg.memory_index}-vec_num', self.vec_num)
pipe.execute()
return _text
def get(self, data: str) -> Optional[List[Any]]:
"""
Gets the data from the memory that is most relevant to the given data.
Args:
data: The data to compare to.
Returns: The most relevant data.
"""
return self.get_relevant(data, 1)
def clear(self) -> str:
"""
Clears the redis server.
Returns: A message indicating that the memory has been cleared.
"""
self.redis.flushall()
return "Obliviated"
def get_relevant(
self,
data: str,
num_relevant: int = 5
) -> Optional[List[Any]]:
"""
Returns all the data in the memory that is relevant to the given data.
Args:
data: The data to compare to.
num_relevant: The number of relevant data to return.
Returns: A list of the most relevant data.
"""
query_embedding = get_ada_embedding(data)
base_query = f"*=>[KNN {num_relevant} @embedding $vector AS vector_score]"
query = Query(base_query).return_fields(
"data",
"vector_score"
).sort_by("vector_score").dialect(2)
query_vector = np.array(query_embedding).astype(np.float32).tobytes()
try:
results = self.redis.ft(f"{self.cfg.memory_index}").search(
query, query_params={"vector": query_vector}
)
except Exception as e:
print("Error calling Redis search: ", e)
return None
return [result.data for result in results.docs]
def get_stats(self):
"""
Returns: The stats of the memory index.
"""
return self.redis.ft(f"{self.cfg.memory_index}").info()

100
scripts/memory/weaviate.py Normal file
View File

@@ -0,0 +1,100 @@
from config import Config
from memory.base import MemoryProviderSingleton, get_ada_embedding
import uuid
import weaviate
from weaviate import Client
from weaviate.util import generate_uuid5
def default_schema(weaviate_index):
return {
"class": weaviate_index,
"properties": [
{
"name": "raw_text",
"dataType": ["text"],
"description": "original text for the embedding"
}
],
}
class WeaviateMemory(MemoryProviderSingleton):
def __init__(self, cfg):
auth_credentials = self._build_auth_credentials(cfg)
url = f'{cfg.weaviate_host}:{cfg.weaviate_port}'
self.client = Client(url, auth_client_secret=auth_credentials)
self.index = cfg.memory_index
self._create_schema()
def _create_schema(self):
schema = default_schema(self.index)
if not self.client.schema.contains(schema):
self.client.schema.create_class(schema)
def _build_auth_credentials(self, cfg):
if cfg.weaviate_username and cfg.weaviate_password:
return weaviate_auth.AuthClientPassword(cfg.weaviate_username, cfg.weaviate_password)
else:
return None
def add(self, data):
vector = get_ada_embedding(data)
doc_uuid = generate_uuid5(data, self.index)
data_object = {
'class': self.index,
'raw_text': data
}
with self.client.batch as batch:
batch.add_data_object(
uuid=doc_uuid,
data_object=data_object,
class_name=self.index,
vector=vector
)
batch.flush()
return f"Inserting data into memory at uuid: {doc_uuid}:\n data: {data}"
def get(self, data):
return self.get_relevant(data, 1)
def clear(self):
self.client.schema.delete_all()
# weaviate does not yet have a neat way to just remove the items in an index
# without removing the entire schema, therefore we need to re-create it
# after a call to delete_all
self._create_schema()
return 'Obliterated'
def get_relevant(self, data, num_relevant=5):
query_embedding = get_ada_embedding(data)
try:
results = self.client.query.get(self.index, ['raw_text']) \
.with_near_vector({'vector': query_embedding, 'certainty': 0.7}) \
.with_limit(num_relevant) \
.do()
if len(results['data']['Get'][self.index]) > 0:
return [str(item['raw_text']) for item in results['data']['Get'][self.index]]
else:
return []
except Exception as err:
print(f'Unexpected error {err=}, {type(err)=}')
return []
def get_stats(self):
result = self.client.query.aggregate(self.index) \
.with_meta_count() \
.do()
class_data = result['data']['Aggregate'][self.index]
return class_data[0]['meta'] if class_data else {}