mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-17 05:54:22 +01:00
Added package-lock.json
This commit is contained in:
6
package-lock.json
generated
Normal file
6
package-lock.json
generated
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"name": "project",
|
||||||
|
"lockfileVersion": 3,
|
||||||
|
"requires": true,
|
||||||
|
"packages": {}
|
||||||
|
}
|
||||||
@@ -0,0 +1,211 @@
|
|||||||
|
import boto3
|
||||||
|
import os
|
||||||
|
import zipfile
|
||||||
|
import io
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
from botocore.exceptions import ClientError
|
||||||
|
from src.utils.retry_logic import retry_with_backoff
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
from src.infra.deploy_infrastructure import get_or_create_kms_key
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
import logging
|
||||||
|
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
|
||||||
|
|
||||||
|
# Set variables
|
||||||
|
|
||||||
|
LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME')
|
||||||
|
|
||||||
|
ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID')
|
||||||
|
REGION = os.getenv("AWS_REGION")
|
||||||
|
LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN")
|
||||||
|
LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT'))
|
||||||
|
LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY'))
|
||||||
|
LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME')
|
||||||
|
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
|
||||||
|
LAMBDA_HANDLER = "lambda_function.lambda_handler"
|
||||||
|
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"
|
||||||
|
S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip'
|
||||||
|
|
||||||
|
def zip_directory(path):
|
||||||
|
print(f"Creating deployment package from {path}...")
|
||||||
|
zip_buffer = io.BytesIO()
|
||||||
|
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
|
||||||
|
for root, _, files in os.walk(path):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
arcname = os.path.relpath(file_path, path)
|
||||||
|
zip_file.write(file_path, arcname)
|
||||||
|
return zip_buffer.getvalue()
|
||||||
|
|
||||||
|
@retry_with_backoff()
|
||||||
|
def update_function_code(lambda_client, function_name, zip_file):
|
||||||
|
return lambda_client.update_function_code(
|
||||||
|
FunctionName=function_name,
|
||||||
|
ZipFile=zip_file
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_or_create_lambda_layer():
|
||||||
|
layer_arn = os.getenv('LAMBDA_LAYER_ARN')
|
||||||
|
|
||||||
|
return layer_arn
|
||||||
|
|
||||||
|
@retry_with_backoff(max_retries=50, initial_backoff=5, backoff_multiplier=2) # Note: This function usually takes a long time to be successful.
|
||||||
|
def update_function_configuration(lambda_client, function_name, handler, role, timeout, memory, layers, kms_key_id):
|
||||||
|
|
||||||
|
config = {
|
||||||
|
'FunctionName': function_name,
|
||||||
|
'Handler': handler,
|
||||||
|
'Role': role,
|
||||||
|
'Timeout': timeout,
|
||||||
|
'MemorySize': memory,
|
||||||
|
'Layers': layers
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if kms_key_id:
|
||||||
|
config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = lambda_client.update_function_configuration(**config)
|
||||||
|
print(f"Update request sent successfully for {function_name}.")
|
||||||
|
|
||||||
|
except ClientError as e:
|
||||||
|
if e.response['Error']['Code'] == 'ResourceConflictException':
|
||||||
|
logging.info(f"Function {function_name} is currently being updated. Retrying...")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
@retry_with_backoff()
|
||||||
|
def configure_sqs_trigger(lambda_client, function_name, queue_arn):
|
||||||
|
"""Placeholder for backward compatibility. Redis deployment uses no SQS trigger."""
|
||||||
|
return
|
||||||
|
|
||||||
|
@retry_with_backoff()
|
||||||
|
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy):
|
||||||
|
config = {
|
||||||
|
'FunctionName': function_name,
|
||||||
|
'Runtime': runtime,
|
||||||
|
'Role': role,
|
||||||
|
'Handler': handler,
|
||||||
|
'Code': {'ZipFile': zip_file},
|
||||||
|
'Timeout': timeout,
|
||||||
|
'MemorySize': memory,
|
||||||
|
'Layers': layers
|
||||||
|
}
|
||||||
|
print(policy)
|
||||||
|
|
||||||
|
if kms_key_id:
|
||||||
|
config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
return lambda_client.create_function(**config)
|
||||||
|
except ClientError as e:
|
||||||
|
if e.response['Error']['Code'] == 'InvalidParameterValueException':
|
||||||
|
print(f"Error creating function: {e}")
|
||||||
|
print("Ensure that the IAM role has the correct trust relationship and permissions.")
|
||||||
|
print("There might be a delay in role propagation. Please wait a few minutes and try again.")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def get_pillow_layer_arn():
|
||||||
|
url = f"https://api.klayers.cloud/api/v2/p{os.getenv('PYTHON_VERSION')}/layers/latest/{os.getenv('AWS_REGION')}/json"
|
||||||
|
try:
|
||||||
|
response = requests.get(url)
|
||||||
|
response.raise_for_status()
|
||||||
|
layers_data = response.json()
|
||||||
|
|
||||||
|
pillow_layer = next((layer for layer in layers_data if layer['package'] == 'Pillow'), None)
|
||||||
|
|
||||||
|
if pillow_layer:
|
||||||
|
return pillow_layer['arn']
|
||||||
|
else:
|
||||||
|
print("Pillow layer not found in the API response.")
|
||||||
|
return None
|
||||||
|
except requests.RequestException as e:
|
||||||
|
print(f"Error fetching Pillow layer ARN: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_lambda_policy():
|
||||||
|
policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"logs:CreateLogGroup",
|
||||||
|
"logs:CreateLogStream",
|
||||||
|
"logs:PutLogEvents"
|
||||||
|
],
|
||||||
|
"Resource": "arn:aws:logs:*:*:*"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Action": [
|
||||||
|
"s3:GetObject",
|
||||||
|
"s3:PutObject"
|
||||||
|
],
|
||||||
|
"Resource": "arn:aws:s3:::your-bucket-name/*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def deploy_lambda():
|
||||||
|
lambda_client = boto3.client('lambda', region_name=REGION)
|
||||||
|
|
||||||
|
print(f"Starting deployment of Lambda function: {LAMBDA_NAME}")
|
||||||
|
deployment_package = zip_directory('src/infra/lambdas/RSSFeedProcessorLambda/src')
|
||||||
|
|
||||||
|
layer_arn = get_or_create_lambda_layer()
|
||||||
|
if layer_arn:
|
||||||
|
print(f"Using Lambda Layer ARN: {layer_arn}")
|
||||||
|
else:
|
||||||
|
print("Warning: Lambda Layer not found or created. Proceeding without Layer.")
|
||||||
|
|
||||||
|
pillow_layer_arn = get_pillow_layer_arn()
|
||||||
|
if pillow_layer_arn:
|
||||||
|
print(f"Using Pillow Layer ARN: {pillow_layer_arn}")
|
||||||
|
else:
|
||||||
|
print("Warning: Pillow Layer not found. Proceeding without Pillow Layer.")
|
||||||
|
|
||||||
|
kms_key_id = get_or_create_kms_key()
|
||||||
|
if kms_key_id:
|
||||||
|
print(f"Using KMS Key ID: {kms_key_id}")
|
||||||
|
else:
|
||||||
|
print("Warning: KMS Key not found or created. Proceeding without KMS Key.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Check if the function exists
|
||||||
|
try:
|
||||||
|
lambda_client.get_function(FunctionName=LAMBDA_NAME)
|
||||||
|
function_exists = True
|
||||||
|
except ClientError as e:
|
||||||
|
if e.response['Error']['Code'] == 'ResourceNotFoundException':
|
||||||
|
function_exists = False
|
||||||
|
else:
|
||||||
|
raise e
|
||||||
|
|
||||||
|
# Combine the layers
|
||||||
|
layers = [layer_arn] if layer_arn else []
|
||||||
|
if pillow_layer_arn:
|
||||||
|
layers.append(pillow_layer_arn)
|
||||||
|
|
||||||
|
if function_exists:
|
||||||
|
print("Updating existing Lambda function...")
|
||||||
|
update_function_configuration(lambda_client, LAMBDA_NAME, LAMBDA_HANDLER, LAMBDA_ROLE_ARN, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id)
|
||||||
|
update_function_code(lambda_client, LAMBDA_NAME, deployment_package)
|
||||||
|
else:
|
||||||
|
print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...")
|
||||||
|
policy = get_lambda_policy()
|
||||||
|
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy)
|
||||||
|
|
||||||
|
print("Lambda deployment completed successfully!")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during Lambda deployment: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
deploy_lambda()
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
newspaper3k
|
||||||
|
feedparser
|
||||||
|
python-dateutil
|
||||||
|
lxml
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from qdrant_client import QdrantClient, models
|
||||||
|
|
||||||
|
from utils import setup_logging
|
||||||
|
|
||||||
|
logger = setup_logging()
|
||||||
|
|
||||||
|
qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
|
||||||
|
qdrant_api_key = os.getenv("QDRANT_API_KEY")
|
||||||
|
collection_name = os.getenv("QDRANT_COLLECTION_NAME")
|
||||||
|
|
||||||
|
embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM")
|
||||||
|
vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine")
|
||||||
|
|
||||||
|
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||||||
|
ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
|
||||||
|
|
||||||
|
client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key)
|
||||||
|
|
||||||
|
def get_index():
|
||||||
|
collections = client.get_collections().collections
|
||||||
|
if collection_name not in [c.name for c in collections]:
|
||||||
|
raise KeyError(f"Collection {collection_name} not found")
|
||||||
|
return client
|
||||||
|
|
||||||
|
def vectorize(article: str) -> list[float]:
|
||||||
|
response = requests.post(
|
||||||
|
f"{ollama_host}/api/embeddings",
|
||||||
|
json={"model": ollama_embedding_model, "prompt": article},
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json().get("embedding", [])
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_vectors(index: QdrantClient, data: list[dict]):
|
||||||
|
points = [
|
||||||
|
models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload"))
|
||||||
|
for item in data
|
||||||
|
]
|
||||||
|
index.upsert(collection_name=collection_name, points=points)
|
||||||
|
|
||||||
|
|
||||||
|
def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None):
|
||||||
|
if len(vector) != int(embedding_dim):
|
||||||
|
raise ValueError("Length of vector does not match the embedding dimension")
|
||||||
|
return index.search(
|
||||||
|
collection_name=collection_name,
|
||||||
|
query_vector=vector,
|
||||||
|
limit=top_k,
|
||||||
|
with_payload=True,
|
||||||
|
query_filter=filter_query,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
paragraph = "This is a test."
|
||||||
|
vectorize(paragraph)
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
|
||||||
|
|
||||||
|
def summarize(text:str):
|
||||||
|
sub_prompt = "Summarize the following passage"
|
||||||
|
|
||||||
|
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
def remove_newlines(text: str) -> str:
|
||||||
|
return text.replace('\n', '')
|
||||||
|
|
||||||
|
def remove_urls(text: str) -> str:
|
||||||
|
url_pattern = re.compile(r'http\S+|www\S+')
|
||||||
|
return url_pattern.sub('', text)
|
||||||
|
|
||||||
|
|
||||||
|
def clean_text(text: str) -> str:
|
||||||
|
text = remove_newlines(text)
|
||||||
|
text = remove_urls(text)
|
||||||
|
return text
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
import newspaper
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
def extract_article(url):
|
||||||
|
"""
|
||||||
|
Extracts the title and text of an article from the given URL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url (str): The URL of the article.
|
||||||
|
Returns:
|
||||||
|
A tuple containing the title and text of the article, respectively.
|
||||||
|
"""
|
||||||
|
logger.debug(f"Starting Newspaper Article Extraction {url}")
|
||||||
|
config = newspaper.Config()
|
||||||
|
config.request_timeout = 60
|
||||||
|
article = newspaper.Article(url)
|
||||||
|
|
||||||
|
try:
|
||||||
|
article.download()
|
||||||
|
logger.debug(f"Downloaded Article {url}")
|
||||||
|
article.parse()
|
||||||
|
logger.debug(f"Parsed Article {url}")
|
||||||
|
|
||||||
|
return article.title, article.text
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to extract article {url}: {str(e)}")
|
||||||
|
return None, None
|
||||||
116
src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py
Normal file
116
src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py
Normal file
@@ -0,0 +1,116 @@
|
|||||||
|
import boto3
|
||||||
|
from minio import Minio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
from pymongo import MongoClient
|
||||||
|
|
||||||
|
from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
s3 = boto3.client('s3')
|
||||||
|
|
||||||
|
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
|
||||||
|
|
||||||
|
minio_client = Minio(
|
||||||
|
os.getenv("MINIO_ENDPOINT"),
|
||||||
|
access_key=os.getenv("MINIO_ACCESS_KEY"),
|
||||||
|
secret_key=os.getenv("MINIO_SECRET_KEY"),
|
||||||
|
secure=False
|
||||||
|
)
|
||||||
|
CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")))
|
||||||
|
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
|
||||||
|
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||||
|
|
||||||
|
MONGODB_URL = os.getenv("MONGODB_URL")
|
||||||
|
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME")
|
||||||
|
MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds")
|
||||||
|
|
||||||
|
mongo_client = MongoClient(MONGODB_URL)
|
||||||
|
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
|
||||||
|
|
||||||
|
##### Article Storage #####
|
||||||
|
def save_article(article: dict, strategy: str):
|
||||||
|
if strategy == "s3":
|
||||||
|
s3_save_article(article)
|
||||||
|
elif strategy == "qdrant":
|
||||||
|
qdrant_save_article(article)
|
||||||
|
elif strategy == "both":
|
||||||
|
qdrant_save_article(article)
|
||||||
|
s3_save_article(article)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Invalid storage strategy: {strategy}")
|
||||||
|
|
||||||
|
|
||||||
|
def qdrant_save_article(article: dict):
|
||||||
|
logger.info("Saving article to Qdrant")
|
||||||
|
index = get_index()
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"id": article["article_id"],
|
||||||
|
"vector": vectorize(article["content"]),
|
||||||
|
"payload": {"rss": article.get("rss"), "title": article.get("title")},
|
||||||
|
}
|
||||||
|
|
||||||
|
upsert_vectors(index, [data])
|
||||||
|
|
||||||
|
|
||||||
|
def s3_save_article(article:dict):
|
||||||
|
logger.info("Saving article to MinIO")
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
article_id = article['article_id']
|
||||||
|
logger.info(f"Content ")
|
||||||
|
if not article_id:
|
||||||
|
logger.error(f"Missing rss_id or article_id in article: {article}")
|
||||||
|
return
|
||||||
|
|
||||||
|
file_path = f"/tmp/{article_id}-article.json"
|
||||||
|
file_key = f"{now.year}/{now.month}/{now.day}/{article_id}.json"
|
||||||
|
|
||||||
|
# Save article to /tmp json file
|
||||||
|
with open(file_path, "w") as f:
|
||||||
|
json.dump(article, f)
|
||||||
|
|
||||||
|
try:
|
||||||
|
metadata = {
|
||||||
|
"rss": article.get("rss", ""),
|
||||||
|
"title": article.get("title", ""),
|
||||||
|
"unixTime": str(article.get("unixTime", "")),
|
||||||
|
"article_id": article.get("article_id", ""),
|
||||||
|
"link": article.get("link", ""),
|
||||||
|
"rss_id": article.get("rss_id", "")
|
||||||
|
}
|
||||||
|
minio_client.fput_object(
|
||||||
|
CONTENT_BUCKET,
|
||||||
|
file_key,
|
||||||
|
file_path,
|
||||||
|
content_type="application/json",
|
||||||
|
metadata=metadata
|
||||||
|
)
|
||||||
|
logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
|
||||||
|
|
||||||
|
|
||||||
|
###### Feed Storage ######
|
||||||
|
RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
|
||||||
|
|
||||||
|
|
||||||
|
def update_rss_feed(feed: dict, last_pub_dt: int):
|
||||||
|
try:
|
||||||
|
if not os.path.exists(RSS_FEEDS_FILE):
|
||||||
|
return
|
||||||
|
with open(RSS_FEEDS_FILE, "r") as f:
|
||||||
|
feeds = json.load(f)
|
||||||
|
for item in feeds:
|
||||||
|
if item.get("u") == feed["u"]:
|
||||||
|
item["dt"] = int(last_pub_dt)
|
||||||
|
with open(RSS_FEEDS_FILE, "w") as f:
|
||||||
|
json.dump(feeds, f)
|
||||||
|
logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to update RSS feed: {str(e)}")
|
||||||
131
src/infra/lambdas/RSSFeedProcessorLambda/src/feed_processor.py
Normal file
131
src/infra/lambdas/RSSFeedProcessorLambda/src/feed_processor.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
import feedparser
|
||||||
|
from datetime import datetime
|
||||||
|
from dateutil import parser
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
from utils import generate_key
|
||||||
|
from article_extractor import extract_article
|
||||||
|
from article_cleaning import clean_text
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
def process_feed(feed: dict):
|
||||||
|
output_queue = queue.Queue()
|
||||||
|
stop_thread = threading.Event()
|
||||||
|
thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
logger.debug(f"Thread Started: {feed['u']}")
|
||||||
|
thread.join(timeout=90)
|
||||||
|
|
||||||
|
if thread.is_alive():
|
||||||
|
stop_thread.set()
|
||||||
|
logger.debug(f"Killing Thread: {feed['u']}")
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
output = output_queue.get_nowait()
|
||||||
|
logger.info(f"Thread Succeeded: {feed['u']}")
|
||||||
|
return output
|
||||||
|
except queue.Empty:
|
||||||
|
logger.info(f"Thread Failed: {feed['u']}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_feed_threading(rss: dict, output_queue, stop_thread):
|
||||||
|
articles = []
|
||||||
|
feed_url = rss['u']
|
||||||
|
last_date = rss['dt']
|
||||||
|
max_date = last_date
|
||||||
|
|
||||||
|
try:
|
||||||
|
feed = feedparser.parse(feed_url)
|
||||||
|
for entry in feed['entries']:
|
||||||
|
if stop_thread.is_set():
|
||||||
|
break
|
||||||
|
|
||||||
|
pub_date = parse_pub_date(entry['published'])
|
||||||
|
|
||||||
|
if pub_date > last_date:
|
||||||
|
title, text = extract_article(entry.link)
|
||||||
|
title, text = clean_text(title), clean_text(text)
|
||||||
|
article = {
|
||||||
|
'link': entry.link,
|
||||||
|
'rss': feed_url,
|
||||||
|
'title': title,
|
||||||
|
'content': text,
|
||||||
|
'unixTime': pub_date,
|
||||||
|
'rss_id': generate_key(feed_url),
|
||||||
|
'article_id': generate_key(entry.link),
|
||||||
|
'llm_summary': None,
|
||||||
|
'embedding': None
|
||||||
|
}
|
||||||
|
articles.append(article)
|
||||||
|
max_date = max(max_date, pub_date)
|
||||||
|
|
||||||
|
output = {
|
||||||
|
'articles': articles,
|
||||||
|
'max_date': max_date,
|
||||||
|
'feed': rss
|
||||||
|
}
|
||||||
|
output_queue.put(output)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Feed: {entry}")
|
||||||
|
logger.error(f"Feed failed due to error: {e}")
|
||||||
|
|
||||||
|
def extract_feed(rss: dict):
|
||||||
|
articles = []
|
||||||
|
feed_url = rss['u']
|
||||||
|
last_date = rss['dt']
|
||||||
|
max_date = last_date
|
||||||
|
|
||||||
|
try:
|
||||||
|
feed = feedparser.parse(feed_url)
|
||||||
|
for entry in feed['entries']:
|
||||||
|
pub_date = parse_pub_date(entry['published'])
|
||||||
|
|
||||||
|
if pub_date > last_date:
|
||||||
|
title, text = extract_article(entry.link)
|
||||||
|
article = {
|
||||||
|
'link': entry.link,
|
||||||
|
'rss': feed_url,
|
||||||
|
'title': title,
|
||||||
|
'content': text,
|
||||||
|
'unixTime': pub_date,
|
||||||
|
'rss_id': generate_key(feed_url),
|
||||||
|
'article_id': generate_key(entry.link),
|
||||||
|
'llm_summary': None,
|
||||||
|
'embedding': None
|
||||||
|
}
|
||||||
|
articles.append(article)
|
||||||
|
max_date = max(max_date, pub_date)
|
||||||
|
|
||||||
|
output = {
|
||||||
|
'articles': articles,
|
||||||
|
'max_date': max_date,
|
||||||
|
'feed': rss
|
||||||
|
}
|
||||||
|
print(output)
|
||||||
|
return output
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Feed: {entry}")
|
||||||
|
logger.error(f"Feed failed due to error: {e}")
|
||||||
|
|
||||||
|
def parse_pub_date(entry:dict):
|
||||||
|
|
||||||
|
if 'published' in entry:
|
||||||
|
date_string = entry['published']
|
||||||
|
|
||||||
|
try:
|
||||||
|
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
|
||||||
|
except ValueError:
|
||||||
|
try:
|
||||||
|
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
|
||||||
|
except ValueError:
|
||||||
|
try:
|
||||||
|
return int(parser.parse(date_string).timestamp())
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return int(datetime.now().timestamp()) # Return current time if no date is found
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
import redis
|
||||||
|
from feed_processor import extract_feed
|
||||||
|
from data_storage import save_article, update_rss_feed
|
||||||
|
from utils import setup_logging
|
||||||
|
from config import REDIS_URL, REDIS_QUEUE_NAME
|
||||||
|
from exceptions import RSSProcessingError, DataStorageError
|
||||||
|
from metrics import (
|
||||||
|
record_processed_articles,
|
||||||
|
record_processing_time,
|
||||||
|
record_extraction_errors,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = setup_logging()
|
||||||
|
storage_strategy = os.environ.get("STORAGE_STRATEGY")
|
||||||
|
redis_client = redis.Redis.from_url(REDIS_URL)
|
||||||
|
|
||||||
|
|
||||||
|
def lambda_handler(event, context):
|
||||||
|
logger.info("Starting RSS feed processing")
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
|
||||||
|
if not feed_data:
|
||||||
|
logger.info("No messages in queue")
|
||||||
|
return {"statusCode": 200, "body": json.dumps("No feeds to process")}
|
||||||
|
feed = json.loads(feed_data)
|
||||||
|
|
||||||
|
result = extract_feed(feed)
|
||||||
|
logger.info(f"Process Feed Result Dictionary: {result}")
|
||||||
|
last_pub_dt = result["max_date"]
|
||||||
|
|
||||||
|
if result:
|
||||||
|
for article in result["articles"]:
|
||||||
|
try:
|
||||||
|
save_article(article, storage_strategy)
|
||||||
|
except DataStorageError as e:
|
||||||
|
logger.error(f"Failed to save article: {str(e)}")
|
||||||
|
record_extraction_errors(1)
|
||||||
|
|
||||||
|
update_rss_feed(result["feed"], last_pub_dt)
|
||||||
|
logger.info(f"Processed feed: {feed['u']}")
|
||||||
|
record_processed_articles(len(result["articles"]))
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to process feed: {feed['u']}")
|
||||||
|
record_extraction_errors(1)
|
||||||
|
|
||||||
|
except RSSProcessingError as e:
|
||||||
|
logger.error(f"RSS Processing Error: {str(e)}")
|
||||||
|
return {"statusCode": 500, "body": json.dumps("RSS processing failed")}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error: {str(e)}")
|
||||||
|
return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")}
|
||||||
|
|
||||||
|
finally:
|
||||||
|
end_time = time.time()
|
||||||
|
processing_time = end_time - start_time
|
||||||
|
record_processing_time(processing_time)
|
||||||
|
logger.info(f"Lambda execution time: {processing_time:.2f} seconds")
|
||||||
|
|
||||||
|
return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")}
|
||||||
81
src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py
Normal file
81
src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import os
|
||||||
|
import zipfile
|
||||||
|
import logging
|
||||||
|
import boto3
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from src.infra.deploy_infrastructure import deploy_cloudformation
|
||||||
|
|
||||||
|
# Load environment variables
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
|
||||||
|
logging.basicConfig(level=os.getenv('LOG_LEVEL'))
|
||||||
|
|
||||||
|
|
||||||
|
# Set up S3 client
|
||||||
|
s3 = boto3.client('s3')
|
||||||
|
|
||||||
|
def zip_lambda_code():
|
||||||
|
lambda_dir = 'src/infra/lambdas/RSSQueueFiller/lambda'
|
||||||
|
zip_path = 'tmp/lambda_function.zip'
|
||||||
|
|
||||||
|
os.makedirs(zip_path.split("/")[0], exist_ok=True)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||||
|
for root, _, files in os.walk(lambda_dir):
|
||||||
|
for file in files:
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
arcname = os.path.relpath(file_path, lambda_dir)
|
||||||
|
zipf.write(file_path, arcname)
|
||||||
|
|
||||||
|
return zip_path
|
||||||
|
|
||||||
|
def upload_to_s3(file_path):
|
||||||
|
s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
|
||||||
|
bucket_name = os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
|
||||||
|
s3.upload_file(file_path, bucket_name, s3_key)
|
||||||
|
return f's3://{bucket_name}/{s3_key}'
|
||||||
|
|
||||||
|
def deploy_sqs_filler():
|
||||||
|
zip_file = zip_lambda_code()
|
||||||
|
upload_to_s3(zip_file)
|
||||||
|
|
||||||
|
# Deploy CloudFormation
|
||||||
|
deploy_cloudformation('rss_lambda_stack.yaml', 'LambdaSQSFiller',
|
||||||
|
parameters=[
|
||||||
|
{
|
||||||
|
'ParameterKey': 'QueueFillerLambdaName',
|
||||||
|
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'RedisUrl',
|
||||||
|
'ParameterValue': os.getenv('REDIS_URL')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'RedisQueueName',
|
||||||
|
'ParameterValue': os.getenv('REDIS_QUEUE_NAME')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'LambdaCodeS3Bucket',
|
||||||
|
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'LambdaCodeS3Key',
|
||||||
|
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'LambdaRuntime',
|
||||||
|
'ParameterValue': os.getenv('LAMBDA_RUNTIME')
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'ParameterKey': 'LambdaTimeout',
|
||||||
|
'ParameterValue': os.getenv('LAMBDA_TIMEOUT')
|
||||||
|
}
|
||||||
|
])
|
||||||
|
|
||||||
|
# Clean up local zip file
|
||||||
|
os.remove(zip_file)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
deploy_sqs_filler()
|
||||||
58
src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py
Normal file
58
src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from pymongo import MongoClient
|
||||||
|
from datetime import datetime
|
||||||
|
import redis
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
logger.setLevel("INFO")
|
||||||
|
|
||||||
|
sqs = boto3.client('sqs')
|
||||||
|
|
||||||
|
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||||
|
MONGODB_URL = os.environ['MONGODB_URL']
|
||||||
|
MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME']
|
||||||
|
MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds')
|
||||||
|
|
||||||
|
mongo_client = MongoClient(MONGODB_URL)
|
||||||
|
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
|
||||||
|
|
||||||
|
class DecimalEncoder(json.JSONEncoder):
|
||||||
|
def default(self, obj):
|
||||||
|
if isinstance(obj, Decimal):
|
||||||
|
return int(obj)
|
||||||
|
return super(DecimalEncoder, self).default(obj)
|
||||||
|
|
||||||
|
def handler(event, context):
|
||||||
|
messages_sent = 0
|
||||||
|
|
||||||
|
# Iterate over all feeds in MongoDB
|
||||||
|
for item in feeds_collection.find({}):
|
||||||
|
rss_url = item.get('url')
|
||||||
|
rss_dt = item.get('dt')
|
||||||
|
|
||||||
|
logger.debug(f"Processing RSS feed: {rss_url}")
|
||||||
|
logger.debug(f"Last published date: {rss_dt}")
|
||||||
|
|
||||||
|
if rss_url:
|
||||||
|
message = {
|
||||||
|
'u': rss_url,
|
||||||
|
'dt': rss_dt
|
||||||
|
}
|
||||||
|
logger.debug("message", message)
|
||||||
|
try:
|
||||||
|
sqs.send_message(
|
||||||
|
QueueUrl=SQS_QUEUE_URL,
|
||||||
|
MessageBody=json.dumps(message, cls=DecimalEncoder)
|
||||||
|
)
|
||||||
|
messages_sent += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error sending message to SQS: {str(e)}")
|
||||||
|
|
||||||
|
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"statusCode": 200,
|
||||||
|
"body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"),
|
||||||
|
}
|
||||||
@@ -0,0 +1,117 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
####### Section 1: Checking Python Existence ########
|
||||||
|
echo "Section 1: Checking Python Existence"
|
||||||
|
|
||||||
|
# Ensure python3.12 is installed
|
||||||
|
if ! command -v python3.12 &> /dev/null; then
|
||||||
|
echo "Python 3.12 is not installed. Please install it before running this script."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "Python 3.12 found. Proceeding..."
|
||||||
|
|
||||||
|
####### Section 2: Installing Dependencies ########
|
||||||
|
echo "Section 2: Installing Dependencies"
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
python3.12 -m pip install --upgrade Pillow feedfinder2==0.0.4 python-dateutil newspaper3k==0.2.8 feedparser lxml[html5lib] lxml_html_clean lxml[html_clean] qdrant-client ollama -t python/
|
||||||
|
echo "Dependencies installed successfully."
|
||||||
|
|
||||||
|
####### Section 3: Creating ZIP File ########
|
||||||
|
echo "Section 3: Creating ZIP File"
|
||||||
|
|
||||||
|
# Create ZIP file
|
||||||
|
zip -r OpenRSSLambdaLayer.zip python/
|
||||||
|
echo "ZIP file created."
|
||||||
|
|
||||||
|
# Check if ZIP file was created and is not empty
|
||||||
|
if [ ! -s OpenRSSLambdaLayer.zip ]; then
|
||||||
|
echo "Error: ZIP file is empty or was not created."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "ZIP file check passed."
|
||||||
|
|
||||||
|
####### Section 4: Getting AWS Regions ########
|
||||||
|
echo "Section 4: Getting AWS Regions"
|
||||||
|
|
||||||
|
# Get list of all AWS regions
|
||||||
|
REGIONS=$(aws ec2 describe-regions --query 'Regions[].RegionName' --output text)
|
||||||
|
echo "Retrieved AWS regions: $REGIONS"
|
||||||
|
|
||||||
|
####### Section 5: Creating Buckets, Uploading, and Publishing Layer ########
|
||||||
|
echo "Section 5: Creating Buckets, Uploading, and Publishing Layer"
|
||||||
|
|
||||||
|
create_bucket_upload_and_publish_layer() {
|
||||||
|
local region=$1
|
||||||
|
local bucket_name="rss-feed-processor-layers-$region"
|
||||||
|
local layer_name="ingest-rss-lambda-layer-$region"
|
||||||
|
|
||||||
|
echo "Processing region: $region"
|
||||||
|
|
||||||
|
# Create bucket if it doesn't exist
|
||||||
|
if ! aws s3api head-bucket --bucket "$bucket_name" --region "$region" 2>/dev/null; then
|
||||||
|
echo "Creating bucket $bucket_name in $region"
|
||||||
|
if [ "$region" == "us-east-1" ]; then
|
||||||
|
aws s3api create-bucket --bucket "$bucket_name" --region "$region"
|
||||||
|
else
|
||||||
|
aws s3api create-bucket --bucket "$bucket_name" --region "$region" --create-bucket-configuration LocationConstraint=$region
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo "Bucket $bucket_name already exists in $region"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Upload ZIP to the region-specific bucket
|
||||||
|
echo "Uploading ZIP to $bucket_name"
|
||||||
|
aws s3 cp OpenRSSLambdaLayer.zip "s3://$bucket_name/" --region "$region"
|
||||||
|
|
||||||
|
# Create and publish Lambda layer
|
||||||
|
echo "Creating Lambda layer in region: $region"
|
||||||
|
LAYER_VERSION=$(aws lambda publish-layer-version \
|
||||||
|
--region "$region" \
|
||||||
|
--layer-name $layer_name \
|
||||||
|
--description "Layer with dependencies for RSS processing" \
|
||||||
|
--license-info "MIT" \
|
||||||
|
--content "S3Bucket=$bucket_name,S3Key=OpenRSSLambdaLayer.zip" \
|
||||||
|
--compatible-runtimes python3.12 \
|
||||||
|
--query 'Version' \
|
||||||
|
--output text
|
||||||
|
)
|
||||||
|
|
||||||
|
if [ -z "$LAYER_VERSION" ]; then
|
||||||
|
echo "Failed to create Lambda layer in region $region."
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Making layer public in region: $region"
|
||||||
|
aws lambda add-layer-version-permission \
|
||||||
|
--region "$region" \
|
||||||
|
--layer-name $layer_name \
|
||||||
|
--version-number "$LAYER_VERSION" \
|
||||||
|
--statement-id public \
|
||||||
|
--action lambda:GetLayerVersion \
|
||||||
|
--principal '*'
|
||||||
|
|
||||||
|
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
|
||||||
|
ARN="arn:aws:lambda:${region}:${ACCOUNT_ID}:layer:$layer_name:${LAYER_VERSION}"
|
||||||
|
echo "Layer ARN for region $region: $ARN"
|
||||||
|
echo "$region:$ARN" >> layer_arns.txt
|
||||||
|
}
|
||||||
|
|
||||||
|
# Process all regions
|
||||||
|
for region in $REGIONS; do
|
||||||
|
if create_bucket_upload_and_publish_layer "$region"; then
|
||||||
|
echo "Successfully processed region: $region"
|
||||||
|
else
|
||||||
|
echo "Failed to process region: $region. Continuing with next region..."
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
####### Section 6: Completion ########
|
||||||
|
echo "Section 6: Completion"
|
||||||
|
|
||||||
|
echo "Setup complete! OpenRSSLambdaLayer is now available in all processed regions."
|
||||||
|
echo "Layer ARNs have been saved to layer_arns.txt"
|
||||||
|
|
||||||
|
echo "Script execution completed successfully."
|
||||||
Reference in New Issue
Block a user