diff --git a/launch.py b/launch.py index 07a24f3..253b2ee 100644 --- a/launch.py +++ b/launch.py @@ -48,7 +48,12 @@ def main(): if os.path.exists(rss_feeds_file): with open(rss_feeds_file, 'r') as f: rss_feeds = json.load(f) - upload_rss_feeds(rss_feeds, os.getenv('DYNAMODB_TABLE_NAME')) + upload_rss_feeds( + rss_feeds, + os.getenv('MONGODB_URL'), + os.getenv('MONGODB_DB_NAME'), + os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds') + ) else: print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.") diff --git a/requirements.txt b/requirements.txt index 4aeda36..7368e03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ boto3==1.35.* +pymongo==4.* python-dotenv==1.0.* requests==2.32.* constructs==10.2.69 diff --git a/src/feed_management/upload_rss_feeds.py b/src/feed_management/upload_rss_feeds.py index 018b767..bc66794 100644 --- a/src/feed_management/upload_rss_feeds.py +++ b/src/feed_management/upload_rss_feeds.py @@ -1,58 +1,43 @@ import json -import boto3 -from boto3.dynamodb.conditions import Key -from botocore.exceptions import ClientError +import os +from pymongo import MongoClient import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def upload_rss_feeds(rss_feeds, table_name): - dynamodb = boto3.resource('dynamodb') - table = dynamodb.Table(table_name) +def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name): + client = MongoClient(mongo_url) + collection = client[db_name][collection_name] - logger.info(f"Uploading RSS feeds to table: {table_name}") - - try: - # Get the table's key schema - key_schema = table.key_schema - partition_key = next(key['AttributeName'] for key in key_schema if key['KeyType'] == 'HASH') - except ClientError as e: - logger.error(f"Error getting table schema: {e.response['Error']['Message']}") - return + logger.info(f"Uploading RSS feeds to MongoDB collection: {collection_name}") new_items = 0 existing_items = 0 for feed in rss_feeds: - # Check if the item already exists - try: - response = table.get_item(Key={partition_key: feed['u']}) - except ClientError as e: - logger.error(f"Error checking for existing item: {e.response['Error']['Message']}") - continue - - if 'Item' not in response: - # Item doesn't exist, insert new item - item = {partition_key: feed['u'], 'dt': 0} - feed['dt'] = int(feed['dt']) - item.update() - - try: - table.put_item(Item=item) - new_items += 1 - except ClientError as e: - logger.error(f"Error inserting new item: {e.response['Error']['Message']}") + url = feed.get('u') + dt = int(feed.get('dt', 0)) + result = collection.update_one( + {'url': url}, + {'$setOnInsert': {'url': url, 'dt': dt}}, + upsert=True + ) + if result.upserted_id: + new_items += 1 else: existing_items += 1 - logger.info(f"Upload complete. {new_items} new items inserted. {existing_items} items already existed.") + logger.info( + f"Upload complete. {new_items} new items inserted. {existing_items} items already existed." + ) if __name__ == "__main__": - table_name = 'rss-feeds-table' - rss_feed_path = 'rss_feeds.json' - with open(rss_feed_path) as f: + mongo_url = os.getenv('MONGODB_URL', 'mongodb://localhost:27017') + db_name = os.getenv('MONGODB_DB_NAME', 'ingestrss') + collection_name = os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds') + with open('rss_feeds.json') as f: rss_feeds = json.load(f) logger.info(f"Loaded RSS feeds: {rss_feeds}") - upload_rss_feeds(rss_feeds, table_name) \ No newline at end of file + upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index 2c4e034..7fea1c0 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -4,18 +4,24 @@ import os import logging from random import randint from datetime import datetime +from pymongo import MongoClient from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize logger = logging.getLogger() s3 = boto3.client('s3') -dynamodb = boto3.resource('dynamodb') -CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) -DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") +CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) storage_strategy = os.environ.get('STORAGE_STRATEGY') +MONGODB_URL = os.getenv("MONGODB_URL") +MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME") +MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds") + +mongo_client = MongoClient(MONGODB_URL) +feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] + ##### Article Storage ##### def save_article(article:dict, strategy:str): if strategy == "s3": @@ -94,14 +100,15 @@ def s3_save_article(article:dict): ###### Feed Storage ###### -def update_rss_feed(feed:dict, last_pub_dt:int): +def update_rss_feed(feed: dict, last_pub_dt: int): try: - table = dynamodb.Table(DYNAMODB_TABLE) - table.update_item( - Key={'url': feed['u']}, - UpdateExpression='SET dt = :val', - ExpressionAttributeValues={':val': last_pub_dt} + feeds_collection.update_one( + {"url": feed["u"]}, + {"$set": {"dt": last_pub_dt}}, + upsert=True, + ) + logger.info( + f"Updated RSS feed in MongoDB: {feed['u']} with dt: {last_pub_dt}" ) - logger.info(f"Updated RSS feed in DynamoDB: {feed['u']} with dt: {feed['dt']}") except Exception as e: logger.error(f"Failed to update RSS feed: {str(e)}") \ No newline at end of file diff --git a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py index 6d24bdf..3b2733e 100644 --- a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py +++ b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py @@ -4,15 +4,20 @@ import boto3 from decimal import Decimal from datetime import datetime import logging +from pymongo import MongoClient logger = logging.getLogger() logger.setLevel("INFO") -dynamodb = boto3.resource('dynamodb') sqs = boto3.client('sqs') SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] -DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME'] +MONGODB_URL = os.environ['MONGODB_URL'] +MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME'] +MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds') + +mongo_client = MongoClient(MONGODB_URL) +feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] class DecimalEncoder(json.JSONEncoder): def default(self, obj): @@ -21,13 +26,10 @@ class DecimalEncoder(json.JSONEncoder): return super(DecimalEncoder, self).default(obj) def handler(event, context): - table = dynamodb.Table(DYNAMODB_TABLE_NAME) messages_sent = 0 - # Scan the DynamoDB table - response = table.scan() - - for item in response['Items']: + # Iterate over all feeds in MongoDB + for item in feeds_collection.find({}): rss_url = item.get('url') rss_dt = item.get('dt') diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 5592dda..12d4ad5 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -34,6 +34,9 @@ def check_env() -> None: "LAMBDA_RUNTIME", "LAMBDA_TIMEOUT", "LAMBDA_MEMORY", + "MONGODB_URL", + "MONGODB_DB_NAME", + "MONGODB_COLLECTION_NAME", "QUEUE_FILLER_LAMBDA_NAME", "QUEUE_FILLER_LAMBDA_S3_KEY", "LOG_LEVEL", diff --git a/template.env b/template.env index 4142e73..9585302 100644 --- a/template.env +++ b/template.env @@ -34,6 +34,11 @@ LAMBDA_RUNTIME=python${PYTHON_VERSION} LAMBDA_TIMEOUT=300 LAMBDA_MEMORY=512 +# MongoDB settings +MONGODB_URL=mongodb://localhost:27017 +MONGODB_DB_NAME=ingestrss +MONGODB_COLLECTION_NAME=rss_feeds + QUEUE_FILLER_LAMBDA_NAME=RSSQueueFiller QUEUE_FILLER_LAMBDA_S3_KEY=RSSQueueFiller.zip