This commit is contained in:
Charles-Gormley
2024-08-25 09:11:49 -04:00
parent a31e5fdfe0
commit 049cb6a6b2
37 changed files with 1117 additions and 1 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
repo_structure.txt
.env
/layer/python*

128
README.md
View File

@@ -1 +1,127 @@
# OpenRSS # OpenRSS Feed Processor
OpenRSS is an AWS-based RSS feed processing system that automatically fetches, processes, and stores articles from specified RSS feeds.
## Project Structure
```
OpenRSS/
├── src/
│ ├── infra/
│ │ ├── cloudformation/
│ │ │ ├── s3.yaml
│ │ │ ├── dynamo.yaml
│ │ │ └── sqs.yaml
│ │ └── deploy_infrastructure.py
│ ├── lambda_function/
│ │ ├── src/
│ │ │ ├── lambda_function.py
│ │ │ ├── feed_processor.py
│ │ │ ├── article_extractor.py
│ │ │ ├── data_storage.py
│ │ │ ├── utils.py
│ │ │ ├── config.py
│ │ │ ├── exceptions.py
│ │ │ └── metrics.py
│ │ ├── tests/
│ │ │ └── test_lambda_function.py
│ │ ├── layers/
│ │ │ └── requirements.txt
│ │ ├── deploy_lambda.py
│ │ └── update_env_vars.py
│ └── utils/
│ ├── create_lambda_layer.py
│ └── upload_rss_feeds.py
├── launch.py
├── rss_feeds.json
├── requirements.txt
└── README.md
```
## Prerequisites
- Python 3.8+
- AWS CLI configured with appropriate permissions
- An AWS account with necessary services (S3, DynamoDB, SQS, Lambda) enabled
## Setup
1. Clone the repository:
```
git clone https://github.com/yourusername/OpenRSS.git
cd OpenRSS
```
2. Install the required dependencies:
```
pip install -r requirements.txt
```
3. Create a `.env` file in the root directory with the following content:
```
AWS_ACCESS_KEY_ID=your_access_key_here
AWS_SECRET_ACCESS_KEY=your_secret_key_here
AWS_REGION=us-east-1
```
4. Update the `rss_feeds.json` file with the RSS feeds you want to process.
## Usage
To deploy the infrastructure and start the RSS feed processor:
```
python launch.py
```
This script will:
1. Deploy the necessary AWS infrastructure (S3, DynamoDB, SQS) using CloudFormation.
2. Create and upload the Lambda layer.
3. Deploy the Lambda function.
4. Upload the RSS feeds to DynamoDB.
5. Trigger an initial execution of the Lambda function.
## Infrastructure
The project uses the following AWS services:
- S3: Stores processed articles
- DynamoDB: Stores RSS feed information and processing status
- SQS: Queues RSS feeds for processing
- Lambda: Processes RSS feeds and extracts articles
## Lambda Function
The Lambda function (`src/lambda_function/src/lambda_function.py`) is triggered periodically to process RSS feeds. It:
1. Retrieves RSS feed information from DynamoDB
2. Fetches and parses the RSS feed
3. Extracts articles using the newspaper3k library
4. Stores processed articles in S3
5. Updates the feed's last processed timestamp in DynamoDB
## Customization
- To modify the CloudFormation templates, edit the YAML files in `src/infra/cloudformation/`.
- To change the Lambda function's behavior, modify the Python files in `src/lambda_function/src/`.
- To add or remove RSS feeds, update the `rss_feeds.json` file.
## Testing
To run the tests for the Lambda function:
```
python -m pytest src/lambda_function/tests/
```
## Monitoring
The Lambda function logs its activities to CloudWatch Logs. You can monitor the function's performance and any errors through the AWS CloudWatch console.
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## License
This project is licensed under the MIT License.

66
launch.py Normal file
View File

