Batch changes.

This commit is contained in:
Charles E. Gormley
2024-09-02 20:16:51 -04:00
parent 02833626c0
commit 65e91e62ed
13 changed files with 128 additions and 203 deletions

0
CHANGELOG.md Normal file
View File

0
CONTRIBUTING.md Normal file
View File

110
README.md
View File

@@ -2,103 +2,8 @@
OpenRSS is an AWS-based RSS feed processing system that automatically fetches, processes, and stores articles from specified RSS feeds. OpenRSS is an AWS-based RSS feed processing system that automatically fetches, processes, and stores articles from specified RSS feeds.
## Project Structure
```
OpenRSS/
├── src/
│ ├── infra/
│ │ ├── cloudformation/
│ │ │ ├── s3.yaml
│ │ │ ├── dynamo.yaml
│ │ │ └── sqs.yaml
│ │ └── deploy_infrastructure.py
│ ├── lambda_function/
│ │ ├── src/
│ │ │ ├── lambda_function.py
│ │ │ ├── feed_processor.py
│ │ │ ├── article_extractor.py
│ │ │ ├── data_storage.py
│ │ │ ├── utils.py
│ │ │ ├── config.py
│ │ │ ├── exceptions.py
│ │ │ └── metrics.py
│ │ ├── tests/
│ │ │ └── test_lambda_function.py
│ │ ├── layers/
│ │ │ └── requirements.txt
│ │ ├── deploy_lambda.py
│ │ └── update_env_vars.py
│ └── utils/
│ ├── create_lambda_layer.py
│ └── upload_rss_feeds.py
├── launch.py
├── rss_feeds.json
├── requirements.txt
└── README.md
```
## Prerequisites
- Python 3.8+
- AWS CLI configured with appropriate permissions
- An AWS account with necessary services (S3, DynamoDB, SQS, Lambda) enabled
## Setup
1. Clone the repository:
```
git clone https://github.com/yourusername/OpenRSS.git
cd OpenRSS
```
2. Install the required dependencies:
```
pip install -r requirements.txt
```
3. Create a `.env` file in the root directory with the following content:
```
AWS_ACCESS_KEY_ID=your_access_key_here
AWS_SECRET_ACCESS_KEY=your_secret_key_here
AWS_REGION=us-east-1
```
4. Update the `rss_feeds.json` file with the RSS feeds you want to process.
## Usage
To deploy the infrastructure and start the RSS feed processor:
```
python launch.py
```
This script will:
1. Deploy the necessary AWS infrastructure (S3, DynamoDB, SQS) using CloudFormation.
2. Create and upload the Lambda layer.
3. Deploy the Lambda function.
4. Upload the RSS feeds to DynamoDB.
5. Trigger an initial execution of the Lambda function.
## Infrastructure
The project uses the following AWS services:
- S3: Stores processed articles
- DynamoDB: Stores RSS feed information and processing status
- SQS: Queues RSS feeds for processing
- Lambda: Processes RSS feeds and extracts articles
## Lambda Function
The Lambda function (`src/lambda_function/src/lambda_function.py`) is triggered periodically to process RSS feeds. It:
1. Retrieves RSS feed information from DynamoDB
2. Fetches and parses the RSS feed
3. Extracts articles using the newspaper3k library
4. Stores processed articles in S3
5. Updates the feed's last processed timestamp in DynamoDB
## Customization ## Customization
@@ -106,22 +11,13 @@ The Lambda function (`src/lambda_function/src/lambda_function.py`) is triggered
- To change the Lambda function's behavior, modify the Python files in `src/lambda_function/src/`. - To change the Lambda function's behavior, modify the Python files in `src/lambda_function/src/`.
- To add or remove RSS feeds, update the `rss_feeds.json` file. - To add or remove RSS feeds, update the `rss_feeds.json` file.
## Testing
To run the tests for the Lambda function:
```
python -m pytest src/lambda_function/tests/
```
## Monitoring ## Monitoring
The Lambda function logs its activities to CloudWatch Logs. You can monitor the function's performance and any errors through the AWS CloudWatch console. The Lambda function logs its activities to CloudWatch Logs. You can monitor the function's performance and any errors through the AWS CloudWatch console.
## Contributing ## Contributing
We are still working on a contribution framework. But they are more than welcome! Feel free to submit a PR which will be approved by the team.
Contributions are welcome! Please feel free to submit a Pull Request. Check
## License ## License
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT)
This project is licensed under the MIT License.

View File

