From d8915d7241d528081307ceeb158f3bf96261dc6b Mon Sep 17 00:00:00 2001 From: Aljaz Date: Mon, 2 Jun 2025 14:16:15 +0200 Subject: [PATCH] Replace Pinecone/OpenAI with Qdrant/Ollama --- local.env.template | 3 + requirements.txt | 6 +- src/infra/deploy_infrastructure.py | 29 +++--- .../src/analytics/embeddings/vector_db.py | 93 ++++++++----------- .../src/data_storage.py | 38 +++----- .../lambda_layer/lambda_layer_cloud9.sh | 2 +- .../lambda_utils/update_lambda_env_vars.py | 15 ++- src/launch/launch_env.py | 14 +-- src/utils/check_env.py | 15 +-- template.env | 15 ++- 10 files changed, 101 insertions(+), 129 deletions(-) diff --git a/local.env.template b/local.env.template index 6e1b88c..7ab2d0c 100644 --- a/local.env.template +++ b/local.env.template @@ -23,3 +23,6 @@ APP_NAME=RSS Feed Processor VERSION=1.0.0 STORAGE_STRATEGY=s3 +QDRANT_URL=http://localhost:6333 +QDRANT_COLLECTION_NAME=open-rss-articles +OLLAMA_HOST=http://localhost:11434 diff --git a/requirements.txt b/requirements.txt index 8520887..9c08ae6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,9 +3,9 @@ pymongo==4.* python-dotenv==1.0.* requests==2.32.* constructs==10.2.69 -# Optional, yet necessary for the Pinecone SDK functionality. -pinecone -openai +# Vector database and embedding libraries +qdrant-client +ollama tqdm prometheus-clien redis diff --git a/src/infra/deploy_infrastructure.py b/src/infra/deploy_infrastructure.py index e843947..4f40247 100644 --- a/src/infra/deploy_infrastructure.py +++ b/src/infra/deploy_infrastructure.py @@ -4,8 +4,7 @@ import sys import json from src.utils.retry_logic import retry_with_backoff from botocore.exceptions import ClientError -from pinecone import Pinecone -from pinecone import ServerlessSpec +from qdrant_client import QdrantClient, models @@ -180,21 +179,17 @@ def deploy_infrastructure(): } ]) - if os.getenv("STORAGE_STRATEGY") == 'pinecone': - pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) - index_name = os.getenv("PINECONE_DB_NAME") - embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM") - vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC") - - if index_name not in pc.list_indexes().names(): - pc.create_index( - name=index_name, - dimension=int(embedding_dim), - metric=vector_search_metric, - spec = ServerlessSpec( - cloud="aws", - region="us-east-1", - ), + if os.getenv("STORAGE_STRATEGY") == 'qdrant': + client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY")) + collection = os.getenv("QDRANT_COLLECTION_NAME") + embedding_dim = int(os.getenv("VECTOR_EMBEDDING_DIM")) + metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine").upper() + + existing = [c.name for c in client.get_collections().collections] + if collection not in existing: + client.create_collection( + collection_name=collection, + vectors_config=models.VectorParams(size=embedding_dim, distance=getattr(models.Distance, metric)) ) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py index 6ce5dc5..ee47bcd 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py @@ -1,75 +1,60 @@ import os +import requests -from pinecone import Pinecone -from openai import OpenAI +from qdrant_client import QdrantClient, models from utils import setup_logging logger = setup_logging() +qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333") +qdrant_api_key = os.getenv("QDRANT_API_KEY") +collection_name = os.getenv("QDRANT_COLLECTION_NAME") -# Set up Pinecone client -api_key = os.getenv("PINCEONE_API_KEY") -shards = os.getenv("PINECONE_SHARDS") -embedding_model = os.getenv("VECTOR_EMBEDDING_MODEL") embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM") -vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC") -index_name = os.getenv("PINECONE_DB_NAME") +vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine") -client = OpenAI() # For Embedding Models, Not LLMs -pc = Pinecone(api_key=api_key) +ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434") +ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text") + +client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key) def get_index(): - if index_name not in pc.list_indexes().names(): - return KeyError(f"Index {index_name} not found") + collections = client.get_collections().collections + if collection_name not in [c.name for c in collections]: + raise KeyError(f"Collection {collection_name} not found") + return client - index = pc.Index(index_name) - return index - -def vectorize(article:str) -> list[float]: - response = client.embeddings.create( - input=article, # FIXME: This fails when article is something else, find what the 'something else' is and implement fix. - model=os.getenv('OPENAI_EMBEDDING_MODEL', 'text-') +def vectorize(article: str) -> list[float]: + response = requests.post( + f"{ollama_host}/api/embeddings", + json={"model": ollama_embedding_model, "prompt": article}, + timeout=30, ) - - return response.data[0].embedding + response.raise_for_status() + return response.json().get("embedding", []) -def upsert_vectors(index:Pinecone.Index, data:list[dict], namespace:str): # [ ] Check if the data is being upserted. - response = index.upsert( - vectors=data, - namespace=namespace - ) - logger.info(f'Upserted Vector Response : {response.to_dict()}') - logger.info(f'Upserted Vector Length : {len(data[0]["values"])}') - logger.info(f'Upserted Vector Response Type : {type(response)}') - logger.info(f'Upserted Vector Response - status : {response.status_code}') - -def query_vectors(index:Pinecone.Index, namespace:str, vector:list[float], top_k:int, filter_query:dict=None): # [ ]: Make sure this is working. - +def upsert_vectors(index: QdrantClient, data: list[dict]): + points = [ + models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload")) + for item in data + ] + index.upsert(collection_name=collection_name, points=points) + + +def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None): if len(vector) != int(embedding_dim): raise ValueError("Length of vector does not match the embedding dimension") - - if filter_query: - query = index.query( - namespace=namespace, - vector=vector, - filter_query=filter_query, - top_k=top_k, - include_metadata=True - ) - - else: - query = index.query( - namespace=namespace, - vector=vector, - top_k=top_k - ) - - return query + return index.search( + collection_name=collection_name, + query_vector=vector, + limit=top_k, + with_payload=True, + query_filter=filter_query, + ) if __name__ == "__main__": - # Create a large paragraph - paragraph = '''This is a test.''' - vectorize("This is a test string") \ No newline at end of file + paragraph = "This is a test." + vectorize(paragraph) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index b860982..516df60 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -32,39 +32,29 @@ mongo_client = MongoClient(MONGODB_URL) feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] ##### Article Storage ##### -def save_article(article:dict, strategy:str): +def save_article(article: dict, strategy: str): if strategy == "s3": s3_save_article(article) - elif strategy == "pinecone": - pinecone_save_article(article) - elif strategy == 'both': - pinecone_save_article(article) + elif strategy == "qdrant": + qdrant_save_article(article) + elif strategy == "both": + qdrant_save_article(article) s3_save_article(article) else: raise ValueError(f"Invalid storage strategy: {strategy}") -def pinecone_save_article(article:dict): - logger.info("Saving article to Pinecone") +def qdrant_save_article(article: dict): + logger.info("Saving article to Qdrant") index = get_index() - # Expected Keys from Pinecone *MUST* include 'id' and 'values' - data = dict() - logging.info(f"Article ID into Pinecone") - data["id"] = article["article_id"] - logging.info(f"Article content into Pinecone") - data["values"] = vectorize(article=article["content"]) - - print(type(data["values"])) - print(data["id"]) - - data = [data] - - - namespace = os.getenv('PINECONE_NAMESPACE') - - logger.info("Upserting article to Pinecone") - upsert_vectors(index, data, namespace) + data = { + "id": article["article_id"], + "vector": vectorize(article["content"]), + "payload": {"rss": article.get("rss"), "title": article.get("title")}, + } + + upsert_vectors(index, [data]) def s3_save_article(article:dict): diff --git a/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh b/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh index 14b7e13..c6cbf99 100644 --- a/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh +++ b/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh @@ -16,7 +16,7 @@ echo "Python 3.12 found. Proceeding..." echo "Section 2: Installing Dependencies" # Install dependencies -python3.12 -m pip install --upgrade Pillow feedfinder2==0.0.4 python-dateutil newspaper3k==0.2.8 feedparser lxml[html5lib] lxml_html_clean lxml[html_clean] openai pinecone -t python/ +python3.12 -m pip install --upgrade Pillow feedfinder2==0.0.4 python-dateutil newspaper3k==0.2.8 feedparser lxml[html5lib] lxml_html_clean lxml[html_clean] qdrant-client ollama -t python/ echo "Dependencies installed successfully." ####### Section 3: Creating ZIP File ######## diff --git a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py index 4aa7b62..9d50ec0 100644 --- a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py +++ b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py @@ -48,20 +48,19 @@ def update_env_vars(function_name): # Storage Configuration 'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY', 's3'), # Default to s3 storage - # Pinecone Configuration (only used if STORAGE_STRATEGY is 'pinecone') - 'PINECONE_API_KEY': os.environ.get('PINECONE_API_KEY'), - 'PINECONE_DB_NAME': os.environ.get('PINECONE_DB_NAME'), - 'PINECONE_SHARDS': os.environ.get('PINECONE_SHARDS'), - 'PINECONE_NAMESPACE': os.environ.get('PINECONE_NAMESPACE'), + # Qdrant Configuration (only used if STORAGE_STRATEGY is 'qdrant') + 'QDRANT_URL': os.environ.get('QDRANT_URL'), + 'QDRANT_API_KEY': os.environ.get('QDRANT_API_KEY'), + 'QDRANT_COLLECTION_NAME': os.environ.get('QDRANT_COLLECTION_NAME'), # Vector Configuration 'VECTOR_EMBEDDING_MODEL': os.environ.get('VECTOR_EMBEDDING_MODEL'), 'VECTOR_EMBEDDING_DIM': os.environ.get('VECTOR_EMBEDDING_DIM'), 'VECTOR_SEARCH_METRIC': os.environ.get('VECTOR_SEARCH_METRIC'), - # OpenAI Configuration - 'OPENAI_API_KEY': os.environ.get('OPENAI_API_KEY'), - "OPENAI_EMBEDDING_MODEL": os.environ.get('OPENAI_EMBEDDING_MODEL'), + # Ollama Configuration + 'OLLAMA_HOST': os.environ.get('OLLAMA_HOST'), + 'OLLAMA_EMBEDDING_MODEL': os.environ.get('OLLAMA_EMBEDDING_MODEL'), } return lambda_client.update_function_configuration( diff --git a/src/launch/launch_env.py b/src/launch/launch_env.py index cdc85cb..dd9e899 100644 --- a/src/launch/launch_env.py +++ b/src/launch/launch_env.py @@ -90,13 +90,13 @@ def main(): env_vars["TEST"] = get_env_value("TEST", "Enter Test Value:", options=["0", "1"], advanced=advanced_mode) # Storage Strategy - env_vars["STORAGE_STRATEGY"] = get_env_value("STORAGE_STRATEGY", "Choose Storage Strategy:", options=["s3", "pinecone"], advanced=advanced_mode) - - # Pinecone Configuration (only if pinecone is selected) - if env_vars["STORAGE_STRATEGY"] == "pinecone": - env_vars["PINECONE_API_KEY"] = get_env_value("PINECONE_API_KEY", "Enter Pinecone API Key:", advanced=advanced_mode) - env_vars["PINECONE_DB_NAME"] = get_env_value("PINECONE_DB_NAME", "Enter Pinecone DB Name:", options=["open-rss-articles", "custom-rss-db"], advanced=advanced_mode) - + env_vars["STORAGE_STRATEGY"] = get_env_value("STORAGE_STRATEGY", "Choose Storage Strategy:", options=["s3", "qdrant"], advanced=advanced_mode) + + # Qdrant Configuration (only if qdrant is selected) + if env_vars["STORAGE_STRATEGY"] == "qdrant": + env_vars["QDRANT_URL"] = get_env_value("QDRANT_URL", "Enter Qdrant URL:", options=["http://localhost:6333"], advanced=advanced_mode) + env_vars["QDRANT_COLLECTION_NAME"] = get_env_value("QDRANT_COLLECTION_NAME", "Enter Qdrant Collection Name:", options=["open-rss-articles"], advanced=advanced_mode) + # Display summary display_summary(env_vars) diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 863efba..521f26c 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -48,13 +48,14 @@ def check_env() -> None: # Variables that are optional depending on the storage strategy optional_vars = { - "PINECONE_API_KEY": "pinecone", - "PINECONE_DB_NAME": "pinecone", - "OPENAI_API_KEY": "all", - "PINECONE_SHARDS": "pinecone", - "VECTOR_EMBEDDING_MODEL": "pinecone", - "VECTOR_EMBEDDING_DIM": "pinecone", - "VECTOR_SEARCH_METRIC": "pinecone" + "QDRANT_URL": "qdrant", + "QDRANT_API_KEY": "qdrant", + "QDRANT_COLLECTION_NAME": "qdrant", + "OLLAMA_HOST": "all", + "OLLAMA_EMBEDDING_MODEL": "all", + "VECTOR_EMBEDDING_MODEL": "qdrant", + "VECTOR_EMBEDDING_DIM": "qdrant", + "VECTOR_SEARCH_METRIC": "qdrant" } missing_vars: List[str] = [] diff --git a/template.env b/template.env index eed5510..8fd44f6 100644 --- a/template.env +++ b/template.env @@ -52,17 +52,16 @@ APP_NAME=RSS Feed Processor VERSION=1.0.0 -STORAGE_STRATEGY=s3 # 's3' or 'pinecone' will support others in the future. +STORAGE_STRATEGY=s3 # 's3' or 'qdrant' -# Only need to fill out this if your storage strategy is pinecone [ Not currently supported. ] -PINECONE_API_KEY=*** -PINECONE_DB_NAME=open-rss-articles -PINECONE_SHARDS=*** -PINECONE_NAMESPACE=IngestRSS-Articles +# Only need to fill out this if your storage strategy is qdrant +QDRANT_URL=http://localhost:6333 +QDRANT_API_KEY=*** +QDRANT_COLLECTION_NAME=open-rss-articles VECTOR_EMBEDDING_MODEL=*** VECTOR_EMBEDDING_DIM=*** VECTOR_SEARCH_METRIC=*** -OPENAI_API_KEY=sk** -OPENAI_EMBEDDING_MODEL=text-embedding-3-large +OLLAMA_HOST=http://localhost:11434 +OLLAMA_EMBEDDING_MODEL=nomic-embed-text