From 785d2733f9b20b24395e1e99f4e4196d3fbbfa6d Mon Sep 17 00:00:00 2001 From: Aljaz Ceru Date: Tue, 3 Jun 2025 13:37:17 +0200 Subject: [PATCH] dashboard --- src/dashboard/Dockerfile | 11 ++ src/dashboard/app.py | 47 ++++++ src/dashboard/static/index.html | 23 +++ src/dashboard/static/main.js | 35 +++++ src/feed_management/scheduler_main.py | 57 ++++++++ .../analytics/embeddings/vector_db.py | 72 ++++++++++ .../analytics/genai/summarization.py | 6 + src/feed_processing/article_cleaning.py | 14 ++ src/feed_processing/article_extractor.py | 31 ++++ src/feed_processing/config.py | 15 ++ src/feed_processing/data_storage.py | 50 +++++++ src/feed_processing/exceptions.py | 11 ++ src/feed_processing/feed_processor.py | 135 ++++++++++++++++++ src/feed_processing/feed_processor.py.orig | 131 +++++++++++++++++ src/feed_processing/metrics.py | 46 ++++++ src/feed_processing/utils.py | 18 +++ src/feed_processing/worker_main.py | 63 ++++++++ 17 files changed, 765 insertions(+) create mode 100644 src/dashboard/Dockerfile create mode 100644 src/dashboard/app.py create mode 100644 src/dashboard/static/index.html create mode 100644 src/dashboard/static/main.js create mode 100644 src/feed_management/scheduler_main.py create mode 100644 src/feed_processing/analytics/embeddings/vector_db.py create mode 100644 src/feed_processing/analytics/genai/summarization.py create mode 100644 src/feed_processing/article_cleaning.py create mode 100644 src/feed_processing/article_extractor.py create mode 100644 src/feed_processing/config.py create mode 100644 src/feed_processing/data_storage.py create mode 100644 src/feed_processing/exceptions.py create mode 100644 src/feed_processing/feed_processor.py create mode 100644 src/feed_processing/feed_processor.py.orig create mode 100644 src/feed_processing/metrics.py create mode 100644 src/feed_processing/utils.py create mode 100644 src/feed_processing/worker_main.py diff --git a/src/dashboard/Dockerfile b/src/dashboard/Dockerfile new file mode 100644 index 0000000..e9c8906 --- /dev/null +++ b/src/dashboard/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY . /app + +RUN pip install fastapi uvicorn pymongo + +EXPOSE 8000 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/src/dashboard/app.py b/src/dashboard/app.py new file mode 100644 index 0000000..8fdfffd --- /dev/null +++ b/src/dashboard/app.py @@ -0,0 +1,47 @@ +from fastapi import FastAPI, Request, Response +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse, JSONResponse +from pymongo import MongoClient +import os +import json + +app = FastAPI() + +# Serve static files (HTML/JS) +app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static") + +# MongoDB setup +MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017") +MONGODB_FEEDS_DB_NAME = os.environ.get("MONGODB_FEEDS_DB_NAME", "feeds_db") +MONGODB_FEEDS_COLLECTION_NAME = os.environ.get("MONGODB_FEEDS_COLLECTION_NAME", "rss_feeds") +mongo_client = MongoClient(MONGODB_URL) +feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME] + +@app.get("/") +def root(): + return FileResponse(os.path.join(os.path.dirname(__file__), "static", "index.html")) + +@app.get("/api/feeds") +def get_feeds(): + feeds = list(feeds_collection.find({}, {"_id": 0})) + return JSONResponse(content=feeds) + +@app.post("/api/feeds") +async def add_feed(request: Request): + data = await request.json() + url = data.get("url") + if not url: + return JSONResponse(content={"error": "Missing URL"}, status_code=400) + feeds_collection.update_one({"url": url}, {"$setOnInsert": {"url": url, "dt": 0}}, upsert=True) + return JSONResponse(content={"status": "ok"}) + +@app.delete("/api/feeds") +async def delete_feed(request: Request): + data = await request.json() + url = data.get("url") + if not url: + return JSONResponse(content={"error": "Missing URL"}, status_code=400) + feeds_collection.delete_one({"url": url}) + return JSONResponse(content={"status": "ok"}) + +# Add similar endpoints for schedules and job status as needed diff --git a/src/dashboard/static/index.html b/src/dashboard/static/index.html new file mode 100644 index 0000000..3bbfe65 --- /dev/null +++ b/src/dashboard/static/index.html @@ -0,0 +1,23 @@ + + + + IngestRSS Dashboard + + + +