@@ -4,6 +4,7 @@ import json
import boto3 import boto3
from dotenv import load_dotenv from dotenv import load_dotenv
import logging import logging
from src.infra.lambdas.RSSQueueFiller.deploy_sqs_filler_lambda import deploy_sqs_filler
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
@@ -36,18 +37,30 @@ from src.feed_management.upload_rss_feeds import upload_rss_feeds
def main(): def main():
# Deploy infrastructure # Deploy infrastructure
# deploy_infrastructure() # TODO: Add in sqs lambda filler here. deploy_infrastructure()
# logging.info("Finished Deploying Infrastructure") logging.info("Finished Deploying Infrastructure")
# Deploy Lambda function # Deploy Lambda function
deploy_lambda() deploy_lambda()
print("Finished Deploying Lambda") logging.info("Finished Deploying Lambda")
deploy_sqs_filler()
logging.info("Finished Deploying SQS Filler Lambda")
# Update Lambda environment variables # Update Lambda environment variables
update_env_vars(LAMBDA_FUNCTION_NAME) update_env_vars(LAMBDA_FUNCTION_NAME)
print("Finished Environment Variable Updates") print("Finished Environment Variable Updates")
# TODO: Add in an eventbridge timer to trigger the lambda.
# TODO: Add in a 2x check to make sure the queue trigger and the eb trigger are enabled.
# Upload RSS feeds # Upload RSS feeds
rss_feeds_file = os.path.join(current_dir, "rss_feeds.json") rss_feeds_file = os.path.join(current_dir, "rss_feeds.json")
if os.path.exists(rss_feeds_file): if os.path.exists(rss_feeds_file):

View File

@@ -8,7 +8,7 @@ load_dotenv()
region = os.getenv("AWS_REGION") region = os.getenv("AWS_REGION")
index_name = os.getenv("PINECONE_DB_NAME") index_name = os.getenv("PINECONE_DB_NAME")
index_name = "quickstart" # TODO: Remove this line after we are done testing with vector dbs. index_name = "quickstart" # TODO: Remove this line after we are done testing with vector dbs.
if index_name not in pc.list_indexes().names(): if index_name not in pc.list_indexes().names():
pc.create_index( pc.create_index(

View File

@@ -5,6 +5,9 @@ import json
from src.utils.retry_logic import retry_with_backoff from src.utils.retry_logic import retry_with_backoff
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from dotenv import load_dotenv
load_dotenv()
region_name = os.getenv("AWS_REGION") region_name = os.getenv("AWS_REGION")
kms_client = boto3.client('kms', region_name=region_name) kms_client = boto3.client('kms', region_name=region_name)
stack_base = os.getenv("STACK_BASE") stack_base = os.getenv("STACK_BASE")
@@ -13,6 +16,7 @@ stack_base = os.getenv("STACK_BASE")
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]): def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
cf_client = boto3.client('cloudformation') cf_client = boto3.client('cloudformation')
stack_name = f"{stack_base}-{stack_suffix}" stack_name = f"{stack_base}-{stack_suffix}"
print(stack_name)
with open(f'src/infra/cloudformation/{template_file}', 'r') as file: with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
template_body = file.read() template_body = file.read()
@@ -58,11 +62,11 @@ def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, par
elif 'No updates are to be performed' in str(e): elif 'No updates are to be performed' in str(e):
print(f"No updates needed for stack {stack_name}.") print(f"No updates needed for stack {stack_name}.")
else: else:
raise ClientError raise
except ClientError as e: except ClientError as e:
print(f"Error handling stack {stack_name}: {str(e)}") print(f"Error handling stack {stack_name}: {str(e)}")
raise ClientError raise
def get_or_create_kms_key(): def get_or_create_kms_key():
# Create a KMS client # Create a KMS client
@@ -160,7 +164,7 @@ def deploy_infrastructure():
} }
]) ])
deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True, deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True,
parameters=[ parameters=[
{ {
'ParameterKey': 'LambdaExecutionRoleName', 'ParameterKey': 'LambdaExecutionRoleName',
'ParameterValue': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME', 'default-role-name') 'ParameterValue': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME', 'default-role-name')
@@ -169,7 +173,8 @@ def deploy_infrastructure():
'ParameterKey': 'LambdaKMSKeyArn', 'ParameterKey': 'LambdaKMSKeyArn',
'ParameterValue': kms_key_arn 'ParameterValue': kms_key_arn
} }
]) ]
)
# TODO: Figure out KMS Stuff, but for now just do it in the console. I would like to get the rest of the cloudformation working # TODO: Figure out KMS Stuff, but for now just do it in the console. I would like to get the rest of the cloudformation working
# before I start messing with KMS keys. # before I start messing with KMS keys.