@@ -0,0 +1,66 @@
import os
import sys
import json
import boto3
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Set AWS credentials from environment variables
os.environ['AWS_ACCESS_KEY_ID'] = os.getenv('AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = os.getenv('AWS_SECRET_ACCESS_KEY')
os.environ['AWS_DEFAULT_REGION'] = os.getenv('AWS_REGION')
TABLE_NAME = os.getenv('DYNAMODB_TABLE_NAME')
ACCOUNT_NUM = os.getenv("AWS_ACCOUNT_ID")
SQS_QUEUE_NAME = os.getenv("SQS_QUEUE_NAME")
REGION = os.getenv("AWS_REGION")
os.environ["SQS_QUEUE_URL"] = f"https://sqs.{REGION}.amazonaws.com/{ACCOUNT_NUM}/{SQS_QUEUE_NAME}"
lambda_client = boto3.client("lambda")
LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME")
# Add the src directory to the Python path
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
from src.infra.deploy_infrastructure import deploy_infrastructure
from src.utils.create_lambda_layer import create_lambda_layer
from src.lambda_function.deploy_lambda import deploy_lambda
from src.lambda_function.update_lambda_env_vars import update_env_vars
from src.utils.upload_rss_feeds import upload_rss_feeds
def main():
# Deploy infrastructure
deploy_infrastructure()
# Create Lambda layer
create_lambda_layer()
print("Finished with Lambda Layer")
# Deploy Lambda function
deploy_lambda()
print("Finished Deploying Lambda")
# Update Lambda environment variables
update_env_vars(LAMBDA_FUNCTION_NAME, )
print("Finished Environment Variable Updates")
# Upload RSS feeds
rss_feeds_file = os.path.join(current_dir, "rss_feeds.json")
if os.path.exists(rss_feeds_file):
with open(rss_feeds_file, 'r') as f:
rss_feeds = json.load(f)
upload_rss_feeds(rss_feeds, TABLE_NAME)
else:
print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.")
print("RSS Feed Processor launched successfully!")
print("RSS Feed Processor launched successfully!")
if __name__ == "__main__":
main()

0
requirements.txt Normal file
View File

42
rss_feeds.json Normal file
View File

@@ -0,0 +1,42 @@
[
{
"u": "http://rss.cnn.com/rss/cnn_topstories.rss",
"dt": 0
},
{
"u": "https://feeds.a.dj.com/rss/RSSWorldNews.xml",
"dt": 0
},
{
"u": "http://feeds.bbci.co.uk/news/world/rss.xml",
"dt": 0
},
{
"u": "https://feeds.npr.org/1001/rss.xml",
"dt": 0
},
{
"u": "https://www.reddit.com/r/news/.rss",
"dt": 0
},
{
"u": "https://news.ycombinator.com/rss",
"dt": 0
},
{
"u": "https://techcrunch.com/feed/",
"dt": 0
},
{
"u": "https://www.wired.com/feed/rss",
"dt": 0
},
{
"u": "https://www.sciencedaily.com/rss/all.xml",
"dt": 0
},
{
"u": "https://www.nasa.gov/rss/dyn/breaking_news.rss",
"dt": 0
}
]

View File

@@ -0,0 +1,27 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor DynamoDB Table'
Parameters:
DynamoDBName:
Type: String
Description: ""
Resources:
RSSFeedsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Ref 'DynamoDBName'
AttributeDefinitions:
- AttributeName: url
AttributeType: S
KeySchema:
- AttributeName: url
KeyType: HASH
BillingMode: PAY_PER_REQUEST
Outputs:
TableName:
Description: 'Name of the DynamoDB table for RSS feeds'
Value: !Ref RSSFeedsTable
Export:
Name: !Sub '${AWS::StackName}-RSSFeedsTableName'

View File

@@ -0,0 +1,48 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'IAM Role for RSS Feed Processor Lambda Function with Broad Permissions'
Parameters:
LambdaExecutionRoleName:
Type: String
Description: "Name of the Lambda Execution Role"
Resources:
LambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
RoleName: !Ref LambdaExecutionRoleName
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
Policies:
- PolicyName: 'RSSFeedProcessorLambdaBroadPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'sqs:*'
- 'dynamodb:*'
- 's3:*'
- 'lambda:*'
- 'logs:*'
- 'xray:*'
- 'cloudwatch:*'
- 'events:*'
- 'kms:Decrypt'
Resource: '*'
Outputs:
LambdaRoleArn:
Description: 'ARN of the Lambda Execution Role'
Value: !GetAtt LambdaExecutionRole.Arn
Export:
Name: !Sub '${AWS::StackName}-LambdaRoleArn'

View File

@@ -0,0 +1,26 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor S3 Bucket'
Parameters:
BucketName:
Type: String
Description: "Name of the Lambda Execution Role"
Resources:
ArticleContentBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref BucketName
VersioningConfiguration:
Status: Enabled
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
Outputs:
BucketName:
Description: 'Name of the S3 bucket for article content'
Value: !Ref ArticleContentBucket
Export:
Name: !Sub '${AWS::StackName}-ArticleContentBucketName'

View File

@@ -0,0 +1,35 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for RSS Feed Processor SQS Queue'
Parameters:
SQSQueueName:
Type: String
Description: ""
Resources:
RSSFeedQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Ref SQSQueueName
VisibilityTimeout: 300
RedrivePolicy:
deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn
maxReceiveCount: 3
RSSFeedDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub '${AWS::StackName}-rss-feed-dlq'
Outputs:
QueueURL:
Description: 'URL of the SQS queue for RSS feeds'
Value: !Ref RSSFeedQueue
Export:
Name: !Sub '${AWS::StackName}-RSSFeedQueueURL'
DLQueueURL:
Description: 'URL of the Dead Letter Queue for RSS feeds'
Value: !Ref RSSFeedDLQ
Export:
Name: !Sub '${AWS::StackName}-RSSFeedDLQueueURL'

View File

@@ -0,0 +1,95 @@
import boto3
import os
from botocore.exceptions import ClientError
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
cf_client = boto3.client('cloudformation')
stack_name = f"rss-feed-processor-{stack_suffix}"
with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
template_body = file.read()
print(f"Template contents:\n{template_body}")
capabilities = ['CAPABILITY_NAMED_IAM']
try:
if force_recreate:
try:
print(f"Deleting stack {stack_name} for recreation...")
cf_client.delete_stack(StackName=stack_name)
waiter = cf_client.get_waiter('stack_delete_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} deleted successfully.")
except ClientError:
print(f"Stack {stack_name} does not exist or is already deleted.")
try:
stack = cf_client.describe_stacks(StackName=stack_name)['Stacks'][0]
print(f"Updating stack {stack_name}...")
cf_client.update_stack(
StackName=stack_name,
TemplateBody=template_body,
Capabilities=capabilities,
Parameters=parameters # Add parameters here
)
waiter = cf_client.get_waiter('stack_update_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} updated successfully.")
except ClientError as e:
if 'does not exist' in str(e):
print(f"Creating stack {stack_name}...")
cf_client.create_stack(
StackName=stack_name,
TemplateBody=template_body,
Capabilities=capabilities,
Parameters=parameters # Add parameters here
)
waiter = cf_client.get_waiter('stack_create_complete')
waiter.wait(StackName=stack_name)
print(f"Stack {stack_name} created successfully.")
elif 'No updates are to be performed' in str(e):
print(f"No updates needed for stack {stack_name}.")
else:
raise
except ClientError as e:
print(f"Error handling stack {stack_name}: {str(e)}")
raise
def deploy_infrastructure():
deploy_cloudformation('s3.yaml', 'S3',
parameters=[
{
'ParameterKey': 'BucketName',
'ParameterValue': os.environ.get('S3_BUCKET_NAME', 'default-role-name')
}
]) # Force recreation of Lambda role)
deploy_cloudformation('dynamo.yaml', 'DynamoDB',
parameters=[
{
'ParameterKey': 'DynamoDBName',
'ParameterValue': os.environ.get('DYNAMODB_TABLE_NAME', 'default-role-name')
}
])
deploy_cloudformation('sqs.yaml', 'SQS',
parameters=[
{
'ParameterKey': 'SQSQueueName',
'ParameterValue': os.environ.get('SQS_QUEUE_NAME', 'default-role-name')
}
])
deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True,
parameters=[
{
'ParameterKey': 'LambdaExecutionRoleName',
'ParameterValue': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME', 'default-role-name')
}
])
if __name__ == "__main__":
deploy_infrastructure()

View File

@@ -0,0 +1,90 @@
import boto3
import os
import zipfile
import io
from botocore.exceptions import ClientError
from src.utils.retry_logic import retry_with_backoff
# Set variables
LAMBDA_NAME = "RSSFeedProcessor"
LAMBDA_HANDLER = "lambda_function.lambda_handler"
ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID')
LAMBDA_ROLE_NAME = os.getenv('LAMBDA_EXECUTION_ROLE_NAME')
LAMBDA_ROLE_ARN = f"arn:aws:iam::{ACCOUNT_NUM}:role/{LAMBDA_ROLE_NAME}"
LAMBDA_TIMEOUT = 300
LAMBDA_MEMORY = 256
LAMBDA_RUNTIME = "python3.10"
def zip_directory(path):
print(f"Creating deployment package from {path}...")
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file:
for root, _, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, path)
zip_file.write(file_path, arcname)
return zip_buffer.getvalue()
@retry_with_backoff()
def update_function_code(lambda_client, function_name, zip_file):
return lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_file
)
@retry_with_backoff()
def update_function_configuration(lambda_client, function_name, handler, role, timeout, memory):
return lambda_client.update_function_configuration(
FunctionName=function_name,
Handler=handler,
Role=role,
Timeout=timeout,
MemorySize=memory
)
@retry_with_backoff()
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory):
return lambda_client.create_function(
FunctionName=function_name,
Runtime=runtime,
Role=role,
Handler=handler,
Code={'ZipFile': zip_file},
Timeout=timeout,
MemorySize=memory
)
def deploy_lambda():
lambda_client = boto3.client('lambda')
print(f"Starting deployment of Lambda function: {LAMBDA_NAME}")
deployment_package = zip_directory('src/lambda_function/src')
try:
# Check if the function exists
try:
lambda_client.get_function(FunctionName=LAMBDA_NAME)
function_exists = True
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
function_exists = False
else:
raise e
if function_exists:
print("Updating existing Lambda function...")
update_function_code(lambda_client, LAMBDA_NAME, deployment_package)
update_function_configuration(lambda_client, LAMBDA_NAME, LAMBDA_HANDLER, LAMBDA_ROLE_ARN, LAMBDA_TIMEOUT, LAMBDA_MEMORY)
else:
print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...")
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY)
print("Lambda deployment completed successfully!")
except Exception as e:
print(f"Error during Lambda deployment: {str(e)}")
raise
if __name__ == "__main__":
deploy_lambda()

