mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2026-02-23 15:14:23 +01:00
Merge pull request #3 from aljazceru/codex/migrate-rss-feed-storage-to-mongodb
Switch feed storage to MongoDB
This commit is contained in:
@@ -48,7 +48,13 @@ def main():
|
||||
if os.path.exists(rss_feeds_file):
|
||||
with open(rss_feeds_file, 'r') as f:
|
||||
rss_feeds = json.load(f)
|
||||
upload_rss_feeds(rss_feeds, rss_feeds_file)
|
||||
upload_rss_feeds(
|
||||
rss_feeds,
|
||||
os.getenv('MONGODB_URL'),
|
||||
os.getenv('MONGODB_DB_NAME'),
|
||||
os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
|
||||
)
|
||||
|
||||
else:
|
||||
print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.")
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
boto3==1.35.*
|
||||
pymongo==4.*
|
||||
python-dotenv==1.0.*
|
||||
requests==2.32.*
|
||||
constructs==10.2.69
|
||||
|
||||
@@ -1,23 +1,42 @@
|
||||
import json
|
||||
import os
|
||||
from pymongo import MongoClient
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name):
|
||||
client = MongoClient(mongo_url)
|
||||
collection = client[db_name][collection_name]
|
||||
|
||||
def upload_rss_feeds(rss_feeds, file_path):
|
||||
"""Persist RSS feed definitions to a local JSON file."""
|
||||
try:
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(rss_feeds, f)
|
||||
logger.info(f"Saved {len(rss_feeds)} feeds to {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save feeds: {e}")
|
||||
logger.info(f"Uploading RSS feeds to MongoDB collection: {collection_name}")
|
||||
|
||||
new_items = 0
|
||||
existing_items = 0
|
||||
|
||||
for feed in rss_feeds:
|
||||
url = feed.get('u')
|
||||
dt = int(feed.get('dt', 0))
|
||||
result = collection.update_one(
|
||||
{'url': url},
|
||||
{'$setOnInsert': {'url': url, 'dt': dt}},
|
||||
upsert=True
|
||||
)
|
||||
if result.upserted_id:
|
||||
new_items += 1
|
||||
else:
|
||||
existing_items += 1
|
||||
|
||||
logger.info(
|
||||
f"Upload complete. {new_items} new items inserted. {existing_items} items already existed."
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
rss_feed_path = "rss_feeds.json"
|
||||
with open(rss_feed_path) as f:
|
||||
mongo_url = os.getenv('MONGODB_URL', 'mongodb://localhost:27017')
|
||||
db_name = os.getenv('MONGODB_DB_NAME', 'ingestrss')
|
||||
collection_name = os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
|
||||
with open('rss_feeds.json') as f:
|
||||
rss_feeds = json.load(f)
|
||||
logger.info(f"Loaded RSS feeds: {rss_feeds}")
|
||||
upload_rss_feeds(rss_feeds, rss_feed_path)
|
||||
upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name)
|
||||
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pymongo import MongoClient
|
||||
|
||||
from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
|
||||
|
||||
@@ -23,6 +24,13 @@ CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv
|
||||
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
|
||||
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||
|
||||
MONGODB_URL = os.getenv("MONGODB_URL")
|
||||
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME")
|
||||
MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds")
|
||||
|
||||
mongo_client = MongoClient(MONGODB_URL)
|
||||
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
|
||||
|
||||
##### Article Storage #####
|
||||
def save_article(article:dict, strategy:str):
|
||||
if strategy == "s3":
|
||||
|
||||
@@ -1,38 +1,57 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from pymongo import MongoClient
|
||||
from datetime import datetime
|
||||
import redis
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel("INFO")
|
||||
|
||||
REDIS_URL = os.environ["REDIS_URL"]
|
||||
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
|
||||
RSS_FEEDS_FILE = os.environ.get("RSS_FEEDS_FILE", "rss_feeds.json")
|
||||
sqs = boto3.client('sqs')
|
||||
|
||||
redis_client = redis.Redis.from_url(REDIS_URL)
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
MONGODB_URL = os.environ['MONGODB_URL']
|
||||
MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME']
|
||||
MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds')
|
||||
|
||||
mongo_client = MongoClient(MONGODB_URL)
|
||||
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
|
||||
|
||||
class DecimalEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, Decimal):
|
||||
return int(obj)
|
||||
return super(DecimalEncoder, self).default(obj)
|
||||
|
||||
def handler(event, context):
|
||||
messages_sent = 0
|
||||
try:
|
||||
with open(RSS_FEEDS_FILE, "r") as f:
|
||||
feeds = json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load RSS feed file: {e}")
|
||||
return {"statusCode": 500, "body": "Failed to load feeds"}
|
||||
|
||||
for feed in feeds:
|
||||
message = {"u": feed.get("u"), "dt": feed.get("dt")}
|
||||
try:
|
||||
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message))
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error pushing message to Redis: {e}")
|
||||
# Iterate over all feeds in MongoDB
|
||||
for item in feeds_collection.find({}):
|
||||
rss_url = item.get('url')
|
||||
rss_dt = item.get('dt')
|
||||
|
||||
logger.debug(f"Processing RSS feed: {rss_url}")
|
||||
logger.debug(f"Last published date: {rss_dt}")
|
||||
|
||||
if rss_url:
|
||||
message = {
|
||||
'u': rss_url,
|
||||
'dt': rss_dt
|
||||
}
|
||||
logger.debug("message", message)
|
||||
try:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message, cls=DecimalEncoder)
|
||||
)
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending message to SQS: {str(e)}")
|
||||
|
||||
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||
|
||||
logger.info(
|
||||
f"Sent {messages_sent} messages to Redis at {datetime.now().isoformat()}"
|
||||
)
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"),
|
||||
|
||||
@@ -35,6 +35,9 @@ def check_env() -> None:
|
||||
"LAMBDA_RUNTIME",
|
||||
"LAMBDA_TIMEOUT",
|
||||
"LAMBDA_MEMORY",
|
||||
"MONGODB_URL",
|
||||
"MONGODB_DB_NAME",
|
||||
"MONGODB_COLLECTION_NAME",
|
||||
"QUEUE_FILLER_LAMBDA_NAME",
|
||||
"QUEUE_FILLER_LAMBDA_S3_KEY",
|
||||
"LOG_LEVEL",
|
||||
|
||||
@@ -36,6 +36,11 @@ LAMBDA_RUNTIME=python${PYTHON_VERSION}
|
||||
LAMBDA_TIMEOUT=300
|
||||
LAMBDA_MEMORY=512
|
||||
|
||||
# MongoDB settings
|
||||
MONGODB_URL=mongodb://localhost:27017
|
||||
MONGODB_DB_NAME=ingestrss
|
||||
MONGODB_COLLECTION_NAME=rss_feeds
|
||||
|
||||
QUEUE_FILLER_LAMBDA_NAME=RSSQueueFiller
|
||||
QUEUE_FILLER_LAMBDA_S3_KEY=RSSQueueFiller.zip
|
||||
|
||||
|
||||
Reference in New Issue
Block a user