Switch RSS feed storage to MongoDB

This commit is contained in:
2025-06-02 13:46:05 +02:00
parent 8ad98c4cb4
commit 933971c2e0
7 changed files with 64 additions and 56 deletions

View File

@@ -48,7 +48,12 @@ def main():
if os.path.exists(rss_feeds_file):
with open(rss_feeds_file, 'r') as f:
rss_feeds = json.load(f)
upload_rss_feeds(rss_feeds, os.getenv('DYNAMODB_TABLE_NAME'))
upload_rss_feeds(
rss_feeds,
os.getenv('MONGODB_URL'),
os.getenv('MONGODB_DB_NAME'),
os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
)
else:
print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.")

View File

@@ -1,4 +1,5 @@
boto3==1.35.*
pymongo==4.*
python-dotenv==1.0.*
requests==2.32.*
constructs==10.2.69

View File

@@ -1,58 +1,43 @@
import json
import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
import os
from pymongo import MongoClient
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def upload_rss_feeds(rss_feeds, table_name):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name):
client = MongoClient(mongo_url)
collection = client[db_name][collection_name]
logger.info(f"Uploading RSS feeds to table: {table_name}")
try:
# Get the table's key schema
key_schema = table.key_schema
partition_key = next(key['AttributeName'] for key in key_schema if key['KeyType'] == 'HASH')
except ClientError as e:
logger.error(f"Error getting table schema: {e.response['Error']['Message']}")
return
logger.info(f"Uploading RSS feeds to MongoDB collection: {collection_name}")
new_items = 0
existing_items = 0
for feed in rss_feeds:
# Check if the item already exists
try:
response = table.get_item(Key={partition_key: feed['u']})
except ClientError as e:
logger.error(f"Error checking for existing item: {e.response['Error']['Message']}")
continue
if 'Item' not in response:
# Item doesn't exist, insert new item
item = {partition_key: feed['u'], 'dt': 0}
feed['dt'] = int(feed['dt'])
item.update()
try:
table.put_item(Item=item)
url = feed.get('u')
dt = int(feed.get('dt', 0))
result = collection.update_one(
{'url': url},
{'$setOnInsert': {'url': url, 'dt': dt}},
upsert=True
)
if result.upserted_id:
new_items += 1
except ClientError as e:
logger.error(f"Error inserting new item: {e.response['Error']['Message']}")
else:
existing_items += 1
logger.info(f"Upload complete. {new_items} new items inserted. {existing_items} items already existed.")
logger.info(
f"Upload complete. {new_items} new items inserted. {existing_items} items already existed."
)
if __name__ == "__main__":
table_name = 'rss-feeds-table'
rss_feed_path = 'rss_feeds.json'
with open(rss_feed_path) as f:
mongo_url = os.getenv('MONGODB_URL', 'mongodb://localhost:27017')
db_name = os.getenv('MONGODB_DB_NAME', 'ingestrss')
collection_name = os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
with open('rss_feeds.json') as f:
rss_feeds = json.load(f)
logger.info(f"Loaded RSS feeds: {rss_feeds}")
upload_rss_feeds(rss_feeds, table_name)
upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name)

View File

@@ -4,18 +4,24 @@ import os
import logging
from random import randint
from datetime import datetime
from pymongo import MongoClient
from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
logger = logging.getLogger()
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
storage_strategy = os.environ.get('STORAGE_STRATEGY')
MONGODB_URL = os.getenv("MONGODB_URL")
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME")
MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds")
mongo_client = MongoClient(MONGODB_URL)
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
##### Article Storage #####
def save_article(article:dict, strategy:str):
if strategy == "s3":
@@ -96,12 +102,13 @@ def s3_save_article(article:dict):
###### Feed Storage ######
def update_rss_feed(feed: dict, last_pub_dt: int):
try:
table = dynamodb.Table(DYNAMODB_TABLE)
table.update_item(
Key={'url': feed['u']},
UpdateExpression='SET dt = :val',
ExpressionAttributeValues={':val': last_pub_dt}
feeds_collection.update_one(
{"url": feed["u"]},
{"$set": {"dt": last_pub_dt}},
upsert=True,
)
logger.info(
f"Updated RSS feed in MongoDB: {feed['u']} with dt: {last_pub_dt}"
)
logger.info(f"Updated RSS feed in DynamoDB: {feed['u']} with dt: {feed['dt']}")
except Exception as e:
logger.error(f"Failed to update RSS feed: {str(e)}")

View File

@@ -4,15 +4,20 @@ import boto3
from decimal import Decimal
from datetime import datetime
import logging
from pymongo import MongoClient
logger = logging.getLogger()
logger.setLevel("INFO")
dynamodb = boto3.resource('dynamodb')
sqs = boto3.client('sqs')
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME']
MONGODB_URL = os.environ['MONGODB_URL']
MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME']
MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds')
mongo_client = MongoClient(MONGODB_URL)
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
@@ -21,13 +26,10 @@ class DecimalEncoder(json.JSONEncoder):
return super(DecimalEncoder, self).default(obj)
def handler(event, context):
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
messages_sent = 0
# Scan the DynamoDB table
response = table.scan()
for item in response['Items']:
# Iterate over all feeds in MongoDB
for item in feeds_collection.find({}):
rss_url = item.get('url')
rss_dt = item.get('dt')

View File

@@ -34,6 +34,9 @@ def check_env() -> None:
"LAMBDA_RUNTIME",
"LAMBDA_TIMEOUT",
"LAMBDA_MEMORY",
"MONGODB_URL",
"MONGODB_DB_NAME",
"MONGODB_COLLECTION_NAME",
"QUEUE_FILLER_LAMBDA_NAME",
"QUEUE_FILLER_LAMBDA_S3_KEY",
"LOG_LEVEL",

View File

@@ -34,6 +34,11 @@ LAMBDA_RUNTIME=python${PYTHON_VERSION}
LAMBDA_TIMEOUT=300
LAMBDA_MEMORY=512
# MongoDB settings
MONGODB_URL=mongodb://localhost:27017
MONGODB_DB_NAME=ingestrss
MONGODB_COLLECTION_NAME=rss_feeds
QUEUE_FILLER_LAMBDA_NAME=RSSQueueFiller
QUEUE_FILLER_LAMBDA_S3_KEY=RSSQueueFiller.zip