Merge branch 'main' into codex/migrate-rss-feed-storage-to-mongodb

This commit is contained in:
2025-06-02 13:51:22 +02:00
committed by GitHub
35 changed files with 161 additions and 281 deletions

View File

@@ -37,7 +37,7 @@ def main():
logging.info("Finished Deploying Lambda") logging.info("Finished Deploying Lambda")
deploy_sqs_filler() deploy_sqs_filler()
logging.info("Finished Deploying SQS Filler Lambda") logging.info("Finished Deploying Queue Filler Lambda")
# Update Lambda environment variables # Update Lambda environment variables
update_env_vars(os.getenv("LAMBDA_FUNCTION_NAME")) update_env_vars(os.getenv("LAMBDA_FUNCTION_NAME"))
@@ -54,6 +54,7 @@ def main():
os.getenv('MONGODB_DB_NAME'), os.getenv('MONGODB_DB_NAME'),
os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds') os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
) )
else: else:
print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.") print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.")

View File

@@ -7,3 +7,5 @@ constructs==10.2.69
pinecone pinecone
openai openai
tqdm tqdm
redis
minio

View File

@@ -1,17 +1,20 @@
import boto3 from minio import Minio
import os
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from datetime import datetime, timedelta from datetime import datetime, timedelta
from collections import defaultdict from collections import defaultdict
def get_s3_object_creation_dates(bucket_name): def get_s3_object_creation_dates(bucket_name):
s3 = boto3.client('s3') client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
creation_dates = [] creation_dates = []
# List all objects in the bucket for obj in client.list_objects(bucket_name, recursive=True):
paginator = s3.get_paginator('list_objects_v2') creation_dates.append(obj.last_modified.date())
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get('Contents', []):
creation_dates.append(obj['LastModified'].date())
return creation_dates return creation_dates
@@ -47,7 +50,7 @@ def plot_creation_dates(dates):
print("Graph saved as 's3_object_creation_dates.png'") print("Graph saved as 's3_object_creation_dates.png'")
def main(): def main():
bucket_name = 'open-rss-articles-us-east-1' bucket_name = os.getenv('MINIO_BUCKET')
dates = get_s3_object_creation_dates(bucket_name) dates = get_s3_object_creation_dates(bucket_name)
plot_creation_dates(dates) plot_creation_dates(dates)

View File

@@ -3,7 +3,6 @@ import os
from pymongo import MongoClient from pymongo import MongoClient
import logging import logging
# Set up logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -1,27 +0,0 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor DynamoDB Table'
Parameters:
DynamoDBName:
Type: String
Description: ""
Resources:
RSSFeedsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref 'DynamoDBName'
AttributeDefinitions:
- AttributeName: url
AttributeType: S
KeySchema:
- AttributeName: url
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Outputs:
TableName:
Description: 'Name of the DynamoDB table for RSS feeds'
Value: !Ref RSSFeedsTable
Export:
Name: !Sub '${AWS::StackName}-RSSFeedsTableName'

View File

@@ -37,7 +37,6 @@ Resources:
- Effect: Allow - Effect: Allow
Action: Action:
- 'sqs:*' - 'sqs:*'
- 'dynamodb:*'
- 's3:*' - 's3:*'
- 'lambda:*' - 'lambda:*'
- 'logs:*' - 'logs:*'

View File

