This commit is contained in:
2025-06-03 13:36:40 +02:00
parent 404c6dd2ce
commit d28b9243f8
35 changed files with 91 additions and 1823 deletions

View File

@@ -1,4 +1,3 @@
version: '3.8'
services: services:
redis: redis:
image: redis:7 image: redis:7
@@ -20,22 +19,6 @@ services:
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 5 retries: 5
minio:
image: minio/minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio-data:/data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 5
worker: worker:
build: build:
context: . context: .
@@ -45,19 +28,15 @@ services:
condition: service_healthy condition: service_healthy
mongodb: mongodb:
condition: service_healthy condition: service_healthy
minio:
condition: service_healthy
environment: environment:
REDIS_URL: redis://redis:6379 REDIS_URL: redis://redis:6379
REDIS_QUEUE_NAME: rss-feed-queue REDIS_QUEUE_NAME: rss-feed-queue
MONGODB_URL: mongodb://mongodb:27017 MONGODB_URL: mongodb://mongodb:27017
MONGODB_DB_NAME: ingestrss MONGODB_ARTICLES_DB_NAME: articles_db
MONGODB_COLLECTION_NAME: rss_feeds MONGODB_ARTICLES_COLLECTION_NAME: articles
MINIO_ENDPOINT: minio:9000 MONGODB_FEEDS_DB_NAME: feeds_db
MINIO_ACCESS_KEY: minioadmin MONGODB_FEEDS_COLLECTION_NAME: rss_feeds
MINIO_SECRET_KEY: minioadmin STORAGE_STRATEGY: mongodb
MINIO_BUCKET: ingestrss
STORAGE_STRATEGY: s3
LOG_LEVEL: INFO LOG_LEVEL: INFO
scheduler: scheduler:
build: build:
@@ -72,9 +51,25 @@ services:
REDIS_URL: redis://redis:6379 REDIS_URL: redis://redis:6379
REDIS_QUEUE_NAME: rss-feed-queue REDIS_QUEUE_NAME: rss-feed-queue
MONGODB_URL: mongodb://mongodb:27017 MONGODB_URL: mongodb://mongodb:27017
MONGODB_DB_NAME: ingestrss MONGODB_ARTICLES_DB_NAME: articles_db
MONGODB_COLLECTION_NAME: rss_feeds MONGODB_ARTICLES_COLLECTION_NAME: articles
MONGODB_FEEDS_DB_NAME: feeds_db
MONGODB_FEEDS_COLLECTION_NAME: rss_feeds
LOG_LEVEL: INFO LOG_LEVEL: INFO
dashboard:
build:
context: .
dockerfile: src/dashboard/Dockerfile
depends_on:
mongodb:
condition: service_healthy
environment:
MONGODB_URL: mongodb://mongodb:27017
MONGODB_FEEDS_DB_NAME: feeds_db
MONGODB_FEEDS_COLLECTION_NAME: rss_feeds
volumes:
- ./src/dashboard:/app
ports:
- "18000:8000"
volumes: volumes:
mongo-data: mongo-data:
minio-data:

View File

@@ -3,4 +3,5 @@ WORKDIR /app
COPY requirements.txt ./ COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
ENV PYTHONPATH=/app
CMD ["python", "local_services/scheduler.py"] CMD ["python", "local_services/scheduler.py"]

View File

@@ -3,4 +3,5 @@ WORKDIR /app
COPY requirements.txt ./ COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
COPY . . COPY . .
ENV PYTHONPATH=/app
CMD ["python", "local_services/worker.py"] CMD ["python", "local_services/worker.py"]

View File

@@ -4,8 +4,8 @@ import logging
import importlib import importlib
handler = importlib.import_module( handler = importlib.import_module(
'src.infra.lambdas.RSSQueueFiller.lambda.lambda_function' 'src.feed_management.scheduler_main'
).handler ).scheduler_main
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -17,7 +17,7 @@ def main():
logger.info("Starting scheduler loop") logger.info("Starting scheduler loop")
while True: while True:
try: try:
handler({}, None) handler()
except Exception as exc: except Exception as exc:
logger.exception("Scheduler job failed: %s", exc) logger.exception("Scheduler job failed: %s", exc)
time.sleep(INTERVAL_MINUTES * 60) time.sleep(INTERVAL_MINUTES * 60)

View File

@@ -2,7 +2,7 @@ import os
import time import time
import logging import logging
from src.infra.lambdas.RSSFeedProcessorLambda.src.lambda_function import lambda_handler from src.feed_processing.worker_main import worker_main
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -13,7 +13,7 @@ def main():
logger.info("Starting worker loop") logger.info("Starting worker loop")
while True: while True:
try: try:
lambda_handler({}, None) worker_main()
except Exception as exc: except Exception as exc:
logger.exception("Worker iteration failed: %s", exc) logger.exception("Worker iteration failed: %s", exc)
time.sleep(SLEEP_SECONDS) time.sleep(SLEEP_SECONDS)

View File

@@ -1,8 +1,6 @@
boto3==1.35.*
pymongo==4.* pymongo==4.*
python-dotenv==1.0.* python-dotenv==1.0.*
requests==2.32.* requests==2.32.*
constructs==10.2.69
# Vector database and embedding libraries # Vector database and embedding libraries
qdrant-client qdrant-client
ollama ollama
@@ -14,4 +12,6 @@ schedule==1.*
feedparser feedparser
newspaper3k newspaper3k
python-dateutil python-dateutil
lxml lxml
lxml[html_clean]
boto3

Binary file not shown.

Before

Width:  |  Height:  |  Size: 154 KiB

View File

@@ -1,58 +0,0 @@
from minio import Minio
import os
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from collections import defaultdict
def get_s3_object_creation_dates(bucket_name):
client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
creation_dates = []
for obj in client.list_objects(bucket_name, recursive=True):
creation_dates.append(obj.last_modified.date())
return creation_dates
def plot_creation_dates(dates):
# Count objects created on each date
date_counts = defaultdict(int)
for date in dates:
date_counts[date] += 1
# Sort dates and get counts
sorted_dates = sorted(date_counts.keys())
counts = [date_counts[date] for date in sorted_dates]
# Create the plot
plt.figure(figsize=(15, 8))
bars = plt.bar(sorted_dates, counts)
plt.title('S3 Object Creation Dates')
plt.xlabel('Date')
plt.ylabel('Number of Objects Created')
plt.xticks(rotation=45, ha='right')
# Label each bar with its height
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom')
plt.tight_layout()
# Save the plot
plt.savefig('s3_object_creation_dates.png', dpi=300, bbox_inches='tight')
print("Graph saved as 's3_object_creation_dates.png'")
def main():
bucket_name = os.getenv('MINIO_BUCKET')
dates = get_s3_object_creation_dates(bucket_name)
plot_creation_dates(dates)
if __name__ == "__main__":
main()

View File

@@ -6,6 +6,10 @@ import logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MONGODB_URL = os.getenv('MONGODB_URL', 'mongodb://localhost:27017')
MONGODB_FEEDS_DB_NAME = os.getenv('MONGODB_FEEDS_DB_NAME', 'feeds_db')
MONGODB_FEEDS_COLLECTION_NAME = os.getenv('MONGODB_FEEDS_COLLECTION_NAME', 'rss_feeds')
def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name): def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name):
client = MongoClient(mongo_url) client = MongoClient(mongo_url)
collection = client[db_name][collection_name] collection = client[db_name][collection_name]
@@ -33,10 +37,7 @@ def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name):
) )
if __name__ == "__main__": if __name__ == "__main__":
mongo_url = os.getenv('MONGODB_URL', 'mongodb://localhost:27017')
db_name = os.getenv('MONGODB_DB_NAME', 'ingestrss')
collection_name = os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds')
with open('rss_feeds.json') as f: with open('rss_feeds.json') as f:
rss_feeds = json.load(f) rss_feeds = json.load(f)
logger.info(f"Loaded RSS feeds: {rss_feeds}") logger.info(f"Loaded RSS feeds: {rss_feeds}")
upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name) upload_rss_feeds(rss_feeds, MONGODB_URL, MONGODB_FEEDS_DB_NAME, MONGODB_FEEDS_COLLECTION_NAME)

