fixes claude

This commit is contained in:
2025-06-02 14:37:51 +02:00
parent b37f4470fa
commit 404c6dd2ce
8 changed files with 156 additions and 83 deletions

View File

@@ -16,7 +16,7 @@ services:
volumes: volumes:
- mongo-data:/data/db - mongo-data:/data/db
healthcheck: healthcheck:
test: ["CMD", "mongo", "--eval", "db.adminCommand('ping')"] test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 5 retries: 5
@@ -24,8 +24,8 @@ services:
image: minio/minio image: minio/minio
command: server /data --console-address ":9001" command: server /data --console-address ":9001"
environment: environment:
MINIO_ACCESS_KEY: minioadmin MINIO_ROOT_USER: minioadmin
MINIO_SECRET_KEY: minioadmin MINIO_ROOT_PASSWORD: minioadmin
ports: ports:
- "9000:9000" - "9000:9000"
- "9001:9001" - "9001:9001"
@@ -53,7 +53,7 @@ services:
MONGODB_URL: mongodb://mongodb:27017 MONGODB_URL: mongodb://mongodb:27017
MONGODB_DB_NAME: ingestrss MONGODB_DB_NAME: ingestrss
MONGODB_COLLECTION_NAME: rss_feeds MONGODB_COLLECTION_NAME: rss_feeds
MINIO_ENDPOINT: http://minio:9000 MINIO_ENDPOINT: minio:9000
MINIO_ACCESS_KEY: minioadmin MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin MINIO_SECRET_KEY: minioadmin
MINIO_BUCKET: ingestrss MINIO_BUCKET: ingestrss

View File