View File

@@ -0,0 +1,5 @@
requests
newspaper3k
feedparser
python-dateutil
pandas

View File

@@ -0,0 +1,28 @@
import newspaper
import logging
logger = logging.getLogger()
def extract_article(url):
"""
Extracts the title and text of an article from the given URL.
Args:
url (str): The URL of the article.
Returns:
A tuple containing the title and text of the article, respectively.
"""
logger.debug(f"Starting Newspaper Article Extraction {url}")
config = newspaper.Config()
config.request_timeout = 60
article = newspaper.Article(url)
try:
article.download()
logger.debug(f"Downloaded Article {url}")
article.parse()
logger.debug(f"Parsed Article {url}")
return article.title, article.text
except Exception as e:
logger.error(f"Failed to extract article {url}: {str(e)}")
return None, None

View File

@@ -0,0 +1,20 @@
import os
# SQS Configuration
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
# S3 Configuration
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
# DynamoDB Configuration
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
# Logging Configuration
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
# RSS Feed Processing Configuration
MAX_ARTICLES_PER_FEED = int(os.environ.get('MAX_ARTICLES_PER_FEED', '10'))
FEED_PROCESSING_TIMEOUT = int(os.environ.get('FEED_PROCESSING_TIMEOUT', '90'))
# Article Extraction Configuration
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get('ARTICLE_EXTRACTION_TIMEOUT', '30'))