@@ -1,22 +1,16 @@
AWSTemplateFormatVersion: '2010-09-09' AWSTemplateFormatVersion: '2010-09-09'
Description: SQS Filler Lambda Stack Description: Redis Queue Filler Lambda Stack
Parameters: Parameters:
QueueFillerLambdaName: QueueFillerLambdaName:
Type: String Type: String
Description: Name of the Lambda function Description: Name of the Lambda function
SqsQueueUrl: RedisUrl:
Type: String Type: String
Description: URL of the SQS queue Description: URL of the Redis instance
DynamoDbTableName: RedisQueueName:
Type: String Type: String
Description: Name of the DynamoDB table Description: Name of the Redis queue
DynamoDbTableArn:
Type: String
Description: ARN of the DynamoDB table
SqsQueueArn:
Type: String
Description: ARN of the SQS queue
LambdaCodeS3Bucket: LambdaCodeS3Bucket:
Type: String Type: String
Description: S3 bucket containing the Lambda function code Description: S3 bucket containing the Lambda function code
@@ -45,8 +39,8 @@ Resources:
Timeout: !Ref LambdaTimeout Timeout: !Ref LambdaTimeout
Environment: Environment:
Variables: Variables:
SQS_QUEUE_URL: !Ref SqsQueueUrl REDIS_URL: !Ref RedisUrl
DYNAMODB_TABLE_NAME: !Ref DynamoDbTableName REDIS_QUEUE_NAME: !Ref RedisQueueName
Role: !GetAtt SqsFillerFunctionRole.Arn Role: !GetAtt SqsFillerFunctionRole.Arn
SqsFillerFunctionRole: SqsFillerFunctionRole:
@@ -70,14 +64,6 @@ Resources:
- logs:CreateLogStream - logs:CreateLogStream
- logs:PutLogEvents - logs:PutLogEvents
Resource: arn:aws:logs:*:*:* Resource: arn:aws:logs:*:*:*
- Effect: Allow
Action:
- dynamodb:Scan
Resource: !Ref DynamoDbTableArn
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !Ref SqsQueueArn
- Effect: Allow - Effect: Allow
Action: Action:
- s3:GetObject - s3:GetObject
@@ -85,8 +71,8 @@ Resources:
Outputs: Outputs:
SqsFillerFunctionArn: SqsFillerFunctionArn:
Description: ARN of the SQS Filler Lambda Function Description: ARN of the Queue Filler Lambda Function
Value: !GetAtt SqsFillerFunction.Arn Value: !GetAtt SqsFillerFunction.Arn
SqsFillerFunctionRoleArn: SqsFillerFunctionRoleArn:
Description: ARN of the IAM Role for SQS Filler Lambda Function Description: ARN of the IAM Role for Queue Filler Lambda Function
Value: !GetAtt SqsFillerFunctionRole.Arn Value: !GetAtt SqsFillerFunctionRole.Arn

View File

@@ -1,36 +0,0 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor SQS Queue'
Parameters:
SQSQueueName:
Type: String
Description: ""
Resources:
RSSFeedQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Ref SQSQueueName
VisibilityTimeout: 900 # Should be set to the 3rd standard deviation of your lambda runtime distribution.
ReceiveMessageWaitTimeSeconds: 20
RedrivePolicy:
deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn
maxReceiveCount: 3
RSSFeedDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub '${AWS::StackName}-rss-feed-dlq'
Outputs:
QueueURL:
Description: 'URL of the SQS queue for RSS feeds'
Value: !Ref RSSFeedQueue
Export:
Name: !Sub '${AWS::StackName}-RSSFeedQueueURL'
DLQueueURL:
Description: 'URL of the Dead Letter Queue for RSS feeds'
Value: !Ref RSSFeedDLQ
Export:
Name: !Sub '${AWS::StackName}-RSSFeedDLQueueURL'

View File

