finished:

- eventbridge
-sqs
- lambda bugs
This commit is contained in:
Charles E. Gormley
2024-09-07 20:37:58 -04:00
parent 6b0862a871
commit ba6f5752e7
10 changed files with 130 additions and 36 deletions

View File

@@ -0,0 +1,15 @@
Parameters:
LambdaFunctionArn:
Type: String
Description: ARN of the RSS Feed Processor Lambda function
Resources:
EventBridgeSchedule:
Type: AWS::Events::Rule
Properties:
Name: rss-feed-processor-schedule
Description: Runs the RSS Feed Processor Lambda function every hour
ScheduleExpression: rate(1 hour) # TODO: Turn this into a variable.
Targets:
- Arn: !Ref LambdaFunctionArn
Id: rss-feed-processor-lambda

View File

@@ -11,7 +11,8 @@ Resources:
Type: AWS::SQS::Queue Type: AWS::SQS::Queue
Properties: Properties:
QueueName: !Ref SQSQueueName QueueName: !Ref SQSQueueName
VisibilityTimeout: 300 VisibilityTimeout: 300 # Should be set to the 3rd standard deviation of your lambda runtime distribution.
ReceiveMessageWaitTimeSeconds: 20
RedrivePolicy: RedrivePolicy:
deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn
maxReceiveCount: 3 maxReceiveCount: 3

View File

@@ -176,6 +176,14 @@ def deploy_infrastructure():
] ]
) )
deploy_cloudformation('eventbridge.yaml', 'EventBridgeSchedule',
parameters=[
{
'ParameterKey': 'LambdaFunctionArn',
'ParameterValue': f"arn:aws:lambda:{os.getenv('AWS_REGION')}:{os.getenv('AWS_ACCOUNT_ID')}:function:{os.getenv('QUEUE_FILLER_LAMBDA_NAME')}"
}
])
# TODO: Figure out KMS Stuff, but for now just do it in the console. I would like to get the rest of the cloudformation working # TODO: Figure out KMS Stuff, but for now just do it in the console. I would like to get the rest of the cloudformation working
# before I start messing with KMS keys. # before I start messing with KMS keys.

View File

@@ -76,6 +76,28 @@ def update_function_configuration(lambda_client, function_name, handler, role, t
logging.info(f"Function {function_name} is currently being updated. Retrying...") logging.info(f"Function {function_name} is currently being updated. Retrying...")
raise e raise e
@retry_with_backoff()
def configure_sqs_trigger(lambda_client, function_name, queue_arn):
event_source_mapping = {
'FunctionName': function_name,
'EventSourceArn': queue_arn,
'BatchSize': 1,
'MaximumBatchingWindowInSeconds': 0,
'ScalingConfig': {
'MaximumConcurrency': 50
}
}
try:
response = lambda_client.create_event_source_mapping(**event_source_mapping)
print(f"SQS trigger configured successfully for {function_name}")
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceConflictException':
print(f"SQS trigger already exists for {function_name}. Updating configuration...")
# If you want to update existing trigger, you'd need to list existing mappings and update them
# This is left as an exercise as it requires additional error handling and logic
else:
raise e
@retry_with_backoff() @retry_with_backoff()
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id): def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id):
@@ -162,6 +184,13 @@ def deploy_lambda():
print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...") print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...")
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id) create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id)
# Configure SQS trigger
queue_arn = os.getenv('SQS_QUEUE_ARN') # Make sure to set this environment variable
if queue_arn:
configure_sqs_trigger(lambda_client, LAMBDA_NAME, queue_arn)
else:
print("Warning: SQS_QUEUE_ARN not set. Skipping SQS trigger configuration.")
print("Lambda deployment completed successfully!") print("Lambda deployment completed successfully!")
except Exception as e: except Exception as e:

View File

@@ -3,11 +3,6 @@ import os
# SQS Configuration # SQS Configuration
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
# S3 Configuration
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
# DynamoDB Configuration
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
# Logging Configuration # Logging Configuration
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO') LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')

View File

@@ -11,7 +11,7 @@ logger = logging.getLogger()
s3 = boto3.client('s3') s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb') dynamodb = boto3.resource('dynamodb')
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME") CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
storage_strategy = os.environ.get('STORAGE_STRATEGY') storage_strategy = os.environ.get('STORAGE_STRATEGY')
@@ -29,21 +29,30 @@ def pinecone_save_article(article:dict):
def dynamodb_save_article(article:dict): def dynamodb_save_article(article:dict):
pass pass
def s3_save_article(article:dict): def s3_save_article(article:dict):
logger.info("Saving article to S3")
rss_feed_id = article['rss_id'] rss_feed_id = article['rss_id']
article_id = article['article_id'] article_id = article['article_id']
logger.info(f"Content ")
try: if not rss_feed_id or not article_id:
key = f"articles/{rss_feed_id}/{article_id}/article.json" logger.error(f"Missing rss_id or article_id in article: {article}")
s3.put_object( return
Bucket=CONTENT_BUCKET,
Key=key,
Body=json.dumps(article)
)
logger.info(f"Saved article to S3: {key}")
file_path = f"/tmp/{rss_feed_id}-{article_id}-article.json"
file_key = f"articles/{rss_feed_id}/{article_id}/article.json"
# Save article to /tmp json file
with open(file_path, "w") as f:
json.dump(article, f)
try:
s3.upload_file(file_path, CONTENT_BUCKET, file_key)
logger.info(f"Saved article {article_id} to S3 bucket {CONTENT_BUCKET}")
except Exception as e: except Exception as e:
logger.error(f"Failed to save article: {str(e)}") logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}")
###### Feed Storage ###### ###### Feed Storage ######