View File

@@ -0,0 +1,42 @@
import boto3
import json
import os
import logging
logger = logging.getLogger()
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
def save_article(article):
try:
# Save to S3
key = f"articles/{article['unixTime']}/{article['link'].split('/')[-1]}.json"
s3.put_object(
Bucket=CONTENT_BUCKET,
Key=key,
Body=json.dumps(article)
)
logger.info(f"Saved article to S3: {key}")
# Save to DynamoDB
table = dynamodb.Table(DYNAMODB_TABLE)
table.put_item(Item=article)
logger.info(f"Saved article to DynamoDB: {article['link']}")
except Exception as e:
logger.error(f"Failed to save article: {str(e)}")
def update_rss_feed(feed):
try:
table = dynamodb.Table(DYNAMODB_TABLE)
table.update_item(
Key={'u': feed['u']},
UpdateExpression='SET dt = :val',
ExpressionAttributeValues={':val': feed['dt']}
)
logger.info(f"Updated RSS feed in DynamoDB: {feed['u']}")
except Exception as e:
logger.error(f"Failed to update RSS feed: {str(e)}")

View File

@@ -0,0 +1,11 @@
class RSSProcessingError(Exception):
"""Exception raised for errors in the RSS processing."""
pass
class ArticleExtractionError(Exception):
"""Exception raised for errors in the article extraction."""
pass
class DataStorageError(Exception):
"""Exception raised for errors in data storage operations."""
pass