@@ -143,13 +143,6 @@ def deploy_infrastructure():
key_info = kms_client.describe_key(KeyId=kms_key_id) key_info = kms_client.describe_key(KeyId=kms_key_id)
kms_key_arn = key_info['KeyMetadata']['Arn'] kms_key_arn = key_info['KeyMetadata']['Arn']
deploy_cloudformation('dynamo.yaml', 'DynamoDB',
parameters=[
{
'ParameterKey': 'DynamoDBName',
'ParameterValue': os.environ.get('DYNAMODB_TABLE_NAME', 'default-table-name')
}
])
deploy_cloudformation('s3.yaml', 'S3', deploy_cloudformation('s3.yaml', 'S3',
@@ -166,13 +159,6 @@ def deploy_infrastructure():
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') 'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
} }
]) ])
deploy_cloudformation('sqs.yaml', 'SQS',
parameters=[
{
'ParameterKey': 'SQSQueueName',
'ParameterValue': os.environ.get('SQS_QUEUE_NAME', 'default-queue-name')
}
])
deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True, deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True,
parameters=[ parameters=[
{ {

View File

@@ -80,26 +80,8 @@ def update_function_configuration(lambda_client, function_name, handler, role, t
@retry_with_backoff() @retry_with_backoff()
def configure_sqs_trigger(lambda_client, function_name, queue_arn): def configure_sqs_trigger(lambda_client, function_name, queue_arn):
event_source_mapping = { """Placeholder for backward compatibility. Redis deployment uses no SQS trigger."""
'FunctionName': function_name, return
'EventSourceArn': queue_arn,
'BatchSize': 1,
'MaximumBatchingWindowInSeconds': 0,
'ScalingConfig': {
'MaximumConcurrency': 50
}
}
try:
response = lambda_client.create_event_source_mapping(**event_source_mapping)
print(f"SQS trigger configured successfully for {function_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceConflictException':
print(f"SQS trigger already exists for {function_name}. Updating configuration...")
# If you want to update existing trigger, you'd need to list existing mappings and update them
# This is left as an exercise as it requires additional error handling and logic
else:
raise e
@retry_with_backoff() @retry_with_backoff()
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy): def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy):
@@ -219,13 +201,6 @@ def deploy_lambda():
policy = get_lambda_policy() policy = get_lambda_policy()
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy) create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy)
# Configure SQS trigger
queue_arn = os.getenv('SQS_QUEUE_ARN') # Make sure to set this environment variable
if queue_arn:
configure_sqs_trigger(lambda_client, LAMBDA_NAME, queue_arn)
else:
print("Warning: SQS_QUEUE_ARN not set. Skipping SQS trigger configuration.")
print("Lambda deployment completed successfully!") print("Lambda deployment completed successfully!")
except Exception as e: except Exception as e:

View File

@@ -1,15 +1,15 @@
import os import os
# SQS Configuration # Redis Configuration
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] REDIS_URL = os.environ["REDIS_URL"]
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
# Logging Configuration # Logging Configuration
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
# RSS Feed Processing Configuration # RSS Feed Processing Configuration
MAX_ARTICLES_PER_FEED = int(os.environ.get('MAX_ARTICLES_PER_FEED', '10')) MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10"))
FEED_PROCESSING_TIMEOUT = int(os.environ.get('FEED_PROCESSING_TIMEOUT', '90')) FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90"))
# Article Extraction Configuration # Article Extraction Configuration
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get('ARTICLE_EXTRACTION_TIMEOUT', '30')) ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30"))

View File

@@ -1,8 +1,8 @@
import boto3 import boto3
from minio import Minio
import json import json
import os import os
import logging import logging
from random import randint
from datetime import datetime from datetime import datetime
from pymongo import MongoClient from pymongo import MongoClient
@@ -13,6 +13,15 @@ logger = logging.getLogger()
s3 = boto3.client('s3') s3 = boto3.client('s3')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
minio_client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")))
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
storage_strategy = os.environ.get('STORAGE_STRATEGY') storage_strategy = os.environ.get('STORAGE_STRATEGY')
MONGODB_URL = os.getenv("MONGODB_URL") MONGODB_URL = os.getenv("MONGODB_URL")
@@ -57,11 +66,9 @@ def pinecone_save_article(article:dict):
logger.info("Upserting article to Pinecone") logger.info("Upserting article to Pinecone")
upsert_vectors(index, data, namespace) upsert_vectors(index, data, namespace)
def dynamodb_save_article(article:dict):
pass
def s3_save_article(article:dict): def s3_save_article(article:dict):
logger.info("Saving article to S3") logger.info("Saving article to MinIO")
now = datetime.now() now = datetime.now()
article_id = article['article_id'] article_id = article['article_id']
@@ -78,37 +85,42 @@ def s3_save_article(article:dict):
json.dump(article, f) json.dump(article, f)
try: try:
s3.upload_file(file_path, metadata = {
CONTENT_BUCKET, "rss": article.get("rss", ""),
file_key, "title": article.get("title", ""),
ExtraArgs={ "unixTime": str(article.get("unixTime", "")),
"Metadata": "article_id": article.get("article_id", ""),
{ "link": article.get("link", ""),
"rss": article.get("rss", ""), "rss_id": article.get("rss_id", "")
"title": article.get("title", ""), }
"unixTime": str(article.get("unixTime", "")), minio_client.fput_object(
"article_id": article.get("article_id", ""), CONTENT_BUCKET,
"link": article.get("link", ""), file_key,
"rss_id": article.get("rss_id", "") file_path,
} content_type="application/json",
} metadata=metadata
) )
logger.info(f"Saved article {article_id} to S3 bucket {CONTENT_BUCKET}") logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}")
except Exception as e: except Exception as e:
logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}") logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
###### Feed Storage ###### ###### Feed Storage ######
RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
def update_rss_feed(feed: dict, last_pub_dt: int): def update_rss_feed(feed: dict, last_pub_dt: int):
try: try:
feeds_collection.update_one( if not os.path.exists(RSS_FEEDS_FILE):
{"url": feed["u"]}, return
{"$set": {"dt": last_pub_dt}}, with open(RSS_FEEDS_FILE, "r") as f:
upsert=True, feeds = json.load(f)
) for item in feeds:
logger.info( if item.get("u") == feed["u"]:
f"Updated RSS feed in MongoDB: {feed['u']} with dt: {last_pub_dt}" item["dt"] = int(last_pub_dt)
) with open(RSS_FEEDS_FILE, "w") as f:
json.dump(feeds, f)
logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
except Exception as e: except Exception as e:
logger.error(f"Failed to update RSS feed: {str(e)}") logger.error(f"Failed to update RSS feed: {str(e)}")

