diff --git a/src/dashboard/Dockerfile b/src/dashboard/Dockerfile
new file mode 100644
index 0000000..e9c8906
--- /dev/null
+++ b/src/dashboard/Dockerfile
@@ -0,0 +1,11 @@
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY . /app
+
+RUN pip install fastapi uvicorn pymongo
+
+EXPOSE 8000
+
+CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
new file mode 100644
index 0000000..8fdfffd
--- /dev/null
+++ b/src/dashboard/app.py
@@ -0,0 +1,47 @@
+from fastapi import FastAPI, Request, Response
+from fastapi.staticfiles import StaticFiles
+from fastapi.responses import FileResponse, JSONResponse
+from pymongo import MongoClient
+import os
+import json
+
+app = FastAPI()
+
+# Serve static files (HTML/JS)
+app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static")
+
+# MongoDB setup
+MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017")
+MONGODB_FEEDS_DB_NAME = os.environ.get("MONGODB_FEEDS_DB_NAME", "feeds_db")
+MONGODB_FEEDS_COLLECTION_NAME = os.environ.get("MONGODB_FEEDS_COLLECTION_NAME", "rss_feeds")
+mongo_client = MongoClient(MONGODB_URL)
+feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME]
+
+@app.get("/")
+def root():
+ return FileResponse(os.path.join(os.path.dirname(__file__), "static", "index.html"))
+
+@app.get("/api/feeds")
+def get_feeds():
+ feeds = list(feeds_collection.find({}, {"_id": 0}))
+ return JSONResponse(content=feeds)
+
+@app.post("/api/feeds")
+async def add_feed(request: Request):
+ data = await request.json()
+ url = data.get("url")
+ if not url:
+ return JSONResponse(content={"error": "Missing URL"}, status_code=400)
+ feeds_collection.update_one({"url": url}, {"$setOnInsert": {"url": url, "dt": 0}}, upsert=True)
+ return JSONResponse(content={"status": "ok"})
+
+@app.delete("/api/feeds")
+async def delete_feed(request: Request):
+ data = await request.json()
+ url = data.get("url")
+ if not url:
+ return JSONResponse(content={"error": "Missing URL"}, status_code=400)
+ feeds_collection.delete_one({"url": url})
+ return JSONResponse(content={"status": "ok"})
+
+# Add similar endpoints for schedules and job status as needed
diff --git a/src/dashboard/static/index.html b/src/dashboard/static/index.html
new file mode 100644
index 0000000..3bbfe65
--- /dev/null
+++ b/src/dashboard/static/index.html
@@ -0,0 +1,23 @@
+
+
+
+ IngestRSS Dashboard
+
+
+
+ IngestRSS Dashboard
+ Feeds
+
+
+
+
+
+
diff --git a/src/dashboard/static/main.js b/src/dashboard/static/main.js
new file mode 100644
index 0000000..732fc55
--- /dev/null
+++ b/src/dashboard/static/main.js
@@ -0,0 +1,35 @@
+async function fetchFeeds() {
+ const res = await fetch('/api/feeds');
+ const feeds = await res.json();
+ const tbody = document.querySelector('#feeds-table tbody');
+ tbody.innerHTML = '';
+ feeds.forEach(feed => {
+ const tr = document.createElement('tr');
+ tr.innerHTML = `${feed.url} |
+ | `;
+ tbody.appendChild(tr);
+ });
+}
+
+async function addFeed() {
+ const url = document.getElementById('feed-url').value;
+ if (!url) return alert('Enter a URL');
+ await fetch('/api/feeds', {
+ method: 'POST',
+ headers: {'Content-Type': 'application/json'},
+ body: JSON.stringify({url})
+ });
+ document.getElementById('feed-url').value = '';
+ fetchFeeds();
+}
+
+async function deleteFeed(url) {
+ await fetch('/api/feeds', {
+ method: 'DELETE',
+ headers: {'Content-Type': 'application/json'},
+ body: JSON.stringify({url})
+ });
+ fetchFeeds();
+}
+
+window.onload = fetchFeeds;
diff --git a/src/feed_management/scheduler_main.py b/src/feed_management/scheduler_main.py
new file mode 100644
index 0000000..0e6b988
--- /dev/null
+++ b/src/feed_management/scheduler_main.py
@@ -0,0 +1,57 @@
+import json
+import os
+import logging
+import time
+
+from pymongo import MongoClient
+from datetime import datetime
+import redis
+
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
+# Local deployment - Redis only
+redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
+REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue')
+
+MONGODB_URL = os.environ['MONGODB_URL']
+MONGODB_FEEDS_DB_NAME = os.environ.get('MONGODB_FEEDS_DB_NAME', 'feeds_db')
+MONGODB_FEEDS_COLLECTION_NAME = os.environ.get('MONGODB_FEEDS_COLLECTION_NAME', 'rss_feeds')
+
+mongo_client = MongoClient(MONGODB_URL)
+feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME]
+
+# Calculate timestamp for 48 hours ago
+dt_48h_ago = int(time.time()) - 48 * 3600
+
+# Update all feeds
+feeds_collection.update_many({}, {"$set": {"dt": dt_48h_ago}})
+
+def scheduler_main():
+ messages_sent = 0
+
+ # Iterate over all feeds in MongoDB
+ for item in feeds_collection.find({}):
+ rss_url = item.get('url')
+ rss_dt = item.get('dt')
+
+ logger.debug(f"Processing RSS feed: {rss_url}")
+ logger.debug(f"Last published date: {rss_dt}")
+
+ if rss_url:
+ message = {
+ 'u': rss_url,
+ 'dt': rss_dt
+ }
+ logger.debug(f"Message: {message}")
+
+ try:
+ # Send to Redis for local deployment
+ redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message))
+ messages_sent += 1
+ except Exception as e:
+ logger.error(f"Error sending message to queue: {str(e)}")
+
+ logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}")
+
+ return
\ No newline at end of file
diff --git a/src/feed_processing/analytics/embeddings/vector_db.py b/src/feed_processing/analytics/embeddings/vector_db.py
new file mode 100644
index 0000000..9bd4ee3
--- /dev/null
+++ b/src/feed_processing/analytics/embeddings/vector_db.py
@@ -0,0 +1,72 @@
+import os
+import requests
+
+from qdrant_client import QdrantClient, models
+
+try:
+ from ...utils import setup_logging
+except ImportError:
+ # Fallback for when running standalone
+ import logging
+ def setup_logging():
+ return logging.getLogger(__name__)
+
+logger = setup_logging()
+
+qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
+qdrant_api_key = os.getenv("QDRANT_API_KEY")
+collection_name = os.getenv("QDRANT_COLLECTION_NAME")
+
+embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM")
+vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine")
+
+ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
+ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
+
+client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key)
+
+def get_index():
+ collections = client.get_collections().collections
+ if collection_name not in [c.name for c in collections]:
+ raise KeyError(f"Collection {collection_name} not found")
+ return client
+
+def vectorize(article: str) -> list[float]:
+ try:
+ response = requests.post(
+ f"{ollama_host}/api/embeddings",
+ json={"model": ollama_embedding_model, "prompt": article},
+ timeout=30,
+ )
+ response.raise_for_status()
+ return response.json().get("embedding", [])
+ except requests.RequestException as e:
+ logger.error(f"Error generating embedding: {e}")
+ # Return a zero vector of the expected dimension as fallback
+ dim = int(embedding_dim) if embedding_dim else 384 # Default dimension
+ return [0.0] * dim
+
+
+def upsert_vectors(index: QdrantClient, data: list[dict]):
+ points = [
+ models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload"))
+ for item in data
+ ]
+ index.upsert(collection_name=collection_name, points=points)
+
+
+def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None):
+ if embedding_dim and len(vector) != int(embedding_dim):
+ raise ValueError("Length of vector does not match the embedding dimension")
+ return index.search(
+ collection_name=collection_name,
+ query_vector=vector,
+ limit=top_k,
+ with_payload=True,
+ query_filter=filter_query,
+ )
+
+
+if __name__ == "__main__":
+ paragraph = "This is a test."
+ vectorize(paragraph)
\ No newline at end of file
diff --git a/src/feed_processing/analytics/genai/summarization.py b/src/feed_processing/analytics/genai/summarization.py
new file mode 100644
index 0000000..3e109e9
--- /dev/null
+++ b/src/feed_processing/analytics/genai/summarization.py
@@ -0,0 +1,6 @@
+
+
+def summarize(text:str):
+ sub_prompt = "Summarize the following passage"
+
+
\ No newline at end of file
diff --git a/src/feed_processing/article_cleaning.py b/src/feed_processing/article_cleaning.py
new file mode 100644
index 0000000..a03a743
--- /dev/null
+++ b/src/feed_processing/article_cleaning.py
@@ -0,0 +1,14 @@
+import re
+
+def remove_newlines(text: str) -> str:
+ return text.replace('\n', '')
+
+def remove_urls(text: str) -> str:
+ url_pattern = re.compile(r'http\S+|www\S+')
+ return url_pattern.sub('', text)
+
+
+def clean_text(text: str) -> str:
+ text = remove_newlines(text)
+ text = remove_urls(text)
+ return text
\ No newline at end of file
diff --git a/src/feed_processing/article_extractor.py b/src/feed_processing/article_extractor.py
new file mode 100644
index 0000000..2a26d90
--- /dev/null
+++ b/src/feed_processing/article_extractor.py
@@ -0,0 +1,31 @@
+import newspaper
+import logging
+
+
+
+logger = logging.getLogger()
+
+def extract_article(url):
+ """
+ Extracts the title and text of an article from the given URL.
+
+ Args:
+ url (str): The URL of the article.
+ Returns:
+ A tuple containing the title and text of the article, respectively.
+ """
+ logger.debug(f"Starting Newspaper Article Extraction {url}")
+ config = newspaper.Config()
+ config.request_timeout = 60
+ article = newspaper.Article(url)
+
+ try:
+ article.download()
+ logger.debug(f"Downloaded Article {url}")
+ article.parse()
+ logger.debug(f"Parsed Article {url}")
+
+ return article.title, article.text
+ except Exception as e:
+ logger.error(f"Failed to extract article {url}: {str(e)}")
+ return None, None
\ No newline at end of file
diff --git a/src/feed_processing/config.py b/src/feed_processing/config.py
new file mode 100644
index 0000000..6c36b63
--- /dev/null
+++ b/src/feed_processing/config.py
@@ -0,0 +1,15 @@
+import os
+
+# Redis Configuration
+REDIS_URL = os.environ["REDIS_URL"]
+REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
+
+# Logging Configuration
+LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
+
+# RSS Feed Processing Configuration
+MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10"))
+FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90"))
+
+# Article Extraction Configuration
+ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30"))
diff --git a/src/feed_processing/data_storage.py b/src/feed_processing/data_storage.py
new file mode 100644
index 0000000..abd33aa
--- /dev/null
+++ b/src/feed_processing/data_storage.py
@@ -0,0 +1,50 @@
+import json
+import os
+import logging
+from pymongo import MongoClient
+
+logger = logging.getLogger()
+
+# Try to import vector DB components, but make them optional
+try:
+ from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
+ VECTOR_DB_AVAILABLE = True
+except ImportError:
+ VECTOR_DB_AVAILABLE = False
+ logger.warning("Vector DB components not available. Qdrant storage will not work.")
+
+MONGODB_URL = os.getenv("MONGODB_URL")
+MONGODB_ARTICLES_DB_NAME = os.getenv("MONGODB_ARTICLES_DB_NAME", "articles_db")
+MONGODB_ARTICLES_COLLECTION_NAME = os.getenv("MONGODB_ARTICLES_COLLECTION_NAME", "articles")
+
+mongo_client = MongoClient(MONGODB_URL)
+articles_collection = mongo_client[MONGODB_ARTICLES_DB_NAME][MONGODB_ARTICLES_COLLECTION_NAME]
+
+##### Article Storage #####
+def save_article(article: dict, strategy: str = None):
+ # Only MongoDB is supported now
+ try:
+ articles_collection.insert_one(article)
+ logger.info(f"Saved article {article.get('article_id', '')} to MongoDB")
+ except Exception as e:
+ logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
+ raise
+
+###### Feed Storage ######
+RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
+
+
+def update_rss_feed(feed: dict, last_pub_dt: int):
+ try:
+ if not os.path.exists(RSS_FEEDS_FILE):
+ return
+ with open(RSS_FEEDS_FILE, "r") as f:
+ feeds = json.load(f)
+ for item in feeds:
+ if item.get("u") == feed["u"]:
+ item["dt"] = int(last_pub_dt)
+ with open(RSS_FEEDS_FILE, "w") as f:
+ json.dump(feeds, f)
+ logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
+ except Exception as e:
+ logger.error(f"Failed to update RSS feed: {str(e)}")
\ No newline at end of file
diff --git a/src/feed_processing/exceptions.py b/src/feed_processing/exceptions.py
new file mode 100644
index 0000000..c6c0702
--- /dev/null
+++ b/src/feed_processing/exceptions.py
@@ -0,0 +1,11 @@
+class RSSProcessingError(Exception):
+ """Exception raised for errors in the RSS processing."""
+ pass
+
+class ArticleExtractionError(Exception):
+ """Exception raised for errors in the article extraction."""
+ pass
+
+class DataStorageError(Exception):
+ """Exception raised for errors in data storage operations."""
+ pass
\ No newline at end of file
diff --git a/src/feed_processing/feed_processor.py b/src/feed_processing/feed_processor.py
new file mode 100644
index 0000000..7ddef76
--- /dev/null
+++ b/src/feed_processing/feed_processor.py
@@ -0,0 +1,135 @@
+import feedparser
+from datetime import datetime
+from dateutil import parser
+import queue
+import threading
+import logging
+from .utils import generate_key
+from .article_extractor import extract_article
+from .article_cleaning import clean_text
+
+logger = logging.getLogger()
+
+def process_feed(feed: dict):
+ output_queue = queue.Queue()
+ stop_thread = threading.Event()
+ thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
+ thread.daemon = True
+ thread.start()
+
+ logger.debug(f"Thread Started: {feed['u']}")
+ thread.join(timeout=90)
+
+ if thread.is_alive():
+ stop_thread.set()
+ logger.debug(f"Killing Thread: {feed['u']}")
+ return None
+ else:
+ try:
+ output = output_queue.get_nowait()
+ logger.info(f"Thread Succeeded: {feed['u']}")
+ return output
+ except queue.Empty:
+ logger.info(f"Thread Failed: {feed['u']}")
+ return None
+
+def extract_feed_threading(rss: dict, output_queue, stop_thread):
+ articles = []
+ feed_url = rss['u']
+ last_date = rss['dt']
+ max_date = last_date
+ entry = None # Initialize entry variable
+
+ try:
+ feed = feedparser.parse(feed_url)
+ for entry in feed['entries']:
+ if stop_thread.is_set():
+ break
+
+ pub_date = parse_pub_date(entry.get('published', ''))
+
+ if pub_date > last_date:
+ title, text = extract_article(entry.link)
+ title, text = clean_text(title or ''), clean_text(text or '')
+ article = {
+ 'link': entry.link,
+ 'rss': feed_url,
+ 'title': title,
+ 'content': text,
+ 'unixTime': pub_date,
+ 'rss_id': generate_key(feed_url),
+ 'article_id': generate_key(entry.link),
+ 'llm_summary': None,
+ 'embedding': None,
+ 'tags': []
+ }
+ articles.append(article)
+ max_date = max(max_date, pub_date)
+
+ output = {
+ 'articles': articles,
+ 'max_date': max_date,
+ 'feed': rss
+ }
+ output_queue.put(output)
+ except Exception as e:
+ logger.error(f"Feed URL: {feed_url}")
+ if entry:
+ logger.error(f"Current entry: {entry.get('link', 'unknown')}")
+ logger.error(f"Feed failed due to error: {e}")
+
+def extract_feed(rss: dict):
+ articles = []
+ feed_url = rss['u']
+ last_date = rss['dt']
+ max_date = last_date
+
+ try:
+ feed = feedparser.parse(feed_url)
+ for entry in feed['entries']:
+ pub_date = parse_pub_date(entry.get('published', ''))
+
+ if pub_date > last_date:
+ title, text = extract_article(entry.link)
+ article = {
+ 'link': entry.link,
+ 'rss': feed_url,
+ 'title': title,
+ 'content': text,
+ 'unixTime': pub_date,
+ 'rss_id': generate_key(feed_url),
+ 'article_id': generate_key(entry.link),
+ 'llm_summary': None,
+ 'embedding': None,
+ 'tags': []
+ }
+ articles.append(article)
+ max_date = max(max_date, pub_date)
+
+ output = {
+ 'articles': articles,
+ 'max_date': max_date,
+ 'feed': rss
+ }
+ return output
+ except Exception as e:
+ logger.error(f"Feed URL: {feed_url}")
+ logger.error(f"Feed failed due to error: {e}")
+
+def parse_pub_date(date_string: str) -> int:
+ """Parse publication date from various formats"""
+ if not date_string:
+ return int(datetime.now().timestamp())
+
+ try:
+ return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
+ except ValueError:
+ try:
+ return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
+ except ValueError:
+ try:
+ return int(parser.parse(date_string).timestamp())
+ except (ValueError, TypeError):
+ pass
+
+ return int(datetime.now().timestamp()) # Return current time if no date is found
\ No newline at end of file
diff --git a/src/feed_processing/feed_processor.py.orig b/src/feed_processing/feed_processor.py.orig
new file mode 100644
index 0000000..b34a518
--- /dev/null
+++ b/src/feed_processing/feed_processor.py.orig
@@ -0,0 +1,131 @@
+import feedparser
+from datetime import datetime
+from dateutil import parser
+import queue
+import threading
+import logging
+from utils import generate_key
+from article_extractor import extract_article
+from article_cleaning import clean_text
+
+logger = logging.getLogger()
+
+def process_feed(feed: dict):
+ output_queue = queue.Queue()
+ stop_thread = threading.Event()
+ thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
+ thread.daemon = True
+ thread.start()
+
+ logger.debug(f"Thread Started: {feed['u']}")
+ thread.join(timeout=90)
+
+ if thread.is_alive():
+ stop_thread.set()
+ logger.debug(f"Killing Thread: {feed['u']}")
+ return None
+ else:
+ try:
+ output = output_queue.get_nowait()
+ logger.info(f"Thread Succeeded: {feed['u']}")
+ return output
+ except queue.Empty:
+ logger.info(f"Thread Failed: {feed['u']}")
+ return None
+
+def extract_feed_threading(rss: dict, output_queue, stop_thread):
+ articles = []
+ feed_url = rss['u']
+ last_date = rss['dt']
+ max_date = last_date
+
+ try:
+ feed = feedparser.parse(feed_url)
+ for entry in feed['entries']:
+ if stop_thread.is_set():
+ break
+
+ pub_date = parse_pub_date(entry['published'])
+
+ if pub_date > last_date:
+ title, text = extract_article(entry.link)
+ title, text = clean_text(title), clean_text(text)
+ article = {
+ 'link': entry.link,
+ 'rss': feed_url,
+ 'title': title,
+ 'content': text,
+ 'unixTime': pub_date,
+ 'rss_id': generate_key(feed_url),
+ 'article_id': generate_key(entry.link),
+ 'llm_summary': None,
+ 'embedding': None
+ }
+ articles.append(article)
+ max_date = max(max_date, pub_date)
+
+ output = {
+ 'articles': articles,
+ 'max_date': max_date,
+ 'feed': rss
+ }
+ output_queue.put(output)
+ except Exception as e:
+ logger.error(f"Feed: {entry}")
+ logger.error(f"Feed failed due to error: {e}")
+
+def extract_feed(rss: dict):
+ articles = []
+ feed_url = rss['u']
+ last_date = rss['dt']
+ max_date = last_date
+
+ try:
+ feed = feedparser.parse(feed_url)
+ for entry in feed['entries']:
+ pub_date = parse_pub_date(entry['published'])
+
+ if pub_date > last_date:
+ title, text = extract_article(entry.link)
+ article = {
+ 'link': entry.link,
+ 'rss': feed_url,
+ 'title': title,
+ 'content': text,
+ 'unixTime': pub_date,
+ 'rss_id': generate_key(feed_url),
+ 'article_id': generate_key(entry.link),
+ 'llm_summary': None,
+ 'embedding': None
+ }
+ articles.append(article)
+ max_date = max(max_date, pub_date)
+
+ output = {
+ 'articles': articles,
+ 'max_date': max_date,
+ 'feed': rss
+ }
+ print(output)
+ return output
+ except Exception as e:
+ logger.error(f"Feed: {entry}")
+ logger.error(f"Feed failed due to error: {e}")
+
+def parse_pub_date(entry:dict):
+
+ if 'published' in entry:
+ date_string = entry['published']
+
+ try:
+ return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
+ except ValueError:
+ try:
+ return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
+ except ValueError:
+ try:
+ return int(parser.parse(date_string).timestamp())
+ except ValueError:
+ pass
+
+ return int(datetime.now().timestamp()) # Return current time if no date is found
\ No newline at end of file
diff --git a/src/feed_processing/metrics.py b/src/feed_processing/metrics.py
new file mode 100644
index 0000000..9d9fe1c
--- /dev/null
+++ b/src/feed_processing/metrics.py
@@ -0,0 +1,46 @@
+"""Prometheus metrics utilities for the RSS feed processor."""
+
+import os
+from prometheus_client import Counter, Histogram, start_http_server
+
+
+# Start a Prometheus metrics HTTP server exposing ``/metrics``. The port can be
+# customised with the ``METRICS_PORT`` environment variable. This block is safe
+# to run multiple times as it silently ignores port binding errors.
+_metrics_port = int(os.environ.get("METRICS_PORT", "8000"))
+if not os.environ.get("METRICS_SERVER_STARTED"):
+ try:
+ start_http_server(_metrics_port)
+ os.environ["METRICS_SERVER_STARTED"] = "1"
+ except OSError:
+ pass
+
+
+# Metric definitions
+_processed_articles = Counter(
+ "processed_articles_total",
+ "Total number of processed articles",
+)
+_processing_time = Histogram(
+ "rss_processing_seconds",
+ "Time spent processing RSS feeds",
+)
+_extraction_errors = Counter(
+ "extraction_errors_total",
+ "Number of article extraction errors",
+)
+
+
+def record_processed_articles(count: int) -> None:
+ """Increment the processed articles counter."""
+ _processed_articles.inc(count)
+
+
+def record_processing_time(duration: float) -> None:
+ """Record how long a feed took to process."""
+ _processing_time.observe(duration)
+
+
+def record_extraction_errors(count: int) -> None:
+ """Increment the extraction errors counter."""
+ _extraction_errors.inc(count)
diff --git a/src/feed_processing/utils.py b/src/feed_processing/utils.py
new file mode 100644
index 0000000..b7dfcd7
--- /dev/null
+++ b/src/feed_processing/utils.py
@@ -0,0 +1,18 @@
+import logging
+import os
+import hashlib
+
+def setup_logging():
+ logger = logging.getLogger()
+ log_level = os.environ.get('LOG_LEVEL', 'INFO')
+ logger.setLevel(logging.getLevelName(log_level))
+ return logger
+
+
+def generate_key(input_string, length=10):
+ # Create a SHA256 hash of the input string
+ hash_object = hashlib.sha256(input_string.encode())
+ hex_dig = hash_object.hexdigest()
+
+ # Return the first 'length' characters of the hash
+ return hex_dig[:length]
diff --git a/src/feed_processing/worker_main.py b/src/feed_processing/worker_main.py
new file mode 100644
index 0000000..68bb048
--- /dev/null
+++ b/src/feed_processing/worker_main.py
@@ -0,0 +1,63 @@
+import json
+import time
+import os
+import redis
+from .feed_processor import extract_feed
+from .data_storage import save_article, update_rss_feed
+from .utils import setup_logging
+from .config import REDIS_URL, REDIS_QUEUE_NAME
+from .exceptions import RSSProcessingError, DataStorageError
+from .metrics import (
+ record_processed_articles,
+ record_processing_time,
+ record_extraction_errors,
+)
+
+logger = setup_logging()
+storage_strategy = os.environ.get("STORAGE_STRATEGY")
+redis_client = redis.Redis.from_url(REDIS_URL)
+
+
+def worker_main():
+ logger.info("Starting RSS feed processing")
+ start_time = time.time()
+
+ try:
+ feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
+ if not feed_data:
+ logger.info("No messages in queue")
+ return
+ feed = json.loads(feed_data)
+
+ result = extract_feed(feed)
+ logger.info(f"Process Feed Result Dictionary: {result}")
+ last_pub_dt = result["max_date"]
+
+ if result:
+ for article in result["articles"]:
+ try:
+ save_article(article, storage_strategy)
+ except DataStorageError as e:
+ logger.error(f"Failed to save article: {str(e)}")
+ record_extraction_errors(1)
+
+ update_rss_feed(result["feed"], last_pub_dt)
+ logger.info(f"Processed feed: {feed['u']}")
+ record_processed_articles(len(result["articles"]))
+ else:
+ logger.warning(f"Failed to process feed: {feed['u']}")
+ record_extraction_errors(1)
+
+ except RSSProcessingError as e:
+ logger.error(f"RSS Processing Error: {str(e)}")
+ return
+
+ except Exception as e:
+ logger.error(f"Unexpected error: {str(e)}")
+ return
+
+ finally:
+ end_time = time.time()
+ processing_time = end_time - start_time
+ record_processing_time(processing_time)
+ logger.info(f"Worker execution time: {processing_time:.2f} seconds")