View File

@@ -1,77 +0,0 @@
# TODO: Delete this... probably? If not move it somewhere elssse.
import boto3
import os
from botocore.exceptions import ClientError
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Set up AWS clients
sqs = boto3.client('sqs')
lambda_client = boto3.client('lambda')
# Get environment variables
region = os.getenv("AWS_REGION")
account_id = os.getenv("AWS_ACCOUNT_ID")
SQS_QUEUE_NAME = os.getenv("SQS_QUEUE_NAME")
LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME")
def get_or_create_sqs_queue():
try:
# Try to get the queue URL first
response = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)
queue_url = response['QueueUrl']
print(f"SQS queue already exists. URL: {queue_url}")
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
# Queue doesn't exist, so create it
try:
response = sqs.create_queue(QueueName=SQS_QUEUE_NAME)
queue_url = response['QueueUrl']
print(f"SQS queue created. URL: {queue_url}")
except ClientError as create_error:
print(f"Error creating SQS queue: {create_error}")
return None
else:
print(f"Error getting SQS queue: {e}")
return None
return queue_url
def configure_lambda_trigger(function_name, queue_url):
try:
# Get the SQS queue ARN
queue_attributes = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['QueueArn']
)
queue_arn = queue_attributes['Attributes']['QueueArn']
# Check if Lambda function exists
try:
lambda_client.get_function(FunctionName=function_name)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print(f"Lambda function '{function_name}' does not exist. Please create it first.")
return
else:
raise
# Add the SQS queue as an event source for the Lambda function
response = lambda_client.create_event_source_mapping(
EventSourceArn=queue_arn,
FunctionName=function_name,
Enabled=True,
BatchSize=10 # Number of messages to process in one batch
)
print(f"SQS trigger configured for Lambda. UUID: {response['UUID']}")
except ClientError as e:
print(f"Error configuring Lambda trigger: {e}")
if __name__ == "__main__":
queue_url = get_or_create_sqs_queue()
if queue_url:
configure_lambda_trigger(LAMBDA_FUNCTION_NAME, queue_url)
else:
print("Failed to get or create SQS queue. Lambda trigger configuration aborted.")

View File

@@ -14,18 +14,18 @@ import logging
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO')) logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
# Set variables # Set variables
# TODO: Set environment variables
LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME') LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME')
LAMBDA_HANDLER = "lambda_function.lambda_handler"
ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID') ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID')
LAMBDA_ROLE_NAME = os.getenv('LAMBDA_EXECUTION_ROLE_NAME') LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN")
LAMBDA_ROLE_ARN = f"arn:aws:iam::{ACCOUNT_NUM}:role/{LAMBDA_ROLE_NAME}"
LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT')) LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT'))
LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY')) LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY'))
LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME') LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME')
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"
S3_LAYER_BUCKET_NAME = os.getenv('S3_LAYER_BUCKET_NAME') S3_LAYER_BUCKET_NAME = os.getenv('S3_LAYER_BUCKET_NAME')
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
LAMBDA_HANDLER = "lambda_function.lambda_handler"
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"
S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip' S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip'
def zip_directory(path): def zip_directory(path):

View File

@@ -1,18 +1,23 @@
import os import os
import zipfile import zipfile
import logging
import boto3 import boto3
from dotenv import load_dotenv from dotenv import load_dotenv
from deploy_infrastructure import deploy_cloudformation from src.infra.deploy_infrastructure import deploy_cloudformation
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
# Set up logging
logging.basicConfig(level=os.getenv('LOG_LEVEL'))
# Set up S3 client # Set up S3 client
s3 = boto3.client('s3') s3 = boto3.client('s3')
def zip_lambda_code(): def zip_lambda_code():
lambda_dir = 'src/infra/RSSQueueFillerLambda/lambda' lambda_dir = 'src/infra/lambdas/RSSQueueFiller/lambda'
zip_path = 'tmp/lambda_function.zip' zip_path = 'tmp/lambda_function.zip'
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
@@ -27,7 +32,6 @@ def zip_lambda_code():
def upload_to_s3(file_path): def upload_to_s3(file_path):
s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY') s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
bucket_name = os.getenv('S3_LAYER_BUCKET_NAME') bucket_name = os.getenv('S3_LAYER_BUCKET_NAME')
s3.upload_file(file_path, bucket_name, s3_key) s3.upload_file(file_path, bucket_name, s3_key)
return f's3://{bucket_name}/{s3_key}' return f's3://{bucket_name}/{s3_key}'

