mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-18 22:44:27 +01:00
9/1 commit
This commit is contained in:
43
src/infra/RSSQueueFillerLambda/lambda/lambda_function.py
Normal file
43
src/infra/RSSQueueFillerLambda/lambda/lambda_function.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# File: lambda/lambda_function.py
|
||||
|
||||
import json
|
||||
import os
|
||||
import boto3
|
||||
from datetime import datetime
|
||||
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
sqs = boto3.client('sqs')
|
||||
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME']
|
||||
|
||||
def handler(event, context):
|
||||
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
|
||||
messages_sent = 0
|
||||
|
||||
# Scan the DynamoDB table
|
||||
response = table.scan()
|
||||
|
||||
for item in response['Items']:
|
||||
rss_url = item.get('url')
|
||||
if rss_url:
|
||||
message = {
|
||||
'rss_url': rss_url,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
try:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message)
|
||||
)
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
print(f"Error sending message to SQS: {str(e)}")
|
||||
|
||||
print(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps(f'Sent {messages_sent} RSS URLs to SQS')
|
||||
}
|
||||
50
src/infra/RSSQueueFillerLambda/rss_lambda_stack.py
Normal file
50
src/infra/RSSQueueFillerLambda/rss_lambda_stack.py
Normal file
@@ -0,0 +1,50 @@
|
||||
# File: rss_lambda_stack.py
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
from aws_cdk import (
|
||||
App,
|
||||
Stack,
|
||||
aws_lambda as _lambda,
|
||||
aws_iam as iam,
|
||||
Duration
|
||||
)
|
||||
from constructs import Construct
|
||||
|
||||
class SqsFillerLambdaStack(Stack):
|
||||
|
||||
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
|
||||
super().__init__(scope, construct_id, **kwargs)
|
||||
|
||||
# Create Lambda Function
|
||||
self.sqs_filler = _lambda.Function(
|
||||
self, "SqsFillerFunction",
|
||||
function_name=os.getenv("QUEUE_FILLER_LAMBDA_NAME"),
|
||||
runtime=_lambda.Runtime.PYTHON_3_12,
|
||||
handler="lambda_function.handler",
|
||||
code=_lambda.Code.from_asset("src/infra/RSSQueueFillerLambda/lambda"),
|
||||
timeout=Duration.minutes(5),
|
||||
environment={
|
||||
"SQS_QUEUE_URL": os.getenv("SQS_QUEUE_URL"),
|
||||
"DYNAMODB_TABLE_NAME": os.getenv("DYNAMODB_TABLE_NAME")
|
||||
}
|
||||
)
|
||||
|
||||
# Grant Lambda permission to scan DynamoDB
|
||||
self.sqs_filler.add_to_role_policy(iam.PolicyStatement(
|
||||
actions=["dynamodb:Scan"],
|
||||
resources=[os.getenv("DYNAMODB_TABLE_ARN")]
|
||||
))
|
||||
|
||||
# Grant Lambda permission to send messages to SQS
|
||||
self.sqs_filler.add_to_role_policy(iam.PolicyStatement(
|
||||
actions=["sqs:SendMessage"],
|
||||
resources=[os.getenv("SQS_QUEUE_ARN")]
|
||||
))
|
||||
|
||||
# Main
|
||||
if __name__ == "__main__":
|
||||
app = App()
|
||||
SqsFillerLambdaStack(app, "SqsFillerLambdaStack")
|
||||
app.synth()
|
||||
0
src/infra/deploy_
Normal file
0
src/infra/deploy_
Normal file
0
src/infra/deploy_eventbridge.py
Normal file
0
src/infra/deploy_eventbridge.py
Normal file
76
src/infra/deploy_sqs.py
Normal file
76
src/infra/deploy_sqs.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import boto3
|
||||
import os
|
||||
from botocore.exceptions import ClientError
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Set up AWS clients
|
||||
sqs = boto3.client('sqs')
|
||||
lambda_client = boto3.client('lambda')
|
||||
|
||||
# Get environment variables
|
||||
region = os.getenv("AWS_REGION")
|
||||
account_id = os.getenv("AWS_ACCOUNT_ID")
|
||||
SQS_QUEUE_NAME = os.getenv("SQS_QUEUE_NAME")
|
||||
LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME")
|
||||
|
||||
def get_or_create_sqs_queue():
|
||||
try:
|
||||
# Try to get the queue URL first
|
||||
response = sqs.get_queue_url(QueueName=SQS_QUEUE_NAME)
|
||||
queue_url = response['QueueUrl']
|
||||
print(f"SQS queue already exists. URL: {queue_url}")
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
|
||||
# Queue doesn't exist, so create it
|
||||
try:
|
||||
response = sqs.create_queue(QueueName=SQS_QUEUE_NAME)
|
||||
queue_url = response['QueueUrl']
|
||||
print(f"SQS queue created. URL: {queue_url}")
|
||||
except ClientError as create_error:
|
||||
print(f"Error creating SQS queue: {create_error}")
|
||||
return None
|
||||
else:
|
||||
print(f"Error getting SQS queue: {e}")
|
||||
return None
|
||||
return queue_url
|
||||
|
||||
def configure_lambda_trigger(function_name, queue_url):
|
||||
try:
|
||||
# Get the SQS queue ARN
|
||||
queue_attributes = sqs.get_queue_attributes(
|
||||
QueueUrl=queue_url,
|
||||
AttributeNames=['QueueArn']
|
||||
)
|
||||
queue_arn = queue_attributes['Attributes']['QueueArn']
|
||||
|
||||
# Check if Lambda function exists
|
||||
try:
|
||||
lambda_client.get_function(FunctionName=function_name)
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == 'ResourceNotFoundException':
|
||||
print(f"Lambda function '{function_name}' does not exist. Please create it first.")
|
||||
return
|
||||
else:
|
||||
raise
|
||||
|
||||
# Add the SQS queue as an event source for the Lambda function
|
||||
response = lambda_client.create_event_source_mapping(
|
||||
EventSourceArn=queue_arn,
|
||||
FunctionName=function_name,
|
||||
Enabled=True,
|
||||
BatchSize=10 # Number of messages to process in one batch
|
||||
)
|
||||
|
||||
print(f"SQS trigger configured for Lambda. UUID: {response['UUID']}")
|
||||
except ClientError as e:
|
||||
print(f"Error configuring Lambda trigger: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
queue_url = get_or_create_sqs_queue()
|
||||
if queue_url:
|
||||
configure_lambda_trigger(LAMBDA_FUNCTION_NAME, queue_url)
|
||||
else:
|
||||
print("Failed to get or create SQS queue. Lambda trigger configuration aborted.")
|
||||
33
src/infra/test_sqs.py
Normal file
33
src/infra/test_sqs.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import boto3
|
||||
import json
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Set up AWS client
|
||||
sqs = boto3.client('sqs')
|
||||
region = os.getenv("AWS_REGION")
|
||||
account_id = os.getenv("AWS_ACCOUNT_ID")
|
||||
SQS_QUEUE_NAME = os.getenv("SQS_QUEUE_NAME")
|
||||
SQS_QUEUE_URL = f"https://sqs.{region}.amazonaws.com/{account_id}/{SQS_QUEUE_NAME}"
|
||||
LAMBDA_FUNCTION_NAME = os.getenv("LAMBDA_FUNCTION_NAME")
|
||||
|
||||
def send_test_message():
|
||||
# Create a test message
|
||||
message = {
|
||||
'test_key': 'test_value',
|
||||
'message': 'This is a test message for the Lambda trigger'
|
||||
}
|
||||
|
||||
# Send the message to SQS
|
||||
response = sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message)
|
||||
)
|
||||
|
||||
print(f"Message sent. MessageId: {response['MessageId']}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
send_test_message()
|
||||
@@ -1,7 +1,10 @@
|
||||
import os
|
||||
|
||||
# SQS Configuration
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
region = os.getenv["AWS_REGION"]
|
||||
account_id = os.getenv["AWS_ACCOUNT_ID"]
|
||||
sqs_name = os.getenv["SQS_QUEUE_NAME"]
|
||||
|
||||
|
||||
# S3 Configuration
|
||||
CONTENT_BUCKET = os.environ['CONTENT_BUCKET']
|
||||
|
||||
19
todo.md
19
todo.md
@@ -1,11 +1,8 @@
|
||||
* [ ] Make sure lambda works base
|
||||
* [ ] Make sure the lambda syncs up well with the sqs and can easily pull items from dynamoDB.
|
||||
* [ ]
|
||||
|
||||
* [ ] Version control lambda packages
|
||||
* [ ] RSS Feed Easy Insertion
|
||||
* [ ] environment variable template
|
||||
* [ ] Shoudl we do some vector database stuff with this repo as well?
|
||||
* [ ] We should probably make another module which makes it fairly easy to query all this data from
|
||||
anywhere
|
||||
* [ ] Add in a scheduler for the lambda
|
||||
# Modules
|
||||
* Gen AI Module
|
||||
* More RSS Feed Module
|
||||
* Duplicate Article Check Module
|
||||
* Semantic Storage Module
|
||||
* API Module ( Semantic Search, Retrieval )
|
||||
* Way to start the repo, enabling all the different modules from the launch script ( Make it fun ).
|
||||
* Make it solarpunk themed.
|
||||
Reference in New Issue
Block a user