View File

@@ -1,75 +1,60 @@
import json import json
import time import time
from feed_processor import process_feed import os
import redis
from feed_processor import extract_feed
from data_storage import save_article, update_rss_feed from data_storage import save_article, update_rss_feed
from utils import setup_logging from utils import setup_logging
from config import SQS_QUEUE_URL from config import REDIS_URL, REDIS_QUEUE_NAME
from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError from exceptions import RSSProcessingError, DataStorageError
from metrics import record_processed_articles, record_processing_time, record_extraction_errors from metrics import (
import boto3 record_processed_articles,
import os record_processing_time,
from feed_processor import extract_feed record_extraction_errors,
)
# Set up logging
logger = setup_logging() logger = setup_logging()
storage_strategy = os.environ.get("STORAGE_STRATEGY")
redis_client = redis.Redis.from_url(REDIS_URL)
storage_strategy = os.environ.get('STORAGE_STRATEGY')
# Initialize AWS clients
sqs = boto3.client('sqs')
def lambda_handler(event, context): def lambda_handler(event, context):
logger.info("Starting RSS feed processing") logger.info("Starting RSS feed processing")
start_time = time.time() start_time = time.time()
try: try:
# Receive message from SQS feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
event_source = event["Records"][0]["eventSource"] if not feed_data:
if event_source == "aws:sqs": logger.info("No messages in queue")
feed = event["Records"][0]["body"] return {"statusCode": 200, "body": json.dumps("No feeds to process")}
logger.info(f"Received message from SQS: {feed}") feed = json.loads(feed_data)
feed = json.loads(feed)
receipt_handle = event["Records"][0]['receiptHandle']
# Process the feed
result = extract_feed(feed) result = extract_feed(feed)
logger.info(f"Process Feed Result Dictionary: {result}") logger.info(f"Process Feed Result Dictionary: {result}")
last_pub_dt = result['max_date'] last_pub_dt = result["max_date"]
if result: if result:
# Save articles and update feed for article in result["articles"]:
for article in result['articles']:
try: try:
save_article(article, storage_strategy) save_article(article, storage_strategy)
except DataStorageError as e: except DataStorageError as e:
logger.error(f"Failed to save article: {str(e)}") logger.error(f"Failed to save article: {str(e)}")
record_extraction_errors(1) record_extraction_errors(1)
update_rss_feed(result['feed'], last_pub_dt) update_rss_feed(result["feed"], last_pub_dt)
# Delete the message from the queue
logger.info("Deleting sqs queue message")
try:
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
except Exception as e:
logger.error(f"Error deleting message from SQS: {str(e)}")
logger.info("We can skip this but delete this block of code if it fails. This means the queue is already deleted when it triggers.")
logger.info(f"Processed feed: {feed['u']}") logger.info(f"Processed feed: {feed['u']}")
record_processed_articles(len(result["articles"]))
# Record metrics
record_processed_articles(len(result['articles']))
else: else:
logger.warning(f"Failed to process feed: {feed['u']}") logger.warning(f"Failed to process feed: {feed['u']}")
record_extraction_errors(1) record_extraction_errors(1)
except RSSProcessingError as e: except RSSProcessingError as e:
logger.error(f"RSS Processing Error: {str(e)}") logger.error(f"RSS Processing Error: {str(e)}")
return {'statusCode': 500, 'body': json.dumps('RSS processing failed')} return {"statusCode": 500, "body": json.dumps("RSS processing failed")}
except Exception as e: except Exception as e:
logger.error(f"Unexpected error: {str(e)}") logger.error(f"Unexpected error: {str(e)}")
return {'statusCode': 500, 'body': json.dumps('An unexpected error occurred')} return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")}
finally: finally:
end_time = time.time() end_time = time.time()
@@ -77,7 +62,4 @@ def lambda_handler(event, context):
record_processing_time(processing_time) record_processing_time(processing_time)
logger.info(f"Lambda execution time: {processing_time:.2f} seconds") logger.info(f"Lambda execution time: {processing_time:.2f} seconds")
return { return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")}
'statusCode': 200,
'body': json.dumps('RSS feed processed successfully')
}