View File

@@ -1,9 +1,12 @@
# TODO: Clean up aws cdk stuff.
# TODO: Reorganize infra folder structure.
# TODO: Add in console setup python script for new project into launch.py # TODO: Add in console setup python script for new project into launch.py
# TODO: Eventbridge set up ( make sure this works )
# TODO: Automate eventbridge.
# TODO: Make sure ingestion of articles actually works
# Modules # Modules
* More RSS Feed Module * More RSS Feed Module
* Update Lambda Layer Creation Script to be comprehensive.
# Future Modules # Future Modules
* Gen AI Summarization Module * Gen AI Summarization Module

81
tree.md Normal file
View File

@@ -0,0 +1,81 @@
.
├── README.md
├── launch.py
├── requirements.txt
├── rss_feeds.json
├── src
│   ├── article_storage
│   │   ├── __pycache__
│   │   │   └── initialize.cpython-310.pyc
│   │   ├── create_index.py
│   │   └── initialize.py
│   ├── feed_management
│   │   ├── __pycache__
│   │   │   └── upload_rss_feeds.cpython-312.pyc
│   │   └── upload_rss_feeds.py
│   ├── infra
│   │   ├── __pycache__
│   │   │   └── deploy_infrastructure.cpython-312.pyc
│   │   ├── cloudformation
│   │   │   ├── dynamo.yaml
│   │   │   ├── lambda_role.yaml
│   │   │   ├── rss_lambda_stack.yaml
│   │   │   ├── s3.yaml
│   │   │   └── sqs.yaml
│   │   ├── deploy_infrastructure.py
│   │   ├── lambdas
│   │   │   ├── RSSFeedProcessorLambda
│   │   │   │   ├── __pycache__
│   │   │   │   │   ├── deploy_lambda.cpython-310.pyc
│   │   │   │   │   ├── deploy_lambda.cpython-311.pyc
│   │   │   │   │   ├── deploy_lambda.cpython-312.pyc
│   │   │   │   │   ├── deploy_rss_feed_lambda.cpython-312.pyc
│   │   │   │   │   ├── update_env_vars.cpython-310.pyc
│   │   │   │   │   ├── update_lambda_env_vars.cpython-310.pyc
│   │   │   │   │   ├── update_lambda_env_vars.cpython-311.pyc
│   │   │   │   │   └── update_lambda_env_vars.cpython-312.pyc
│   │   │   │   ├── deploy_rss_feed_lambda.py
│   │   │   │   ├── layers
│   │   │   │   │   └── requirements.txt
│   │   │   │   └── src
│   │   │   │   ├── __pycache__
│   │   │   │   │   └── utils.cpython-310.pyc
│   │   │   │   ├── article_extractor.py
│   │   │   │   ├── config.py
│   │   │   │   ├── data_storage.py
│   │   │   │   ├── exceptions.py
│   │   │   │   ├── feed_processor.py
│   │   │   │   ├── lambda_function.py
│   │   │   │   ├── metrics.py
│   │   │   │   └── utils.py
│   │   │   ├── RSSQueueFiller
│   │   │   │   ├── deploy_sqs_filler_lambda.py
│   │   │   │   └── lambda
│   │   │   │   └── lambda_function.py
│   │   │   └── lambda_utils
│   │   │   ├── __pycache__
│   │   │   │   └── update_lambda_env_vars.cpython-312.pyc
│   │   │   ├── lambda_layer
│   │   │   │   └── lambda_layer_cloud9.sh
│   │   │   └── update_lambda_env_vars.py
│   │   └── tmp
│   └── utils
│   ├── __pycache__
│   │   ├── create_lambda_layer.cpython-310.pyc
│   │   ├── create_lambda_layer.cpython-311.pyc
│   │   ├── create_lambda_layer.cpython-312.pyc
│   │   ├── create_s3_bucket.cpython-310.pyc
│   │   ├── kms_update.cpython-310.pyc
│   │   ├── kms_update.cpython-311.pyc
│   │   ├── kms_update.cpython-312.pyc
│   │   ├── retry_logic.cpython-310.pyc
│   │   ├── retry_logic.cpython-311.pyc
│   │   ├── retry_logic.cpython-312.pyc
│   │   ├── upload_rss_feeds.cpython-310.pyc
│   │   ├── upload_rss_feeds.cpython-311.pyc
│   │   └── upload_rss_feeds.cpython-312.pyc
│   └── retry_logic.py
├── template.env
├── tmp
├── todo.md
└── tree.md