View File

@@ -0,0 +1,76 @@
import feedparser
from datetime import datetime
from dateutil import parser
import queue
import threading
import logging
from article_extractor import extract_article
logger = logging.getLogger()
def process_feed(feed: dict):
output_queue = queue.Queue()
stop_thread = threading.Event()
thread = threading.Thread(target=extract_feed, args=(feed, output_queue, stop_thread))
thread.daemon = True
thread.start()
logger.debug(f"Thread Started: {feed['u']}")
thread.join(timeout=90)
if thread.is_alive():
stop_thread.set()
logger.debug(f"Killing Thread: {feed['u']}")
return None
else:
try:
output = output_queue.get_nowait()
logger.info(f"Thread Succeeded: {feed['u']}")
return output
except queue.Empty:
logger.info(f"Thread Failed: {feed['u']}")
return None
def extract_feed(rss: dict, output_queue, stop_thread):
articles = []
feed_url = rss['u']
last_date = rss['dt']
max_date = last_date
try:
feed = feedparser.parse(feed_url)
for entry in feed['entries']:
if stop_thread.is_set():
break
pub_date = parse_pub_date(entry['published'])
if pub_date > last_date:
title, text = extract_article(entry.link)
article = {
'link': entry.link,
'rss': feed_url,
'title': title,
'content': text,
'unixTime': pub_date
}
articles.append(article)
max_date = max(max_date, pub_date)
output = {
'articles': articles,
'max_date': max_date,
'feed': rss
}
output_queue.put(output)
except Exception as e:
logger.error(f"Feed failed due to error: {e}")
def parse_pub_date(date_string):
try:
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
except ValueError:
try:
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
except ValueError:
return int(parser.parse(date_string).timestamp())

View File

@@ -0,0 +1,78 @@
import json
import time
from feed_processor import process_feed
from data_storage import save_article, update_rss_feed
from utils import setup_logging
from config import SQS_QUEUE_URL
from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError
from metrics import record_processed_articles, record_processing_time, record_extraction_errors
import boto3
# Set up logging
logger = setup_logging()
# Initialize AWS clients
sqs = boto3.client('sqs')
def lambda_handler(event, context):
logger.info("Starting RSS feed processing")
start_time = time.time()
try:
# Receive message from SQS
response = sqs.receive_message(
QueueUrl=SQS_QUEUE_URL,
MaxNumberOfMessages=1,
WaitTimeSeconds=0
)
if 'Messages' not in response:
logger.info("No messages in queue")
return {'statusCode': 200, 'body': json.dumps('No RSS feeds to process')}
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
feed = json.loads(message['Body'])
# Process the feed
result = process_feed(feed)
if result:
# Save articles and update feed
for article in result['articles']:
try:
save_article(article)
except DataStorageError as e:
logger.error(f"Failed to save article: {str(e)}")
record_extraction_errors(1)
update_rss_feed(result['feed'])
# Delete the message from the queue
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
logger.info(f"Processed feed: {feed['u']}")
# Record metrics
record_processed_articles(len(result['articles']))
else:
logger.warning(f"Failed to process feed: {feed['u']}")
record_extraction_errors(1)
except RSSProcessingError as e:
logger.error(f"RSS Processing Error: {str(e)}")
return {'statusCode': 500, 'body': json.dumps('RSS processing failed')}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return {'statusCode': 500, 'body': json.dumps('An unexpected error occurred')}
finally:
end_time = time.time()
processing_time = end_time - start_time
record_processing_time(processing_time)
logger.info(f"Lambda execution time: {processing_time:.2f} seconds")
return {
'statusCode': 200,
'body': json.dumps('RSS feed processed successfully')
}