View File

@@ -1,38 +0,0 @@
Parameters:
LambdaFunctionArn:
Type: String
Description: ARN of the RSS Feed Processor Lambda function
Resources:
EventBridgeScheduleRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: scheduler.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: EventBridgeSchedulePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: lambda:InvokeFunction
Resource: !Ref LambdaFunctionArn
EventBridgeSchedule:
Type: AWS::Scheduler::Schedule
Properties:
Name: rss-feed-processor-schedule
Description: Runs the RSS Feed Processor Lambda function every hour
State: ENABLED
ScheduleExpression: rate(240 minutes)
FlexibleTimeWindow:
Mode: FLEXIBLE
MaximumWindowInMinutes: 1
Target:
Arn: !Ref LambdaFunctionArn
RoleArn: !GetAtt EventBridgeScheduleRole.Arn

View File

@@ -1,68 +0,0 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'IAM Role for RSS Feed Processor Lambda Function with Environment Variable Encryption'
Parameters:
LambdaExecutionRoleName:
Type: String
Description: "Name of the Lambda Execution Role"
LambdaKMSKeyArn:
Type: String
Description: "ARN of the KMS Key for Lambda environment variable encryption"
Region:
Type: String
Description: "AWS Region for deployment"
Default: "us-east-1"
Resources:
LambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: !Ref LambdaExecutionRoleName
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: 'RSSFeedProcessorLambdaPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'sqs:*'
- 's3:*'
- 'lambda:*'
- 'logs:*'
- 'xray:*'
- 'cloudwatch:*'
- 'events:*'
Resource: '*'
- Effect: Allow
Action:
- 'kms:Decrypt'
- 'kms:GenerateDataKey'
Resource: !Ref LambdaKMSKeyArn
Outputs:
LambdaRoleArn:
Description: 'ARN of the Lambda Execution Role'
Value: !GetAtt LambdaExecutionRole.Arn
Export:
Name: !Sub '${AWS::StackName}-LambdaRoleArn'
LambdaKMSKeyArn:
Description: 'ARN of the KMS Key for Lambda'
Value: !Ref LambdaKMSKeyArn
Export:
Name: !Sub '${AWS::StackName}-LambdaKMSKeyArn'
Region:
Description: 'AWS Region for deployment'
Value: !Ref Region
Export:
Name: !Sub '${AWS::StackName}-Region'

View File

@@ -1,78 +0,0 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: Redis Queue Filler Lambda Stack
Parameters:
QueueFillerLambdaName:
Type: String
Description: Name of the Lambda function
RedisUrl:
Type: String
Description: URL of the Redis instance
RedisQueueName:
Type: String
Description: Name of the Redis queue
LambdaCodeS3Bucket:
Type: String
Description: S3 bucket containing the Lambda function code
LambdaCodeS3Key:
Type: String
Description: S3 key for the Lambda function code
LambdaRuntime:
Type: String
Description: Lambda runtime
Default: python3.12
LambdaTimeout:
Type: Number
Description: Lambda timeout in seconds
Default: 300
Resources:
SqsFillerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Ref QueueFillerLambdaName
Runtime: !Ref LambdaRuntime
Handler: lambda_function.handler
Code:
S3Bucket: !Ref LambdaCodeS3Bucket
S3Key: !Ref LambdaCodeS3Key
Timeout: !Ref LambdaTimeout
Environment:
Variables:
REDIS_URL: !Ref RedisUrl
REDIS_QUEUE_NAME: !Ref RedisQueueName
Role: !GetAtt SqsFillerFunctionRole.Arn
SqsFillerFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: LambdaExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub arn:aws:s3:::${LambdaCodeS3Bucket}/${LambdaCodeS3Key}
Outputs:
SqsFillerFunctionArn:
Description: ARN of the Queue Filler Lambda Function
Value: !GetAtt SqsFillerFunction.Arn
SqsFillerFunctionRoleArn:
Description: ARN of the IAM Role for Queue Filler Lambda Function
Value: !GetAtt SqsFillerFunctionRole.Arn

View File

@@ -1,26 +0,0 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor S3 Bucket'
Parameters:
BucketName:
Type: String
Description: "Name of the Lambda Execution Role"
Resources:
ArticleContentBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref BucketName
VersioningConfiguration:
Status: Enabled
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
Outputs:
BucketName:
Description: 'Name of the S3 bucket for article content'
Value: !Ref ArticleContentBucket
Export:
Name: !Sub '${AWS::StackName}-ArticleContentBucketName'

View File

@@ -1,197 +0,0 @@
import boto3
import os
import sys
import json
from src.utils.retry_logic import retry_with_backoff
from botocore.exceptions import ClientError
from qdrant_client import QdrantClient, models
from dotenv import load_dotenv
load_dotenv(override=True)
kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION"))
stack_base = os.getenv("STACK_BASE")
@retry_with_backoff()
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
cf_client = boto3.client('cloudformation', region_name=os.getenv("AWS_REGION"))
stack_name = f"{stack_base}-{stack_suffix}"
with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
template_body = file.read()
capabilities = ['CAPABILITY_NAMED_IAM']
try:
if force_recreate:
try:
print(f"Deleting stack {stack_name} for recreation...")
cf_client.delete_stack(StackName=stack_name)
waiter = cf_client.get_waiter('stack_delete_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} deleted successfully.")
except ClientError:
print(f"Stack {stack_name} does not exist or is already deleted.")
try:
stack = cf_client.describe_stacks(StackName=stack_name)['Stacks'][0]
print(f"Updating stack {stack_name}...")
cf_client.update_stack(
StackName=stack_name,
TemplateBody=template_body,
Capabilities=capabilities,
Parameters=parameters # Add parameters here
)
waiter = cf_client.get_waiter('stack_update_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} updated successfully.")
except ClientError as e:
if 'does not exist' in str(e):
print(f"Creating stack {stack_name}...")
cf_client.create_stack(
StackName=stack_name,
TemplateBody=template_body,
Capabilities=capabilities,
Parameters=parameters # Add parameters here
)
waiter = cf_client.get_waiter('stack_create_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} created successfully.")
elif 'No updates are to be performed' in str(e):
print(f"No updates needed for stack {stack_name}.")
else:
raise
except ClientError as e:
print(f"Error handling stack {stack_name}: {str(e)}")
raise
def get_or_create_kms_key():
# Create a KMS client
kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION"))
tag_key = 'purpose'
tag_value = 'You pass butter'
description = 'KMS key for RSS Feed Processor... Oh my god'
account_id = os.getenv('AWS_ACCOUNT_ID')
try:
# List all KMS keys
response = kms_client.list_keys()
# Check each key for the specified tag
for key in response['Keys']:
try:
tags = kms_client.list_resource_tags(KeyId=key['KeyId'])['Tags']
if any(tag['TagKey'] == tag_key and tag['TagValue'] == tag_value for tag in tags) and any(tag['TagKey'] == 'region' and tag['TagValue'] == os.getenv("AWS_REGION") for tag in tags): # FIXME: This is inefficient and should be fixed and more readable.
print(f"Found existing KMS key with ID: {key['KeyId']}")
return key['KeyId']
except ClientError:
continue
# If no key found, create a new one with appropriate policy
print("No existing key found. Creating a new KMS key.")
key_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{account_id}:root"},
"Action": "kms:*",
"Resource": "*"
},
{
"Sid": "Allow Lambda to use the key",
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey*"
],
"Resource": "*"
}
]
}
response = kms_client.create_key(
Description=description,
KeyUsage='ENCRYPT_DECRYPT',
Origin='AWS_KMS',
Tags=[{'TagKey': tag_key, 'TagValue': tag_value}, {'TagKey': 'region', 'TagValue': os.getenv("AWS_REGION")}],
Policy=json.dumps(key_policy)
)
key_id = response['KeyMetadata']['KeyId']
print(f"Successfully created new KMS key with ID: {key_id}")
return key_id
except ClientError as e:
print(f"Error in KMS key operation: {e}")
sys.exit(1)
def deploy_infrastructure():
# Do some stuff with KMS keys.
kms_key_id = get_or_create_kms_key()
key_info = kms_client.describe_key(KeyId=kms_key_id)
kms_key_arn = key_info['KeyMetadata']['Arn']
deploy_cloudformation('s3.yaml', 'S3',
parameters=[
{
'ParameterKey': 'BucketName',
'ParameterValue': os.getenv('S3_BUCKET_NAME')
}
])
deploy_cloudformation('s3.yaml', 'S3-zipped',
parameters=[
{
'ParameterKey': 'BucketName',
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
}
])
deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True,
parameters=[
{
'ParameterKey': 'LambdaExecutionRoleName',
'ParameterValue': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME', 'default-role-name')
},
{
'ParameterKey': 'LambdaKMSKeyArn',
'ParameterValue': kms_key_arn
}
]
)
deploy_cloudformation('eventbridge.yaml', 'Schedule',
parameters=[
{
'ParameterKey': 'LambdaFunctionArn',
'ParameterValue': f"arn:aws:lambda:{os.getenv('AWS_REGION')}:{os.getenv('AWS_ACCOUNT_ID')}:function:{os.getenv('QUEUE_FILLER_LAMBDA_NAME')}"
}
])
if os.getenv("STORAGE_STRATEGY") == 'qdrant':
client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))
collection = os.getenv("QDRANT_COLLECTION_NAME")
embedding_dim = int(os.getenv("VECTOR_EMBEDDING_DIM"))
metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine").upper()
existing = [c.name for c in client.get_collections().collections]
if collection not in existing:
client.create_collection(
collection_name=collection,
vectors_config=models.VectorParams(size=embedding_dim, distance=getattr(models.Distance, metric))
)
if __name__ == "__main__":
deploy_infrastructure()

