Merge branch 'main' into codex/integrate-redis-py-and-refactor-queue-processing

This commit is contained in:
2025-06-02 13:48:31 +02:00
committed by GitHub
6 changed files with 80 additions and 43 deletions

View File

@@ -7,3 +7,4 @@ pinecone
openai
tqdm
redis
minio

View File

@@ -1,17 +1,20 @@
import boto3
from minio import Minio
import os
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from collections import defaultdict
def get_s3_object_creation_dates(bucket_name):
s3 = boto3.client('s3')
client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
creation_dates = []
# List all objects in the bucket
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get('Contents', []):
creation_dates.append(obj['LastModified'].date())
for obj in client.list_objects(bucket_name, recursive=True):
creation_dates.append(obj.last_modified.date())
return creation_dates
@@ -47,7 +50,7 @@ def plot_creation_dates(dates):
print("Graph saved as 's3_object_creation_dates.png'")
def main():
bucket_name = 'open-rss-articles-us-east-1'
bucket_name = os.getenv('MINIO_BUCKET')
dates = get_s3_object_creation_dates(bucket_name)
plot_creation_dates(dates)

View File

@@ -1,4 +1,5 @@
import boto3
from minio import Minio
import json
import os
import logging
@@ -11,6 +12,15 @@ logger = logging.getLogger()
s3 = boto3.client('s3')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
minio_client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")))
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
storage_strategy = os.environ.get('STORAGE_STRATEGY')
##### Article Storage #####
@@ -49,8 +59,8 @@ def pinecone_save_article(article:dict):
upsert_vectors(index, data, namespace)
def s3_save_article(article:dict):
logger.info("Saving article to S3")
def s3_save_article(article:dict):
logger.info("Saving article to MinIO")
now = datetime.now()
article_id = article['article_id']
@@ -67,22 +77,22 @@ def s3_save_article(article:dict):
json.dump(article, f)
try:
s3.upload_file(file_path,
CONTENT_BUCKET,
file_key,
ExtraArgs={
"Metadata":
{
"rss": article.get("rss", ""),
"title": article.get("title", ""),
"unixTime": str(article.get("unixTime", "")),
"article_id": article.get("article_id", ""),
"link": article.get("link", ""),
"rss_id": article.get("rss_id", "")
}
}
)
logger.info(f"Saved article {article_id} to S3 bucket {CONTENT_BUCKET}")
metadata = {
"rss": article.get("rss", ""),
"title": article.get("title", ""),
"unixTime": str(article.get("unixTime", "")),
"article_id": article.get("article_id", ""),
"link": article.get("link", ""),
"rss_id": article.get("rss_id", "")
}
minio_client.fput_object(
CONTENT_BUCKET,
file_key,
file_path,
content_type="application/json",
metadata=metadata
)
logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}")
except Exception as e:
logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")

View File

@@ -1,4 +1,4 @@
import boto3
from minio import Minio
import pandas as pd
from typing import Optional, List, Dict, Union, Any
import json
@@ -10,7 +10,7 @@ from string import Template
from tqdm import tqdm
class S3BatchDownloader:
"""Class for batch downloading RSS articles from S3"""
"""Class for batch downloading RSS articles from a MinIO bucket"""
DEFAULT_CONFIG = {
"region": "${AWS_REGION}",
@@ -30,8 +30,15 @@ class S3BatchDownloader:
self.config = self._load_config(config_path)
self._validate_config()
self.s3 = boto3.client('s3', region_name=self.config['region'])
self.logger.info(f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}")
self.s3 = Minio(
os.getenv('MINIO_ENDPOINT'),
access_key=os.getenv('MINIO_ACCESS_KEY'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=False
)
self.logger.info(
f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}"
)
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load and process configuration"""
@@ -43,7 +50,7 @@ class S3BatchDownloader:
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'),
'RSS_BUCKET_NAME': os.getenv('S3_BUCKET_NAME')
'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET')
}
config_str = template.safe_substitute(env_vars)
@@ -68,7 +75,7 @@ class S3BatchDownloader:
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> str:
"""
Download articles from S3 to a consolidated file
Download articles from MinIO to a consolidated file
Args:
output_path: Path to save the output file.
@@ -112,25 +119,31 @@ class S3BatchDownloader:
return output_path
def _list_objects(self) -> List[Dict]:
"""List objects in S3 bucket"""
"""List objects in bucket"""
objects = []
paginator = self.s3.get_paginator('list_objects')
try:
for page in paginator.paginate(Bucket=self.config['bucket']):
if 'Contents' in page:
objects.extend(page['Contents'])
for obj in self.s3.list_objects(
self.config['bucket'],
prefix=self.config['prefix'],
recursive=True
):
objects.append({
'Key': obj.object_name,
'LastModified': obj.last_modified
})
return objects
except Exception as e:
self.logger.error(f"Error listing objects: {str(e)}")
raise
def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]:
"""Download and parse single S3 object"""
"""Download and parse single object"""
try:
response = self.s3.get_object(Bucket=self.config['bucket'], Key=obj['Key'])
content = response['Body'].read().decode('utf-8')
response = self.s3.get_object(self.config['bucket'], obj['Key'])
content = response.read().decode('utf-8')
data = json.loads(content)
metadata = response.get('Metadata', {})
stat = self.s3.stat_object(self.config['bucket'], obj['Key'])
metadata = stat.metadata
if isinstance(data, dict):
data.update(metadata)
return [data]
@@ -154,4 +167,4 @@ class S3BatchDownloader:
elif file_format == 'json':
df.to_json(output_path, orient='records', lines=True)
else:
raise ValueError(f"Unsupported file format: {file_format}")
raise ValueError(f"Unsupported file format: {file_format}")

View File

@@ -9,7 +9,11 @@ def check_env() -> None:
"AWS_REGION",
"AWS_ACCOUNT_ID",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY"
"AWS_SECRET_ACCESS_KEY",
"MINIO_ENDPOINT",
"MINIO_ACCESS_KEY",
"MINIO_SECRET_KEY",
"MINIO_BUCKET"
]
# Variables that are derived or have default values

View File

@@ -16,6 +16,12 @@ S3_BUCKET_NAME=open-rss-articles-${AWS_REGION}
REDIS_URL=redis://localhost:6379
REDIS_QUEUE_NAME=rss-feed-queue
# MinIO configuration
MINIO_ENDPOINT=***
MINIO_ACCESS_KEY=***
MINIO_SECRET_KEY=***
MINIO_BUCKET=***
LAMBDA_LAYER_VERSION=6 # This is fixed.
LAMBDA_LAYER_NAME=ingest-rss-lambda-layer-${AWS_REGION}