View File

@@ -49,20 +49,12 @@ def deploy_sqs_filler():
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME') 'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME')
}, },
{ {
'ParameterKey': 'SqsQueueUrl', 'ParameterKey': 'RedisUrl',
'ParameterValue': os.getenv('SQS_QUEUE_URL') 'ParameterValue': os.getenv('REDIS_URL')
}, },
{ {
'ParameterKey': 'DynamoDbTableName', 'ParameterKey': 'RedisQueueName',
'ParameterValue': os.getenv('DYNAMODB_TABLE_NAME') 'ParameterValue': os.getenv('REDIS_QUEUE_NAME')
},
{
'ParameterKey': 'DynamoDbTableArn',
'ParameterValue': os.getenv('DYNAMODB_TABLE_ARN')
},
{
'ParameterKey': 'SqsQueueArn',
'ParameterValue': os.getenv('SQS_QUEUE_ARN')
}, },
{ {
'ParameterKey': 'LambdaCodeS3Bucket', 'ParameterKey': 'LambdaCodeS3Bucket',

View File

@@ -1,10 +1,9 @@
import json import json
import os import os
import boto3
from decimal import Decimal
from datetime import datetime
import logging import logging
from pymongo import MongoClient from pymongo import MongoClient
from datetime import datetime
import redis
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel("INFO") logger.setLevel("INFO")
@@ -54,6 +53,6 @@ def handler(event, context):
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}") logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
return { return {
'statusCode': 200, "statusCode": 200,
'body': json.dumps(f'Sent {messages_sent} RSS URLs to SQS') "body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"),
} }

View File

@@ -29,14 +29,9 @@ def update_env_vars(function_name):
'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'), 'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'),
'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'), 'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'),
# DynamoDB Configuration # Redis Configuration
'DYNAMODB_TABLE_NAME': os.environ.get('DYNAMODB_TABLE_NAME'), 'REDIS_URL': os.environ.get('REDIS_URL'),
'DYNAMODB_TABLE_ARN': os.environ.get('DYNAMODB_TABLE_ARN'), 'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'),
# SQS Configuration
'SQS_QUEUE_NAME': os.environ.get('SQS_QUEUE_NAME'),
'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'),
'SQS_QUEUE_ARN': os.environ.get('SQS_QUEUE_ARN'),
# Queue Filler Lambda Configuration # Queue Filler Lambda Configuration
'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'), 'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'),

View File

@@ -62,8 +62,8 @@ def main():
env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}" env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}"
env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}" env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}"
env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}" env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}"
env_vars["DYNAMODB_TABLE_NAME"] = get_env_value("DYNAMODB_TABLE_NAME", "Enter DynamoDB Table Name:", options=["rss-feeds-table", "custom-rss-table"], advanced=advanced_mode) env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode)
env_vars["SQS_QUEUE_NAME"] = get_env_value("SQS_QUEUE_NAME", "Enter SQS Queue Name:", options=["rss-feed-queue", "custom-rss-queue"], advanced=advanced_mode) env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode)
# Advanced Configuration # Advanced Configuration
env_vars["LAMBDA_LAYER_VERSION"] = 3 env_vars["LAMBDA_LAYER_VERSION"] = 3
@@ -72,9 +72,6 @@ def main():
env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}" env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}"
env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}" env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}"
env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode) env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode)
env_vars["SQS_QUEUE_URL"] = f"https://sqs.{env_vars['AWS_REGION']}.amazonaws.com/{env_vars['AWS_ACCOUNT_ID']}/{env_vars['SQS_QUEUE_NAME']}"
env_vars["SQS_QUEUE_ARN"] = f"arn:aws:sqs:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:{env_vars['SQS_QUEUE_NAME']}"
env_vars["DYNAMODB_TABLE_ARN"] = f"arn:aws:dynamodb:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:table/{env_vars['DYNAMODB_TABLE_NAME']}"
env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode) env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode)
env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}" env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}"

