From 7503a452dc920a0ba1ce8ab19071257ba042b1b3 Mon Sep 17 00:00:00 2001 From: Aljaz Date: Mon, 2 Jun 2025 13:45:51 +0200 Subject: [PATCH] Switch to MinIO --- requirements.txt | 4 +- src/analysis-toolkit/s3_object_ingestion.py | 19 ++++---- .../src/data_storage.py | 46 +++++++++++-------- src/search/batch/downloader.py | 45 +++++++++++------- src/utils/check_env.py | 6 ++- template.env | 8 +++- 6 files changed, 81 insertions(+), 47 deletions(-) diff --git a/requirements.txt b/requirements.txt index 4aeda36..88fcb0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,6 @@ constructs==10.2.69 # Optional, yet necessary for the Pinecone SDK functionality. pinecone openai -tqdm \ No newline at end of file +tqdm +minio + diff --git a/src/analysis-toolkit/s3_object_ingestion.py b/src/analysis-toolkit/s3_object_ingestion.py index a3ac1b3..176292d 100644 --- a/src/analysis-toolkit/s3_object_ingestion.py +++ b/src/analysis-toolkit/s3_object_ingestion.py @@ -1,17 +1,20 @@ -import boto3 +from minio import Minio +import os import matplotlib.pyplot as plt from datetime import datetime, timedelta from collections import defaultdict def get_s3_object_creation_dates(bucket_name): - s3 = boto3.client('s3') + client = Minio( + os.getenv("MINIO_ENDPOINT"), + access_key=os.getenv("MINIO_ACCESS_KEY"), + secret_key=os.getenv("MINIO_SECRET_KEY"), + secure=False + ) creation_dates = [] - # List all objects in the bucket - paginator = s3.get_paginator('list_objects_v2') - for page in paginator.paginate(Bucket=bucket_name): - for obj in page.get('Contents', []): - creation_dates.append(obj['LastModified'].date()) + for obj in client.list_objects(bucket_name, recursive=True): + creation_dates.append(obj.last_modified.date()) return creation_dates @@ -47,7 +50,7 @@ def plot_creation_dates(dates): print("Graph saved as 's3_object_creation_dates.png'") def main(): - bucket_name = 'open-rss-articles-us-east-1' + bucket_name = os.getenv('MINIO_BUCKET') dates = get_s3_object_creation_dates(bucket_name) plot_creation_dates(dates) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index 2c4e034..c312d0f 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -1,4 +1,5 @@ import boto3 +from minio import Minio import json import os import logging @@ -9,10 +10,15 @@ from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize logger = logging.getLogger() -s3 = boto3.client('s3') dynamodb = boto3.resource('dynamodb') -CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) +minio_client = Minio( + os.getenv("MINIO_ENDPOINT"), + access_key=os.getenv("MINIO_ACCESS_KEY"), + secret_key=os.getenv("MINIO_SECRET_KEY"), + secure=False +) +CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))) DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") storage_strategy = os.environ.get('STORAGE_STRATEGY') @@ -54,8 +60,8 @@ def pinecone_save_article(article:dict): def dynamodb_save_article(article:dict): pass -def s3_save_article(article:dict): - logger.info("Saving article to S3") +def s3_save_article(article:dict): + logger.info("Saving article to MinIO") now = datetime.now() article_id = article['article_id'] @@ -72,22 +78,22 @@ def s3_save_article(article:dict): json.dump(article, f) try: - s3.upload_file(file_path, - CONTENT_BUCKET, - file_key, - ExtraArgs={ - "Metadata": - { - "rss": article.get("rss", ""), - "title": article.get("title", ""), - "unixTime": str(article.get("unixTime", "")), - "article_id": article.get("article_id", ""), - "link": article.get("link", ""), - "rss_id": article.get("rss_id", "") - } - } - ) - logger.info(f"Saved article {article_id} to S3 bucket {CONTENT_BUCKET}") + metadata = { + "rss": article.get("rss", ""), + "title": article.get("title", ""), + "unixTime": str(article.get("unixTime", "")), + "article_id": article.get("article_id", ""), + "link": article.get("link", ""), + "rss_id": article.get("rss_id", "") + } + minio_client.fput_object( + CONTENT_BUCKET, + file_key, + file_path, + content_type="application/json", + metadata=metadata + ) + logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}") except Exception as e: logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}") diff --git a/src/search/batch/downloader.py b/src/search/batch/downloader.py index e6c7362..c4725fe 100644 --- a/src/search/batch/downloader.py +++ b/src/search/batch/downloader.py @@ -1,4 +1,4 @@ -import boto3 +from minio import Minio import pandas as pd from typing import Optional, List, Dict, Union, Any import json @@ -10,7 +10,7 @@ from string import Template from tqdm import tqdm class S3BatchDownloader: - """Class for batch downloading RSS articles from S3""" + """Class for batch downloading RSS articles from a MinIO bucket""" DEFAULT_CONFIG = { "region": "${AWS_REGION}", @@ -30,8 +30,15 @@ class S3BatchDownloader: self.config = self._load_config(config_path) self._validate_config() - self.s3 = boto3.client('s3', region_name=self.config['region']) - self.logger.info(f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}") + self.s3 = Minio( + os.getenv('MINIO_ENDPOINT'), + access_key=os.getenv('MINIO_ACCESS_KEY'), + secret_key=os.getenv('MINIO_SECRET_KEY'), + secure=False + ) + self.logger.info( + f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}" + ) def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: """Load and process configuration""" @@ -43,7 +50,7 @@ class S3BatchDownloader: env_vars = { 'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'), - 'RSS_BUCKET_NAME': os.getenv('S3_BUCKET_NAME') + 'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET') } config_str = template.safe_substitute(env_vars) @@ -68,7 +75,7 @@ class S3BatchDownloader: start_date: Optional[str] = None, end_date: Optional[str] = None) -> str: """ - Download articles from S3 to a consolidated file + Download articles from MinIO to a consolidated file Args: output_path: Path to save the output file. @@ -112,25 +119,31 @@ class S3BatchDownloader: return output_path def _list_objects(self) -> List[Dict]: - """List objects in S3 bucket""" + """List objects in bucket""" objects = [] - paginator = self.s3.get_paginator('list_objects') try: - for page in paginator.paginate(Bucket=self.config['bucket']): - if 'Contents' in page: - objects.extend(page['Contents']) + for obj in self.s3.list_objects( + self.config['bucket'], + prefix=self.config['prefix'], + recursive=True + ): + objects.append({ + 'Key': obj.object_name, + 'LastModified': obj.last_modified + }) return objects except Exception as e: self.logger.error(f"Error listing objects: {str(e)}") raise def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]: - """Download and parse single S3 object""" + """Download and parse single object""" try: - response = self.s3.get_object(Bucket=self.config['bucket'], Key=obj['Key']) - content = response['Body'].read().decode('utf-8') + response = self.s3.get_object(self.config['bucket'], obj['Key']) + content = response.read().decode('utf-8') data = json.loads(content) - metadata = response.get('Metadata', {}) + stat = self.s3.stat_object(self.config['bucket'], obj['Key']) + metadata = stat.metadata if isinstance(data, dict): data.update(metadata) return [data] @@ -154,4 +167,4 @@ class S3BatchDownloader: elif file_format == 'json': df.to_json(output_path, orient='records', lines=True) else: - raise ValueError(f"Unsupported file format: {file_format}") \ No newline at end of file + raise ValueError(f"Unsupported file format: {file_format}") diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 5592dda..1364e16 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -9,7 +9,11 @@ def check_env() -> None: "AWS_REGION", "AWS_ACCOUNT_ID", "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY" + "AWS_SECRET_ACCESS_KEY", + "MINIO_ENDPOINT", + "MINIO_ACCESS_KEY", + "MINIO_SECRET_KEY", + "MINIO_BUCKET" ] # Variables that are derived or have default values diff --git a/template.env b/template.env index 4142e73..e84dbf3 100644 --- a/template.env +++ b/template.env @@ -16,6 +16,12 @@ S3_BUCKET_NAME=open-rss-articles-${AWS_REGION} DYNAMODB_TABLE_NAME=rss-feeds-table SQS_QUEUE_NAME=rss-feed-queue +# MinIO configuration +MINIO_ENDPOINT=*** +MINIO_ACCESS_KEY=*** +MINIO_SECRET_KEY=*** +MINIO_BUCKET=*** + LAMBDA_LAYER_VERSION=6 # This is fixed. LAMBDA_LAYER_NAME=ingest-rss-lambda-layer-${AWS_REGION} @@ -58,4 +64,4 @@ VECTOR_EMBEDDING_DIM=*** VECTOR_SEARCH_METRIC=*** OPENAI_API_KEY=sk** -OPENAI_EMBEDDING_MODEL=text-embedding-3-large \ No newline at end of file +OPENAI_EMBEDDING_MODEL=text-embedding-3-large