diff --git a/launch.py b/launch.py index 253b2ee..ba76d12 100644 --- a/launch.py +++ b/launch.py @@ -37,7 +37,7 @@ def main(): logging.info("Finished Deploying Lambda") deploy_sqs_filler() - logging.info("Finished Deploying SQS Filler Lambda") + logging.info("Finished Deploying Queue Filler Lambda") # Update Lambda environment variables update_env_vars(os.getenv("LAMBDA_FUNCTION_NAME")) @@ -54,6 +54,7 @@ def main(): os.getenv('MONGODB_DB_NAME'), os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds') ) + else: print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.") diff --git a/requirements.txt b/requirements.txt index 7368e03..16d1f3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,6 @@ constructs==10.2.69 # Optional, yet necessary for the Pinecone SDK functionality. pinecone openai -tqdm \ No newline at end of file +tqdm +redis +minio \ No newline at end of file diff --git a/src/analysis-toolkit/s3_object_ingestion.py b/src/analysis-toolkit/s3_object_ingestion.py index a3ac1b3..176292d 100644 --- a/src/analysis-toolkit/s3_object_ingestion.py +++ b/src/analysis-toolkit/s3_object_ingestion.py @@ -1,17 +1,20 @@ -import boto3 +from minio import Minio +import os import matplotlib.pyplot as plt from datetime import datetime, timedelta from collections import defaultdict def get_s3_object_creation_dates(bucket_name): - s3 = boto3.client('s3') + client = Minio( + os.getenv("MINIO_ENDPOINT"), + access_key=os.getenv("MINIO_ACCESS_KEY"), + secret_key=os.getenv("MINIO_SECRET_KEY"), + secure=False + ) creation_dates = [] - # List all objects in the bucket - paginator = s3.get_paginator('list_objects_v2') - for page in paginator.paginate(Bucket=bucket_name): - for obj in page.get('Contents', []): - creation_dates.append(obj['LastModified'].date()) + for obj in client.list_objects(bucket_name, recursive=True): + creation_dates.append(obj.last_modified.date()) return creation_dates @@ -47,7 +50,7 @@ def plot_creation_dates(dates): print("Graph saved as 's3_object_creation_dates.png'") def main(): - bucket_name = 'open-rss-articles-us-east-1' + bucket_name = os.getenv('MINIO_BUCKET') dates = get_s3_object_creation_dates(bucket_name) plot_creation_dates(dates) diff --git a/src/feed_management/upload_rss_feeds.py b/src/feed_management/upload_rss_feeds.py index bc66794..3e53eb1 100644 --- a/src/feed_management/upload_rss_feeds.py +++ b/src/feed_management/upload_rss_feeds.py @@ -3,7 +3,6 @@ import os from pymongo import MongoClient import logging -# Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc b/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc deleted file mode 100644 index 188e56f..0000000 Binary files a/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc and /dev/null differ diff --git a/src/infra/cloudformation/dynamo.yaml b/src/infra/cloudformation/dynamo.yaml deleted file mode 100644 index 864c00d..0000000 --- a/src/infra/cloudformation/dynamo.yaml +++ /dev/null @@ -1,27 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'CloudFormation template for RSS Feed Processor DynamoDB Table' - -Parameters: - DynamoDBName: - Type: String - Description: "" - -Resources: - RSSFeedsTable: - Type: AWS::DynamoDB::Table - Properties: - TableName: !Ref 'DynamoDBName' - AttributeDefinitions: - - AttributeName: url - AttributeType: S - KeySchema: - - AttributeName: url - KeyType: HASH - BillingMode: PAY_PER_REQUEST - -Outputs: - TableName: - Description: 'Name of the DynamoDB table for RSS feeds' - Value: !Ref RSSFeedsTable - Export: - Name: !Sub '${AWS::StackName}-RSSFeedsTableName' \ No newline at end of file diff --git a/src/infra/cloudformation/lambda_role.yaml b/src/infra/cloudformation/lambda_role.yaml index bafd81d..58acd32 100644 --- a/src/infra/cloudformation/lambda_role.yaml +++ b/src/infra/cloudformation/lambda_role.yaml @@ -37,7 +37,6 @@ Resources: - Effect: Allow Action: - 'sqs:*' - - 'dynamodb:*' - 's3:*' - 'lambda:*' - 'logs:*' diff --git a/src/infra/cloudformation/rss_lambda_stack.yaml b/src/infra/cloudformation/rss_lambda_stack.yaml index 725d1a0..be04071 100644 --- a/src/infra/cloudformation/rss_lambda_stack.yaml +++ b/src/infra/cloudformation/rss_lambda_stack.yaml @@ -1,22 +1,16 @@ AWSTemplateFormatVersion: '2010-09-09' -Description: SQS Filler Lambda Stack +Description: Redis Queue Filler Lambda Stack Parameters: QueueFillerLambdaName: Type: String Description: Name of the Lambda function - SqsQueueUrl: + RedisUrl: Type: String - Description: URL of the SQS queue - DynamoDbTableName: + Description: URL of the Redis instance + RedisQueueName: Type: String - Description: Name of the DynamoDB table - DynamoDbTableArn: - Type: String - Description: ARN of the DynamoDB table - SqsQueueArn: - Type: String - Description: ARN of the SQS queue + Description: Name of the Redis queue LambdaCodeS3Bucket: Type: String Description: S3 bucket containing the Lambda function code @@ -45,8 +39,8 @@ Resources: Timeout: !Ref LambdaTimeout Environment: Variables: - SQS_QUEUE_URL: !Ref SqsQueueUrl - DYNAMODB_TABLE_NAME: !Ref DynamoDbTableName + REDIS_URL: !Ref RedisUrl + REDIS_QUEUE_NAME: !Ref RedisQueueName Role: !GetAtt SqsFillerFunctionRole.Arn SqsFillerFunctionRole: @@ -70,14 +64,6 @@ Resources: - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - - Effect: Allow - Action: - - dynamodb:Scan - Resource: !Ref DynamoDbTableArn - - Effect: Allow - Action: - - sqs:SendMessage - Resource: !Ref SqsQueueArn - Effect: Allow Action: - s3:GetObject @@ -85,8 +71,8 @@ Resources: Outputs: SqsFillerFunctionArn: - Description: ARN of the SQS Filler Lambda Function + Description: ARN of the Queue Filler Lambda Function Value: !GetAtt SqsFillerFunction.Arn SqsFillerFunctionRoleArn: - Description: ARN of the IAM Role for SQS Filler Lambda Function + Description: ARN of the IAM Role for Queue Filler Lambda Function Value: !GetAtt SqsFillerFunctionRole.Arn \ No newline at end of file diff --git a/src/infra/cloudformation/sqs.yaml b/src/infra/cloudformation/sqs.yaml deleted file mode 100644 index 10b8145..0000000 --- a/src/infra/cloudformation/sqs.yaml +++ /dev/null @@ -1,36 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'CloudFormation template for RSS Feed Processor SQS Queue' - -Parameters: - SQSQueueName: - Type: String - Description: "" - -Resources: - RSSFeedQueue: - Type: AWS::SQS::Queue - Properties: - QueueName: !Ref SQSQueueName - VisibilityTimeout: 900 # Should be set to the 3rd standard deviation of your lambda runtime distribution. - ReceiveMessageWaitTimeSeconds: 20 - RedrivePolicy: - deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn - maxReceiveCount: 3 - - RSSFeedDLQ: - Type: AWS::SQS::Queue - Properties: - QueueName: !Sub '${AWS::StackName}-rss-feed-dlq' - -Outputs: - QueueURL: - Description: 'URL of the SQS queue for RSS feeds' - Value: !Ref RSSFeedQueue - Export: - Name: !Sub '${AWS::StackName}-RSSFeedQueueURL' - - DLQueueURL: - Description: 'URL of the Dead Letter Queue for RSS feeds' - Value: !Ref RSSFeedDLQ - Export: - Name: !Sub '${AWS::StackName}-RSSFeedDLQueueURL' \ No newline at end of file diff --git a/src/infra/deploy_infrastructure.py b/src/infra/deploy_infrastructure.py index 25f7575..e843947 100644 --- a/src/infra/deploy_infrastructure.py +++ b/src/infra/deploy_infrastructure.py @@ -143,13 +143,6 @@ def deploy_infrastructure(): key_info = kms_client.describe_key(KeyId=kms_key_id) kms_key_arn = key_info['KeyMetadata']['Arn'] - deploy_cloudformation('dynamo.yaml', 'DynamoDB', - parameters=[ - { - 'ParameterKey': 'DynamoDBName', - 'ParameterValue': os.environ.get('DYNAMODB_TABLE_NAME', 'default-table-name') - } - ]) deploy_cloudformation('s3.yaml', 'S3', @@ -166,13 +159,6 @@ def deploy_infrastructure(): 'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') } ]) - deploy_cloudformation('sqs.yaml', 'SQS', - parameters=[ - { - 'ParameterKey': 'SQSQueueName', - 'ParameterValue': os.environ.get('SQS_QUEUE_NAME', 'default-queue-name') - } - ]) deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True, parameters=[ { diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py b/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py index cc73d84..5ee5953 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py @@ -80,26 +80,8 @@ def update_function_configuration(lambda_client, function_name, handler, role, t @retry_with_backoff() def configure_sqs_trigger(lambda_client, function_name, queue_arn): - event_source_mapping = { - 'FunctionName': function_name, - 'EventSourceArn': queue_arn, - 'BatchSize': 1, - 'MaximumBatchingWindowInSeconds': 0, - 'ScalingConfig': { - 'MaximumConcurrency': 50 - } - } - - try: - response = lambda_client.create_event_source_mapping(**event_source_mapping) - print(f"SQS trigger configured successfully for {function_name}") - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceConflictException': - print(f"SQS trigger already exists for {function_name}. Updating configuration...") - # If you want to update existing trigger, you'd need to list existing mappings and update them - # This is left as an exercise as it requires additional error handling and logic - else: - raise e + """Placeholder for backward compatibility. Redis deployment uses no SQS trigger.""" + return @retry_with_backoff() def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy): @@ -219,13 +201,6 @@ def deploy_lambda(): policy = get_lambda_policy() create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy) - # Configure SQS trigger - queue_arn = os.getenv('SQS_QUEUE_ARN') # Make sure to set this environment variable - if queue_arn: - configure_sqs_trigger(lambda_client, LAMBDA_NAME, queue_arn) - else: - print("Warning: SQS_QUEUE_ARN not set. Skipping SQS trigger configuration.") - print("Lambda deployment completed successfully!") except Exception as e: diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py index f14ef07..6c36b63 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py @@ -1,15 +1,15 @@ import os -# SQS Configuration -SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] - +# Redis Configuration +REDIS_URL = os.environ["REDIS_URL"] +REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue") # Logging Configuration -LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") # RSS Feed Processing Configuration -MAX_ARTICLES_PER_FEED = int(os.environ.get('MAX_ARTICLES_PER_FEED', '10')) -FEED_PROCESSING_TIMEOUT = int(os.environ.get('FEED_PROCESSING_TIMEOUT', '90')) +MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10")) +FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90")) # Article Extraction Configuration -ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get('ARTICLE_EXTRACTION_TIMEOUT', '30')) \ No newline at end of file +ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30")) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index 7fea1c0..b860982 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -1,8 +1,8 @@ import boto3 +from minio import Minio import json import os import logging -from random import randint from datetime import datetime from pymongo import MongoClient @@ -13,6 +13,15 @@ logger = logging.getLogger() s3 = boto3.client('s3') CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) + +minio_client = Minio( + os.getenv("MINIO_ENDPOINT"), + access_key=os.getenv("MINIO_ACCESS_KEY"), + secret_key=os.getenv("MINIO_SECRET_KEY"), + secure=False +) +CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))) +DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") storage_strategy = os.environ.get('STORAGE_STRATEGY') MONGODB_URL = os.getenv("MONGODB_URL") @@ -57,11 +66,9 @@ def pinecone_save_article(article:dict): logger.info("Upserting article to Pinecone") upsert_vectors(index, data, namespace) -def dynamodb_save_article(article:dict): - pass -def s3_save_article(article:dict): - logger.info("Saving article to S3") +def s3_save_article(article:dict): + logger.info("Saving article to MinIO") now = datetime.now() article_id = article['article_id'] @@ -78,37 +85,42 @@ def s3_save_article(article:dict): json.dump(article, f) try: - s3.upload_file(file_path, - CONTENT_BUCKET, - file_key, - ExtraArgs={ - "Metadata": - { - "rss": article.get("rss", ""), - "title": article.get("title", ""), - "unixTime": str(article.get("unixTime", "")), - "article_id": article.get("article_id", ""), - "link": article.get("link", ""), - "rss_id": article.get("rss_id", "") - } - } - ) - logger.info(f"Saved article {article_id} to S3 bucket {CONTENT_BUCKET}") + metadata = { + "rss": article.get("rss", ""), + "title": article.get("title", ""), + "unixTime": str(article.get("unixTime", "")), + "article_id": article.get("article_id", ""), + "link": article.get("link", ""), + "rss_id": article.get("rss_id", "") + } + minio_client.fput_object( + CONTENT_BUCKET, + file_key, + file_path, + content_type="application/json", + metadata=metadata + ) + logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}") except Exception as e: logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}") ###### Feed Storage ###### +RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json") + + def update_rss_feed(feed: dict, last_pub_dt: int): try: - feeds_collection.update_one( - {"url": feed["u"]}, - {"$set": {"dt": last_pub_dt}}, - upsert=True, - ) - logger.info( - f"Updated RSS feed in MongoDB: {feed['u']} with dt: {last_pub_dt}" - ) + if not os.path.exists(RSS_FEEDS_FILE): + return + with open(RSS_FEEDS_FILE, "r") as f: + feeds = json.load(f) + for item in feeds: + if item.get("u") == feed["u"]: + item["dt"] = int(last_pub_dt) + with open(RSS_FEEDS_FILE, "w") as f: + json.dump(feeds, f) + logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}") except Exception as e: logger.error(f"Failed to update RSS feed: {str(e)}") \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py index ee9c56e..b8e42ff 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py @@ -1,75 +1,60 @@ import json import time -from feed_processor import process_feed +import os +import redis +from feed_processor import extract_feed from data_storage import save_article, update_rss_feed from utils import setup_logging -from config import SQS_QUEUE_URL -from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError -from metrics import record_processed_articles, record_processing_time, record_extraction_errors -import boto3 -import os -from feed_processor import extract_feed +from config import REDIS_URL, REDIS_QUEUE_NAME +from exceptions import RSSProcessingError, DataStorageError +from metrics import ( + record_processed_articles, + record_processing_time, + record_extraction_errors, +) -# Set up logging logger = setup_logging() +storage_strategy = os.environ.get("STORAGE_STRATEGY") +redis_client = redis.Redis.from_url(REDIS_URL) -storage_strategy = os.environ.get('STORAGE_STRATEGY') - -# Initialize AWS clients -sqs = boto3.client('sqs') def lambda_handler(event, context): logger.info("Starting RSS feed processing") start_time = time.time() - - try: - # Receive message from SQS - event_source = event["Records"][0]["eventSource"] - if event_source == "aws:sqs": - feed = event["Records"][0]["body"] - logger.info(f"Received message from SQS: {feed}") - feed = json.loads(feed) - - receipt_handle = event["Records"][0]['receiptHandle'] - # Process the feed + try: + feed_data = redis_client.rpop(REDIS_QUEUE_NAME) + if not feed_data: + logger.info("No messages in queue") + return {"statusCode": 200, "body": json.dumps("No feeds to process")} + feed = json.loads(feed_data) + result = extract_feed(feed) logger.info(f"Process Feed Result Dictionary: {result}") - last_pub_dt = result['max_date'] + last_pub_dt = result["max_date"] if result: - # Save articles and update feed - for article in result['articles']: + for article in result["articles"]: try: save_article(article, storage_strategy) except DataStorageError as e: logger.error(f"Failed to save article: {str(e)}") record_extraction_errors(1) - update_rss_feed(result['feed'], last_pub_dt) - - # Delete the message from the queue - logger.info("Deleting sqs queue message") - try: - sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle) - except Exception as e: - logger.error(f"Error deleting message from SQS: {str(e)}") - logger.info("We can skip this but delete this block of code if it fails. This means the queue is already deleted when it triggers.") + update_rss_feed(result["feed"], last_pub_dt) logger.info(f"Processed feed: {feed['u']}") - - # Record metrics - record_processed_articles(len(result['articles'])) + record_processed_articles(len(result["articles"])) else: logger.warning(f"Failed to process feed: {feed['u']}") record_extraction_errors(1) except RSSProcessingError as e: logger.error(f"RSS Processing Error: {str(e)}") - return {'statusCode': 500, 'body': json.dumps('RSS processing failed')} + return {"statusCode": 500, "body": json.dumps("RSS processing failed")} except Exception as e: logger.error(f"Unexpected error: {str(e)}") - return {'statusCode': 500, 'body': json.dumps('An unexpected error occurred')} + return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")} finally: end_time = time.time() @@ -77,7 +62,4 @@ def lambda_handler(event, context): record_processing_time(processing_time) logger.info(f"Lambda execution time: {processing_time:.2f} seconds") - return { - 'statusCode': 200, - 'body': json.dumps('RSS feed processed successfully') - } \ No newline at end of file + return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")} diff --git a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py b/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py index fe8d781..2897f78 100644 --- a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py +++ b/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py @@ -49,20 +49,12 @@ def deploy_sqs_filler(): 'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME') }, { - 'ParameterKey': 'SqsQueueUrl', - 'ParameterValue': os.getenv('SQS_QUEUE_URL') + 'ParameterKey': 'RedisUrl', + 'ParameterValue': os.getenv('REDIS_URL') }, { - 'ParameterKey': 'DynamoDbTableName', - 'ParameterValue': os.getenv('DYNAMODB_TABLE_NAME') - }, - { - 'ParameterKey': 'DynamoDbTableArn', - 'ParameterValue': os.getenv('DYNAMODB_TABLE_ARN') - }, - { - 'ParameterKey': 'SqsQueueArn', - 'ParameterValue': os.getenv('SQS_QUEUE_ARN') + 'ParameterKey': 'RedisQueueName', + 'ParameterValue': os.getenv('REDIS_QUEUE_NAME') }, { 'ParameterKey': 'LambdaCodeS3Bucket', diff --git a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py index 3b2733e..5765818 100644 --- a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py +++ b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py @@ -1,10 +1,9 @@ import json import os -import boto3 -from decimal import Decimal -from datetime import datetime import logging from pymongo import MongoClient +from datetime import datetime +import redis logger = logging.getLogger() logger.setLevel("INFO") @@ -54,6 +53,6 @@ def handler(event, context): logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}") return { - 'statusCode': 200, - 'body': json.dumps(f'Sent {messages_sent} RSS URLs to SQS') - } \ No newline at end of file + "statusCode": 200, + "body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"), + } diff --git a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py index 8936636..4aa7b62 100644 --- a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py +++ b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py @@ -28,15 +28,10 @@ def update_env_vars(function_name): 'S3_LAMBDA_ZIPPED_BUCKET_NAME': os.environ.get('S3_LAMBDA_ZIPPED_BUCKET_NAME'), 'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'), 'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'), - - # DynamoDB Configuration - 'DYNAMODB_TABLE_NAME': os.environ.get('DYNAMODB_TABLE_NAME'), - 'DYNAMODB_TABLE_ARN': os.environ.get('DYNAMODB_TABLE_ARN'), - - # SQS Configuration - 'SQS_QUEUE_NAME': os.environ.get('SQS_QUEUE_NAME'), - 'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'), - 'SQS_QUEUE_ARN': os.environ.get('SQS_QUEUE_ARN'), + + # Redis Configuration + 'REDIS_URL': os.environ.get('REDIS_URL'), + 'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'), # Queue Filler Lambda Configuration 'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'), diff --git a/src/launch/launch_env.py b/src/launch/launch_env.py index e222dcf..cdc85cb 100644 --- a/src/launch/launch_env.py +++ b/src/launch/launch_env.py @@ -62,8 +62,8 @@ def main(): env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}" env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}" env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}" - env_vars["DYNAMODB_TABLE_NAME"] = get_env_value("DYNAMODB_TABLE_NAME", "Enter DynamoDB Table Name:", options=["rss-feeds-table", "custom-rss-table"], advanced=advanced_mode) - env_vars["SQS_QUEUE_NAME"] = get_env_value("SQS_QUEUE_NAME", "Enter SQS Queue Name:", options=["rss-feed-queue", "custom-rss-queue"], advanced=advanced_mode) + env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode) + env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode) # Advanced Configuration env_vars["LAMBDA_LAYER_VERSION"] = 3 @@ -72,9 +72,6 @@ def main(): env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}" env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}" env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode) - env_vars["SQS_QUEUE_URL"] = f"https://sqs.{env_vars['AWS_REGION']}.amazonaws.com/{env_vars['AWS_ACCOUNT_ID']}/{env_vars['SQS_QUEUE_NAME']}" - env_vars["SQS_QUEUE_ARN"] = f"arn:aws:sqs:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:{env_vars['SQS_QUEUE_NAME']}" - env_vars["DYNAMODB_TABLE_ARN"] = f"arn:aws:dynamodb:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:table/{env_vars['DYNAMODB_TABLE_NAME']}" env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode) env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}" diff --git a/src/search/batch/downloader.py b/src/search/batch/downloader.py index e6c7362..c4725fe 100644 --- a/src/search/batch/downloader.py +++ b/src/search/batch/downloader.py @@ -1,4 +1,4 @@ -import boto3 +from minio import Minio import pandas as pd from typing import Optional, List, Dict, Union, Any import json @@ -10,7 +10,7 @@ from string import Template from tqdm import tqdm class S3BatchDownloader: - """Class for batch downloading RSS articles from S3""" + """Class for batch downloading RSS articles from a MinIO bucket""" DEFAULT_CONFIG = { "region": "${AWS_REGION}", @@ -30,8 +30,15 @@ class S3BatchDownloader: self.config = self._load_config(config_path) self._validate_config() - self.s3 = boto3.client('s3', region_name=self.config['region']) - self.logger.info(f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}") + self.s3 = Minio( + os.getenv('MINIO_ENDPOINT'), + access_key=os.getenv('MINIO_ACCESS_KEY'), + secret_key=os.getenv('MINIO_SECRET_KEY'), + secure=False + ) + self.logger.info( + f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}" + ) def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: """Load and process configuration""" @@ -43,7 +50,7 @@ class S3BatchDownloader: env_vars = { 'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'), - 'RSS_BUCKET_NAME': os.getenv('S3_BUCKET_NAME') + 'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET') } config_str = template.safe_substitute(env_vars) @@ -68,7 +75,7 @@ class S3BatchDownloader: start_date: Optional[str] = None, end_date: Optional[str] = None) -> str: """ - Download articles from S3 to a consolidated file + Download articles from MinIO to a consolidated file Args: output_path: Path to save the output file. @@ -112,25 +119,31 @@ class S3BatchDownloader: return output_path def _list_objects(self) -> List[Dict]: - """List objects in S3 bucket""" + """List objects in bucket""" objects = [] - paginator = self.s3.get_paginator('list_objects') try: - for page in paginator.paginate(Bucket=self.config['bucket']): - if 'Contents' in page: - objects.extend(page['Contents']) + for obj in self.s3.list_objects( + self.config['bucket'], + prefix=self.config['prefix'], + recursive=True + ): + objects.append({ + 'Key': obj.object_name, + 'LastModified': obj.last_modified + }) return objects except Exception as e: self.logger.error(f"Error listing objects: {str(e)}") raise def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]: - """Download and parse single S3 object""" + """Download and parse single object""" try: - response = self.s3.get_object(Bucket=self.config['bucket'], Key=obj['Key']) - content = response['Body'].read().decode('utf-8') + response = self.s3.get_object(self.config['bucket'], obj['Key']) + content = response.read().decode('utf-8') data = json.loads(content) - metadata = response.get('Metadata', {}) + stat = self.s3.stat_object(self.config['bucket'], obj['Key']) + metadata = stat.metadata if isinstance(data, dict): data.update(metadata) return [data] @@ -154,4 +167,4 @@ class S3BatchDownloader: elif file_format == 'json': df.to_json(output_path, orient='records', lines=True) else: - raise ValueError(f"Unsupported file format: {file_format}") \ No newline at end of file + raise ValueError(f"Unsupported file format: {file_format}") diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc deleted file mode 100644 index b300011..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc deleted file mode 100644 index 546b0ef..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc deleted file mode 100644 index c2e9aff..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc b/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc deleted file mode 100644 index ae2fec8..0000000 Binary files a/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-310.pyc b/src/utils/__pycache__/kms_update.cpython-310.pyc deleted file mode 100644 index 4101d85..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-311.pyc b/src/utils/__pycache__/kms_update.cpython-311.pyc deleted file mode 100644 index c1d2918..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-312.pyc b/src/utils/__pycache__/kms_update.cpython-312.pyc deleted file mode 100644 index 8f2e76c..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-310.pyc b/src/utils/__pycache__/retry_logic.cpython-310.pyc deleted file mode 100644 index 09cbb7a..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-311.pyc b/src/utils/__pycache__/retry_logic.cpython-311.pyc deleted file mode 100644 index fbe939c..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-312.pyc b/src/utils/__pycache__/retry_logic.cpython-312.pyc deleted file mode 100644 index cf610f9..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc deleted file mode 100644 index c2b1e08..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc deleted file mode 100644 index 0583ed2..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc deleted file mode 100644 index 041f12d..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc and /dev/null differ diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 12d4ad5..863efba 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -9,7 +9,11 @@ def check_env() -> None: "AWS_REGION", "AWS_ACCOUNT_ID", "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY" + "AWS_SECRET_ACCESS_KEY", + "MINIO_ENDPOINT", + "MINIO_ACCESS_KEY", + "MINIO_SECRET_KEY", + "MINIO_BUCKET" ] # Variables that are derived or have default values @@ -20,16 +24,13 @@ def check_env() -> None: "LAMBDA_EXECUTION_ROLE_NAME", "LAMBDA_ROLE_ARN", "S3_BUCKET_NAME", - "DYNAMODB_TABLE_NAME", - "SQS_QUEUE_NAME", + "REDIS_URL", + "REDIS_QUEUE_NAME", "LAMBDA_LAYER_VERSION", "LAMBDA_LAYER_NAME", "LAMBDA_LAYER_ARN", "S3_LAYER_BUCKET_NAME", "S3_LAYER_KEY_NAME", - "SQS_QUEUE_URL", - "SQS_QUEUE_ARN", - "DYNAMODB_TABLE_ARN", "PYTHON_VERSION", "LAMBDA_RUNTIME", "LAMBDA_TIMEOUT", diff --git a/template.env b/template.env index 9585302..eed5510 100644 --- a/template.env +++ b/template.env @@ -13,8 +13,14 @@ STACK_BASE=${LAMBDA_FUNCTION_NAME} LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role-${AWS_REGION} LAMBDA_ROLE_ARN=arn:aws:iam::${AWS_ACCOUNT_ID}:role/${LAMBDA_EXECUTION_ROLE_NAME} S3_BUCKET_NAME=open-rss-articles-${AWS_REGION} -DYNAMODB_TABLE_NAME=rss-feeds-table -SQS_QUEUE_NAME=rss-feed-queue +REDIS_URL=redis://localhost:6379 +REDIS_QUEUE_NAME=rss-feed-queue + +# MinIO configuration +MINIO_ENDPOINT=*** +MINIO_ACCESS_KEY=*** +MINIO_SECRET_KEY=*** +MINIO_BUCKET=*** LAMBDA_LAYER_VERSION=6 # This is fixed. @@ -25,10 +31,6 @@ S3_LAMBDA_ZIPPED_BUCKET_NAME=open-rss-lambda-${AWS_REGION} S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION} S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies - -SQS_QUEUE_URL=https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME} -SQS_QUEUE_ARN=arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SQS_QUEUE_NAME} -DYNAMODB_TABLE_ARN=arn:aws:dynamodb:${AWS_REGION}:${AWS_ACCOUNT_ID}:table/${DYNAMODB_TABLE_NAME} PYTHON_VERSION=3.12 LAMBDA_RUNTIME=python${PYTHON_VERSION} LAMBDA_TIMEOUT=300 @@ -63,4 +65,4 @@ VECTOR_EMBEDDING_DIM=*** VECTOR_SEARCH_METRIC=*** OPENAI_API_KEY=sk** -OPENAI_EMBEDDING_MODEL=text-embedding-3-large \ No newline at end of file +OPENAI_EMBEDDING_MODEL=text-embedding-3-large diff --git a/tree.md b/tree.md index ff89dad..cca3797 100644 --- a/tree.md +++ b/tree.md @@ -17,7 +17,6 @@ │   │   ├── __pycache__ │   │   │   └── deploy_infrastructure.cpython-312.pyc │   │   ├── cloudformation -│   │   │   ├── dynamo.yaml │   │   │   ├── lambda_role.yaml │   │   │   ├── rss_lambda_stack.yaml │   │   │   ├── s3.yaml