diff --git a/launch.py b/launch.py index 07a24f3..239d2be 100644 --- a/launch.py +++ b/launch.py @@ -37,7 +37,7 @@ def main(): logging.info("Finished Deploying Lambda") deploy_sqs_filler() - logging.info("Finished Deploying SQS Filler Lambda") + logging.info("Finished Deploying Queue Filler Lambda") # Update Lambda environment variables update_env_vars(os.getenv("LAMBDA_FUNCTION_NAME")) @@ -48,7 +48,7 @@ def main(): if os.path.exists(rss_feeds_file): with open(rss_feeds_file, 'r') as f: rss_feeds = json.load(f) - upload_rss_feeds(rss_feeds, os.getenv('DYNAMODB_TABLE_NAME')) + upload_rss_feeds(rss_feeds, rss_feeds_file) else: print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.") diff --git a/requirements.txt b/requirements.txt index 4aeda36..7220d27 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ constructs==10.2.69 # Optional, yet necessary for the Pinecone SDK functionality. pinecone openai -tqdm \ No newline at end of file +tqdm +redis diff --git a/src/feed_management/upload_rss_feeds.py b/src/feed_management/upload_rss_feeds.py index 018b767..78fb01d 100644 --- a/src/feed_management/upload_rss_feeds.py +++ b/src/feed_management/upload_rss_feeds.py @@ -1,58 +1,23 @@ import json -import boto3 -from boto3.dynamodb.conditions import Key -from botocore.exceptions import ClientError import logging -# Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def upload_rss_feeds(rss_feeds, table_name): - dynamodb = boto3.resource('dynamodb') - table = dynamodb.Table(table_name) - - logger.info(f"Uploading RSS feeds to table: {table_name}") +def upload_rss_feeds(rss_feeds, file_path): + """Persist RSS feed definitions to a local JSON file.""" try: - # Get the table's key schema - key_schema = table.key_schema - partition_key = next(key['AttributeName'] for key in key_schema if key['KeyType'] == 'HASH') - except ClientError as e: - logger.error(f"Error getting table schema: {e.response['Error']['Message']}") - return + with open(file_path, "w") as f: + json.dump(rss_feeds, f) + logger.info(f"Saved {len(rss_feeds)} feeds to {file_path}") + except Exception as e: + logger.error(f"Failed to save feeds: {e}") - new_items = 0 - existing_items = 0 - - for feed in rss_feeds: - # Check if the item already exists - try: - response = table.get_item(Key={partition_key: feed['u']}) - except ClientError as e: - logger.error(f"Error checking for existing item: {e.response['Error']['Message']}") - continue - - if 'Item' not in response: - # Item doesn't exist, insert new item - item = {partition_key: feed['u'], 'dt': 0} - feed['dt'] = int(feed['dt']) - item.update() - - try: - table.put_item(Item=item) - new_items += 1 - except ClientError as e: - logger.error(f"Error inserting new item: {e.response['Error']['Message']}") - else: - existing_items += 1 - - logger.info(f"Upload complete. {new_items} new items inserted. {existing_items} items already existed.") if __name__ == "__main__": - table_name = 'rss-feeds-table' - rss_feed_path = 'rss_feeds.json' + rss_feed_path = "rss_feeds.json" with open(rss_feed_path) as f: rss_feeds = json.load(f) logger.info(f"Loaded RSS feeds: {rss_feeds}") - upload_rss_feeds(rss_feeds, table_name) \ No newline at end of file + upload_rss_feeds(rss_feeds, rss_feed_path) diff --git a/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc b/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc deleted file mode 100644 index 188e56f..0000000 Binary files a/src/infra/__pycache__/deploy_infrastructure.cpython-312.pyc and /dev/null differ diff --git a/src/infra/cloudformation/dynamo.yaml b/src/infra/cloudformation/dynamo.yaml deleted file mode 100644 index 864c00d..0000000 --- a/src/infra/cloudformation/dynamo.yaml +++ /dev/null @@ -1,27 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'CloudFormation template for RSS Feed Processor DynamoDB Table' - -Parameters: - DynamoDBName: - Type: String - Description: "" - -Resources: - RSSFeedsTable: - Type: AWS::DynamoDB::Table - Properties: - TableName: !Ref 'DynamoDBName' - AttributeDefinitions: - - AttributeName: url - AttributeType: S - KeySchema: - - AttributeName: url - KeyType: HASH - BillingMode: PAY_PER_REQUEST - -Outputs: - TableName: - Description: 'Name of the DynamoDB table for RSS feeds' - Value: !Ref RSSFeedsTable - Export: - Name: !Sub '${AWS::StackName}-RSSFeedsTableName' \ No newline at end of file diff --git a/src/infra/cloudformation/lambda_role.yaml b/src/infra/cloudformation/lambda_role.yaml index bafd81d..58acd32 100644 --- a/src/infra/cloudformation/lambda_role.yaml +++ b/src/infra/cloudformation/lambda_role.yaml @@ -37,7 +37,6 @@ Resources: - Effect: Allow Action: - 'sqs:*' - - 'dynamodb:*' - 's3:*' - 'lambda:*' - 'logs:*' diff --git a/src/infra/cloudformation/rss_lambda_stack.yaml b/src/infra/cloudformation/rss_lambda_stack.yaml index 725d1a0..be04071 100644 --- a/src/infra/cloudformation/rss_lambda_stack.yaml +++ b/src/infra/cloudformation/rss_lambda_stack.yaml @@ -1,22 +1,16 @@ AWSTemplateFormatVersion: '2010-09-09' -Description: SQS Filler Lambda Stack +Description: Redis Queue Filler Lambda Stack Parameters: QueueFillerLambdaName: Type: String Description: Name of the Lambda function - SqsQueueUrl: + RedisUrl: Type: String - Description: URL of the SQS queue - DynamoDbTableName: + Description: URL of the Redis instance + RedisQueueName: Type: String - Description: Name of the DynamoDB table - DynamoDbTableArn: - Type: String - Description: ARN of the DynamoDB table - SqsQueueArn: - Type: String - Description: ARN of the SQS queue + Description: Name of the Redis queue LambdaCodeS3Bucket: Type: String Description: S3 bucket containing the Lambda function code @@ -45,8 +39,8 @@ Resources: Timeout: !Ref LambdaTimeout Environment: Variables: - SQS_QUEUE_URL: !Ref SqsQueueUrl - DYNAMODB_TABLE_NAME: !Ref DynamoDbTableName + REDIS_URL: !Ref RedisUrl + REDIS_QUEUE_NAME: !Ref RedisQueueName Role: !GetAtt SqsFillerFunctionRole.Arn SqsFillerFunctionRole: @@ -70,14 +64,6 @@ Resources: - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - - Effect: Allow - Action: - - dynamodb:Scan - Resource: !Ref DynamoDbTableArn - - Effect: Allow - Action: - - sqs:SendMessage - Resource: !Ref SqsQueueArn - Effect: Allow Action: - s3:GetObject @@ -85,8 +71,8 @@ Resources: Outputs: SqsFillerFunctionArn: - Description: ARN of the SQS Filler Lambda Function + Description: ARN of the Queue Filler Lambda Function Value: !GetAtt SqsFillerFunction.Arn SqsFillerFunctionRoleArn: - Description: ARN of the IAM Role for SQS Filler Lambda Function + Description: ARN of the IAM Role for Queue Filler Lambda Function Value: !GetAtt SqsFillerFunctionRole.Arn \ No newline at end of file diff --git a/src/infra/cloudformation/sqs.yaml b/src/infra/cloudformation/sqs.yaml deleted file mode 100644 index 10b8145..0000000 --- a/src/infra/cloudformation/sqs.yaml +++ /dev/null @@ -1,36 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'CloudFormation template for RSS Feed Processor SQS Queue' - -Parameters: - SQSQueueName: - Type: String - Description: "" - -Resources: - RSSFeedQueue: - Type: AWS::SQS::Queue - Properties: - QueueName: !Ref SQSQueueName - VisibilityTimeout: 900 # Should be set to the 3rd standard deviation of your lambda runtime distribution. - ReceiveMessageWaitTimeSeconds: 20 - RedrivePolicy: - deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn - maxReceiveCount: 3 - - RSSFeedDLQ: - Type: AWS::SQS::Queue - Properties: - QueueName: !Sub '${AWS::StackName}-rss-feed-dlq' - -Outputs: - QueueURL: - Description: 'URL of the SQS queue for RSS feeds' - Value: !Ref RSSFeedQueue - Export: - Name: !Sub '${AWS::StackName}-RSSFeedQueueURL' - - DLQueueURL: - Description: 'URL of the Dead Letter Queue for RSS feeds' - Value: !Ref RSSFeedDLQ - Export: - Name: !Sub '${AWS::StackName}-RSSFeedDLQueueURL' \ No newline at end of file diff --git a/src/infra/deploy_infrastructure.py b/src/infra/deploy_infrastructure.py index 25f7575..e843947 100644 --- a/src/infra/deploy_infrastructure.py +++ b/src/infra/deploy_infrastructure.py @@ -143,13 +143,6 @@ def deploy_infrastructure(): key_info = kms_client.describe_key(KeyId=kms_key_id) kms_key_arn = key_info['KeyMetadata']['Arn'] - deploy_cloudformation('dynamo.yaml', 'DynamoDB', - parameters=[ - { - 'ParameterKey': 'DynamoDBName', - 'ParameterValue': os.environ.get('DYNAMODB_TABLE_NAME', 'default-table-name') - } - ]) deploy_cloudformation('s3.yaml', 'S3', @@ -166,13 +159,6 @@ def deploy_infrastructure(): 'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') } ]) - deploy_cloudformation('sqs.yaml', 'SQS', - parameters=[ - { - 'ParameterKey': 'SQSQueueName', - 'ParameterValue': os.environ.get('SQS_QUEUE_NAME', 'default-queue-name') - } - ]) deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True, parameters=[ { diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py b/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py index cc73d84..5ee5953 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py @@ -80,26 +80,8 @@ def update_function_configuration(lambda_client, function_name, handler, role, t @retry_with_backoff() def configure_sqs_trigger(lambda_client, function_name, queue_arn): - event_source_mapping = { - 'FunctionName': function_name, - 'EventSourceArn': queue_arn, - 'BatchSize': 1, - 'MaximumBatchingWindowInSeconds': 0, - 'ScalingConfig': { - 'MaximumConcurrency': 50 - } - } - - try: - response = lambda_client.create_event_source_mapping(**event_source_mapping) - print(f"SQS trigger configured successfully for {function_name}") - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceConflictException': - print(f"SQS trigger already exists for {function_name}. Updating configuration...") - # If you want to update existing trigger, you'd need to list existing mappings and update them - # This is left as an exercise as it requires additional error handling and logic - else: - raise e + """Placeholder for backward compatibility. Redis deployment uses no SQS trigger.""" + return @retry_with_backoff() def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy): @@ -219,13 +201,6 @@ def deploy_lambda(): policy = get_lambda_policy() create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy) - # Configure SQS trigger - queue_arn = os.getenv('SQS_QUEUE_ARN') # Make sure to set this environment variable - if queue_arn: - configure_sqs_trigger(lambda_client, LAMBDA_NAME, queue_arn) - else: - print("Warning: SQS_QUEUE_ARN not set. Skipping SQS trigger configuration.") - print("Lambda deployment completed successfully!") except Exception as e: diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py index f14ef07..6c36b63 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py @@ -1,15 +1,15 @@ import os -# SQS Configuration -SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] - +# Redis Configuration +REDIS_URL = os.environ["REDIS_URL"] +REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue") # Logging Configuration -LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") # RSS Feed Processing Configuration -MAX_ARTICLES_PER_FEED = int(os.environ.get('MAX_ARTICLES_PER_FEED', '10')) -FEED_PROCESSING_TIMEOUT = int(os.environ.get('FEED_PROCESSING_TIMEOUT', '90')) +MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10")) +FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90")) # Article Extraction Configuration -ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get('ARTICLE_EXTRACTION_TIMEOUT', '30')) \ No newline at end of file +ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30")) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index 2c4e034..081aee7 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -2,7 +2,6 @@ import boto3 import json import os import logging -from random import randint from datetime import datetime from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize @@ -10,10 +9,8 @@ from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize logger = logging.getLogger() s3 = boto3.client('s3') -dynamodb = boto3.resource('dynamodb') -CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) -DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") +CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) storage_strategy = os.environ.get('STORAGE_STRATEGY') ##### Article Storage ##### @@ -51,8 +48,6 @@ def pinecone_save_article(article:dict): logger.info("Upserting article to Pinecone") upsert_vectors(index, data, namespace) -def dynamodb_save_article(article:dict): - pass def s3_save_article(article:dict): logger.info("Saving article to S3") @@ -94,14 +89,20 @@ def s3_save_article(article:dict): ###### Feed Storage ###### -def update_rss_feed(feed:dict, last_pub_dt:int): +RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json") + + +def update_rss_feed(feed: dict, last_pub_dt: int): try: - table = dynamodb.Table(DYNAMODB_TABLE) - table.update_item( - Key={'url': feed['u']}, - UpdateExpression='SET dt = :val', - ExpressionAttributeValues={':val': last_pub_dt} - ) - logger.info(f"Updated RSS feed in DynamoDB: {feed['u']} with dt: {feed['dt']}") + if not os.path.exists(RSS_FEEDS_FILE): + return + with open(RSS_FEEDS_FILE, "r") as f: + feeds = json.load(f) + for item in feeds: + if item.get("u") == feed["u"]: + item["dt"] = int(last_pub_dt) + with open(RSS_FEEDS_FILE, "w") as f: + json.dump(feeds, f) + logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}") except Exception as e: logger.error(f"Failed to update RSS feed: {str(e)}") \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py index ee9c56e..b8e42ff 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py @@ -1,75 +1,60 @@ import json import time -from feed_processor import process_feed +import os +import redis +from feed_processor import extract_feed from data_storage import save_article, update_rss_feed from utils import setup_logging -from config import SQS_QUEUE_URL -from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError -from metrics import record_processed_articles, record_processing_time, record_extraction_errors -import boto3 -import os -from feed_processor import extract_feed +from config import REDIS_URL, REDIS_QUEUE_NAME +from exceptions import RSSProcessingError, DataStorageError +from metrics import ( + record_processed_articles, + record_processing_time, + record_extraction_errors, +) -# Set up logging logger = setup_logging() +storage_strategy = os.environ.get("STORAGE_STRATEGY") +redis_client = redis.Redis.from_url(REDIS_URL) -storage_strategy = os.environ.get('STORAGE_STRATEGY') - -# Initialize AWS clients -sqs = boto3.client('sqs') def lambda_handler(event, context): logger.info("Starting RSS feed processing") start_time = time.time() - - try: - # Receive message from SQS - event_source = event["Records"][0]["eventSource"] - if event_source == "aws:sqs": - feed = event["Records"][0]["body"] - logger.info(f"Received message from SQS: {feed}") - feed = json.loads(feed) - - receipt_handle = event["Records"][0]['receiptHandle'] - # Process the feed + try: + feed_data = redis_client.rpop(REDIS_QUEUE_NAME) + if not feed_data: + logger.info("No messages in queue") + return {"statusCode": 200, "body": json.dumps("No feeds to process")} + feed = json.loads(feed_data) + result = extract_feed(feed) logger.info(f"Process Feed Result Dictionary: {result}") - last_pub_dt = result['max_date'] + last_pub_dt = result["max_date"] if result: - # Save articles and update feed - for article in result['articles']: + for article in result["articles"]: try: save_article(article, storage_strategy) except DataStorageError as e: logger.error(f"Failed to save article: {str(e)}") record_extraction_errors(1) - update_rss_feed(result['feed'], last_pub_dt) - - # Delete the message from the queue - logger.info("Deleting sqs queue message") - try: - sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle) - except Exception as e: - logger.error(f"Error deleting message from SQS: {str(e)}") - logger.info("We can skip this but delete this block of code if it fails. This means the queue is already deleted when it triggers.") + update_rss_feed(result["feed"], last_pub_dt) logger.info(f"Processed feed: {feed['u']}") - - # Record metrics - record_processed_articles(len(result['articles'])) + record_processed_articles(len(result["articles"])) else: logger.warning(f"Failed to process feed: {feed['u']}") record_extraction_errors(1) except RSSProcessingError as e: logger.error(f"RSS Processing Error: {str(e)}") - return {'statusCode': 500, 'body': json.dumps('RSS processing failed')} + return {"statusCode": 500, "body": json.dumps("RSS processing failed")} except Exception as e: logger.error(f"Unexpected error: {str(e)}") - return {'statusCode': 500, 'body': json.dumps('An unexpected error occurred')} + return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")} finally: end_time = time.time() @@ -77,7 +62,4 @@ def lambda_handler(event, context): record_processing_time(processing_time) logger.info(f"Lambda execution time: {processing_time:.2f} seconds") - return { - 'statusCode': 200, - 'body': json.dumps('RSS feed processed successfully') - } \ No newline at end of file + return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")} diff --git a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py b/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py index fe8d781..2897f78 100644 --- a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py +++ b/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py @@ -49,20 +49,12 @@ def deploy_sqs_filler(): 'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME') }, { - 'ParameterKey': 'SqsQueueUrl', - 'ParameterValue': os.getenv('SQS_QUEUE_URL') + 'ParameterKey': 'RedisUrl', + 'ParameterValue': os.getenv('REDIS_URL') }, { - 'ParameterKey': 'DynamoDbTableName', - 'ParameterValue': os.getenv('DYNAMODB_TABLE_NAME') - }, - { - 'ParameterKey': 'DynamoDbTableArn', - 'ParameterValue': os.getenv('DYNAMODB_TABLE_ARN') - }, - { - 'ParameterKey': 'SqsQueueArn', - 'ParameterValue': os.getenv('SQS_QUEUE_ARN') + 'ParameterKey': 'RedisQueueName', + 'ParameterValue': os.getenv('REDIS_QUEUE_NAME') }, { 'ParameterKey': 'LambdaCodeS3Bucket', diff --git a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py index 6d24bdf..9dc6a18 100644 --- a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py +++ b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py @@ -1,57 +1,39 @@ import json import os -import boto3 -from decimal import Decimal -from datetime import datetime import logging +from datetime import datetime +import redis logger = logging.getLogger() logger.setLevel("INFO") -dynamodb = boto3.resource('dynamodb') -sqs = boto3.client('sqs') +REDIS_URL = os.environ["REDIS_URL"] +REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue") +RSS_FEEDS_FILE = os.environ.get("RSS_FEEDS_FILE", "rss_feeds.json") -SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] -DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME'] - -class DecimalEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, Decimal): - return int(obj) - return super(DecimalEncoder, self).default(obj) +redis_client = redis.Redis.from_url(REDIS_URL) def handler(event, context): - table = dynamodb.Table(DYNAMODB_TABLE_NAME) messages_sent = 0 + try: + with open(RSS_FEEDS_FILE, "r") as f: + feeds = json.load(f) + except Exception as e: + logger.error(f"Failed to load RSS feed file: {e}") + return {"statusCode": 500, "body": "Failed to load feeds"} - # Scan the DynamoDB table - response = table.scan() - - for item in response['Items']: - rss_url = item.get('url') - rss_dt = item.get('dt') - - logger.debug(f"Processing RSS feed: {rss_url}") - logger.debug(f"Last published date: {rss_dt}") - - if rss_url: - message = { - 'u': rss_url, - 'dt': rss_dt - } - logger.debug("message", message) - try: - sqs.send_message( - QueueUrl=SQS_QUEUE_URL, - MessageBody=json.dumps(message, cls=DecimalEncoder) - ) - messages_sent += 1 - except Exception as e: - logger.error(f"Error sending message to SQS: {str(e)}") - - logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}") + for feed in feeds: + message = {"u": feed.get("u"), "dt": feed.get("dt")} + try: + redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message)) + messages_sent += 1 + except Exception as e: + logger.error(f"Error pushing message to Redis: {e}") + logger.info( + f"Sent {messages_sent} messages to Redis at {datetime.now().isoformat()}" + ) return { - 'statusCode': 200, - 'body': json.dumps(f'Sent {messages_sent} RSS URLs to SQS') - } \ No newline at end of file + "statusCode": 200, + "body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"), + } diff --git a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py index 8936636..4aa7b62 100644 --- a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py +++ b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py @@ -28,15 +28,10 @@ def update_env_vars(function_name): 'S3_LAMBDA_ZIPPED_BUCKET_NAME': os.environ.get('S3_LAMBDA_ZIPPED_BUCKET_NAME'), 'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'), 'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'), - - # DynamoDB Configuration - 'DYNAMODB_TABLE_NAME': os.environ.get('DYNAMODB_TABLE_NAME'), - 'DYNAMODB_TABLE_ARN': os.environ.get('DYNAMODB_TABLE_ARN'), - - # SQS Configuration - 'SQS_QUEUE_NAME': os.environ.get('SQS_QUEUE_NAME'), - 'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'), - 'SQS_QUEUE_ARN': os.environ.get('SQS_QUEUE_ARN'), + + # Redis Configuration + 'REDIS_URL': os.environ.get('REDIS_URL'), + 'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'), # Queue Filler Lambda Configuration 'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'), diff --git a/src/launch/launch_env.py b/src/launch/launch_env.py index e222dcf..cdc85cb 100644 --- a/src/launch/launch_env.py +++ b/src/launch/launch_env.py @@ -62,8 +62,8 @@ def main(): env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}" env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}" env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}" - env_vars["DYNAMODB_TABLE_NAME"] = get_env_value("DYNAMODB_TABLE_NAME", "Enter DynamoDB Table Name:", options=["rss-feeds-table", "custom-rss-table"], advanced=advanced_mode) - env_vars["SQS_QUEUE_NAME"] = get_env_value("SQS_QUEUE_NAME", "Enter SQS Queue Name:", options=["rss-feed-queue", "custom-rss-queue"], advanced=advanced_mode) + env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode) + env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode) # Advanced Configuration env_vars["LAMBDA_LAYER_VERSION"] = 3 @@ -72,9 +72,6 @@ def main(): env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}" env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}" env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode) - env_vars["SQS_QUEUE_URL"] = f"https://sqs.{env_vars['AWS_REGION']}.amazonaws.com/{env_vars['AWS_ACCOUNT_ID']}/{env_vars['SQS_QUEUE_NAME']}" - env_vars["SQS_QUEUE_ARN"] = f"arn:aws:sqs:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:{env_vars['SQS_QUEUE_NAME']}" - env_vars["DYNAMODB_TABLE_ARN"] = f"arn:aws:dynamodb:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:table/{env_vars['DYNAMODB_TABLE_NAME']}" env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode) env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}" diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc deleted file mode 100644 index b300011..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc deleted file mode 100644 index 546b0ef..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc b/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc deleted file mode 100644 index c2e9aff..0000000 Binary files a/src/utils/__pycache__/create_lambda_layer.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc b/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc deleted file mode 100644 index ae2fec8..0000000 Binary files a/src/utils/__pycache__/create_s3_bucket.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-310.pyc b/src/utils/__pycache__/kms_update.cpython-310.pyc deleted file mode 100644 index 4101d85..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-311.pyc b/src/utils/__pycache__/kms_update.cpython-311.pyc deleted file mode 100644 index c1d2918..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/kms_update.cpython-312.pyc b/src/utils/__pycache__/kms_update.cpython-312.pyc deleted file mode 100644 index 8f2e76c..0000000 Binary files a/src/utils/__pycache__/kms_update.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-310.pyc b/src/utils/__pycache__/retry_logic.cpython-310.pyc deleted file mode 100644 index 09cbb7a..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-311.pyc b/src/utils/__pycache__/retry_logic.cpython-311.pyc deleted file mode 100644 index fbe939c..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/retry_logic.cpython-312.pyc b/src/utils/__pycache__/retry_logic.cpython-312.pyc deleted file mode 100644 index cf610f9..0000000 Binary files a/src/utils/__pycache__/retry_logic.cpython-312.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc deleted file mode 100644 index c2b1e08..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-310.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc deleted file mode 100644 index 0583ed2..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-311.pyc and /dev/null differ diff --git a/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc b/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc deleted file mode 100644 index 041f12d..0000000 Binary files a/src/utils/__pycache__/upload_rss_feeds.cpython-312.pyc and /dev/null differ diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 5592dda..c085423 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -20,16 +20,13 @@ def check_env() -> None: "LAMBDA_EXECUTION_ROLE_NAME", "LAMBDA_ROLE_ARN", "S3_BUCKET_NAME", - "DYNAMODB_TABLE_NAME", - "SQS_QUEUE_NAME", + "REDIS_URL", + "REDIS_QUEUE_NAME", "LAMBDA_LAYER_VERSION", "LAMBDA_LAYER_NAME", "LAMBDA_LAYER_ARN", "S3_LAYER_BUCKET_NAME", "S3_LAYER_KEY_NAME", - "SQS_QUEUE_URL", - "SQS_QUEUE_ARN", - "DYNAMODB_TABLE_ARN", "PYTHON_VERSION", "LAMBDA_RUNTIME", "LAMBDA_TIMEOUT", diff --git a/template.env b/template.env index 4142e73..dc169e5 100644 --- a/template.env +++ b/template.env @@ -13,8 +13,8 @@ STACK_BASE=${LAMBDA_FUNCTION_NAME} LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role-${AWS_REGION} LAMBDA_ROLE_ARN=arn:aws:iam::${AWS_ACCOUNT_ID}:role/${LAMBDA_EXECUTION_ROLE_NAME} S3_BUCKET_NAME=open-rss-articles-${AWS_REGION} -DYNAMODB_TABLE_NAME=rss-feeds-table -SQS_QUEUE_NAME=rss-feed-queue +REDIS_URL=redis://localhost:6379 +REDIS_QUEUE_NAME=rss-feed-queue LAMBDA_LAYER_VERSION=6 # This is fixed. @@ -25,10 +25,6 @@ S3_LAMBDA_ZIPPED_BUCKET_NAME=open-rss-lambda-${AWS_REGION} S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION} S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies - -SQS_QUEUE_URL=https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME} -SQS_QUEUE_ARN=arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SQS_QUEUE_NAME} -DYNAMODB_TABLE_ARN=arn:aws:dynamodb:${AWS_REGION}:${AWS_ACCOUNT_ID}:table/${DYNAMODB_TABLE_NAME} PYTHON_VERSION=3.12 LAMBDA_RUNTIME=python${PYTHON_VERSION} LAMBDA_TIMEOUT=300 @@ -58,4 +54,4 @@ VECTOR_EMBEDDING_DIM=*** VECTOR_SEARCH_METRIC=*** OPENAI_API_KEY=sk** -OPENAI_EMBEDDING_MODEL=text-embedding-3-large \ No newline at end of file +OPENAI_EMBEDDING_MODEL=text-embedding-3-large diff --git a/tree.md b/tree.md index ff89dad..cca3797 100644 --- a/tree.md +++ b/tree.md @@ -17,7 +17,6 @@ │   │   ├── __pycache__ │   │   │   └── deploy_infrastructure.cpython-312.pyc │   │   ├── cloudformation -│   │   │   ├── dynamo.yaml │   │   │   ├── lambda_role.yaml │   │   │   ├── rss_lambda_stack.yaml │   │   │   ├── s3.yaml