View File

@@ -1,4 +1,4 @@
import boto3 from minio import Minio
import pandas as pd import pandas as pd
from typing import Optional, List, Dict, Union, Any from typing import Optional, List, Dict, Union, Any
import json import json
@@ -10,7 +10,7 @@ from string import Template
from tqdm import tqdm from tqdm import tqdm
class S3BatchDownloader: class S3BatchDownloader:
"""Class for batch downloading RSS articles from S3""" """Class for batch downloading RSS articles from a MinIO bucket"""
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
"region": "${AWS_REGION}", "region": "${AWS_REGION}",
@@ -30,8 +30,15 @@ class S3BatchDownloader:
self.config = self._load_config(config_path) self.config = self._load_config(config_path)
self._validate_config() self._validate_config()
self.s3 = boto3.client('s3', region_name=self.config['region']) self.s3 = Minio(
self.logger.info(f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}") os.getenv('MINIO_ENDPOINT'),
access_key=os.getenv('MINIO_ACCESS_KEY'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=False
)
self.logger.info(
f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}"
)
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load and process configuration""" """Load and process configuration"""
@@ -43,7 +50,7 @@ class S3BatchDownloader:
env_vars = { env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'), 'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'),
'RSS_BUCKET_NAME': os.getenv('S3_BUCKET_NAME') 'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET')
} }
config_str = template.safe_substitute(env_vars) config_str = template.safe_substitute(env_vars)
@@ -68,7 +75,7 @@ class S3BatchDownloader:
start_date: Optional[str] = None, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> str: end_date: Optional[str] = None) -> str:
""" """
Download articles from S3 to a consolidated file Download articles from MinIO to a consolidated file
Args: Args:
output_path: Path to save the output file. output_path: Path to save the output file.
@@ -112,25 +119,31 @@ class S3BatchDownloader:
return output_path return output_path
def _list_objects(self) -> List[Dict]: def _list_objects(self) -> List[Dict]:
"""List objects in S3 bucket""" """List objects in bucket"""
objects = [] objects = []
paginator = self.s3.get_paginator('list_objects')
try: try:
for page in paginator.paginate(Bucket=self.config['bucket']): for obj in self.s3.list_objects(
if 'Contents' in page: self.config['bucket'],
objects.extend(page['Contents']) prefix=self.config['prefix'],
recursive=True
):
objects.append({
'Key': obj.object_name,
'LastModified': obj.last_modified
})
return objects return objects
except Exception as e: except Exception as e:
self.logger.error(f"Error listing objects: {str(e)}") self.logger.error(f"Error listing objects: {str(e)}")
raise raise
def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]: def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]:
"""Download and parse single S3 object""" """Download and parse single object"""
try: try:
response = self.s3.get_object(Bucket=self.config['bucket'], Key=obj['Key']) response = self.s3.get_object(self.config['bucket'], obj['Key'])
content = response['Body'].read().decode('utf-8') content = response.read().decode('utf-8')
data = json.loads(content) data = json.loads(content)
metadata = response.get('Metadata', {}) stat = self.s3.stat_object(self.config['bucket'], obj['Key'])
metadata = stat.metadata
if isinstance(data, dict): if isinstance(data, dict):
data.update(metadata) data.update(metadata)
return [data] return [data]

View File

@@ -9,7 +9,11 @@ def check_env() -> None:
"AWS_REGION", "AWS_REGION",
"AWS_ACCOUNT_ID", "AWS_ACCOUNT_ID",
"AWS_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY" "AWS_SECRET_ACCESS_KEY",
"MINIO_ENDPOINT",
"MINIO_ACCESS_KEY",
"MINIO_SECRET_KEY",
"MINIO_BUCKET"
] ]
# Variables that are derived or have default values # Variables that are derived or have default values
@@ -20,16 +24,13 @@ def check_env() -> None:
"LAMBDA_EXECUTION_ROLE_NAME", "LAMBDA_EXECUTION_ROLE_NAME",
"LAMBDA_ROLE_ARN", "LAMBDA_ROLE_ARN",
"S3_BUCKET_NAME", "S3_BUCKET_NAME",
"DYNAMODB_TABLE_NAME", "REDIS_URL",
"SQS_QUEUE_NAME", "REDIS_QUEUE_NAME",
"LAMBDA_LAYER_VERSION", "LAMBDA_LAYER_VERSION",
"LAMBDA_LAYER_NAME", "LAMBDA_LAYER_NAME",
"LAMBDA_LAYER_ARN", "LAMBDA_LAYER_ARN",
"S3_LAYER_BUCKET_NAME", "S3_LAYER_BUCKET_NAME",
"S3_LAYER_KEY_NAME", "S3_LAYER_KEY_NAME",
"SQS_QUEUE_URL",
"SQS_QUEUE_ARN",
"DYNAMODB_TABLE_ARN",
"PYTHON_VERSION", "PYTHON_VERSION",
"LAMBDA_RUNTIME", "LAMBDA_RUNTIME",
"LAMBDA_TIMEOUT", "LAMBDA_TIMEOUT",

View File

@@ -13,8 +13,14 @@ STACK_BASE=${LAMBDA_FUNCTION_NAME}
LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role-${AWS_REGION} LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role-${AWS_REGION}
LAMBDA_ROLE_ARN=arn:aws:iam::${AWS_ACCOUNT_ID}:role/${LAMBDA_EXECUTION_ROLE_NAME} LAMBDA_ROLE_ARN=arn:aws:iam::${AWS_ACCOUNT_ID}:role/${LAMBDA_EXECUTION_ROLE_NAME}
S3_BUCKET_NAME=open-rss-articles-${AWS_REGION} S3_BUCKET_NAME=open-rss-articles-${AWS_REGION}
DYNAMODB_TABLE_NAME=rss-feeds-table REDIS_URL=redis://localhost:6379
SQS_QUEUE_NAME=rss-feed-queue REDIS_QUEUE_NAME=rss-feed-queue
# MinIO configuration
MINIO_ENDPOINT=***
MINIO_ACCESS_KEY=***
MINIO_SECRET_KEY=***
MINIO_BUCKET=***
LAMBDA_LAYER_VERSION=6 # This is fixed. LAMBDA_LAYER_VERSION=6 # This is fixed.
@@ -25,10 +31,6 @@ S3_LAMBDA_ZIPPED_BUCKET_NAME=open-rss-lambda-${AWS_REGION}
S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION} S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION}
S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies
SQS_QUEUE_URL=https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME}
SQS_QUEUE_ARN=arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SQS_QUEUE_NAME}
DYNAMODB_TABLE_ARN=arn:aws:dynamodb:${AWS_REGION}:${AWS_ACCOUNT_ID}:table/${DYNAMODB_TABLE_NAME}
PYTHON_VERSION=3.12 PYTHON_VERSION=3.12
LAMBDA_RUNTIME=python${PYTHON_VERSION} LAMBDA_RUNTIME=python${PYTHON_VERSION}
LAMBDA_TIMEOUT=300 LAMBDA_TIMEOUT=300

View File

@@ -17,7 +17,6 @@
│   │   ├── __pycache__ │   │   ├── __pycache__
│   │   │   └── deploy_infrastructure.cpython-312.pyc │   │   │   └── deploy_infrastructure.cpython-312.pyc
│   │   ├── cloudformation │   │   ├── cloudformation
│   │   │   ├── dynamo.yaml
│   │   │   ├── lambda_role.yaml │   │   │   ├── lambda_role.yaml
│   │   │   ├── rss_lambda_stack.yaml │   │   │   ├── rss_lambda_stack.yaml
│   │   │   ├── s3.yaml │   │   │   ├── s3.yaml