mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2026-02-01 12:24:28 +01:00
dashboard
This commit is contained in:
11
src/dashboard/Dockerfile
Normal file
11
src/dashboard/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY . /app
|
||||
|
||||
RUN pip install fastapi uvicorn pymongo
|
||||
|
||||
EXPOSE 8000
|
||||
|
||||
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||
47
src/dashboard/app.py
Normal file
47
src/dashboard/app.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from fastapi import FastAPI, Request, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.responses import FileResponse, JSONResponse
|
||||
from pymongo import MongoClient
|
||||
import os
|
||||
import json
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# Serve static files (HTML/JS)
|
||||
app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static")
|
||||
|
||||
# MongoDB setup
|
||||
MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017")
|
||||
MONGODB_FEEDS_DB_NAME = os.environ.get("MONGODB_FEEDS_DB_NAME", "feeds_db")
|
||||
MONGODB_FEEDS_COLLECTION_NAME = os.environ.get("MONGODB_FEEDS_COLLECTION_NAME", "rss_feeds")
|
||||
mongo_client = MongoClient(MONGODB_URL)
|
||||
feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME]
|
||||
|
||||
@app.get("/")
|
||||
def root():
|
||||
return FileResponse(os.path.join(os.path.dirname(__file__), "static", "index.html"))
|
||||
|
||||
@app.get("/api/feeds")
|
||||
def get_feeds():
|
||||
feeds = list(feeds_collection.find({}, {"_id": 0}))
|
||||
return JSONResponse(content=feeds)
|
||||
|
||||
@app.post("/api/feeds")
|
||||
async def add_feed(request: Request):
|
||||
data = await request.json()
|
||||
url = data.get("url")
|
||||
if not url:
|
||||
return JSONResponse(content={"error": "Missing URL"}, status_code=400)
|
||||
feeds_collection.update_one({"url": url}, {"$setOnInsert": {"url": url, "dt": 0}}, upsert=True)
|
||||
return JSONResponse(content={"status": "ok"})
|
||||
|
||||
@app.delete("/api/feeds")
|
||||
async def delete_feed(request: Request):
|
||||
data = await request.json()
|
||||
url = data.get("url")
|
||||
if not url:
|
||||
return JSONResponse(content={"error": "Missing URL"}, status_code=400)
|
||||
feeds_collection.delete_one({"url": url})
|
||||
return JSONResponse(content={"status": "ok"})
|
||||
|
||||
# Add similar endpoints for schedules and job status as needed
|
||||
23
src/dashboard/static/index.html
Normal file
23
src/dashboard/static/index.html
Normal file
@@ -0,0 +1,23 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>IngestRSS Dashboard</title>
|
||||
<style>
|
||||
body { font-family: sans-serif; margin: 2em; }
|
||||
input, button { margin: 0.5em; }
|
||||
table { border-collapse: collapse; margin-top: 1em; }
|
||||
th, td { border: 1px solid #ccc; padding: 0.5em; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>IngestRSS Dashboard</h1>
|
||||
<h2>Feeds</h2>
|
||||
<input id="feed-url" placeholder="Feed URL">
|
||||
<button onclick="addFeed()">Add Feed</button>
|
||||
<table id="feeds-table">
|
||||
<thead><tr><th>URL</th><th>Actions</th></tr></thead>
|
||||
<tbody></tbody>
|
||||
</table>
|
||||
<script src="main.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
35
src/dashboard/static/main.js
Normal file
35
src/dashboard/static/main.js
Normal file
@@ -0,0 +1,35 @@
|
||||
async function fetchFeeds() {
|
||||
const res = await fetch('/api/feeds');
|
||||
const feeds = await res.json();
|
||||
const tbody = document.querySelector('#feeds-table tbody');
|
||||
tbody.innerHTML = '';
|
||||
feeds.forEach(feed => {
|
||||
const tr = document.createElement('tr');
|
||||
tr.innerHTML = `<td>${feed.url}</td>
|
||||
<td><button onclick="deleteFeed('${feed.url}')">Delete</button></td>`;
|
||||
tbody.appendChild(tr);
|
||||
});
|
||||
}
|
||||
|
||||
async function addFeed() {
|
||||
const url = document.getElementById('feed-url').value;
|
||||
if (!url) return alert('Enter a URL');
|
||||
await fetch('/api/feeds', {
|
||||
method: 'POST',
|
||||
headers: {'Content-Type': 'application/json'},
|
||||
body: JSON.stringify({url})
|
||||
});
|
||||
document.getElementById('feed-url').value = '';
|
||||
fetchFeeds();
|
||||
}
|
||||
|
||||
async function deleteFeed(url) {
|
||||
await fetch('/api/feeds', {
|
||||
method: 'DELETE',
|
||||
headers: {'Content-Type': 'application/json'},
|
||||
body: JSON.stringify({url})
|
||||
});
|
||||
fetchFeeds();
|
||||
}
|
||||
|
||||
window.onload = fetchFeeds;
|
||||
57
src/feed_management/scheduler_main.py
Normal file
57
src/feed_management/scheduler_main.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import time
|
||||
|
||||
from pymongo import MongoClient
|
||||
from datetime import datetime
|
||||
import redis
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Local deployment - Redis only
|
||||
redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
|
||||
REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue')
|
||||
|
||||
MONGODB_URL = os.environ['MONGODB_URL']
|
||||
MONGODB_FEEDS_DB_NAME = os.environ.get('MONGODB_FEEDS_DB_NAME', 'feeds_db')
|
||||
MONGODB_FEEDS_COLLECTION_NAME = os.environ.get('MONGODB_FEEDS_COLLECTION_NAME', 'rss_feeds')
|
||||
|
||||
mongo_client = MongoClient(MONGODB_URL)
|
||||
feeds_collection = mongo_client[MONGODB_FEEDS_DB_NAME][MONGODB_FEEDS_COLLECTION_NAME]
|
||||
|
||||
# Calculate timestamp for 48 hours ago
|
||||
dt_48h_ago = int(time.time()) - 48 * 3600
|
||||
|
||||
# Update all feeds
|
||||
feeds_collection.update_many({}, {"$set": {"dt": dt_48h_ago}})
|
||||
|
||||
def scheduler_main():
|
||||
messages_sent = 0
|
||||
|
||||
# Iterate over all feeds in MongoDB
|
||||
for item in feeds_collection.find({}):
|
||||
rss_url = item.get('url')
|
||||
rss_dt = item.get('dt')
|
||||
|
||||
logger.debug(f"Processing RSS feed: {rss_url}")
|
||||
logger.debug(f"Last published date: {rss_dt}")
|
||||
|
||||
if rss_url:
|
||||
message = {
|
||||
'u': rss_url,
|
||||
'dt': rss_dt
|
||||
}
|
||||
logger.debug(f"Message: {message}")
|
||||
|
||||
try:
|
||||
# Send to Redis for local deployment
|
||||
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message))
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending message to queue: {str(e)}")
|
||||
|
||||
logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}")
|
||||
|
||||
return
|
||||
72
src/feed_processing/analytics/embeddings/vector_db.py
Normal file
72
src/feed_processing/analytics/embeddings/vector_db.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import os
|
||||
import requests
|
||||
|
||||
from qdrant_client import QdrantClient, models
|
||||
|
||||
try:
|
||||
from ...utils import setup_logging
|
||||
except ImportError:
|
||||
# Fallback for when running standalone
|
||||
import logging
|
||||
def setup_logging():
|
||||
return logging.getLogger(__name__)
|
||||
|
||||
logger = setup_logging()
|
||||
|
||||
qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
|
||||
qdrant_api_key = os.getenv("QDRANT_API_KEY")
|
||||
collection_name = os.getenv("QDRANT_COLLECTION_NAME")
|
||||
|
||||
embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM")
|
||||
vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine")
|
||||
|
||||
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||||
ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
|
||||
|
||||
client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key)
|
||||
|
||||
def get_index():
|
||||
collections = client.get_collections().collections
|
||||
if collection_name not in [c.name for c in collections]:
|
||||
raise KeyError(f"Collection {collection_name} not found")
|
||||
return client
|
||||
|
||||
def vectorize(article: str) -> list[float]:
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{ollama_host}/api/embeddings",
|
||||
json={"model": ollama_embedding_model, "prompt": article},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json().get("embedding", [])
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Error generating embedding: {e}")
|
||||
# Return a zero vector of the expected dimension as fallback
|
||||
dim = int(embedding_dim) if embedding_dim else 384 # Default dimension
|
||||
return [0.0] * dim
|
||||
|
||||
|
||||
def upsert_vectors(index: QdrantClient, data: list[dict]):
|
||||
points = [
|
||||
models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload"))
|
||||
for item in data
|
||||
]
|
||||
index.upsert(collection_name=collection_name, points=points)
|
||||
|
||||
|
||||
def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None):
|
||||
if embedding_dim and len(vector) != int(embedding_dim):
|
||||
raise ValueError("Length of vector does not match the embedding dimension")
|
||||
return index.search(
|
||||
collection_name=collection_name,
|
||||
query_vector=vector,
|
||||
limit=top_k,
|
||||
with_payload=True,
|
||||
query_filter=filter_query,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
paragraph = "This is a test."
|
||||
vectorize(paragraph)
|
||||
6
src/feed_processing/analytics/genai/summarization.py
Normal file
6
src/feed_processing/analytics/genai/summarization.py
Normal file
@@ -0,0 +1,6 @@
|
||||
|
||||
|
||||
def summarize(text:str):
|
||||
sub_prompt = "Summarize the following passage"
|
||||
|
||||
|
||||
14
src/feed_processing/article_cleaning.py
Normal file
14
src/feed_processing/article_cleaning.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import re
|
||||
|
||||
def remove_newlines(text: str) -> str:
|
||||
return text.replace('\n', '')
|
||||
|
||||
def remove_urls(text: str) -> str:
|
||||
url_pattern = re.compile(r'http\S+|www\S+')
|
||||
return url_pattern.sub('', text)
|
||||
|
||||
|
||||
def clean_text(text: str) -> str:
|
||||
text = remove_newlines(text)
|
||||
text = remove_urls(text)
|
||||
return text
|
||||
31
src/feed_processing/article_extractor.py
Normal file
31
src/feed_processing/article_extractor.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import newspaper
|
||||
import logging
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
def extract_article(url):
|
||||
"""
|
||||
Extracts the title and text of an article from the given URL.
|
||||
|
||||
Args:
|
||||
url (str): The URL of the article.
|
||||
Returns:
|
||||
A tuple containing the title and text of the article, respectively.
|
||||
"""
|
||||
logger.debug(f"Starting Newspaper Article Extraction {url}")
|
||||
config = newspaper.Config()
|
||||
config.request_timeout = 60
|
||||
article = newspaper.Article(url)
|
||||
|
||||
try:
|
||||
article.download()
|
||||
logger.debug(f"Downloaded Article {url}")
|
||||
article.parse()
|
||||
logger.debug(f"Parsed Article {url}")
|
||||
|
||||
return article.title, article.text
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract article {url}: {str(e)}")
|
||||
return None, None
|
||||
15
src/feed_processing/config.py
Normal file
15
src/feed_processing/config.py
Normal file
@@ -0,0 +1,15 @@
|
||||
import os
|
||||
|
||||
# Redis Configuration
|
||||
REDIS_URL = os.environ["REDIS_URL"]
|
||||
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
|
||||
|
||||
# Logging Configuration
|
||||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
|
||||
|
||||
# RSS Feed Processing Configuration
|
||||
MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10"))
|
||||
FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90"))
|
||||
|
||||
# Article Extraction Configuration
|
||||
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30"))
|
||||
50
src/feed_processing/data_storage.py
Normal file
50
src/feed_processing/data_storage.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from pymongo import MongoClient
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Try to import vector DB components, but make them optional
|
||||
try:
|
||||
from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
|
||||
VECTOR_DB_AVAILABLE = True
|
||||
except ImportError:
|
||||
VECTOR_DB_AVAILABLE = False
|
||||
logger.warning("Vector DB components not available. Qdrant storage will not work.")
|
||||
|
||||
MONGODB_URL = os.getenv("MONGODB_URL")
|
||||
MONGODB_ARTICLES_DB_NAME = os.getenv("MONGODB_ARTICLES_DB_NAME", "articles_db")
|
||||
MONGODB_ARTICLES_COLLECTION_NAME = os.getenv("MONGODB_ARTICLES_COLLECTION_NAME", "articles")
|
||||
|
||||
mongo_client = MongoClient(MONGODB_URL)
|
||||
articles_collection = mongo_client[MONGODB_ARTICLES_DB_NAME][MONGODB_ARTICLES_COLLECTION_NAME]
|
||||
|
||||
##### Article Storage #####
|
||||
def save_article(article: dict, strategy: str = None):
|
||||
# Only MongoDB is supported now
|
||||
try:
|
||||
articles_collection.insert_one(article)
|
||||
logger.info(f"Saved article {article.get('article_id', '')} to MongoDB")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
|
||||
raise
|
||||
|
||||
###### Feed Storage ######
|
||||
RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
|
||||
|
||||
|
||||
def update_rss_feed(feed: dict, last_pub_dt: int):
|
||||
try:
|
||||
if not os.path.exists(RSS_FEEDS_FILE):
|
||||
return
|
||||
with open(RSS_FEEDS_FILE, "r") as f:
|
||||
feeds = json.load(f)
|
||||
for item in feeds:
|
||||
if item.get("u") == feed["u"]:
|
||||
item["dt"] = int(last_pub_dt)
|
||||
with open(RSS_FEEDS_FILE, "w") as f:
|
||||
json.dump(feeds, f)
|
||||
logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RSS feed: {str(e)}")
|
||||
11
src/feed_processing/exceptions.py
Normal file
11
src/feed_processing/exceptions.py
Normal file
@@ -0,0 +1,11 @@
|
||||
class RSSProcessingError(Exception):
|
||||
"""Exception raised for errors in the RSS processing."""
|
||||
pass
|
||||
|
||||
class ArticleExtractionError(Exception):
|
||||
"""Exception raised for errors in the article extraction."""
|
||||
pass
|
||||
|
||||
class DataStorageError(Exception):
|
||||
"""Exception raised for errors in data storage operations."""
|
||||
pass
|
||||
135
src/feed_processing/feed_processor.py
Normal file
135
src/feed_processing/feed_processor.py
Normal file
@@ -0,0 +1,135 @@
|
||||
import feedparser
|
||||
from datetime import datetime
|
||||
from dateutil import parser
|
||||
import queue
|
||||
import threading
|
||||
import logging
|
||||
from .utils import generate_key
|
||||
from .article_extractor import extract_article
|
||||
from .article_cleaning import clean_text
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
def process_feed(feed: dict):
|
||||
output_queue = queue.Queue()
|
||||
stop_thread = threading.Event()
|
||||
thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
logger.debug(f"Thread Started: {feed['u']}")
|
||||
thread.join(timeout=90)
|
||||
|
||||
if thread.is_alive():
|
||||
stop_thread.set()
|
||||
logger.debug(f"Killing Thread: {feed['u']}")
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
output = output_queue.get_nowait()
|
||||
logger.info(f"Thread Succeeded: {feed['u']}")
|
||||
return output
|
||||
except queue.Empty:
|
||||
logger.info(f"Thread Failed: {feed['u']}")
|
||||
return None
|
||||
|
||||
def extract_feed_threading(rss: dict, output_queue, stop_thread):
|
||||
articles = []
|
||||
feed_url = rss['u']
|
||||
last_date = rss['dt']
|
||||
max_date = last_date
|
||||
entry = None # Initialize entry variable
|
||||
|
||||
try:
|
||||
feed = feedparser.parse(feed_url)
|
||||
for entry in feed['entries']:
|
||||
if stop_thread.is_set():
|
||||
break
|
||||
|
||||
pub_date = parse_pub_date(entry.get('published', ''))
|
||||
|
||||
if pub_date > last_date:
|
||||
title, text = extract_article(entry.link)
|
||||
title, text = clean_text(title or ''), clean_text(text or '')
|
||||
article = {
|
||||
'link': entry.link,
|
||||
'rss': feed_url,
|
||||
'title': title,
|
||||
'content': text,
|
||||
'unixTime': pub_date,
|
||||
'rss_id': generate_key(feed_url),
|
||||
'article_id': generate_key(entry.link),
|
||||
'llm_summary': None,
|
||||
'embedding': None,
|
||||
'tags': []
|
||||
}
|
||||
articles.append(article)
|
||||
max_date = max(max_date, pub_date)
|
||||
|
||||
output = {
|
||||
'articles': articles,
|
||||
'max_date': max_date,
|
||||
'feed': rss
|
||||
}
|
||||
output_queue.put(output)
|
||||
except Exception as e:
|
||||
logger.error(f"Feed URL: {feed_url}")
|
||||
if entry:
|
||||
logger.error(f"Current entry: {entry.get('link', 'unknown')}")
|
||||
logger.error(f"Feed failed due to error: {e}")
|
||||
|
||||
def extract_feed(rss: dict):
|
||||
articles = []
|
||||
feed_url = rss['u']
|
||||
last_date = rss['dt']
|
||||
max_date = last_date
|
||||
|
||||
try:
|
||||
feed = feedparser.parse(feed_url)
|
||||
for entry in feed['entries']:
|
||||
pub_date = parse_pub_date(entry.get('published', ''))
|
||||
|
||||
if pub_date > last_date:
|
||||
title, text = extract_article(entry.link)
|
||||
article = {
|
||||
'link': entry.link,
|
||||
'rss': feed_url,
|
||||
'title': title,
|
||||
'content': text,
|
||||
'unixTime': pub_date,
|
||||
'rss_id': generate_key(feed_url),
|
||||
'article_id': generate_key(entry.link),
|
||||
'llm_summary': None,
|
||||
'embedding': None,
|
||||
'tags': []
|
||||
}
|
||||
articles.append(article)
|
||||
max_date = max(max_date, pub_date)
|
||||
|
||||
output = {
|
||||
'articles': articles,
|
||||
'max_date': max_date,
|
||||
'feed': rss
|
||||
}
|
||||
return output
|
||||
except Exception as e:
|
||||
logger.error(f"Feed URL: {feed_url}")
|
||||
logger.error(f"Feed failed due to error: {e}")
|
||||
|
||||
def parse_pub_date(date_string: str) -> int:
|
||||
"""Parse publication date from various formats"""
|
||||
if not date_string:
|
||||
return int(datetime.now().timestamp())
|
||||
|
||||
try:
|
||||
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
|
||||
except ValueError:
|
||||
try:
|
||||
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
|
||||
except ValueError:
|
||||
try:
|
||||
return int(parser.parse(date_string).timestamp())
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return int(datetime.now().timestamp()) # Return current time if no date is found
|
||||
131
src/feed_processing/feed_processor.py.orig
Normal file
131
src/feed_processing/feed_processor.py.orig
Normal file
@@ -0,0 +1,131 @@
|
||||
import feedparser
|
||||
from datetime import datetime
|
||||
from dateutil import parser
|
||||
import queue
|
||||
import threading
|
||||
import logging
|
||||
from utils import generate_key
|
||||
from article_extractor import extract_article
|
||||
from article_cleaning import clean_text
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
def process_feed(feed: dict):
|
||||
output_queue = queue.Queue()
|
||||
stop_thread = threading.Event()
|
||||
thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
logger.debug(f"Thread Started: {feed['u']}")
|
||||
thread.join(timeout=90)
|
||||
|
||||
if thread.is_alive():
|
||||
stop_thread.set()
|
||||
logger.debug(f"Killing Thread: {feed['u']}")
|
||||
return None
|
||||
else:
|
||||
try:
|
||||
output = output_queue.get_nowait()
|
||||
logger.info(f"Thread Succeeded: {feed['u']}")
|
||||
return output
|
||||
except queue.Empty:
|
||||
logger.info(f"Thread Failed: {feed['u']}")
|
||||
return None
|
||||
|
||||
def extract_feed_threading(rss: dict, output_queue, stop_thread):
|
||||
articles = []
|
||||
feed_url = rss['u']
|
||||
last_date = rss['dt']
|
||||
max_date = last_date
|
||||
|
||||
try:
|
||||
feed = feedparser.parse(feed_url)
|
||||
for entry in feed['entries']:
|
||||
if stop_thread.is_set():
|
||||
break
|
||||
|
||||
pub_date = parse_pub_date(entry['published'])
|
||||
|
||||
if pub_date > last_date:
|
||||
title, text = extract_article(entry.link)
|
||||
title, text = clean_text(title), clean_text(text)
|
||||
article = {
|
||||
'link': entry.link,
|
||||
'rss': feed_url,
|
||||
'title': title,
|
||||
'content': text,
|
||||
'unixTime': pub_date,
|
||||
'rss_id': generate_key(feed_url),
|
||||
'article_id': generate_key(entry.link),
|
||||
'llm_summary': None,
|
||||
'embedding': None
|
||||
}
|
||||
articles.append(article)
|
||||
max_date = max(max_date, pub_date)
|
||||
|
||||
output = {
|
||||
'articles': articles,
|
||||
'max_date': max_date,
|
||||
'feed': rss
|
||||
}
|
||||
output_queue.put(output)
|
||||
except Exception as e:
|
||||
logger.error(f"Feed: {entry}")
|
||||
logger.error(f"Feed failed due to error: {e}")
|
||||
|
||||
def extract_feed(rss: dict):
|
||||
articles = []
|
||||
feed_url = rss['u']
|
||||
last_date = rss['dt']
|
||||
max_date = last_date
|
||||
|
||||
try:
|
||||
feed = feedparser.parse(feed_url)
|
||||
for entry in feed['entries']:
|
||||
pub_date = parse_pub_date(entry['published'])
|
||||
|
||||
if pub_date > last_date:
|
||||
title, text = extract_article(entry.link)
|
||||
article = {
|
||||
'link': entry.link,
|
||||
'rss': feed_url,
|
||||
'title': title,
|
||||
'content': text,
|
||||
'unixTime': pub_date,
|
||||
'rss_id': generate_key(feed_url),
|
||||
'article_id': generate_key(entry.link),
|
||||
'llm_summary': None,
|
||||
'embedding': None
|
||||
}
|
||||
articles.append(article)
|
||||
max_date = max(max_date, pub_date)
|
||||
|
||||
output = {
|
||||
'articles': articles,
|
||||
'max_date': max_date,
|
||||
'feed': rss
|
||||
}
|
||||
print(output)
|
||||
return output
|
||||
except Exception as e:
|
||||
logger.error(f"Feed: {entry}")
|
||||
logger.error(f"Feed failed due to error: {e}")
|
||||
|
||||
def parse_pub_date(entry:dict):
|
||||
|
||||
if 'published' in entry:
|
||||
date_string = entry['published']
|
||||
|
||||
try:
|
||||
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
|
||||
except ValueError:
|
||||
try:
|
||||
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
|
||||
except ValueError:
|
||||
try:
|
||||
return int(parser.parse(date_string).timestamp())
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return int(datetime.now().timestamp()) # Return current time if no date is found
|
||||
46
src/feed_processing/metrics.py
Normal file
46
src/feed_processing/metrics.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Prometheus metrics utilities for the RSS feed processor."""
|
||||
|
||||
import os
|
||||
from prometheus_client import Counter, Histogram, start_http_server
|
||||
|
||||
|
||||
# Start a Prometheus metrics HTTP server exposing ``/metrics``. The port can be
|
||||
# customised with the ``METRICS_PORT`` environment variable. This block is safe
|
||||
# to run multiple times as it silently ignores port binding errors.
|
||||
_metrics_port = int(os.environ.get("METRICS_PORT", "8000"))
|
||||
if not os.environ.get("METRICS_SERVER_STARTED"):
|
||||
try:
|
||||
start_http_server(_metrics_port)
|
||||
os.environ["METRICS_SERVER_STARTED"] = "1"
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
# Metric definitions
|
||||
_processed_articles = Counter(
|
||||
"processed_articles_total",
|
||||
"Total number of processed articles",
|
||||
)
|
||||
_processing_time = Histogram(
|
||||
"rss_processing_seconds",
|
||||
"Time spent processing RSS feeds",
|
||||
)
|
||||
_extraction_errors = Counter(
|
||||
"extraction_errors_total",
|
||||
"Number of article extraction errors",
|
||||
)
|
||||
|
||||
|
||||
def record_processed_articles(count: int) -> None:
|
||||
"""Increment the processed articles counter."""
|
||||
_processed_articles.inc(count)
|
||||
|
||||
|
||||
def record_processing_time(duration: float) -> None:
|
||||
"""Record how long a feed took to process."""
|
||||
_processing_time.observe(duration)
|
||||
|
||||
|
||||
def record_extraction_errors(count: int) -> None:
|
||||
"""Increment the extraction errors counter."""
|
||||
_extraction_errors.inc(count)
|
||||
18
src/feed_processing/utils.py
Normal file
18
src/feed_processing/utils.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import logging
|
||||
import os
|
||||
import hashlib
|
||||
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
log_level = os.environ.get('LOG_LEVEL', 'INFO')
|
||||
logger.setLevel(logging.getLevelName(log_level))
|
||||
return logger
|
||||
|
||||
|
||||
def generate_key(input_string, length=10):
|
||||
# Create a SHA256 hash of the input string
|
||||
hash_object = hashlib.sha256(input_string.encode())
|
||||
hex_dig = hash_object.hexdigest()
|
||||
|
||||
# Return the first 'length' characters of the hash
|
||||
return hex_dig[:length]
|
||||
63
src/feed_processing/worker_main.py
Normal file
63
src/feed_processing/worker_main.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
import redis
|
||||
from .feed_processor import extract_feed
|
||||
from .data_storage import save_article, update_rss_feed
|
||||
from .utils import setup_logging
|
||||
from .config import REDIS_URL, REDIS_QUEUE_NAME
|
||||
from .exceptions import RSSProcessingError, DataStorageError
|
||||
from .metrics import (
|
||||
record_processed_articles,
|
||||
record_processing_time,
|
||||
record_extraction_errors,
|
||||
)
|
||||
|
||||
logger = setup_logging()
|
||||
storage_strategy = os.environ.get("STORAGE_STRATEGY")
|
||||
redis_client = redis.Redis.from_url(REDIS_URL)
|
||||
|
||||
|
||||
def worker_main():
|
||||
logger.info("Starting RSS feed processing")
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
|
||||
if not feed_data:
|
||||
logger.info("No messages in queue")
|
||||
return
|
||||
feed = json.loads(feed_data)
|
||||
|
||||
result = extract_feed(feed)
|
||||
logger.info(f"Process Feed Result Dictionary: {result}")
|
||||
last_pub_dt = result["max_date"]
|
||||
|
||||
if result:
|
||||
for article in result["articles"]:
|
||||
try:
|
||||
save_article(article, storage_strategy)
|
||||
except DataStorageError as e:
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
record_extraction_errors(1)
|
||||
|
||||
update_rss_feed(result["feed"], last_pub_dt)
|
||||
logger.info(f"Processed feed: {feed['u']}")
|
||||
record_processed_articles(len(result["articles"]))
|
||||
else:
|
||||
logger.warning(f"Failed to process feed: {feed['u']}")
|
||||
record_extraction_errors(1)
|
||||
|
||||
except RSSProcessingError as e:
|
||||
logger.error(f"RSS Processing Error: {str(e)}")
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error: {str(e)}")
|
||||
return
|
||||
|
||||
finally:
|
||||
end_time = time.time()
|
||||
processing_time = end_time - start_time
|
||||
record_processing_time(processing_time)
|
||||
logger.info(f"Worker execution time: {processing_time:.2f} seconds")
|
||||
Reference in New Issue
Block a user