View File

@@ -0,0 +1,26 @@
import boto3
import time
cloudwatch = boto3.client('cloudwatch')
def put_metric_data(metric_name, value, unit='Count'):
cloudwatch.put_metric_data(
Namespace='RSS/FeedProcessor',
MetricData=[
{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': time.time()
},
]
)
def record_processed_articles(count):
put_metric_data('ProcessedArticles', count)
def record_processing_time(duration):
put_metric_data('ProcessingTime', duration, 'Seconds')
def record_extraction_errors(count):
put_metric_data('ExtractionErrors', count)

View File

@@ -0,0 +1,8 @@
import logging
import os
def setup_logging():
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'INFO')
logger.setLevel(logging.getLevelName(log_level))
return logger

View File

@@ -0,0 +1,22 @@
import boto3
import os
from src.utils.retry_logic import retry_with_backoff
# Set variables
LAMBDA_NAME = "RSSFeedProcessor"
@retry_with_backoff()
def update_env_vars(function_name):
lambda_client = boto3.client('lambda')
env_vars = {
'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'),
'CONTENT_BUCKET': os.environ.get('S3_BUCKET_NAME'),
'DYNAMODB_TABLE': os.environ.get('DYNAMODB_TABLE_NAME'),
'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO')
}
return lambda_client.update_function_configuration(
FunctionName=LAMBDA_NAME,
Environment={'Variables': env_vars}
)

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,87 @@
import boto3
import subprocess
import os
import shutil
from botocore.exceptions import ClientError
# Set variables
LAYER_NAME = "RSSFeedProcessorDependencies"
BUCKET_NAME = os.getenv("S3_LAYER_BUCKET_NAME")
REQUIREMENTS_FILE = "src/lambda_function/layers/requirements.txt"
ZIP_FILE = f"{LAYER_NAME}.zip"
def create_s3_bucket_if_not_exists(bucket_name, region=None):
s3_client = boto3.client('s3', region_name=region)
try:
# Check if the bucket exists
s3_client.head_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
# Create the bucket
if region == 'us-east-1' or region is None:
# us-east-1 does not require LocationConstraint
s3_client.create_bucket(Bucket=bucket_name)
else:
# Other regions require LocationConstraint
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': region
}
)
print(f"Bucket '{bucket_name}' created.")
else:
# For any other errors, re-raise the exception
raise e
def create_lambda_layer():
# Create a temporary directory for the layer
os.makedirs("layer/python", exist_ok=True)
# Install dependencies
subprocess.check_call([
"pip", "install",
"-r", REQUIREMENTS_FILE,
"-t", "layer/python"
])
print("Finished Installing Packages")
# Create ZIP file
shutil.make_archive(LAYER_NAME, 'zip', "layer")
print("Finished Zipping Package")
# Create or update Lambda layer
lambda_client = boto3.client('lambda', region_name='us-east-1')
# Make sure the S3 bucket exists
create_s3_bucket_if_not_exists(BUCKET_NAME)
# Upload the zip file to S3
s3_client = boto3.client('s3')
s3_client.upload_file(ZIP_FILE, BUCKET_NAME, ZIP_FILE)
print(f"Uploaded {ZIP_FILE} to S3 bucket '{BUCKET_NAME}'.")
# Publish the layer using the S3 object
response = lambda_client.publish_layer_version(
LayerName=LAYER_NAME,
Description="Dependencies for RSS Feed Processor",
Content={
'S3Bucket': BUCKET_NAME,
'S3Key': ZIP_FILE
},
CompatibleRuntimes=['python3.10', 'python3.11']
)
print(f"Created Lambda layer version: {response['Version']}")
# Clean up
shutil.rmtree("layer")
os.remove(ZIP_FILE)
print("Lambda layer creation complete!")
if __name__ == "__main__":
create_lambda_layer()

View File