View File

@@ -1,211 +0,0 @@
import boto3
import os
import zipfile
import io
import requests
import json
from botocore.exceptions import ClientError
from src.utils.retry_logic import retry_with_backoff
import time
import sys
from src.infra.deploy_infrastructure import get_or_create_kms_key
from dotenv import load_dotenv
load_dotenv(override=True)
import logging
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
# Set variables
LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME')
ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID')
REGION = os.getenv("AWS_REGION")
LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN")
LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT'))
LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY'))
LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME')
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
LAMBDA_HANDLER = "lambda_function.lambda_handler"
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"
S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip'
def zip_directory(path):
print(f"Creating deployment package from {path}...")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for root, _, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, path)
zip_file.write(file_path, arcname)
return zip_buffer.getvalue()
@retry_with_backoff()
def update_function_code(lambda_client, function_name, zip_file):
return lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_file
)
def get_or_create_lambda_layer():
layer_arn = os.getenv('LAMBDA_LAYER_ARN')
return layer_arn
@retry_with_backoff(max_retries=50, initial_backoff=5, backoff_multiplier=2) # Note: This function usually takes a long time to be successful.
def update_function_configuration(lambda_client, function_name, handler, role, timeout, memory, layers, kms_key_id):
config = {
'FunctionName': function_name,
'Handler': handler,
'Role': role,
'Timeout': timeout,
'MemorySize': memory,
'Layers': layers
}
if kms_key_id:
config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}"
try:
response = lambda_client.update_function_configuration(**config)
print(f"Update request sent successfully for {function_name}.")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceConflictException':
logging.info(f"Function {function_name} is currently being updated. Retrying...")
raise e
@retry_with_backoff()
def configure_sqs_trigger(lambda_client, function_name, queue_arn):
"""Placeholder for backward compatibility. Redis deployment uses no SQS trigger."""
return
@retry_with_backoff()
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy):
config = {
'FunctionName': function_name,
'Runtime': runtime,
'Role': role,
'Handler': handler,
'Code': {'ZipFile': zip_file},
'Timeout': timeout,
'MemorySize': memory,
'Layers': layers
}
print(policy)
if kms_key_id:
config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}"
try:
return lambda_client.create_function(**config)
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterValueException':
print(f"Error creating function: {e}")
print("Ensure that the IAM role has the correct trust relationship and permissions.")
print("There might be a delay in role propagation. Please wait a few minutes and try again.")
raise
def get_pillow_layer_arn():
url = f"https://api.klayers.cloud/api/v2/p{os.getenv('PYTHON_VERSION')}/layers/latest/{os.getenv('AWS_REGION')}/json"
try:
response = requests.get(url)
response.raise_for_status()
layers_data = response.json()
pillow_layer = next((layer for layer in layers_data if layer['package'] == 'Pillow'), None)
if pillow_layer:
return pillow_layer['arn']
else:
print("Pillow layer not found in the API response.")
return None
except requests.RequestException as e:
print(f"Error fetching Pillow layer ARN: {e}")
return None
def get_lambda_policy():
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::your-bucket-name/*"
}
]
}
def deploy_lambda():
lambda_client = boto3.client('lambda', region_name=REGION)
print(f"Starting deployment of Lambda function: {LAMBDA_NAME}")
deployment_package = zip_directory('src/infra/lambdas/RSSFeedProcessorLambda/src')
layer_arn = get_or_create_lambda_layer()
if layer_arn:
print(f"Using Lambda Layer ARN: {layer_arn}")
else:
print("Warning: Lambda Layer not found or created. Proceeding without Layer.")
pillow_layer_arn = get_pillow_layer_arn()
if pillow_layer_arn:
print(f"Using Pillow Layer ARN: {pillow_layer_arn}")
else:
print("Warning: Pillow Layer not found. Proceeding without Pillow Layer.")
kms_key_id = get_or_create_kms_key()
if kms_key_id:
print(f"Using KMS Key ID: {kms_key_id}")
else:
print("Warning: KMS Key not found or created. Proceeding without KMS Key.")
sys.exit(1)
try:
# Check if the function exists
try:
lambda_client.get_function(FunctionName=LAMBDA_NAME)
function_exists = True
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
function_exists = False
else:
raise e
# Combine the layers
layers = [layer_arn] if layer_arn else []
if pillow_layer_arn:
layers.append(pillow_layer_arn)
if function_exists:
print("Updating existing Lambda function...")
update_function_configuration(lambda_client, LAMBDA_NAME, LAMBDA_HANDLER, LAMBDA_ROLE_ARN, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id)
update_function_code(lambda_client, LAMBDA_NAME, deployment_package)
else:
print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...")
policy = get_lambda_policy()
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy)
print("Lambda deployment completed successfully!")
except Exception as e:
print(f"Error during Lambda deployment: {str(e)}")
raise
if __name__ == "__main__":
deploy_lambda()

View File

@@ -1,4 +0,0 @@
newspaper3k
feedparser
python-dateutil
lxml

View File

@@ -1,72 +0,0 @@
import os
import requests
from qdrant_client import QdrantClient, models
try:
from ...utils import setup_logging
except ImportError:
# Fallback for when running standalone
import logging
def setup_logging():
return logging.getLogger(__name__)
logger = setup_logging()
qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333")
qdrant_api_key = os.getenv("QDRANT_API_KEY")
collection_name = os.getenv("QDRANT_COLLECTION_NAME")
embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM")
vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine")
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text")
client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key)
def get_index():
collections = client.get_collections().collections
if collection_name not in [c.name for c in collections]:
raise KeyError(f"Collection {collection_name} not found")
return client
def vectorize(article: str) -> list[float]:
try:
response = requests.post(
f"{ollama_host}/api/embeddings",
json={"model": ollama_embedding_model, "prompt": article},
timeout=30,
)
response.raise_for_status()
return response.json().get("embedding", [])
except requests.RequestException as e:
logger.error(f"Error generating embedding: {e}")
# Return a zero vector of the expected dimension as fallback
dim = int(embedding_dim) if embedding_dim else 384 # Default dimension
return [0.0] * dim
def upsert_vectors(index: QdrantClient, data: list[dict]):
points = [
models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload"))
for item in data
]
index.upsert(collection_name=collection_name, points=points)
def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None):
if embedding_dim and len(vector) != int(embedding_dim):
raise ValueError("Length of vector does not match the embedding dimension")
return index.search(
collection_name=collection_name,
query_vector=vector,
limit=top_k,
with_payload=True,
query_filter=filter_query,
)
if __name__ == "__main__":
paragraph = "This is a test."
vectorize(paragraph)

View File

@@ -1,6 +0,0 @@
def summarize(text:str):
sub_prompt = "Summarize the following passage"

View File

@@ -1,14 +0,0 @@
import re
def remove_newlines(text: str) -> str:
return text.replace('\n', '')
def remove_urls(text: str) -> str:
url_pattern = re.compile(r'http\S+|www\S+')
return url_pattern.sub('', text)
def clean_text(text: str) -> str:
text = remove_newlines(text)
text = remove_urls(text)
return text

View File

@@ -1,31 +0,0 @@
import newspaper
import logging
logger = logging.getLogger()
def extract_article(url):
"""
Extracts the title and text of an article from the given URL.
Args:
url (str): The URL of the article.
Returns:
A tuple containing the title and text of the article, respectively.
"""
logger.debug(f"Starting Newspaper Article Extraction {url}")
config = newspaper.Config()
config.request_timeout = 60
article = newspaper.Article(url)
try:
article.download()
logger.debug(f"Downloaded Article {url}")
article.parse()
logger.debug(f"Parsed Article {url}")
return article.title, article.text
except Exception as e:
logger.error(f"Failed to extract article {url}: {str(e)}")
return None, None

View File

@@ -1,15 +0,0 @@
import os
# Redis Configuration
REDIS_URL = os.environ["REDIS_URL"]
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
# Logging Configuration
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
# RSS Feed Processing Configuration
MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10"))
FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90"))
# Article Extraction Configuration
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30"))

View File

@@ -1,127 +0,0 @@
import boto3
from minio import Minio
import json
import os
import logging
from datetime import datetime
from pymongo import MongoClient
logger = logging.getLogger()
# Try to import vector DB components, but make them optional
try:
from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
VECTOR_DB_AVAILABLE = True
except ImportError:
VECTOR_DB_AVAILABLE = False
logger.warning("Vector DB components not available. Qdrant storage will not work.")
s3 = boto3.client('s3')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
minio_client = Minio(
os.getenv("MINIO_ENDPOINT"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=False
)
CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")))
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
storage_strategy = os.environ.get('STORAGE_STRATEGY')
MONGODB_URL = os.getenv("MONGODB_URL")
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME")
MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds")
mongo_client = MongoClient(MONGODB_URL)
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
##### Article Storage #####
def save_article(article: dict, strategy: str):
if strategy == "s3":
s3_save_article(article)
elif strategy == "qdrant":
if VECTOR_DB_AVAILABLE:
qdrant_save_article(article)
else:
logger.error("Qdrant storage requested but vector DB components not available")
raise ValueError("Vector DB components not available for Qdrant storage")
elif strategy == "both":
if VECTOR_DB_AVAILABLE:
qdrant_save_article(article)
s3_save_article(article)
else:
raise ValueError(f"Invalid storage strategy: {strategy}")
def qdrant_save_article(article: dict):
logger.info("Saving article to Qdrant")
index = get_index()
data = {
"id": article["article_id"],
"vector": vectorize(article["content"]),
"payload": {"rss": article.get("rss"), "title": article.get("title")},
}
upsert_vectors(index, [data])
def s3_save_article(article:dict):
logger.info("Saving article to MinIO")
now = datetime.now()
article_id = article['article_id']
if not article_id:
logger.error(f"Missing rss_id or article_id in article: {article}")
return
file_path = f"/tmp/{article_id}-article.json"
file_key = f"{now.year}/{now.month}/{now.day}/{article_id}.json"
# Save article to /tmp json file
with open(file_path, "w") as f:
json.dump(article, f)
try:
metadata = {
"rss": article.get("rss", ""),
"title": article.get("title", ""),
"unixTime": str(article.get("unixTime", "")),
"article_id": article.get("article_id", ""),
"link": article.get("link", ""),
"rss_id": article.get("rss_id", "")
}
minio_client.fput_object(
CONTENT_BUCKET,
file_key,
file_path,
content_type="application/json",
metadata=metadata
)
logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}")
except Exception as e:
logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
###### Feed Storage ######
RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
def update_rss_feed(feed: dict, last_pub_dt: int):
try:
if not os.path.exists(RSS_FEEDS_FILE):
return
with open(RSS_FEEDS_FILE, "r") as f:
feeds = json.load(f)
for item in feeds:
if item.get("u") == feed["u"]:
item["dt"] = int(last_pub_dt)
with open(RSS_FEEDS_FILE, "w") as f:
json.dump(feeds, f)
logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
except Exception as e:
logger.error(f"Failed to update RSS feed: {str(e)}")

View File

@@ -1,11 +0,0 @@
class RSSProcessingError(Exception):
"""Exception raised for errors in the RSS processing."""
pass
class ArticleExtractionError(Exception):
"""Exception raised for errors in the article extraction."""
pass
class DataStorageError(Exception):
"""Exception raised for errors in data storage operations."""
pass

View File

@@ -1,133 +0,0 @@
import feedparser
from datetime import datetime
from dateutil import parser
import queue
import threading
import logging
from .utils import generate_key
from .article_extractor import extract_article
from .article_cleaning import clean_text
logger = logging.getLogger()
def process_feed(feed: dict):
output_queue = queue.Queue()
stop_thread = threading.Event()
thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
thread.daemon = True
thread.start()
logger.debug(f"Thread Started: {feed['u']}")
thread.join(timeout=90)
if thread.is_alive():
stop_thread.set()
logger.debug(f"Killing Thread: {feed['u']}")
return None
else:
try:
output = output_queue.get_nowait()
logger.info(f"Thread Succeeded: {feed['u']}")
return output
except queue.Empty:
logger.info(f"Thread Failed: {feed['u']}")
return None
def extract_feed_threading(rss: dict, output_queue, stop_thread):
articles = []
feed_url = rss['u']
last_date = rss['dt']
max_date = last_date
entry = None # Initialize entry variable
try:
feed = feedparser.parse(feed_url)
for entry in feed['entries']:
if stop_thread.is_set():
break
pub_date = parse_pub_date(entry.get('published', ''))
if pub_date > last_date:
title, text = extract_article(entry.link)
title, text = clean_text(title or ''), clean_text(text or '')
article = {
'link': entry.link,
'rss': feed_url,
'title': title,
'content': text,
'unixTime': pub_date,
'rss_id': generate_key(feed_url),
'article_id': generate_key(entry.link),
'llm_summary': None,
'embedding': None
}
articles.append(article)
max_date = max(max_date, pub_date)
output = {
'articles': articles,
'max_date': max_date,
'feed': rss
}
output_queue.put(output)
except Exception as e:
logger.error(f"Feed URL: {feed_url}")
if entry:
logger.error(f"Current entry: {entry.get('link', 'unknown')}")
logger.error(f"Feed failed due to error: {e}")
def extract_feed(rss: dict):
articles = []
feed_url = rss['u']
last_date = rss['dt']
max_date = last_date
try:
feed = feedparser.parse(feed_url)
for entry in feed['entries']:
pub_date = parse_pub_date(entry.get('published', ''))
if pub_date > last_date:
title, text = extract_article(entry.link)
article = {
'link': entry.link,
'rss': feed_url,
'title': title,
'content': text,
'unixTime': pub_date,
'rss_id': generate_key(feed_url),
'article_id': generate_key(entry.link),
'llm_summary': None,
'embedding': None
}
articles.append(article)
max_date = max(max_date, pub_date)
output = {
'articles': articles,
'max_date': max_date,
'feed': rss
}
return output
except Exception as e:
logger.error(f"Feed URL: {feed_url}")
logger.error(f"Feed failed due to error: {e}")
def parse_pub_date(date_string: str) -> int:
"""Parse publication date from various formats"""
if not date_string:
return int(datetime.now().timestamp())
try:
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
except ValueError:
try:
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
except ValueError:
try:
return int(parser.parse(date_string).timestamp())
except (ValueError, TypeError):
pass
return int(datetime.now().timestamp()) # Return current time if no date is found

View File

@@ -1,65 +0,0 @@
import json
import time
import os
import redis
from feed_processor import extract_feed
from data_storage import save_article, update_rss_feed
from utils import setup_logging
from config import REDIS_URL, REDIS_QUEUE_NAME
from exceptions import RSSProcessingError, DataStorageError
from metrics import (
record_processed_articles,
record_processing_time,
record_extraction_errors,
)
logger = setup_logging()
storage_strategy = os.environ.get("STORAGE_STRATEGY")
redis_client = redis.Redis.from_url(REDIS_URL)
def lambda_handler(event, context):
logger.info("Starting RSS feed processing")
start_time = time.time()
try:
feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
if not feed_data:
logger.info("No messages in queue")
return {"statusCode": 200, "body": json.dumps("No feeds to process")}
feed = json.loads(feed_data)
result = extract_feed(feed)
logger.info(f"Process Feed Result Dictionary: {result}")
last_pub_dt = result["max_date"]
if result:
for article in result["articles"]:
try:
save_article(article, storage_strategy)
except DataStorageError as e:
logger.error(f"Failed to save article: {str(e)}")
record_extraction_errors(1)
update_rss_feed(result["feed"], last_pub_dt)
logger.info(f"Processed feed: {feed['u']}")
record_processed_articles(len(result["articles"]))
else:
logger.warning(f"Failed to process feed: {feed['u']}")
record_extraction_errors(1)
except RSSProcessingError as e:
logger.error(f"RSS Processing Error: {str(e)}")
return {"statusCode": 500, "body": json.dumps("RSS processing failed")}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")}
finally:
end_time = time.time()
processing_time = end_time - start_time
record_processing_time(processing_time)
logger.info(f"Lambda execution time: {processing_time:.2f} seconds")
return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")}

View File

@@ -1,46 +0,0 @@
"""Prometheus metrics utilities for the RSS feed processor."""
import os
from prometheus_client import Counter, Histogram, start_http_server
# Start a Prometheus metrics HTTP server exposing ``/metrics``. The port can be
# customised with the ``METRICS_PORT`` environment variable. This block is safe
# to run multiple times as it silently ignores port binding errors.
_metrics_port = int(os.environ.get("METRICS_PORT", "8000"))
if not os.environ.get("METRICS_SERVER_STARTED"):
try:
start_http_server(_metrics_port)
os.environ["METRICS_SERVER_STARTED"] = "1"
except OSError:
pass
# Metric definitions
_processed_articles = Counter(
"processed_articles_total",
"Total number of processed articles",
)
_processing_time = Histogram(
"rss_processing_seconds",
"Time spent processing RSS feeds",
)
_extraction_errors = Counter(
"extraction_errors_total",
"Number of article extraction errors",
)
def record_processed_articles(count: int) -> None:
"""Increment the processed articles counter."""
_processed_articles.inc(count)
def record_processing_time(duration: float) -> None:
"""Record how long a feed took to process."""
_processing_time.observe(duration)
def record_extraction_errors(count: int) -> None:
"""Increment the extraction errors counter."""
_extraction_errors.inc(count)

View File

@@ -1,18 +0,0 @@
import logging
import os
import hashlib
def setup_logging():
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'INFO')
logger.setLevel(logging.getLevelName(log_level))
return logger
def generate_key(input_string, length=10):
# Create a SHA256 hash of the input string
hash_object = hashlib.sha256(input_string.encode())
hex_dig = hash_object.hexdigest()
# Return the first 'length' characters of the hash
return hex_dig[:length]

View File

@@ -1,81 +0,0 @@
import os
import zipfile
import logging
import boto3
from dotenv import load_dotenv
from src.infra.deploy_infrastructure import deploy_cloudformation
# Load environment variables
load_dotenv(override=True)
# Set up logging
logging.basicConfig(level=os.getenv('LOG_LEVEL'))
# Set up S3 client
s3 = boto3.client('s3')
def zip_lambda_code():
lambda_dir = 'src/infra/lambdas/RSSQueueFiller/lambda'
zip_path = 'tmp/lambda_function.zip'
os.makedirs(zip_path.split("/")[0], exist_ok=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(lambda_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, lambda_dir)
zipf.write(file_path, arcname)
return zip_path
def upload_to_s3(file_path):
s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
bucket_name = os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
s3.upload_file(file_path, bucket_name, s3_key)
return f's3://{bucket_name}/{s3_key}'
def deploy_sqs_filler():
zip_file = zip_lambda_code()
upload_to_s3(zip_file)
# Deploy CloudFormation
deploy_cloudformation('rss_lambda_stack.yaml', 'LambdaSQSFiller',
parameters=[
{
'ParameterKey': 'QueueFillerLambdaName',
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME')
},
{
'ParameterKey': 'RedisUrl',
'ParameterValue': os.getenv('REDIS_URL')
},
{
'ParameterKey': 'RedisQueueName',
'ParameterValue': os.getenv('REDIS_QUEUE_NAME')
},
{
'ParameterKey': 'LambdaCodeS3Bucket',
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
},
{
'ParameterKey': 'LambdaCodeS3Key',
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
},
{
'ParameterKey': 'LambdaRuntime',
'ParameterValue': os.getenv('LAMBDA_RUNTIME')
},
{
'ParameterKey': 'LambdaTimeout',
'ParameterValue': os.getenv('LAMBDA_TIMEOUT')
}
])
# Clean up local zip file
os.remove(zip_file)
if __name__ == "__main__":
deploy_sqs_filler()

View File

@@ -1,76 +0,0 @@
import json
import os
import logging
import boto3
from decimal import Decimal
from pymongo import MongoClient
from datetime import datetime
import redis
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# For AWS deployment - SQS
try:
sqs = boto3.client('sqs')
SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL', '')
AWS_DEPLOYMENT = bool(SQS_QUEUE_URL)
except Exception:
AWS_DEPLOYMENT = False
# For local deployment - Redis
if not AWS_DEPLOYMENT:
redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379'))
REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue')
MONGODB_URL = os.environ['MONGODB_URL']
MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME']
MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds')
mongo_client = MongoClient(MONGODB_URL)
feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME]
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return int(obj)
return super(DecimalEncoder, self).default(obj)
def handler(event, context):
messages_sent = 0
# Iterate over all feeds in MongoDB
for item in feeds_collection.find({}):
rss_url = item.get('url')
rss_dt = item.get('dt')
logger.debug(f"Processing RSS feed: {rss_url}")
logger.debug(f"Last published date: {rss_dt}")
if rss_url:
message = {
'u': rss_url,
'dt': rss_dt
}
logger.debug(f"Message: {message}")
try:
if AWS_DEPLOYMENT:
# Send to SQS for AWS deployment
sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(message, cls=DecimalEncoder)
)
else:
# Send to Redis for local deployment
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message, cls=DecimalEncoder))
messages_sent += 1
except Exception as e:
logger.error(f"Error sending message to queue: {str(e)}")
logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}")
return {
"statusCode": 200,
"body": json.dumps(f"Sent {messages_sent} RSS URLs to queue"),
}

View File

@@ -1,117 +0,0 @@
#!/bin/bash
set -e
####### Section 1: Checking Python Existence ########
echo "Section 1: Checking Python Existence"
# Ensure python3.12 is installed
if ! command -v python3.12 &> /dev/null; then
echo "Python 3.12 is not installed. Please install it before running this script."
exit 1
fi
echo "Python 3.12 found. Proceeding..."
####### Section 2: Installing Dependencies ########
echo "Section 2: Installing Dependencies"
# Install dependencies
python3.12 -m pip install --upgrade Pillow feedfinder2==0.0.4 python-dateutil newspaper3k==0.2.8 feedparser lxml[html5lib] lxml_html_clean lxml[html_clean] qdrant-client ollama -t python/
echo "Dependencies installed successfully."
####### Section 3: Creating ZIP File ########
echo "Section 3: Creating ZIP File"
# Create ZIP file
zip -r OpenRSSLambdaLayer.zip python/
echo "ZIP file created."
# Check if ZIP file was created and is not empty
if [ ! -s OpenRSSLambdaLayer.zip ]; then
echo "Error: ZIP file is empty or was not created."
exit 1
fi
echo "ZIP file check passed."
####### Section 4: Getting AWS Regions ########
echo "Section 4: Getting AWS Regions"
# Get list of all AWS regions
REGIONS=$(aws ec2 describe-regions --query 'Regions[].RegionName' --output text)
echo "Retrieved AWS regions: $REGIONS"
####### Section 5: Creating Buckets, Uploading, and Publishing Layer ########
echo "Section 5: Creating Buckets, Uploading, and Publishing Layer"
create_bucket_upload_and_publish_layer() {
local region=$1
local bucket_name="rss-feed-processor-layers-$region"
local layer_name="ingest-rss-lambda-layer-$region"
echo "Processing region: $region"
# Create bucket if it doesn't exist
if ! aws s3api head-bucket --bucket "$bucket_name" --region "$region" 2>/dev/null; then
echo "Creating bucket $bucket_name in $region"
if [ "$region" == "us-east-1" ]; then
aws s3api create-bucket --bucket "$bucket_name" --region "$region"
else
aws s3api create-bucket --bucket "$bucket_name" --region "$region" --create-bucket-configuration LocationConstraint=$region
fi
else
echo "Bucket $bucket_name already exists in $region"
fi
# Upload ZIP to the region-specific bucket
echo "Uploading ZIP to $bucket_name"
aws s3 cp OpenRSSLambdaLayer.zip "s3://$bucket_name/" --region "$region"
# Create and publish Lambda layer
echo "Creating Lambda layer in region: $region"
LAYER_VERSION=$(aws lambda publish-layer-version \
--region "$region" \
--layer-name $layer_name \
--description "Layer with dependencies for RSS processing" \
--license-info "MIT" \
--content "S3Bucket=$bucket_name,S3Key=OpenRSSLambdaLayer.zip" \
--compatible-runtimes python3.12 \
--query 'Version' \
--output text
)
if [ -z "$LAYER_VERSION" ]; then
echo "Failed to create Lambda layer in region $region."
return 1
fi
echo "Making layer public in region: $region"
aws lambda add-layer-version-permission \
--region "$region" \
--layer-name $layer_name \
--version-number "$LAYER_VERSION" \
--statement-id public \
--action lambda:GetLayerVersion \
--principal '*'
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
ARN="arn:aws:lambda:${region}:${ACCOUNT_ID}:layer:$layer_name:${LAYER_VERSION}"
echo "Layer ARN for region $region: $ARN"
echo "$region:$ARN" >> layer_arns.txt
}
# Process all regions
for region in $REGIONS; do
if create_bucket_upload_and_publish_layer "$region"; then
echo "Successfully processed region: $region"
else
echo "Failed to process region: $region. Continuing with next region..."
fi
done
####### Section 6: Completion ########
echo "Section 6: Completion"
echo "Setup complete! OpenRSSLambdaLayer is now available in all processed regions."
echo "Layer ARNs have been saved to layer_arns.txt"
echo "Script execution completed successfully."

View File

@@ -1,69 +0,0 @@
import boto3
import os
from src.utils.retry_logic import retry_with_backoff
# Set variables
LAMBDA_NAME = "RSSFeedProcessor"
@retry_with_backoff()
def update_env_vars(function_name):
lambda_client = boto3.client('lambda')
env_vars = {
# Lambda Configuration
'LAMBDA_FUNCTION_NAME': os.environ.get('LAMBDA_FUNCTION_NAME'),
'STACK_BASE': os.environ.get('STACK_BASE'),
'LAMBDA_EXECUTION_ROLE_NAME': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME'),
'LAMBDA_ROLE_ARN': os.environ.get('LAMBDA_ROLE_ARN'),
'LAMBDA_LAYER_VERSION': os.environ.get('LAMBDA_LAYER_VERSION'),
'LAMBDA_LAYER_NAME': os.environ.get('LAMBDA_LAYER_NAME'),
'LAMBDA_LAYER_ARN': os.environ.get('LAMBDA_LAYER_ARN'),
'LAMBDA_RUNTIME': os.environ.get('LAMBDA_RUNTIME'),
'LAMBDA_TIMEOUT': os.environ.get('LAMBDA_TIMEOUT', '300'), # Reasonable default timeout
'LAMBDA_MEMORY': os.environ.get('LAMBDA_MEMORY', '512'), # Reasonable default memory
# S3 Configuration
'S3_BUCKET_NAME': os.environ.get('S3_BUCKET_NAME'),
'S3_LAMBDA_ZIPPED_BUCKET_NAME': os.environ.get('S3_LAMBDA_ZIPPED_BUCKET_NAME'),
'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'),
'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'),
# Redis Configuration
'REDIS_URL': os.environ.get('REDIS_URL'),
'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'),
# Queue Filler Lambda Configuration
'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'),
'QUEUE_FILLER_LAMBDA_S3_KEY': os.environ.get('QUEUE_FILLER_LAMBDA_S3_KEY'),
# Python Configuration
'PYTHON_VERSION': os.environ.get('PYTHON_VERSION', '3.12'), # Default Python version
# Application Settings
'APP_NAME': os.environ.get('APP_NAME', 'RSS Feed Processor'), # Default app name is fine
'VERSION': os.environ.get('VERSION', '1.0.0'), # Default version is fine
'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO'), # Default to INFO logging
# Storage Configuration
'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY', 's3'), # Default to s3 storage
# Qdrant Configuration (only used if STORAGE_STRATEGY is 'qdrant')
'QDRANT_URL': os.environ.get('QDRANT_URL'),
'QDRANT_API_KEY': os.environ.get('QDRANT_API_KEY'),
'QDRANT_COLLECTION_NAME': os.environ.get('QDRANT_COLLECTION_NAME'),
# Vector Configuration
'VECTOR_EMBEDDING_MODEL': os.environ.get('VECTOR_EMBEDDING_MODEL'),
'VECTOR_EMBEDDING_DIM': os.environ.get('VECTOR_EMBEDDING_DIM'),
'VECTOR_SEARCH_METRIC': os.environ.get('VECTOR_SEARCH_METRIC'),
# Ollama Configuration
'OLLAMA_HOST': os.environ.get('OLLAMA_HOST'),
'OLLAMA_EMBEDDING_MODEL': os.environ.get('OLLAMA_EMBEDDING_MODEL'),
}
return lambda_client.update_function_configuration(
FunctionName=LAMBDA_NAME,
Environment={'Variables': env_vars}
)

View File

@@ -33,54 +33,16 @@ def main():
# Determine if we're in advanced mode # Determine if we're in advanced mode
advanced_mode = not Confirm.ask("Do you want to use basic mode? \n( We recommend basic for your first time ) ") advanced_mode = not Confirm.ask("Do you want to use basic mode? \n( We recommend basic for your first time ) ")
# AWS Configuration
env_vars["AWS_ACCOUNT_ID"] = get_env_value("AWS_ACCOUNT_ID", "Enter AWS Account ID:") # Could we grab this for the user instead.
# AWS Credentials
if not check_aws_region():
console.print("AWS region not found in environment variables.")
env_vars["AWS_REGION"] = get_env_value("AWS_REGION", "Enter AWS Region:", options=get_aws_regions())
else:
env_vars["AWS_REGION"] = os.environ.get('AWS_REGION')
if not check_aws_credentials(): # MongoDB and Redis Configuration
console.print("AWS credentials not found in environment variables.") env_vars["MONGODB_URL"] = get_env_value("MONGODB_URL", "Enter MongoDB URL:", options=["mongodb://localhost:27017"], advanced=advanced_mode)
if Confirm.ask("Do you want to set AWS credentials?"): env_vars["MONGODB_ARTICLES_DB_NAME"] = get_env_value("MONGODB_ARTICLES_DB_NAME", "Enter MongoDB Articles DB Name:", options=["articles_db"], advanced=advanced_mode)
env_vars["AWS_ACCESS_KEY_ID"] = get_env_value("AWS_ACCESS_KEY_ID", "Enter AWS Access Key ID:") env_vars["MONGODB_ARTICLES_COLLECTION_NAME"] = get_env_value("MONGODB_ARTICLES_COLLECTION_NAME", "Enter MongoDB Articles Collection Name:", options=["articles"], advanced=advanced_mode)
env_vars["AWS_SECRET_ACCESS_KEY"] = get_env_value("AWS_SECRET_ACCESS_KEY", "Enter AWS Secret Access Key:") env_vars["MONGODB_FEEDS_DB_NAME"] = get_env_value("MONGODB_FEEDS_DB_NAME", "Enter MongoDB Feeds DB Name:", options=["feeds_db"], advanced=advanced_mode)
else: env_vars["MONGODB_FEEDS_COLLECTION_NAME"] = get_env_value("MONGODB_FEEDS_COLLECTION_NAME", "Enter MongoDB Feeds Collection Name:", options=["rss_feeds"], advanced=advanced_mode)
env_vars["AWS_ACCESS_KEY_ID"] = os.environ.get('AWS_ACCESS_KEY_ID')
env_vars["AWS_SECRET_ACCESS_KEY"] = os.environ.get('AWS_SECRET_ACCESS_KEY')
console.print("AWS credentials found in environment variables.")
# Resource Names
env_vars["LAMBDA_FUNCTION_NAME"] = get_env_value("LAMBDA_FUNCTION_NAME", "Enter Lambda Function Name:", options=["RSSFeedProcessor", "CustomRSSProcessor"], advanced=advanced_mode)
env_vars["STACK_BASE"] = env_vars["LAMBDA_FUNCTION_NAME"]
env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}"
env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}"
env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}"
env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode) env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode)
env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode) env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode)
# Advanced Configuration
env_vars["LAMBDA_LAYER_VERSION"] = 3
env_vars["LAMBDA_LAYER_NAME"] = f"ingest-rss-lambda-layer-{env_vars['AWS_REGION']}"
env_vars["LAMBDA_LAYER_ARN"] = f"arn:aws:lambda:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:layer:{env_vars['LAMBDA_LAYER_NAME']}:{env_vars['LAMBDA_LAYER_VERSION']}"
env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}"
env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}"
env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode)
env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode)
env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}"
env_vars["LAMBDA_TIMEOUT"] = get_env_value("LAMBDA_TIMEOUT", "Enter Lambda Timeout (in seconds):", options=["60", "120", "300"], advanced=advanced_mode)
env_vars["LAMBDA_MEMORY"] = get_env_value("LAMBDA_MEMORY", "Enter Lambda Memory (in MB):", options=["128", "256", "512", "1024"], advanced=advanced_mode)
env_vars["QUEUE_FILLER_LAMBDA_NAME"] = get_env_value("QUEUE_FILLER_LAMBDA_NAME", "Enter Queue Filler Lambda Name:", options=["RSSQueueFiller", "CustomQueueFiller"], advanced=advanced_mode)
env_vars["QUEUE_FILLER_LAMBDA_S3_KEY"] = get_env_value("QUEUE_FILLER_LAMBDA_S3_KEY", "Enter Queue Filler Lambda S3 Key:", options=["RSSQueueFiller.zip", "CustomQueueFiller.zip"], advanced=advanced_mode)
# Logging Configuration # Logging Configuration
env_vars["LOG_LEVEL"] = get_env_value("LOG_LEVEL", "Enter Log Level:", options=["DEBUG", "INFO", "WARNING", "ERROR"], advanced=advanced_mode) env_vars["LOG_LEVEL"] = get_env_value("LOG_LEVEL", "Enter Log Level:", options=["DEBUG", "INFO", "WARNING", "ERROR"], advanced=advanced_mode)
@@ -90,7 +52,7 @@ def main():
env_vars["TEST"] = get_env_value("TEST", "Enter Test Value:", options=["0", "1"], advanced=advanced_mode) env_vars["TEST"] = get_env_value("TEST", "Enter Test Value:", options=["0", "1"], advanced=advanced_mode)
# Storage Strategy # Storage Strategy
env_vars["STORAGE_STRATEGY"] = get_env_value("STORAGE_STRATEGY", "Choose Storage Strategy:", options=["s3", "qdrant"], advanced=advanced_mode) env_vars["STORAGE_STRATEGY"] = "mongodb"
# Qdrant Configuration (only if qdrant is selected) # Qdrant Configuration (only if qdrant is selected)
if env_vars["STORAGE_STRATEGY"] == "qdrant": if env_vars["STORAGE_STRATEGY"] == "qdrant":

View File

@@ -1,3 +1,3 @@
from .downloader import S3BatchDownloader from .downloader import MongoDBBatchDownloader
__all__ = ['S3BatchDownloader'] __all__ = ['MongoDBBatchDownloader']

View File

@@ -1,170 +1,62 @@
from minio import Minio
import pandas as pd import pandas as pd
from typing import Optional, List, Dict, Union, Any from typing import Optional, List, Dict, Any
import json
import os import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import logging import logging
from string import Template from pymongo import MongoClient
from tqdm import tqdm from tqdm import tqdm
from datetime import datetime
class S3BatchDownloader: class MongoDBBatchDownloader:
"""Class for batch downloading RSS articles from a MinIO bucket""" """Class for batch downloading RSS articles from a MongoDB collection"""
DEFAULT_CONFIG = { def __init__(self, mongo_url: str, db_name: str, collection_name: str):
"region": "${AWS_REGION}",
"bucket": "${RSS_BUCKET_NAME}",
"prefix": "${RSS_PREFIX}",
"max_workers": os.cpu_count() or 10
}
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the S3BatchDownloader
Args:
config_path: Optional path to config file. If None, uses environment variables.
"""
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.config = self._load_config(config_path) self.mongo_url = mongo_url
self._validate_config() self.db_name = db_name
self.collection_name = collection_name
self.s3 = Minio( self.client = MongoClient(self.mongo_url)
os.getenv('MINIO_ENDPOINT'), self.collection = self.client[self.db_name][self.collection_name]
access_key=os.getenv('MINIO_ACCESS_KEY'), self.logger.info(f"Initialized MongoDBBatchDownloader for collection: {self.collection_name}")
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=False
)
self.logger.info(
f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}"
)
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load and process configuration"""
if config_path and os.path.exists(config_path):
with open(config_path) as f:
template = Template(f.read())
else:
template = Template(json.dumps(self.DEFAULT_CONFIG))
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'),
'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET')
}
config_str = template.safe_substitute(env_vars)
try:
config = json.loads(config_str)
config['max_workers'] = int(config.get('max_workers', 10))
return config
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON config after variable substitution: {str(e)}")
def _validate_config(self) -> None:
"""Validate the configuration"""
required_fields = ['region', 'bucket', 'prefix']
missing_fields = [field for field in required_fields if field not in self.config]
if missing_fields:
raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}")
def download_to_file(self, def download_to_file(self,
output_path: str, output_path: str,
file_format: str = 'csv', file_format: str = 'csv',
start_date: Optional[str] = None, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> str: end_date: Optional[str] = None) -> str:
""" """
Download articles from MinIO to a consolidated file Download articles from MongoDB to a consolidated file
Args: Args:
output_path: Path to save the output file. output_path: Path to save the output file.
file_format: Format to save the file ('csv' or 'json'). file_format: Format to save the file ('csv' or 'json').
start_date: Optional start date filter (YYYY-MM-DD). start_date: Optional start date filter (YYYY-MM-DD, expects 'unixTime' field in article).
end_date: Optional end date filter (YYYY-MM-DD). end_date: Optional end date filter (YYYY-MM-DD).
Returns: Returns:
Path to the saved file. Path to the saved file.
""" """
self.logger.info(f"Starting batch download to {output_path}") self.logger.info(f"Starting batch download to {output_path}")
query = {}
# Convert date strings to UTC datetime if start_date or end_date:
start_ts = datetime.strptime(start_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if start_date else None date_query = {}
end_ts = datetime.strptime(end_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if end_date else None if start_date:
start_ts = int(datetime.strptime(start_date, '%Y-%m-%d').timestamp())
# List and filter objects date_query['$gte'] = start_ts
objects = self._list_objects() if end_date:
end_ts = int(datetime.strptime(end_date, '%Y-%m-%d').timestamp())
if start_ts or end_ts: date_query['$lte'] = end_ts
objects = [ query['unixTime'] = date_query
obj for obj in objects articles = list(self.collection.find(query))
if self._is_in_date_range(obj['LastModified'], start_ts, end_ts) self.logger.info(f"Found {len(articles)} articles to process")
] print(f"Found {len(articles)} articles to process")
self.logger.info(f"Found {len(objects)} objects to process") # Remove MongoDB _id field for export
print(f"Found {len(objects)} objects to process") for article in articles:
article.pop('_id', None)
# Download and merge data
all_data = []
with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor, tqdm(total=len(objects), unit="object") as progress_bar:
future_to_obj = {executor.submit(self._download_object, obj): obj for obj in objects}
for future in as_completed(future_to_obj):
result = future.result()
if result is not None:
all_data.extend(result if isinstance(result, list) else [result])
progress_bar.update(1)
# Save to file # Save to file
self._save_to_file(all_data, output_path, file_format) df = pd.DataFrame(articles)
self.logger.info(f"Successfully downloaded {len(all_data)} articles to {output_path}")
return output_path
def _list_objects(self) -> List[Dict]:
"""List objects in bucket"""
objects = []
try:
for obj in self.s3.list_objects(
self.config['bucket'],
prefix=self.config['prefix'],
recursive=True
):
objects.append({
'Key': obj.object_name,
'LastModified': obj.last_modified
})
return objects
except Exception as e:
self.logger.error(f"Error listing objects: {str(e)}")
raise
def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]:
"""Download and parse single object"""
try:
response = self.s3.get_object(self.config['bucket'], obj['Key'])
content = response.read().decode('utf-8')
data = json.loads(content)
stat = self.s3.stat_object(self.config['bucket'], obj['Key'])
metadata = stat.metadata
if isinstance(data, dict):
data.update(metadata)
return [data]
elif isinstance(data, list):
for item in data:
item.update(metadata)
return data
except Exception as e:
self.logger.error(f"Error downloading {obj['Key']}: {str(e)}")
return None
def _is_in_date_range(self, ts: datetime, start: Optional[datetime], end: Optional[datetime]) -> bool:
"""Check if timestamp is within the date range"""
return (not start or ts >= start) and (not end or ts <= end)
def _save_to_file(self, data: List[Dict], output_path: str, file_format: str) -> None:
"""Save data to file"""
df = pd.DataFrame(data)
if file_format == 'csv': if file_format == 'csv':
df.to_csv(output_path, index=False) df.to_csv(output_path, index=False)
elif file_format == 'json': elif file_format == 'json':
df.to_json(output_path, orient='records', lines=True) df.to_json(output_path, orient='records', lines=True)
else: else:
raise ValueError(f"Unsupported file format: {file_format}") raise ValueError(f"Unsupported file format: {file_format}")
self.logger.info(f"Successfully downloaded {len(articles)} articles to {output_path}")
return output_path

View File

@@ -6,40 +6,17 @@ from typing import List, Dict
def check_env() -> None: def check_env() -> None:
# Variables that must be set by the user # Variables that must be set by the user
required_user_vars = [ required_user_vars = [
"AWS_REGION", "MONGODB_URL",
"AWS_ACCOUNT_ID", "MONGODB_ARTICLES_DB_NAME",
"AWS_ACCESS_KEY_ID", "MONGODB_ARTICLES_COLLECTION_NAME",
"AWS_SECRET_ACCESS_KEY", "MONGODB_FEEDS_DB_NAME",
"MINIO_ENDPOINT", "MONGODB_FEEDS_COLLECTION_NAME",
"MINIO_ACCESS_KEY", "REDIS_URL",
"MINIO_SECRET_KEY", "REDIS_QUEUE_NAME"
"MINIO_BUCKET"
] ]
# Variables that are derived or have default values # Variables that are derived or have default values
derived_vars = [ derived_vars = [
"AWS_DEFAULT_REGION",
"LAMBDA_FUNCTION_NAME",
"STACK_BASE",
"LAMBDA_EXECUTION_ROLE_NAME",
"LAMBDA_ROLE_ARN",
"S3_BUCKET_NAME",
"REDIS_URL",
"REDIS_QUEUE_NAME",
"LAMBDA_LAYER_VERSION",
"LAMBDA_LAYER_NAME",
"LAMBDA_LAYER_ARN",
"S3_LAYER_BUCKET_NAME",
"S3_LAYER_KEY_NAME",
"PYTHON_VERSION",
"LAMBDA_RUNTIME",
"LAMBDA_TIMEOUT",
"LAMBDA_MEMORY",
"MONGODB_URL",
"MONGODB_DB_NAME",
"MONGODB_COLLECTION_NAME",
"QUEUE_FILLER_LAMBDA_NAME",
"QUEUE_FILLER_LAMBDA_S3_KEY",
"LOG_LEVEL", "LOG_LEVEL",
"APP_NAME", "APP_NAME",
"VERSION", "VERSION",