mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-18 06:24:21 +01:00
9/2
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
repo_structure.txt
|
||||
.env
|
||||
/layer/python*
|
||||
*__pycache__*
|
||||
3
.vscode/settings.json
vendored
Normal file
3
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"todo-tree.tree.showBadges": false
|
||||
}
|
||||
15
launch.py
15
launch.py
@@ -3,10 +3,13 @@ import sys
|
||||
import json
|
||||
import boto3
|
||||
from dotenv import load_dotenv
|
||||
|
||||
import logging
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=os.getenv('LOG_LEVEL'))
|
||||
|
||||
# Set AWS credentials from environment variables
|
||||
os.environ['AWS_ACCESS_KEY_ID'] = os.getenv('AWS_ACCESS_KEY_ID')
|
||||
os.environ['AWS_SECRET_ACCESS_KEY'] = os.getenv('AWS_SECRET_ACCESS_KEY')
|
||||
@@ -27,14 +30,14 @@ current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.append(current_dir)
|
||||
|
||||
from src.infra.deploy_infrastructure import deploy_infrastructure
|
||||
from src.utils.create_lambda_layer import create_lambda_layer
|
||||
from src.lambda_function.deploy_lambda import deploy_lambda
|
||||
from src.lambda_function.update_lambda_env_vars import update_env_vars
|
||||
from src.utils.upload_rss_feeds import upload_rss_feeds
|
||||
from src.infra.lambdas.RSSFeedProcessorLambda.deploy_rss_feed_lambda import deploy_lambda
|
||||
from src.infra.lambdas.lambda_utils.update_lambda_env_vars import update_env_vars
|
||||
from src.feed_management.upload_rss_feeds import upload_rss_feeds
|
||||
|
||||
def main():
|
||||
# Deploy infrastructure
|
||||
deploy_infrastructure()
|
||||
# deploy_infrastructure() # TODO: Add in sqs lambda filler here.
|
||||
# logging.info("Finished Deploying Infrastructure")
|
||||
|
||||
|
||||
# Deploy Lambda function
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
boto3
|
||||
python-dotenv
|
||||
requests
|
||||
boto3==1.35.*
|
||||
python-dotenv==1.0.*
|
||||
requests==2.32.*
|
||||
constructs==10.2.69
|
||||
# Optional, yet necessary for the Pinecone SDK functionality.
|
||||
pinecone==5.1.*
|
||||
62
src/article_storage/create_index.py
Normal file
62
src/article_storage/create_index.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from pinecone import Pinecone, ServerlessSpec
|
||||
|
||||
from initialize import pc
|
||||
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
region = os.getenv("AWS_REGION")
|
||||
index_name = os.getenv("PINECONE_DB_NAME")
|
||||
index_name = "quickstart" # TODO: Remove this line after we are done testing with vector dbs.
|
||||
|
||||
if index_name not in pc.list_indexes().names():
|
||||
pc.create_index(
|
||||
name=index_name,
|
||||
dimension=2,
|
||||
metric="cosine",
|
||||
spec=ServerlessSpec(
|
||||
cloud='aws',
|
||||
region='us-east-1'
|
||||
)
|
||||
)
|
||||
|
||||
index = pc.Index(index_name)
|
||||
|
||||
index.upsert(
|
||||
vectors=[
|
||||
{"id": "vec1", "values": [1.0, 1.5]},
|
||||
{"id": "vec2", "values": [2.0, 1.0]},
|
||||
{"id": "vec3", "values": [0.1, 3.0]},
|
||||
],
|
||||
namespace="example-namespace1"
|
||||
)
|
||||
|
||||
index.upsert(
|
||||
vectors=[
|
||||
{"id": "vec1", "values": [1.0, -2.5]},
|
||||
{"id": "vec2", "values": [3.0, -2.0]},
|
||||
{"id": "vec3", "values": [0.5, -1.5]},
|
||||
],
|
||||
namespace="example-namespace2"
|
||||
)
|
||||
|
||||
print(index.describe_index_stats())
|
||||
|
||||
query_results1 = index.query(
|
||||
namespace="example-namespace1",
|
||||
vector=[1.0, 1.5],
|
||||
top_k=3,
|
||||
include_values=True
|
||||
)
|
||||
|
||||
print(query_results1)
|
||||
|
||||
query_results2 = index.query(
|
||||
namespace="example-namespace2",
|
||||
vector=[1.0,-2.5],
|
||||
top_k=3,
|
||||
include_values=True
|
||||
)
|
||||
|
||||
print(query_results2)
|
||||
9
src/article_storage/initialize.py
Normal file
9
src/article_storage/initialize.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from pinecone import Pinecone
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
# Set up Pinecone client
|
||||
api_key = os.getenv("PINCEONE_API_KEY")
|
||||
|
||||
pc = Pinecone(api_key=api_key)
|
||||
@@ -1,50 +0,0 @@
|
||||
# File: rss_lambda_stack.py
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from aws_cdk import (
|
||||
App,
|
||||
Stack,
|
||||
aws_lambda as _lambda,
|
||||
aws_iam as iam,
|
||||
Duration
|
||||
)
|
||||
from constructs import Construct
|
||||
|
||||
class SqsFillerLambdaStack(Stack):
|
||||
|
||||
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
|
||||
super().__init__(scope, construct_id, **kwargs)
|
||||
|
||||
# Create Lambda Function
|
||||
self.sqs_filler = _lambda.Function(
|
||||
self, "SqsFillerFunction",
|
||||
function_name=os.getenv("QUEUE_FILLER_LAMBDA_NAME"),
|
||||
runtime=_lambda.Runtime.PYTHON_3_12,
|
||||
handler="lambda_function.handler",
|
||||
code=_lambda.Code.from_asset("src/infra/RSSQueueFillerLambda/lambda"),
|
||||
timeout=Duration.minutes(5),
|
||||
environment={
|
||||
"SQS_QUEUE_URL": os.getenv("SQS_QUEUE_URL"),
|
||||
"DYNAMODB_TABLE_NAME": os.getenv("DYNAMODB_TABLE_NAME")
|
||||
}
|
||||
)
|
||||
|
||||
# Grant Lambda permission to scan DynamoDB
|
||||
self.sqs_filler.add_to_role_policy(iam.PolicyStatement(
|
||||
actions=["dynamodb:Scan"],
|
||||
resources=[os.getenv("DYNAMODB_TABLE_ARN")]
|
||||
))
|
||||
|
||||
# Grant Lambda permission to send messages to SQS
|
||||
self.sqs_filler.add_to_role_policy(iam.PolicyStatement(
|
||||
actions=["sqs:SendMessage"],
|
||||
resources=[os.getenv("SQS_QUEUE_ARN")]
|
||||
))
|
||||
|
||||
# Main
|
||||
if __name__ == "__main__":
|
||||
app = App()
|
||||
SqsFillerLambdaStack(app, "SqsFillerLambdaStack")
|
||||
app.synth()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
92
src/infra/cloudformation/rss_lambda_stack.yaml
Normal file
92
src/infra/cloudformation/rss_lambda_stack.yaml
Normal file
@@ -0,0 +1,92 @@
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: SQS Filler Lambda Stack
|
||||
|
||||
Parameters:
|
||||
QueueFillerLambdaName:
|
||||
Type: String
|
||||
Description: Name of the Lambda function
|
||||
SqsQueueUrl:
|
||||
Type: String
|
||||
Description: URL of the SQS queue
|
||||
DynamoDbTableName:
|
||||
Type: String
|
||||
Description: Name of the DynamoDB table
|
||||
DynamoDbTableArn:
|
||||
Type: String
|
||||
Description: ARN of the DynamoDB table
|
||||
SqsQueueArn:
|
||||
Type: String
|
||||
Description: ARN of the SQS queue
|
||||
LambdaCodeS3Bucket:
|
||||
Type: String
|
||||
Description: S3 bucket containing the Lambda function code
|
||||
LambdaCodeS3Key:
|
||||
Type: String
|
||||
Description: S3 key for the Lambda function code
|
||||
LambdaRuntime:
|
||||
Type: String
|
||||
Description: Lambda runtime
|
||||
Default: python3.12
|
||||
LambdaTimeout:
|
||||
Type: Number
|
||||
Description: Lambda timeout in seconds
|
||||
Default: 300
|
||||
|
||||
Resources:
|
||||
SqsFillerFunction:
|
||||
Type: AWS::Lambda::Function
|
||||
Properties:
|
||||
FunctionName: !Ref QueueFillerLambdaName
|
||||
Runtime: !Ref LambdaRuntime
|
||||
Handler: lambda_function.handler
|
||||
Code:
|
||||
S3Bucket: !Ref LambdaCodeS3Bucket
|
||||
S3Key: !Ref LambdaCodeS3Key
|
||||
Timeout: !Ref LambdaTimeout
|
||||
Environment:
|
||||
Variables:
|
||||
SQS_QUEUE_URL: !Ref SqsQueueUrl
|
||||
DYNAMODB_TABLE_NAME: !Ref DynamoDbTableName
|
||||
Role: !GetAtt SqsFillerFunctionRole.Arn
|
||||
|
||||
SqsFillerFunctionRole:
|
||||
Type: AWS::IAM::Role
|
||||
Properties:
|
||||
AssumeRolePolicyDocument:
|
||||
Version: '2012-10-17'
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Principal:
|
||||
Service: lambda.amazonaws.com
|
||||
Action: sts:AssumeRole
|
||||
Policies:
|
||||
- PolicyName: LambdaExecutionPolicy
|
||||
PolicyDocument:
|
||||
Version: '2012-10-17'
|
||||
Statement:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- logs:CreateLogGroup
|
||||
- logs:CreateLogStream
|
||||
- logs:PutLogEvents
|
||||
Resource: arn:aws:logs:*:*:*
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- dynamodb:Scan
|
||||
Resource: !Ref DynamoDbTableArn
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- sqs:SendMessage
|
||||
Resource: !Ref SqsQueueArn
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- s3:GetObject
|
||||
Resource: !Sub arn:aws:s3:::${LambdaCodeS3Bucket}/${LambdaCodeS3Key}
|
||||
|
||||
Outputs:
|
||||
SqsFillerFunctionArn:
|
||||
Description: ARN of the SQS Filler Lambda Function
|
||||
Value: !GetAtt SqsFillerFunction.Arn
|
||||
SqsFillerFunctionRoleArn:
|
||||
Description: ARN of the IAM Role for SQS Filler Lambda Function
|
||||
Value: !GetAtt SqsFillerFunctionRole.Arn
|
||||
@@ -2,20 +2,21 @@ import boto3
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from src.utils.retry_logic import retry_with_backoff
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
region_name = os.getenv("AWS_REGION")
|
||||
kms_client = boto3.client('kms', region_name=region_name)
|
||||
stack_base = os.getenv("STACK_BASE")
|
||||
|
||||
@retry_with_backoff()
|
||||
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
|
||||
cf_client = boto3.client('cloudformation')
|
||||
stack_name = f"rss-feed-processor-{stack_suffix}"
|
||||
stack_name = f"{stack_base}-{stack_suffix}"
|
||||
|
||||
with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
|
||||
template_body = file.read()
|
||||
|
||||
print(f"Template contents:\n{template_body}")
|
||||
|
||||
capabilities = ['CAPABILITY_NAMED_IAM']
|
||||
|
||||
|
||||
@@ -57,11 +58,11 @@ def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, par
|
||||
elif 'No updates are to be performed' in str(e):
|
||||
print(f"No updates needed for stack {stack_name}.")
|
||||
else:
|
||||
raise
|
||||
raise ClientError
|
||||
|
||||
except ClientError as e:
|
||||
print(f"Error handling stack {stack_name}: {str(e)}")
|
||||
raise
|
||||
raise ClientError
|
||||
|
||||
def get_or_create_kms_key():
|
||||
# Create a KMS client
|
||||
@@ -72,8 +73,6 @@ def get_or_create_kms_key():
|
||||
|
||||
account_id = os.getenv('AWS_ACCOUNT_ID')
|
||||
|
||||
|
||||
|
||||
try:
|
||||
# List all KMS keys
|
||||
response = kms_client.list_keys()
|
||||
@@ -143,7 +142,7 @@ def deploy_infrastructure():
|
||||
parameters=[
|
||||
{
|
||||
'ParameterKey': 'BucketName',
|
||||
'ParameterValue': os.environ.get('S3_BUCKET_NAME', 'default-bucket-name')
|
||||
'ParameterValue': os.getenv('S3_BUCKET_NAME')
|
||||
}
|
||||
])
|
||||
deploy_cloudformation('dynamo.yaml', 'DynamoDB',
|
||||
@@ -172,7 +171,8 @@ def deploy_infrastructure():
|
||||
}
|
||||
])
|
||||
|
||||
# TODO: Figure out KMS Stuff, but for now just do it in the console
|
||||
# TODO: Figure out KMS Stuff, but for now just do it in the console. I would like to get the rest of the cloudformation working
|
||||
# before I start messing with KMS keys.
|
||||
|
||||
if __name__ == "__main__":
|
||||
deploy_infrastructure()
|
||||
@@ -1,3 +1,4 @@
|
||||
# TODO: Delete this... probably? If not move it somewhere elssse.
|
||||
import boto3
|
||||
import os
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
@@ -10,17 +10,21 @@ import time
|
||||
import sys
|
||||
from src.infra.deploy_infrastructure import get_or_create_kms_key
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
|
||||
|
||||
# Set variables
|
||||
LAMBDA_NAME = "RSSFeedProcessor"
|
||||
# TODO: Set environment variables
|
||||
LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME')
|
||||
LAMBDA_HANDLER = "lambda_function.lambda_handler"
|
||||
ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID')
|
||||
LAMBDA_ROLE_NAME = os.getenv('LAMBDA_EXECUTION_ROLE_NAME')
|
||||
LAMBDA_ROLE_ARN = f"arn:aws:iam::{ACCOUNT_NUM}:role/{LAMBDA_ROLE_NAME}"
|
||||
LAMBDA_TIMEOUT = 300
|
||||
LAMBDA_MEMORY = 256
|
||||
LAMBDA_RUNTIME = "python3.11"
|
||||
LAMBDA_STACK_NAME = "rss-feed-processor-Lambda"
|
||||
LAMBDA_LAYER_NAME = "RSSFeedProcessorLayer"
|
||||
LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT'))
|
||||
LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY'))
|
||||
LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME')
|
||||
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
|
||||
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"
|
||||
S3_LAYER_BUCKET_NAME = os.getenv('S3_LAYER_BUCKET_NAME')
|
||||
S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip'
|
||||
|
||||
@@ -42,36 +46,13 @@ def update_function_code(lambda_client, function_name, zip_file):
|
||||
ZipFile=zip_file
|
||||
)
|
||||
|
||||
@retry_with_backoff()
|
||||
def get_or_create_lambda_layer():
|
||||
layer_arn = 'arn:aws:lambda:us-east-1:966265353179:layer:OpenRSSLambdaLayer:3'
|
||||
layer_arn = os.getenv('LAMBDA_LAYER_ARN')
|
||||
|
||||
return layer_arn
|
||||
|
||||
|
||||
def wait_for_function_update_to_complete(lambda_client, function_name, max_attempts=30, delay=10):
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
response = lambda_client.get_function(FunctionName=function_name)
|
||||
state = response['Configuration']['State']
|
||||
if state == 'Active':
|
||||
return True
|
||||
elif state == 'Failed':
|
||||
print(f"Function update failed: {response['Configuration'].get('StateReason')}")
|
||||
return False
|
||||
print(f"Function {function_name} is in {state} state. Waiting...")
|
||||
except ClientError as e:
|
||||
print(f"Error checking function state: {e}")
|
||||
return False
|
||||
time.sleep(delay)
|
||||
print(f"Timeout waiting for function {function_name} to become active.")
|
||||
return False
|
||||
|
||||
@retry_with_backoff()
|
||||
@retry_with_backoff(max_retries=50, initial_backoff=5, backoff_multiplier=2) # Note: This function usually takes a long time to be successful.
|
||||
def update_function_configuration(lambda_client, function_name, handler, role, timeout, memory, layers, kms_key_id):
|
||||
# First, wait for any ongoing updates to complete
|
||||
if not wait_for_function_update_to_complete(lambda_client, function_name):
|
||||
raise Exception(f"Function {function_name} is not in a state to be updated.")
|
||||
|
||||
config = {
|
||||
'FunctionName': function_name,
|
||||
@@ -85,43 +66,16 @@ def update_function_configuration(lambda_client, function_name, handler, role, t
|
||||
if kms_key_id:
|
||||
config['KMSKeyArn'] = f"arn:aws:kms:{os.environ['AWS_REGION']}:{ACCOUNT_NUM}:key/{kms_key_id}"
|
||||
|
||||
print(f"Updating function configuration for {function_name}... with {config}")
|
||||
|
||||
max_retries = 5 # TODO: Get rid of this dumb retry logic and just use the wrapper I created.
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = lambda_client.update_function_configuration(**config)
|
||||
print(f"Update request sent successfully for {function_name}.")
|
||||
|
||||
# Wait for the update to complete
|
||||
if wait_for_function_update_to_complete(lambda_client, function_name):
|
||||
print(f"Function {function_name} updated successfully.")
|
||||
return response
|
||||
else:
|
||||
print(f"Function {function_name} update may not have completed successfully.")
|
||||
if attempt < max_retries - 1:
|
||||
print(f"Retrying in 30 seconds... (Attempt {attempt + 1}/{max_retries})")
|
||||
time.sleep(30)
|
||||
else:
|
||||
raise Exception(f"Failed to update function {function_name} after {max_retries} attempts.")
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == 'ResourceConflictException':
|
||||
if attempt < max_retries - 1:
|
||||
print(f"Another operation is in progress for {function_name}. Retrying in 30 seconds... (Attempt {attempt + 1}/{max_retries})")
|
||||
time.sleep(30)
|
||||
else:
|
||||
raise Exception(f"Failed to update function {function_name} after {max_retries} attempts due to ongoing operations.")
|
||||
elif 'The role defined for the function cannot be assumed by Lambda' in str(e):
|
||||
if attempt < max_retries - 1:
|
||||
print(f"IAM role not ready. Retrying in 30 seconds... (Attempt {attempt + 1}/{max_retries})")
|
||||
time.sleep(30)
|
||||
else:
|
||||
raise Exception(f"Failed to update function {function_name} after {max_retries} attempts. IAM role could not be assumed by Lambda.")
|
||||
else:
|
||||
print(f"Error updating function configuration: {e}")
|
||||
raise
|
||||
logging.info(f"Function {function_name} is currently being updated. Retrying...")
|
||||
raise e
|
||||
|
||||
raise Exception(f"Failed to update function {function_name} after {max_retries} attempts.")
|
||||
|
||||
@retry_with_backoff()
|
||||
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id):
|
||||
@@ -163,7 +117,7 @@ def deploy_lambda():
|
||||
lambda_client = boto3.client('lambda')
|
||||
|
||||
print(f"Starting deployment of Lambda function: {LAMBDA_NAME}")
|
||||
deployment_package = zip_directory('src/lambda_function/src')
|
||||
deployment_package = zip_directory('src/infra/lambdas/RSSFeedProcessorLambda/src')
|
||||
|
||||
layer_arn = get_or_create_lambda_layer()
|
||||
if layer_arn:
|
||||
@@ -1,10 +1,7 @@
|
||||
import os
|
||||
|
||||
# SQS Configuration
|
||||
region = os.getenv["AWS_REGION"]
|
||||
account_id = os.getenv["AWS_ACCOUNT_ID"]
|
||||
sqs_name = os.getenv["SQS_QUEUE_NAME"]
|
||||
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
|
||||
# S3 Configuration
|
||||
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
|
||||
60
src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py
Normal file
60
src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from random import randint
|
||||
|
||||
from utils import generate_key
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
|
||||
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME")
|
||||
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
|
||||
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||
|
||||
##### Article Storage #####
|
||||
def save_article(article:dict, strategy:str):
|
||||
if strategy == "s3":
|
||||
s3_save_article(article)
|
||||
else:
|
||||
raise ValueError(f"Invalid storage strategy: {strategy}")
|
||||
|
||||
|
||||
def pinecone_save_article(article:dict):
|
||||
pass
|
||||
|
||||
def dynamodb_save_article(article:dict):
|
||||
pass
|
||||
|
||||
def s3_save_article(article:dict):
|
||||
rss_feed_id = article['rss_id']
|
||||
article_id = article['article_id']
|
||||
|
||||
try:
|
||||
key = f"articles/{rss_feed_id}/{article_id}/article.json"
|
||||
s3.put_object(
|
||||
Bucket=CONTENT_BUCKET,
|
||||
Key=key,
|
||||
Body=json.dumps(article)
|
||||
)
|
||||
logger.info(f"Saved article to S3: {key}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
|
||||
|
||||
###### Feed Storage ######
|
||||
def update_rss_feed(feed:dict, last_pub_dt:int):
|
||||
try:
|
||||
table = dynamodb.Table(DYNAMODB_TABLE)
|
||||
table.update_item(
|
||||
Key={'url': feed['u']},
|
||||
UpdateExpression='SET dt = :val',
|
||||
ExpressionAttributeValues={':val': last_pub_dt}
|
||||
)
|
||||
logger.info(f"Updated RSS feed in DynamoDB: {feed['u']} with dt: {feed['dt']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RSS feed: {str(e)}")
|
||||
@@ -4,6 +4,7 @@ from dateutil import parser
|
||||
import queue
|
||||
import threading
|
||||
import logging
|
||||
from utils import generate_key
|
||||
from article_extractor import extract_article
|
||||
|
||||
logger = logging.getLogger()
|
||||
@@ -52,7 +53,11 @@ def extract_feed(rss: dict, output_queue, stop_thread):
|
||||
'rss': feed_url,
|
||||
'title': title,
|
||||
'content': text,
|
||||
'unixTime': pub_date
|
||||
'unixTime': pub_date,
|
||||
'rss_id': generate_key(feed_url),
|
||||
'article_id': generate_key(entry.link),
|
||||
'llm_summary': None,
|
||||
'embedding': None
|
||||
}
|
||||
articles.append(article)
|
||||
max_date = max(max_date, pub_date)
|
||||
@@ -16,6 +16,7 @@ sqs = boto3.client('sqs')
|
||||
|
||||
def lambda_handler(event, context):
|
||||
logger.info("Starting RSS feed processing")
|
||||
print("starting rss feed, delete this later.")
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
@@ -25,6 +26,7 @@ def lambda_handler(event, context):
|
||||
MaxNumberOfMessages=1,
|
||||
WaitTimeSeconds=0
|
||||
)
|
||||
logger.debug("SQS Response: ", response)
|
||||
|
||||
if 'Messages' not in response:
|
||||
logger.info("No messages in queue")
|
||||
@@ -36,6 +38,8 @@ def lambda_handler(event, context):
|
||||
|
||||
# Process the feed
|
||||
result = process_feed(feed)
|
||||
logger.info("Process Feed Result Dictionary: ", result)
|
||||
last_pub_dt = result['max_date']
|
||||
|
||||
if result:
|
||||
# Save articles and update feed
|
||||
@@ -46,9 +50,10 @@ def lambda_handler(event, context):
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
record_extraction_errors(1)
|
||||
|
||||
update_rss_feed(result['feed'])
|
||||
update_rss_feed(result['feed'], last_pub_dt)
|
||||
|
||||
# Delete the message from the queue
|
||||
logger.info("Deleting sqs queue message")
|
||||
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
|
||||
logger.info(f"Processed feed: {feed['u']}")
|
||||
|
||||
18
src/infra/lambdas/RSSFeedProcessorLambda/src/utils.py
Normal file
18
src/infra/lambdas/RSSFeedProcessorLambda/src/utils.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import logging
|
||||
import os
|
||||
import hashlib
|
||||
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
log_level = "DEBUG"
|
||||
logger.setLevel(logging.getLevelName(log_level))
|
||||
return logger
|
||||
|
||||
|
||||
def generate_key(input_string, length=10):
|
||||
# Create a SHA256 hash of the input string
|
||||
hash_object = hashlib.sha256(input_string.encode())
|
||||
hex_dig = hash_object.hexdigest()
|
||||
|
||||
# Return the first 'length' characters of the hash
|
||||
return hex_dig[:length]
|
||||
83
src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py
Normal file
83
src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import os
|
||||
import zipfile
|
||||
import boto3
|
||||
from dotenv import load_dotenv
|
||||
from deploy_infrastructure import deploy_cloudformation
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# Set up S3 client
|
||||
s3 = boto3.client('s3')
|
||||
|
||||
def zip_lambda_code():
|
||||
lambda_dir = 'src/infra/RSSQueueFillerLambda/lambda'
|
||||
zip_path = 'tmp/lambda_function.zip'
|
||||
|
||||
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
for root, _, files in os.walk(lambda_dir):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
arcname = os.path.relpath(file_path, lambda_dir)
|
||||
zipf.write(file_path, arcname)
|
||||
|
||||
return zip_path
|
||||
|
||||
def upload_to_s3(file_path):
|
||||
s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
|
||||
bucket_name = os.getenv('S3_LAYER_BUCKET_NAME')
|
||||
|
||||
s3.upload_file(file_path, bucket_name, s3_key)
|
||||
return f's3://{bucket_name}/{s3_key}'
|
||||
|
||||
def deploy_sqs_filler():
|
||||
zip_file = zip_lambda_code()
|
||||
upload_to_s3(zip_file)
|
||||
|
||||
# Deploy CloudFormation
|
||||
deploy_cloudformation('rss_lambda_stack.yaml', 'LambdaSQSFiller',
|
||||
parameters=[
|
||||
{
|
||||
'ParameterKey': 'QueueFillerLambdaName',
|
||||
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'SqsQueueUrl',
|
||||
'ParameterValue': os.getenv('SQS_QUEUE_URL')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'DynamoDbTableName',
|
||||
'ParameterValue': os.getenv('DYNAMODB_TABLE_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'DynamoDbTableArn',
|
||||
'ParameterValue': os.getenv('DYNAMODB_TABLE_ARN')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'SqsQueueArn',
|
||||
'ParameterValue': os.getenv('SQS_QUEUE_ARN')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'LambdaCodeS3Bucket',
|
||||
'ParameterValue': os.getenv('S3_LAYER_BUCKET_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'LambdaCodeS3Key',
|
||||
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'LambdaRuntime',
|
||||
'ParameterValue': os.getenv('LAMBDA_RUNTIME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'LambdaTimeout',
|
||||
'ParameterValue': os.getenv('LAMBDA_TIMEOUT')
|
||||
}
|
||||
])
|
||||
|
||||
# Clean up local zip file
|
||||
os.remove(zip_file)
|
||||
|
||||
if __name__ == "__main__":
|
||||
deploy_sqs_filler()
|
||||
@@ -1,9 +1,12 @@
|
||||
# File: lambda/lambda_function.py
|
||||
|
||||
import json
|
||||
import os
|
||||
import boto3
|
||||
from decimal import Decimal
|
||||
from datetime import datetime
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel("INFO")
|
||||
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
sqs = boto3.client('sqs')
|
||||
@@ -11,6 +14,12 @@ sqs = boto3.client('sqs')
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME']
|
||||
|
||||
class DecimalEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, Decimal):
|
||||
return int(obj)
|
||||
return super(DecimalEncoder, self).default(obj)
|
||||
|
||||
def handler(event, context):
|
||||
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
|
||||
messages_sent = 0
|
||||
@@ -20,22 +29,27 @@ def handler(event, context):
|
||||
|
||||
for item in response['Items']:
|
||||
rss_url = item.get('url')
|
||||
rss_dt = item.get('dt')
|
||||
|
||||
logger.debug(f"Processing RSS feed: {rss_url}")
|
||||
logger.debug(f"Last published date: {rss_dt}")
|
||||
|
||||
if rss_url:
|
||||
message = {
|
||||
'rss_url': rss_url,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
'u': rss_url,
|
||||
'dt': rss_dt
|
||||
}
|
||||
|
||||
logger.debug("message", message)
|
||||
try:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message)
|
||||
MessageBody=json.dumps(message, cls=DecimalEncoder)
|
||||
)
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
print(f"Error sending message to SQS: {str(e)}")
|
||||
logger.error(f"Error sending message to SQS: {str(e)}")
|
||||
|
||||
print(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
@@ -1,5 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
# TODO: This needs to be completely overhauled
|
||||
# Update system packages
|
||||
echo "Updating system packages..."
|
||||
sudo yum update -y
|
||||
@@ -1,33 +0,0 @@
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Set up AWS client
|
||||
sqs = boto3.client('sqs')
|
||||
region = os.getenv("AWS_REGION")
|
||||
account_id = os.getenv("AWS_ACCOUNT_ID")
|
||||
SQS_QUEUE_NAME = os.getenv("SQS_QUEUE_NAME")
|
||||
SQS_QUEUE_URL = f"https://sqs.{region}.amazonaws.com/{account_id}/{SQS_QUEUE_NAME}"
|
||||
LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME")
|
||||
|
||||
def send_test_message():
|
||||
# Create a test message
|
||||
message = {
|
||||
'test_key': 'test_value',
|
||||
'message': 'This is a test message for the Lambda trigger'
|
||||
}
|
||||
|
||||
# Send the message to SQS
|
||||
response = sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message)
|
||||
)
|
||||
|
||||
print(f"Message sent. MessageId: {response['MessageId']}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
send_test_message()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,42 +0,0 @@
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
|
||||
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
|
||||
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
|
||||
|
||||
def save_article(article):
|
||||
try:
|
||||
# Save to S3
|
||||
key = f"articles/{article['unixTime']}/{article['link'].split('/')[-1]}.json"
|
||||
s3.put_object(
|
||||
Bucket=CONTENT_BUCKET,
|
||||
Key=key,
|
||||
Body=json.dumps(article)
|
||||
)
|
||||
logger.info(f"Saved article to S3: {key}")
|
||||
|
||||
# Save to DynamoDB
|
||||
table = dynamodb.Table(DYNAMODB_TABLE)
|
||||
table.put_item(Item=article)
|
||||
logger.info(f"Saved article to DynamoDB: {article['link']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
|
||||
def update_rss_feed(feed):
|
||||
try:
|
||||
table = dynamodb.Table(DYNAMODB_TABLE)
|
||||
table.update_item(
|
||||
Key={'u': feed['u']},
|
||||
UpdateExpression='SET dt = :val',
|
||||
ExpressionAttributeValues={':val': feed['dt']}
|
||||
)
|
||||
logger.info(f"Updated RSS feed in DynamoDB: {feed['u']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RSS feed: {str(e)}")
|
||||
@@ -1,8 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
log_level = os.environ.get('LOG_LEVEL', 'INFO')
|
||||
logger.setLevel(logging.getLevelName(log_level))
|
||||
return logger
|
||||
Binary file not shown.
@@ -1,95 +0,0 @@
|
||||
import boto3
|
||||
import subprocess
|
||||
import os
|
||||
import shutil
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
# Set variables
|
||||
LAYER_NAME = os.getenv('S3_LAYER_KEY_NAME')
|
||||
BUCKET_NAME = os.getenv("S3_LAYER_BUCKET_NAME")
|
||||
REQUIREMENTS_FILE = "src/lambda_function/layers/requirements.txt"
|
||||
ZIP_FILE = f"{LAYER_NAME}.zip"
|
||||
|
||||
def create_s3_bucket_if_not_exists(bucket_name, region=None):
|
||||
s3_client = boto3.client('s3', region_name=region)
|
||||
|
||||
try:
|
||||
# Check if the bucket exists
|
||||
s3_client.head_bucket(Bucket=bucket_name)
|
||||
print(f"Bucket '{bucket_name}' already exists.")
|
||||
except ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
if error_code == '404':
|
||||
# Create the bucket
|
||||
if region == 'us-east-1' or region is None:
|
||||
# us-east-1 does not require LocationConstraint
|
||||
s3_client.create_bucket(Bucket=bucket_name)
|
||||
else:
|
||||
# Other regions require LocationConstraint
|
||||
s3_client.create_bucket(
|
||||
Bucket=bucket_name,
|
||||
CreateBucketConfiguration={
|
||||
'LocationConstraint': region
|
||||
}
|
||||
)
|
||||
print(f"Bucket '{bucket_name}' created.")
|
||||
else:
|
||||
# For any other errors, re-raise the exception
|
||||
raise e
|
||||
|
||||
|
||||
|
||||
def install_requirements(requirements_file, target_dir):
|
||||
subprocess.check_call([
|
||||
"pip", "install",
|
||||
"-r", requirements_file,
|
||||
"-t", target_dir
|
||||
])
|
||||
|
||||
|
||||
|
||||
def create_lambda_layer():
|
||||
# Create a temporary directory for the layer
|
||||
os.makedirs("layer/python", exist_ok=True)
|
||||
|
||||
# Install dependencies from requirements.txt
|
||||
install_requirements(REQUIREMENTS_FILE, "layer/python")
|
||||
print("Finished Installing Packages from requirements.txt")
|
||||
|
||||
|
||||
# Create ZIP file
|
||||
shutil.make_archive(LAYER_NAME, 'zip', "layer")
|
||||
print("Finished Zipping Package")
|
||||
|
||||
# Create or update Lambda layer
|
||||
lambda_client = boto3.client('lambda', region_name='us-east-1')
|
||||
|
||||
# Make sure the S3 bucket exists
|
||||
create_s3_bucket_if_not_exists(BUCKET_NAME)
|
||||
|
||||
# Upload the zip file to S3
|
||||
s3_client = boto3.client('s3')
|
||||
s3_client.upload_file(ZIP_FILE, BUCKET_NAME, ZIP_FILE)
|
||||
print(f"Uploaded {ZIP_FILE} to S3 bucket '{BUCKET_NAME}'.")
|
||||
|
||||
# Publish the layer using the S3 object
|
||||
response = lambda_client.publish_layer_version(
|
||||
LayerName=LAYER_NAME,
|
||||
Description="Dependencies for RSS Feed Processor",
|
||||
Content={
|
||||
'S3Bucket': BUCKET_NAME,
|
||||
'S3Key': ZIP_FILE
|
||||
},
|
||||
CompatibleRuntimes=['python3.11']
|
||||
)
|
||||
|
||||
print(f"Created Lambda layer version: {response['Version']}")
|
||||
|
||||
# Clean up
|
||||
shutil.rmtree("layer")
|
||||
os.remove(ZIP_FILE)
|
||||
|
||||
print("Lambda layer creation complete!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
create_lambda_layer()
|
||||
@@ -1,34 +0,0 @@
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
def create_s3_bucket_if_not_exists(bucket_name, region=None):
|
||||
s3_client = boto3.client('s3', region_name=region)
|
||||
|
||||
try:
|
||||
# Check if the bucket exists
|
||||
s3_client.head_bucket(Bucket=bucket_name)
|
||||
print(f"Bucket '{bucket_name}' already exists.")
|
||||
except ClientError as e:
|
||||
# If a 404 error is caught, it means the bucket does not exist
|
||||
error_code = e.response['Error']['Code']
|
||||
if error_code == '404':
|
||||
# Create the bucket
|
||||
if region is None:
|
||||
s3_client.create_bucket(Bucket=bucket_name)
|
||||
else:
|
||||
s3_client.create_bucket(
|
||||
Bucket=bucket_name,
|
||||
CreateBucketConfiguration={
|
||||
'LocationConstraint': region
|
||||
}
|
||||
)
|
||||
print(f"Bucket '{bucket_name}' created.")
|
||||
else:
|
||||
# For any other errors, re-raise the exception
|
||||
raise e
|
||||
|
||||
# Example usage
|
||||
bucket_name = 'your-unique-bucket-name'
|
||||
region = 'us-east-1' # Change this to your desired region
|
||||
|
||||
create_s3_bucket_if_not_exists(bucket_name, region)
|
||||
@@ -1,5 +1,8 @@
|
||||
import time
|
||||
from botocore.exceptions import ClientError
|
||||
import logging
|
||||
import os
|
||||
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
|
||||
|
||||
def retry_with_backoff(max_retries=20, initial_backoff=1, backoff_multiplier=4):
|
||||
def decorator(func):
|
||||
@@ -16,7 +19,7 @@ def retry_with_backoff(max_retries=20, initial_backoff=1, backoff_multiplier=4):
|
||||
if retries == max_retries - 1:
|
||||
raise
|
||||
wait_time = backoff * (2 ** retries)
|
||||
print(f"Encountered {e.response['Error']['Code']}. Retrying in {wait_time} seconds...")
|
||||
logging.info(f"Encountered {e.response['Error']['Code']}. Retrying in {wait_time} seconds...")
|
||||
time.sleep(wait_time)
|
||||
retries += 1
|
||||
backoff *= backoff_multiplier
|
||||
|
||||
15
todo.md
15
todo.md
@@ -1,8 +1,17 @@
|
||||
# TODO: Clean up aws cdk stuff.
|
||||
# TODO: Reorganize infra folder structure.
|
||||
# TODO: Add in console setup python script for new project into launch.py
|
||||
|
||||
# Modules
|
||||
* Gen AI Module
|
||||
* More RSS Feed Module
|
||||
* Duplicate Article Check Module
|
||||
|
||||
# Future Modules
|
||||
* Gen AI Summarization Module
|
||||
* Other Add-on's with text classification of articles ( Sentiment Analysis, political polarity, etc... )
|
||||
* Duplicate Article Check Module.
|
||||
* Semantic Storage Module
|
||||
* API Module ( Semantic Search, Retrieval )
|
||||
* Way to start the repo, enabling all the different modules from the launch script ( Make it fun ).
|
||||
|
||||
# Over-caffeineted Ideas
|
||||
* Make it solarpunk themed.
|
||||
* Write a serverless manifesto for personal projects and where you would like to see the serverless world go.
|
||||
|
||||
Reference in New Issue
Block a user