@@ -0,0 +1,34 @@
import boto3
from botocore.exceptions import ClientError
def create_s3_bucket_if_not_exists(bucket_name, region=None):
s3_client = boto3.client('s3', region_name=region)
try:
# Check if the bucket exists
s3_client.head_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
# If a 404 error is caught, it means the bucket does not exist
error_code = e.response['Error']['Code']
if error_code == '404':
# Create the bucket
if region is None:
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': region
}
)
print(f"Bucket '{bucket_name}' created.")
else:
# For any other errors, re-raise the exception
raise e
# Example usage
bucket_name = 'your-unique-bucket-name'
region = 'us-east-1' # Change this to your desired region
create_s3_bucket_if_not_exists(bucket_name, region)

27
src/utils/retry_logic.py Normal file
View File

@@ -0,0 +1,27 @@
import time
from botocore.exceptions import ClientError
def retry_with_backoff(max_retries=5, initial_backoff=1, backoff_multiplier=2):
def decorator(func):
def wrapper(*args, **kwargs):
retries = 0
backoff = initial_backoff
while retries < max_retries:
try:
return func(*args, **kwargs)
except ClientError as e:
if e.response['Error']['Code'] in ['ResourceConflictException', 'ResourceInUseException']:
if retries == max_retries - 1:
raise
wait_time = backoff * (2 ** retries)
print(f"Encountered {e.response['Error']['Code']}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
retries += 1
backoff *= backoff_multiplier
else:
raise
raise Exception(f"Function failed after {max_retries} retries.")
return wrapper
return decorator

View File

@@ -0,0 +1,57 @@
import json
import boto3
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def upload_rss_feeds(rss_feeds, table_name):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
logger.info(f"Uploading RSS feeds to table: {table_name}")
try:
# Get the table's key schema
key_schema = table.key_schema
partition_key = next(key['AttributeName'] for key in key_schema if key['KeyType'] == 'HASH')
except ClientError as e:
logger.error(f"Error getting table schema: {e.response['Error']['Message']}")
return
new_items = 0
existing_items = 0
for feed in rss_feeds:
# Check if the item already exists
try:
response = table.get_item(Key={partition_key: feed['u']})
except ClientError as e:
logger.error(f"Error checking for existing item: {e.response['Error']['Message']}")
continue
if 'Item' not in response:
# Item doesn't exist, insert new item
item = {partition_key: feed['u'], 'dt': 0}
item.update(feed)
try:
table.put_item(Item=item)
new_items += 1
except ClientError as e:
logger.error(f"Error inserting new item: {e.response['Error']['Message']}")
else:
existing_items += 1
logger.info(f"Upload complete. {new_items} new items inserted. {existing_items} items already existed.")
if __name__ == "__main__":
table_name = 'rss-feeds-table'
rss_feed_path = 'rss_feeds.json'
with open(rss_feed_path) as f:
rss_feeds = json.load(f)
logger.info(f"Loaded RSS feeds: {rss_feeds}")
upload_rss_feeds(rss_feeds, table_name)

26
template.env Normal file
View File

@@ -0,0 +1,26 @@
# AWS Configuration
AWS_REGION=us-east-$
AWS_ACCOUNT_ID=$$$$$$$$$
# Access keys (only use these for local development, NEVER in production)
AWS_ACCESS_KEY_ID=$$$$$$$$$
AWS_SECRET_ACCESS_KEY=$$$$$$$$$
# Resource Names (without ARNs or full URLs)
LAMBDA_FUNCTION_NAME=rss-feed-processor
LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role
S3_BUCKET_NAME=rss-feed-processor-bucket
DYNAMODB_TABLE_NAME=rss-feeds-table
SQS_QUEUE_NAME=rss-feed-queue
S3_LAYER_BUCKET_NAME=rss-feed-processor-layers
# RSS Feed Processing Configuration
MAX_ARTICLES_PER_FEED=10
FEED_PROCESSING_TIMEOUT=90
# Logging Configuration
LOG_LEVEL=INFO
# Other Application Settings
APP_NAME=RSS Feed Processor
VERSION=1.0.0

11
todo.md Normal file
View File

@@ -0,0 +1,11 @@
* [ ] Make sure lambda works base
* [ ] Make sure the lambda syncs up well with the sqs and can easily pull items from dynamoDB.
* [ ]
* [ ] Version control lambda packages
* [ ] RSS Feed Easy Insertion
* [ ] environment variable template
* [ ] Shoudl we do some vector database stuff with this repo as well?
* [ ] We should probably make another module which makes it fairly easy to query all this data from
anywhere
* [ ] Add in a scheduler for the lambda