IngestRSS Dashboard

+

Feeds

+ + + + + +
URLActions
+ + + diff --git a/src/dashboard/static/main.js b/src/dashboard/static/main.js new file mode 100644 index 0000000..732fc55 --- /dev/null +++ b/src/dashboard/static/main.js @@ -0,0 +1,35 @@ +async function fetchFeeds() { + const res = await fetch('/api/feeds'); + const feeds = await res.json(); + const tbody = document.querySelector('#feeds-table tbody'); + tbody.innerHTML = ''; + feeds.forEach(feed => { + const tr = document.createElement('tr'); + tr.innerHTML = `${feed.url} + `; + tbody.appendChild(tr); + }); +} + +async function addFeed() { + const url = document.getElementById('feed-url').value; + if (!url) return alert('Enter a URL'); + await fetch('/api/feeds', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({url}) + }); + document.getElementById('feed-url').value = ''; + fetchFeeds(); +} + +async function deleteFeed(url) { + await fetch('/api/feeds', { + method: 'DELETE', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({url}) + }); + fetchFeeds(); +} + +window.onload = fetchFeeds; diff --git a/src/feed_management/scheduler_main.py b/src/feed_management/scheduler_main.py new file mode 100644 index 0000000..0e6b988 --- /dev/null +++ b/src/feed_management/scheduler_main.py @@ -0,0 +1,57 @@ +import json +import os +import logging +import time + +from pymongo import MongoClient +from datetime import datetime +import redis + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +# Local deployment - Redis only +redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379')) +REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue') + +MONGODB_URL = os.environ['MONGODB_URL'] +MONGODB_FEEDS_DB_NAME = os.environ.get('MONGODB_FEEDS_DB_NAME', 'feeds_db') +MONGODB_FEEDS_COLLECTION_NAME = os.environ.get('MONGODB_FEEDS_COLLECTION_NAME', 'rss_feeds') + +mongo_client = MongoClient(MONGODB_URL) +feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME] + +# Calculate timestamp for 48 hours ago +dt_48h_ago = int(time.time()) - 48 * 3600 + +# Update all feeds +feeds_collection.update_many({}, {"$set": {"dt": dt_48h_ago}}) + +def scheduler_main(): + messages_sent = 0 + + # Iterate over all feeds in MongoDB + for item in feeds_collection.find({}): + rss_url = item.get('url') + rss_dt = item.get('dt') + + logger.debug(f"Processing RSS feed: {rss_url}") + logger.debug(f"Last published date: {rss_dt}") + + if rss_url: + message = { + 'u': rss_url, + 'dt': rss_dt + } + logger.debug(f"Message: {message}") + + try: + # Send to Redis for local deployment + redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message)) + messages_sent += 1 + except Exception as e: + logger.error(f"Error sending message to queue: {str(e)}") + + logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}") + + return \ No newline at end of file diff --git a/src/feed_processing/analytics/embeddings/vector_db.py b/src/feed_processing/analytics/embeddings/vector_db.py new file mode 100644 index 0000000..9bd4ee3 --- /dev/null +++ b/src/feed_processing/analytics/embeddings/vector_db.py @@ -0,0 +1,72 @@ +import os +import requests + +from qdrant_client import QdrantClient, models + +try: + from ...utils import setup_logging +except ImportError: + # Fallback for when running standalone + import logging + def setup_logging(): + return logging.getLogger(__name__) + +logger = setup_logging() + +qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333") +qdrant_api_key = os.getenv("QDRANT_API_KEY") +collection_name = os.getenv("QDRANT_COLLECTION_NAME") + +embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM") +vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine") + +ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434") +ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text") + +client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key) + +def get_index(): + collections = client.get_collections().collections + if collection_name not in [c.name for c in collections]: + raise KeyError(f"Collection {collection_name} not found") + return client + +def vectorize(article: str) -> list[float]: + try: + response = requests.post( + f"{ollama_host}/api/embeddings", + json={"model": ollama_embedding_model, "prompt": article}, + timeout=30, + ) + response.raise_for_status() + return response.json().get("embedding", []) + except requests.RequestException as e: + logger.error(f"Error generating embedding: {e}") + # Return a zero vector of the expected dimension as fallback + dim = int(embedding_dim) if embedding_dim else 384 # Default dimension + return [0.0] * dim + + +def upsert_vectors(index: QdrantClient, data: list[dict]): + points = [ + models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload")) + for item in data + ] + index.upsert(collection_name=collection_name, points=points) + + +def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None): + if embedding_dim and len(vector) != int(embedding_dim): + raise ValueError("Length of vector does not match the embedding dimension") + return index.search( + collection_name=collection_name, + query_vector=vector, + limit=top_k, + with_payload=True, + query_filter=filter_query, + ) + + +if __name__ == "__main__": + paragraph = "This is a test." + vectorize(paragraph) \ No newline at end of file diff --git a/src/feed_processing/analytics/genai/summarization.py b/src/feed_processing/analytics/genai/summarization.py new file mode 100644 index 0000000..3e109e9 --- /dev/null +++ b/src/feed_processing/analytics/genai/summarization.py @@ -0,0 +1,6 @@ + + +def summarize(text:str): + sub_prompt = "Summarize the following passage" + + \ No newline at end of file diff --git a/src/feed_processing/article_cleaning.py b/src/feed_processing/article_cleaning.py new file mode 100644 index 0000000..a03a743 --- /dev/null +++ b/src/feed_processing/article_cleaning.py @@ -0,0 +1,14 @@ +import re + +def remove_newlines(text: str) -> str: + return text.replace('\n', '') + +def remove_urls(text: str) -> str: + url_pattern = re.compile(r'http\S+|www\S+') + return url_pattern.sub('', text) + + +def clean_text(text: str) -> str: + text = remove_newlines(text) + text = remove_urls(text) + return text \ No newline at end of file diff --git a/src/feed_processing/article_extractor.py b/src/feed_processing/article_extractor.py new file mode 100644 index 0000000..2a26d90 --- /dev/null +++ b/src/feed_processing/article_extractor.py @@ -0,0 +1,31 @@ +import newspaper +import logging + + + +logger = logging.getLogger() + +def extract_article(url): + """ + Extracts the title and text of an article from the given URL. + + Args: + url (str): The URL of the article. + Returns: + A tuple containing the title and text of the article, respectively. + """ + logger.debug(f"Starting Newspaper Article Extraction {url}") + config = newspaper.Config() + config.request_timeout = 60 + article = newspaper.Article(url) + + try: + article.download() + logger.debug(f"Downloaded Article {url}") + article.parse() + logger.debug(f"Parsed Article {url}") + + return article.title, article.text + except Exception as e: + logger.error(f"Failed to extract article {url}: {str(e)}") + return None, None \ No newline at end of file diff --git a/src/feed_processing/config.py b/src/feed_processing/config.py new file mode 100644 index 0000000..6c36b63 --- /dev/null +++ b/src/feed_processing/config.py @@ -0,0 +1,15 @@ +import os + +# Redis Configuration +REDIS_URL = os.environ["REDIS_URL"] +REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue") + +# Logging Configuration +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") + +# RSS Feed Processing Configuration +MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10")) +FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90")) + +# Article Extraction Configuration +ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30")) diff --git a/src/feed_processing/data_storage.py b/src/feed_processing/data_storage.py new file mode 100644 index 0000000..abd33aa --- /dev/null +++ b/src/feed_processing/data_storage.py @@ -0,0 +1,50 @@ +import json +import os +import logging +from pymongo import MongoClient + +logger = logging.getLogger() + +# Try to import vector DB components, but make them optional +try: + from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize + VECTOR_DB_AVAILABLE = True +except ImportError: + VECTOR_DB_AVAILABLE = False + logger.warning("Vector DB components not available. Qdrant storage will not work.") + +MONGODB_URL = os.getenv("MONGODB_URL") +MONGODB_ARTICLES_DB_NAME = os.getenv("MONGODB_ARTICLES_DB_NAME", "articles_db") +MONGODB_ARTICLES_COLLECTION_NAME = os.getenv("MONGODB_ARTICLES_COLLECTION_NAME", "articles") + +mongo_client = MongoClient(MONGODB_URL) +articles_collection = mongo_client[MONGODB_ARTICLES_DB_NAME][MONGODB_ARTICLES_COLLECTION_NAME] + +##### Article Storage ##### +def save_article(article: dict, strategy: str = None): + # Only MongoDB is supported now + try: + articles_collection.insert_one(article) + logger.info(f"Saved article {article.get('article_id', '')} to MongoDB") + except Exception as e: + logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}") + raise + +###### Feed Storage ###### +RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json") + + +def update_rss_feed(feed: dict, last_pub_dt: int): + try: + if not os.path.exists(RSS_FEEDS_FILE): + return + with open(RSS_FEEDS_FILE, "r") as f: + feeds = json.load(f) + for item in feeds: + if item.get("u") == feed["u"]: + item["dt"] = int(last_pub_dt) + with open(RSS_FEEDS_FILE, "w") as f: + json.dump(feeds, f) + logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}") + except Exception as e: + logger.error(f"Failed to update RSS feed: {str(e)}") \ No newline at end of file diff --git a/src/feed_processing/exceptions.py b/src/feed_processing/exceptions.py new file mode 100644 index 0000000..c6c0702 --- /dev/null +++ b/src/feed_processing/exceptions.py @@ -0,0 +1,11 @@ +class RSSProcessingError(Exception): + """Exception raised for errors in the RSS processing.""" + pass + +class ArticleExtractionError(Exception): + """Exception raised for errors in the article extraction.""" + pass + +class DataStorageError(Exception): + """Exception raised for errors in data storage operations.""" + pass \ No newline at end of file diff --git a/src/feed_processing/feed_processor.py b/src/feed_processing/feed_processor.py new file mode 100644 index 0000000..7ddef76 --- /dev/null +++ b/src/feed_processing/feed_processor.py @@ -0,0 +1,135 @@ +import feedparser +from datetime import datetime +from dateutil import parser +import queue +import threading +import logging +from .utils import generate_key +from .article_extractor import extract_article +from .article_cleaning import clean_text + +logger = logging.getLogger() + +def process_feed(feed: dict): + output_queue = queue.Queue() + stop_thread = threading.Event() + thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread)) + thread.daemon = True + thread.start() + + logger.debug(f"Thread Started: {feed['u']}") + thread.join(timeout=90) + + if thread.is_alive(): + stop_thread.set() + logger.debug(f"Killing Thread: {feed['u']}") + return None + else: + try: + output = output_queue.get_nowait() + logger.info(f"Thread Succeeded: {feed['u']}") + return output + except queue.Empty: + logger.info(f"Thread Failed: {feed['u']}") + return None + +def extract_feed_threading(rss: dict, output_queue, stop_thread): + articles = [] + feed_url = rss['u'] + last_date = rss['dt'] + max_date = last_date + entry = None # Initialize entry variable + + try: + feed = feedparser.parse(feed_url) + for entry in feed['entries']: + if stop_thread.is_set(): + break + + pub_date = parse_pub_date(entry.get('published', '')) + + if pub_date > last_date: + title, text = extract_article(entry.link) + title, text = clean_text(title or ''), clean_text(text or '') + article = { + 'link': entry.link, + 'rss': feed_url, + 'title': title, + 'content': text, + 'unixTime': pub_date, + 'rss_id': generate_key(feed_url), + 'article_id': generate_key(entry.link), + 'llm_summary': None, + 'embedding': None, + 'tags': [] + } + articles.append(article) + max_date = max(max_date, pub_date) + + output = { + 'articles': articles, + 'max_date': max_date, + 'feed': rss + } + output_queue.put(output) + except Exception as e: + logger.error(f"Feed URL: {feed_url}") + if entry: + logger.error(f"Current entry: {entry.get('link', 'unknown')}") + logger.error(f"Feed failed due to error: {e}") + +def extract_feed(rss: dict): + articles = [] + feed_url = rss['u'] + last_date = rss['dt'] + max_date = last_date + + try: + feed = feedparser.parse(feed_url) + for entry in feed['entries']: + pub_date = parse_pub_date(entry.get('published', '')) + + if pub_date > last_date: + title, text = extract_article(entry.link) + article = { + 'link': entry.link, + 'rss': feed_url, + 'title': title, + 'content': text, + 'unixTime': pub_date, + 'rss_id': generate_key(feed_url), + 'article_id': generate_key(entry.link), + 'llm_summary': None, + 'embedding': None, + 'tags': [] + } + articles.append(article) + max_date = max(max_date, pub_date) + + output = { + 'articles': articles, + 'max_date': max_date, + 'feed': rss + } + return output + except Exception as e: + logger.error(f"Feed URL: {feed_url}") + logger.error(f"Feed failed due to error: {e}") + +def parse_pub_date(date_string: str) -> int: + """Parse publication date from various formats""" + if not date_string: + return int(datetime.now().timestamp()) + + try: + return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp()) + except ValueError: + try: + return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp()) + except ValueError: + try: + return int(parser.parse(date_string).timestamp()) + except (ValueError, TypeError): + pass + + return int(datetime.now().timestamp()) # Return current time if no date is found \ No newline at end of file diff --git a/src/feed_processing/feed_processor.py.orig b/src/feed_processing/feed_processor.py.orig new file mode 100644 index 0000000..b34a518 --- /dev/null +++ b/src/feed_processing/feed_processor.py.orig @@ -0,0 +1,131 @@ +import feedparser +from datetime import datetime +from dateutil import parser +import queue +import threading +import logging +from utils import generate_key +from article_extractor import extract_article +from article_cleaning import clean_text + +logger = logging.getLogger() + +def process_feed(feed: dict): + output_queue = queue.Queue() + stop_thread = threading.Event() + thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread)) + thread.daemon = True + thread.start() + + logger.debug(f"Thread Started: {feed['u']}") + thread.join(timeout=90) + + if thread.is_alive(): + stop_thread.set() + logger.debug(f"Killing Thread: {feed['u']}") + return None + else: + try: + output = output_queue.get_nowait() + logger.info(f"Thread Succeeded: {feed['u']}") + return output + except queue.Empty: + logger.info(f"Thread Failed: {feed['u']}") + return None + +def extract_feed_threading(rss: dict, output_queue, stop_thread): + articles = [] + feed_url = rss['u'] + last_date = rss['dt'] + max_date = last_date + + try: + feed = feedparser.parse(feed_url) + for entry in feed['entries']: + if stop_thread.is_set(): + break + + pub_date = parse_pub_date(entry['published']) + + if pub_date > last_date: + title, text = extract_article(entry.link) + title, text = clean_text(title), clean_text(text) + article = { + 'link': entry.link, + 'rss': feed_url, + 'title': title, + 'content': text, + 'unixTime': pub_date, + 'rss_id': generate_key(feed_url), + 'article_id': generate_key(entry.link), + 'llm_summary': None, + 'embedding': None + } + articles.append(article) + max_date = max(max_date, pub_date) + + output = { + 'articles': articles, + 'max_date': max_date, + 'feed': rss + } + output_queue.put(output) + except Exception as e: + logger.error(f"Feed: {entry}") + logger.error(f"Feed failed due to error: {e}") + +def extract_feed(rss: dict): + articles = [] + feed_url = rss['u'] + last_date = rss['dt'] + max_date = last_date + + try: + feed = feedparser.parse(feed_url) + for entry in feed['entries']: + pub_date = parse_pub_date(entry['published']) + + if pub_date > last_date: + title, text = extract_article(entry.link) + article = { + 'link': entry.link, + 'rss': feed_url, + 'title': title, + 'content': text, + 'unixTime': pub_date, + 'rss_id': generate_key(feed_url), + 'article_id': generate_key(entry.link), + 'llm_summary': None, + 'embedding': None + } + articles.append(article) + max_date = max(max_date, pub_date) + + output = { + 'articles': articles, + 'max_date': max_date, + 'feed': rss + } + print(output) + return output + except Exception as e: + logger.error(f"Feed: {entry}") + logger.error(f"Feed failed due to error: {e}") + +def parse_pub_date(entry:dict): + + if 'published' in entry: + date_string = entry['published'] + + try: + return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp()) + except ValueError: + try: + return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp()) + except ValueError: + try: + return int(parser.parse(date_string).timestamp()) + except ValueError: + pass + + return int(datetime.now().timestamp()) # Return current time if no date is found \ No newline at end of file diff --git a/src/feed_processing/metrics.py b/src/feed_processing/metrics.py new file mode 100644 index 0000000..9d9fe1c --- /dev/null +++ b/src/feed_processing/metrics.py @@ -0,0 +1,46 @@ +"""Prometheus metrics utilities for the RSS feed processor.""" + +import os +from prometheus_client import Counter, Histogram, start_http_server + + +# Start a Prometheus metrics HTTP server exposing ``/metrics``. The port can be +# customised with the ``METRICS_PORT`` environment variable. This block is safe +# to run multiple times as it silently ignores port binding errors. +_metrics_port = int(os.environ.get("METRICS_PORT", "8000")) +if not os.environ.get("METRICS_SERVER_STARTED"): + try: + start_http_server(_metrics_port) + os.environ["METRICS_SERVER_STARTED"] = "1" + except OSError: + pass + + +# Metric definitions +_processed_articles = Counter( + "processed_articles_total", + "Total number of processed articles", +) +_processing_time = Histogram( + "rss_processing_seconds", + "Time spent processing RSS feeds", +) +_extraction_errors = Counter( + "extraction_errors_total", + "Number of article extraction errors", +) + + +def record_processed_articles(count: int) -> None: + """Increment the processed articles counter.""" + _processed_articles.inc(count) + + +def record_processing_time(duration: float) -> None: + """Record how long a feed took to process.""" + _processing_time.observe(duration) + + +def record_extraction_errors(count: int) -> None: + """Increment the extraction errors counter.""" + _extraction_errors.inc(count) diff --git a/src/feed_processing/utils.py b/src/feed_processing/utils.py new file mode 100644 index 0000000..b7dfcd7 --- /dev/null +++ b/src/feed_processing/utils.py @@ -0,0 +1,18 @@ +import logging +import os +import hashlib + +def setup_logging(): + logger = logging.getLogger() + log_level = os.environ.get('LOG_LEVEL', 'INFO') + logger.setLevel(logging.getLevelName(log_level)) + return logger + + +def generate_key(input_string, length=10): + # Create a SHA256 hash of the input string + hash_object = hashlib.sha256(input_string.encode()) + hex_dig = hash_object.hexdigest() + + # Return the first 'length' characters of the hash + return hex_dig[:length] diff --git a/src/feed_processing/worker_main.py b/src/feed_processing/worker_main.py new file mode 100644 index 0000000..68bb048 --- /dev/null +++ b/src/feed_processing/worker_main.py @@ -0,0 +1,63 @@ +import json +import time +import os +import redis +from .feed_processor import extract_feed +from .data_storage import save_article, update_rss_feed +from .utils import setup_logging +from .config import REDIS_URL, REDIS_QUEUE_NAME +from .exceptions import RSSProcessingError, DataStorageError +from .metrics import ( + record_processed_articles, + record_processing_time, + record_extraction_errors, +) + +logger = setup_logging() +storage_strategy = os.environ.get("STORAGE_STRATEGY") +redis_client = redis.Redis.from_url(REDIS_URL) + + +def worker_main(): + logger.info("Starting RSS feed processing") + start_time = time.time() + + try: + feed_data = redis_client.rpop(REDIS_QUEUE_NAME) + if not feed_data: + logger.info("No messages in queue") + return + feed = json.loads(feed_data) + + result = extract_feed(feed) + logger.info(f"Process Feed Result Dictionary: {result}") + last_pub_dt = result["max_date"] + + if result: + for article in result["articles"]: + try: + save_article(article, storage_strategy) + except DataStorageError as e: + logger.error(f"Failed to save article: {str(e)}") + record_extraction_errors(1) + + update_rss_feed(result["feed"], last_pub_dt) + logger.info(f"Processed feed: {feed['u']}") + record_processed_articles(len(result["articles"])) + else: + logger.warning(f"Failed to process feed: {feed['u']}") + record_extraction_errors(1) + + except RSSProcessingError as e: + logger.error(f"RSS Processing Error: {str(e)}") + return + + except Exception as e: + logger.error(f"Unexpected error: {str(e)}") + return + + finally: + end_time = time.time() + processing_time = end_time - start_time + record_processing_time(processing_time) + logger.info(f"Worker execution time: {processing_time:.2f} seconds")