@@ -6,10 +6,6 @@ import boto3
from dotenv import load_dotenv from dotenv import load_dotenv
import logging import logging
import argparse import argparse
import subprocess
from src.infra.lambdas.RSSQueueFiller.deploy_sqs_filler_lambda import deploy_sqs_filler
from src.utils.check_env import check_env
def check_local_env() -> None: def check_local_env() -> None:
@@ -45,7 +41,7 @@ def start_docker_containers() -> None:
raise raise
print("🗞️ 💵 ⚖️ IngestRSS⚖ 💵 🗞️".center(100, "-")) print("RSS Feed Processor".center(100, "-"))
parser = argparse.ArgumentParser(description="Launch IngestRSS") parser = argparse.ArgumentParser(description="Launch IngestRSS")
parser.add_argument( parser.add_argument(
@@ -59,7 +55,41 @@ load_dotenv(override=True)
if args.local: if args.local:
check_local_env() check_local_env()
# Upload RSS feeds to MongoDB for local deployment
from src.feed_management.upload_rss_feeds import upload_rss_feeds
current_dir = os.path.dirname(os.path.abspath(__file__))
rss_feeds_file = os.path.join(current_dir, "rss_feeds.json")
if os.path.exists(rss_feeds_file):
with open(rss_feeds_file, 'r') as f:
rss_feeds = json.load(f)
upload_rss_feeds(
rss_feeds,
os.getenv('MONGODB_URL'),
os.getenv('MONGODB_DB_NAME'),
os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
)
print("RSS feeds uploaded to MongoDB")
start_docker_containers()
print("Local RSS Feed Processor started successfully!")
print("Services running:")
print("- MongoDB: localhost:27017")
print("- Redis: localhost:6379")
print("- MinIO: localhost:9000 (console: localhost:9001)")
print("- Worker and Scheduler containers are processing feeds")
sys.exit(0)
else: else:
# Only import AWS modules for cloud deployment
from src.utils.check_env import check_env
from src.infra.deploy_infrastructure import deploy_infrastructure
from src.infra.lambdas.RSSFeedProcessorLambda.deploy_rss_feed_lambda import deploy_lambda
from src.infra.lambdas.lambda_utils.update_lambda_env_vars import update_env_vars
from src.feed_management.upload_rss_feeds import upload_rss_feeds
from src.infra.lambdas.RSSQueueFiller.deploy_sqs_filler_lambda import deploy_sqs_filler
check_env() check_env()
# Set up logging # Set up logging
@@ -71,15 +101,9 @@ lambda_client = boto3.client("lambda")
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir) sys.path.append(current_dir)
from src.infra.deploy_infrastructure import deploy_infrastructure def main(local_mode=False):
from src.infra.lambdas.RSSFeedProcessorLambda.deploy_rss_feed_lambda import deploy_lambda if local_mode:
from src.infra.lambdas.lambda_utils.update_lambda_env_vars import update_env_vars return # Already handled above
from src.feed_management.upload_rss_feeds import upload_rss_feeds
def main():
if "--local" in sys.argv:
subprocess.run(["docker", "compose", "up", "-d"], check=False)
return
# Deploy infrastructure # Deploy infrastructure
deploy_infrastructure() deploy_infrastructure()

View File

@@ -7,7 +7,11 @@ constructs==10.2.69
qdrant-client qdrant-client
ollama ollama
tqdm tqdm
prometheus-clien prometheus-client
redis redis
minio minio
schedule==1.* schedule==1.*
feedparser
newspaper3k
python-dateutil
lxml

View File

@@ -3,7 +3,13 @@ import requests
from qdrant_client import QdrantClient, models from qdrant_client import QdrantClient, models
from utils import setup_logging try:
from ...utils import setup_logging
except ImportError:
# Fallback for when running standalone
import logging
def setup_logging():
return logging.getLogger(__name__)
logger = setup_logging() logger = setup_logging()
@@ -26,6 +32,7 @@ def get_index():
return client return client
def vectorize(article: str) -> list[float]: def vectorize(article: str) -> list[float]:
try:
response = requests.post( response = requests.post(
f"{ollama_host}/api/embeddings", f"{ollama_host}/api/embeddings",
json={"model": ollama_embedding_model, "prompt": article}, json={"model": ollama_embedding_model, "prompt": article},
@@ -33,6 +40,11 @@ def vectorize(article: str) -> list[float]:
) )
response.raise_for_status() response.raise_for_status()
return response.json().get("embedding", []) return response.json().get("embedding", [])
except requests.RequestException as e:
logger.error(f"Error generating embedding: {e}")
# Return a zero vector of the expected dimension as fallback
dim = int(embedding_dim) if embedding_dim else 384 # Default dimension
return [0.0] * dim
def upsert_vectors(index: QdrantClient, data: list[dict]): def upsert_vectors(index: QdrantClient, data: list[dict]):
@@ -44,7 +56,7 @@ def upsert_vectors(index: QdrantClient, data: list[dict]):
def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None): def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None):
if len(vector) != int(embedding_dim): if embedding_dim and len(vector) != int(embedding_dim):
raise ValueError("Length of vector does not match the embedding dimension") raise ValueError("Length of vector does not match the embedding dimension")
return index.search( return index.search(
collection_name=collection_name, collection_name=collection_name,

View File

@@ -6,10 +6,16 @@ import logging
from datetime import datetime from datetime import datetime
from pymongo import MongoClient from pymongo import MongoClient
from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
logger = logging.getLogger() logger = logging.getLogger()
# Try to import vector DB components, but make them optional
try:
from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
VECTOR_DB_AVAILABLE = True
except ImportError:
VECTOR_DB_AVAILABLE = False
logger.warning("Vector DB components not available. Qdrant storage will not work.")
s3 = boto3.client('s3') s3 = boto3.client('s3')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
@@ -36,8 +42,13 @@ def save_article(article: dict, strategy: str):
if strategy == "s3": if strategy == "s3":
s3_save_article(article) s3_save_article(article)
elif strategy == "qdrant": elif strategy == "qdrant":
if VECTOR_DB_AVAILABLE:
qdrant_save_article(article) qdrant_save_article(article)
else:
logger.error("Qdrant storage requested but vector DB components not available")
raise ValueError("Vector DB components not available for Qdrant storage")
elif strategy == "both": elif strategy == "both":
if VECTOR_DB_AVAILABLE:
qdrant_save_article(article) qdrant_save_article(article)
s3_save_article(article) s3_save_article(article)
else: else:
@@ -62,7 +73,7 @@ def s3_save_article(article:dict):
now = datetime.now() now = datetime.now()
article_id = article['article_id'] article_id = article['article_id']
logger.info(f"Content ")
if not article_id: if not article_id:
logger.error(f"Missing rss_id or article_id in article: {article}") logger.error(f"Missing rss_id or article_id in article: {article}")
return return

View File

@@ -4,9 +4,9 @@ from dateutil import parser
import queue import queue
import threading import threading
import logging import logging
from utils import generate_key from .utils import generate_key
from article_extractor import extract_article from .article_extractor import extract_article
from article_cleaning import clean_text from .article_cleaning import clean_text
logger = logging.getLogger() logger = logging.getLogger()
@@ -38,6 +38,7 @@ def extract_feed_threading(rss: dict, output_queue, stop_thread):
feed_url = rss['u'] feed_url = rss['u']
last_date = rss['dt'] last_date = rss['dt']
max_date = last_date max_date = last_date
entry = None # Initialize entry variable
try: try:
feed = feedparser.parse(feed_url) feed = feedparser.parse(feed_url)
@@ -45,11 +46,11 @@ def extract_feed_threading(rss: dict, output_queue, stop_thread):
if stop_thread.is_set(): if stop_thread.is_set():
break break
pub_date = parse_pub_date(entry['published']) pub_date = parse_pub_date(entry.get('published', ''))
if pub_date > last_date: if pub_date > last_date:
title, text = extract_article(entry.link) title, text = extract_article(entry.link)
title, text = clean_text(title), clean_text(text) title, text = clean_text(title or ''), clean_text(text or '')
article = { article = {
'link': entry.link, 'link': entry.link,
'rss': feed_url, 'rss': feed_url,
@@ -71,7 +72,9 @@ def extract_feed_threading(rss: dict, output_queue, stop_thread):
} }
output_queue.put(output) output_queue.put(output)
except Exception as e: except Exception as e:
logger.error(f"Feed: {entry}") logger.error(f"Feed URL: {feed_url}")
if entry:
logger.error(f"Current entry: {entry.get('link', 'unknown')}")
logger.error(f"Feed failed due to error: {e}") logger.error(f"Feed failed due to error: {e}")
def extract_feed(rss: dict): def extract_feed(rss: dict):
@@ -83,7 +86,7 @@ def extract_feed(rss: dict):
try: try:
feed = feedparser.parse(feed_url) feed = feedparser.parse(feed_url)
for entry in feed['entries']: for entry in feed['entries']:
pub_date = parse_pub_date(entry['published']) pub_date = parse_pub_date(entry.get('published', ''))
if pub_date > last_date: if pub_date > last_date:
title, text = extract_article(entry.link) title, text = extract_article(entry.link)
@@ -106,16 +109,15 @@ def extract_feed(rss: dict):
'max_date': max_date, 'max_date': max_date,
'feed': rss 'feed': rss
} }
print(output)
return output return output
except Exception as e: except Exception as e:
logger.error(f"Feed: {entry}") logger.error(f"Feed URL: {feed_url}")
logger.error(f"Feed failed due to error: {e}") logger.error(f"Feed failed due to error: {e}")
def parse_pub_date(entry:dict): def parse_pub_date(date_string: str) -> int:
"""Parse publication date from various formats"""
if 'published' in entry: if not date_string:
date_string = entry['published'] return int(datetime.now().timestamp())
try: try:
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp()) return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
@@ -125,7 +127,7 @@ def parse_pub_date(entry:dict):
except ValueError: except ValueError:
try: try:
return int(parser.parse(date_string).timestamp()) return int(parser.parse(date_string).timestamp())
except ValueError: except (ValueError, TypeError):
pass pass
return int(datetime.now().timestamp()) # Return current time if no date is found return int(datetime.now().timestamp()) # Return current time if no date is found