View File

@@ -12,7 +12,7 @@ logger = logging.getLogger()
def process_feed(feed: dict): def process_feed(feed: dict):
output_queue = queue.Queue() output_queue = queue.Queue()
stop_thread = threading.Event() stop_thread = threading.Event()
thread = threading.Thread(target=extract_feed, args=(feed, output_queue, stop_thread)) thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
@@ -31,8 +31,8 @@ def process_feed(feed: dict):
except queue.Empty: except queue.Empty:
logger.info(f"Thread Failed: {feed['u']}") logger.info(f"Thread Failed: {feed['u']}")
return None return None
def extract_feed(rss: dict, output_queue, stop_thread): def extract_feed_threading(rss: dict, output_queue, stop_thread):
articles = [] articles = []
feed_url = rss['u'] feed_url = rss['u']
last_date = rss['dt'] last_date = rss['dt']
@@ -72,6 +72,44 @@ def extract_feed(rss: dict, output_queue, stop_thread):
logger.error(f"Feed: {entry}") logger.error(f"Feed: {entry}")
logger.error(f"Feed failed due to error: {e}") logger.error(f"Feed failed due to error: {e}")
def extract_feed(rss: dict):
articles = []
feed_url = rss['u']
last_date = rss['dt']
max_date = last_date
try:
feed = feedparser.parse(feed_url)
for entry in feed['entries']:
pub_date = parse_pub_date(entry['published'])
if pub_date > last_date:
title, text = extract_article(entry.link)
article = {
'link': entry.link,
'rss': feed_url,
'title': title,
'content': text,
'unixTime': pub_date,
'rss_id': generate_key(feed_url),
'article_id': generate_key(entry.link),
'llm_summary': None,
'embedding': None
}
articles.append(article)
max_date = max(max_date, pub_date)
output = {
'articles': articles,
'max_date': max_date,
'feed': rss
}
print(output)
return output
except Exception as e:
logger.error(f"Feed: {entry}")
logger.error(f"Feed failed due to error: {e}")
def parse_pub_date(date_string): def parse_pub_date(date_string):
try: try:
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp()) return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())

View File

@@ -8,6 +8,7 @@ from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageEr
from metrics import record_processed_articles, record_processing_time, record_extraction_errors from metrics import record_processed_articles, record_processing_time, record_extraction_errors
import boto3 import boto3
import os import os
from feed_processor import extract_feed
# Set up logging # Set up logging
logger = setup_logging() logger = setup_logging()
@@ -21,8 +22,6 @@ def lambda_handler(event, context):
logger.info("Starting RSS feed processing") logger.info("Starting RSS feed processing")
start_time = time.time() start_time = time.time()
try: try:
# Receive message from SQS # Receive message from SQS
event_source = event["Records"][0]["eventSource"] event_source = event["Records"][0]["eventSource"]
@@ -36,7 +35,8 @@ def lambda_handler(event, context):
receipt_handle = event["Records"][0]['receiptHandle'] receipt_handle = event["Records"][0]['receiptHandle']
# Process the feed # Process the feed
result = process_feed(feed) result = extract_feed(feed)
print(type(result))
logger.info("Process Feed Result Dictionary: ", result) logger.info("Process Feed Result Dictionary: ", result)
last_pub_dt = result['max_date'] last_pub_dt = result['max_date']

View File

@@ -11,8 +11,8 @@ def update_env_vars(function_name):
env_vars = { env_vars = {
'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'), 'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'),
'CONTENT_BUCKET': os.environ.get('S3_BUCKET_NAME'), 'S3_BUCKET_NAME': os.environ.get('S3_BUCKET_NAME'),
'DYNAMODB_TABLE': os.environ.get('DYNAMODB_TABLE_NAME'), 'DYNAMODB_TABLE_NAME': os.environ.get('DYNAMODB_TABLE_NAME'),
'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO'), 'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO'),
'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY') 'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY')
} }

19
todo.md
View File

@@ -1,15 +1,13 @@
# TODO: Fix nonetype error. # Current Sub-Modules
# TODO: Try to fix forbiden url issue. * TODO: Make sure SQS queue can't go over the concurrency limit of the account.
* TODO: Automate eventbridge.
* TODO: Eventbridge set up ( make sure this works )
* TODO More RSS Feed Module
* TODO: Add in console setup python script for new project into launch.py
# TODO: Add in console setup python script for new project into launch.py
# TODO: Eventbridge set up ( make sure this works )
# TODO: Automate eventbridge.
# TODO: Make sure ingestion of articles actually works
# Modules # Modules
* More RSS Feed Module * Update Lambda Layer Creation Script to be more comprehensive.
* Update Lambda Layer Creation Script to be comprehensive.
# Future Modules # Future Modules
* Gen AI Summarization Module * Gen AI Summarization Module
@@ -17,6 +15,7 @@
* Duplicate Article Check Module. * Duplicate Article Check Module.
* Semantic Storage Module * Semantic Storage Module
* API Module ( Semantic Search, Retrieval ) * API Module ( Semantic Search, Retrieval )
* Architecture Diagram
# Future Use Cases # Future Use Cases
* Betting Market Prediction * Betting Market Prediction
@@ -24,6 +23,6 @@
* News Aggregation * News Aggregation
* News Letter Tooling * News Letter Tooling
# Over-caffeineted Ideas # Over-caffeineted Ideas
* Make it solarpunk themed. * Make it solarpunk themed.
* Write a serverless manifesto for personal projects and where you would like to see the serverless world go. * Write a serverless manifesto for personal projects and where you would like to see the serverless world go.