View File

@@ -1,16 +1,28 @@
import json import json
import os import os
import logging import logging
import boto3
from decimal import Decimal
from pymongo import MongoClient from pymongo import MongoClient
from datetime import datetime from datetime import datetime
import redis import redis
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel("INFO") logger.setLevel(logging.INFO)
# For AWS deployment - SQS
try:
sqs = boto3.client('sqs') sqs = boto3.client('sqs')
SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL', '')
AWS_DEPLOYMENT = bool(SQS_QUEUE_URL)
except Exception:
AWS_DEPLOYMENT = False
# For local deployment - Redis
if not AWS_DEPLOYMENT:
redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue')
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
MONGODB_URL = os.environ['MONGODB_URL'] MONGODB_URL = os.environ['MONGODB_URL']
MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME'] MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME']
MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds') MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds')
@@ -40,19 +52,25 @@ def handler(event, context):
'u': rss_url, 'u': rss_url,
'dt': rss_dt 'dt': rss_dt
} }
logger.debug("message", message) logger.debug(f"Message: {message}")
try: try:
if AWS_DEPLOYMENT:
# Send to SQS for AWS deployment
sqs.send_message( sqs.send_message(
QueueUrl=SQS_QUEUE_URL, QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(message, cls=DecimalEncoder) MessageBody=json.dumps(message, cls=DecimalEncoder)
) )
else:
# Send to Redis for local deployment
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message, cls=DecimalEncoder))
messages_sent += 1 messages_sent += 1
except Exception as e: except Exception as e:
logger.error(f"Error sending message to SQS: {str(e)}") logger.error(f"Error sending message to queue: {str(e)}")
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}") logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}")
return { return {
"statusCode": 200, "statusCode": 200,
"body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"), "body": json.dumps(f"Sent {messages_sent} RSS URLs to queue"),
} }

View File

@@ -3,28 +3,30 @@ import sys
import time import time
import logging import logging
# Ensure project root is in the Python path so imports work when executed # Add the project root to Python path
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, "..", ".."))
if PROJECT_ROOT not in sys.path: if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT) sys.path.insert(0, PROJECT_ROOT)
from src.infra.lambdas.RSSFeedProcessorLambda.src.lambda_function import lambda_handler from src.infra.lambdas.RSSFeedProcessorLambda.src.lambda_function import lambda_handler
logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO")) logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SLEEP_SECONDS = int(os.getenv("WORKER_SLEEP_SECONDS", "5"))
def main() -> None: def main():
"""Continuously run the existing Lambda handler as a local worker.""" logger.info("Starting worker loop")
logger.info("Starting local RSS worker")
while True: while True:
try: try:
lambda_handler({}, None) result = lambda_handler({}, None)
if result.get('statusCode') == 200:
logger.debug("Worker iteration completed successfully")
else:
logger.warning(f"Worker iteration returned non-200 status: {result}")
except Exception as exc: except Exception as exc:
logger.error("Worker iteration failed", exc_info=exc) logger.exception("Worker iteration failed: %s", exc)
time.sleep(1) time.sleep(SLEEP_SECONDS)
if __name__ == "__main__": if __name__ == "__main__":
main() main()