mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-17 05:54:22 +01:00
Switch to Redis queue
This commit is contained in:
@@ -37,7 +37,7 @@ def main():
|
||||
logging.info("Finished Deploying Lambda")
|
||||
|
||||
deploy_sqs_filler()
|
||||
logging.info("Finished Deploying SQS Filler Lambda")
|
||||
logging.info("Finished Deploying Queue Filler Lambda")
|
||||
|
||||
# Update Lambda environment variables
|
||||
update_env_vars(os.getenv("LAMBDA_FUNCTION_NAME"))
|
||||
@@ -48,7 +48,7 @@ def main():
|
||||
if os.path.exists(rss_feeds_file):
|
||||
with open(rss_feeds_file, 'r') as f:
|
||||
rss_feeds = json.load(f)
|
||||
upload_rss_feeds(rss_feeds, os.getenv('DYNAMODB_TABLE_NAME'))
|
||||
upload_rss_feeds(rss_feeds, rss_feeds_file)
|
||||
else:
|
||||
print(f"WARNING: {rss_feeds_file} not found. Skipping RSS feed upload.")
|
||||
|
||||
|
||||
@@ -6,3 +6,4 @@ constructs==10.2.69
|
||||
pinecone
|
||||
openai
|
||||
tqdm
|
||||
redis
|
||||
|
||||
@@ -1,58 +1,23 @@
|
||||
import json
|
||||
import boto3
|
||||
from boto3.dynamodb.conditions import Key
|
||||
from botocore.exceptions import ClientError
|
||||
import logging
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def upload_rss_feeds(rss_feeds, table_name):
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
table = dynamodb.Table(table_name)
|
||||
|
||||
logger.info(f"Uploading RSS feeds to table: {table_name}")
|
||||
|
||||
def upload_rss_feeds(rss_feeds, file_path):
|
||||
"""Persist RSS feed definitions to a local JSON file."""
|
||||
try:
|
||||
# Get the table's key schema
|
||||
key_schema = table.key_schema
|
||||
partition_key = next(key['AttributeName'] for key in key_schema if key['KeyType'] == 'HASH')
|
||||
except ClientError as e:
|
||||
logger.error(f"Error getting table schema: {e.response['Error']['Message']}")
|
||||
return
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(rss_feeds, f)
|
||||
logger.info(f"Saved {len(rss_feeds)} feeds to {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save feeds: {e}")
|
||||
|
||||
new_items = 0
|
||||
existing_items = 0
|
||||
|
||||
for feed in rss_feeds:
|
||||
# Check if the item already exists
|
||||
try:
|
||||
response = table.get_item(Key={partition_key: feed['u']})
|
||||
except ClientError as e:
|
||||
logger.error(f"Error checking for existing item: {e.response['Error']['Message']}")
|
||||
continue
|
||||
|
||||
if 'Item' not in response:
|
||||
# Item doesn't exist, insert new item
|
||||
item = {partition_key: feed['u'], 'dt': 0}
|
||||
feed['dt'] = int(feed['dt'])
|
||||
item.update()
|
||||
|
||||
try:
|
||||
table.put_item(Item=item)
|
||||
new_items += 1
|
||||
except ClientError as e:
|
||||
logger.error(f"Error inserting new item: {e.response['Error']['Message']}")
|
||||
else:
|
||||
existing_items += 1
|
||||
|
||||
logger.info(f"Upload complete. {new_items} new items inserted. {existing_items} items already existed.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
table_name = 'rss-feeds-table'
|
||||
rss_feed_path = 'rss_feeds.json'
|
||||
rss_feed_path = "rss_feeds.json"
|
||||
with open(rss_feed_path) as f:
|
||||
rss_feeds = json.load(f)
|
||||
logger.info(f"Loaded RSS feeds: {rss_feeds}")
|
||||
upload_rss_feeds(rss_feeds, table_name)
|
||||
upload_rss_feeds(rss_feeds, rss_feed_path)
|
||||
|
||||
Binary file not shown.
@@ -1,27 +0,0 @@
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: 'CloudFormation template for RSS Feed Processor DynamoDB Table'
|
||||
|
||||
Parameters:
|
||||
DynamoDBName:
|
||||
Type: String
|
||||
Description: ""
|
||||
|
||||
Resources:
|
||||
RSSFeedsTable:
|
||||
Type: AWS::DynamoDB::Table
|
||||
Properties:
|
||||
TableName: !Ref 'DynamoDBName'
|
||||
AttributeDefinitions:
|
||||
- AttributeName: url
|
||||
AttributeType: S
|
||||
KeySchema:
|
||||
- AttributeName: url
|
||||
KeyType: HASH
|
||||
BillingMode: PAY_PER_REQUEST
|
||||
|
||||
Outputs:
|
||||
TableName:
|
||||
Description: 'Name of the DynamoDB table for RSS feeds'
|
||||
Value: !Ref RSSFeedsTable
|
||||
Export:
|
||||
Name: !Sub '${AWS::StackName}-RSSFeedsTableName'
|
||||
@@ -37,7 +37,6 @@ Resources:
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- 'sqs:*'
|
||||
- 'dynamodb:*'
|
||||
- 's3:*'
|
||||
- 'lambda:*'
|
||||
- 'logs:*'
|
||||
|
||||
@@ -1,22 +1,16 @@
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: SQS Filler Lambda Stack
|
||||
Description: Redis Queue Filler Lambda Stack
|
||||
|
||||
Parameters:
|
||||
QueueFillerLambdaName:
|
||||
Type: String
|
||||
Description: Name of the Lambda function
|
||||
SqsQueueUrl:
|
||||
RedisUrl:
|
||||
Type: String
|
||||
Description: URL of the SQS queue
|
||||
DynamoDbTableName:
|
||||
Description: URL of the Redis instance
|
||||
RedisQueueName:
|
||||
Type: String
|
||||
Description: Name of the DynamoDB table
|
||||
DynamoDbTableArn:
|
||||
Type: String
|
||||
Description: ARN of the DynamoDB table
|
||||
SqsQueueArn:
|
||||
Type: String
|
||||
Description: ARN of the SQS queue
|
||||
Description: Name of the Redis queue
|
||||
LambdaCodeS3Bucket:
|
||||
Type: String
|
||||
Description: S3 bucket containing the Lambda function code
|
||||
@@ -45,8 +39,8 @@ Resources:
|
||||
Timeout: !Ref LambdaTimeout
|
||||
Environment:
|
||||
Variables:
|
||||
SQS_QUEUE_URL: !Ref SqsQueueUrl
|
||||
DYNAMODB_TABLE_NAME: !Ref DynamoDbTableName
|
||||
REDIS_URL: !Ref RedisUrl
|
||||
REDIS_QUEUE_NAME: !Ref RedisQueueName
|
||||
Role: !GetAtt SqsFillerFunctionRole.Arn
|
||||
|
||||
SqsFillerFunctionRole:
|
||||
@@ -70,14 +64,6 @@ Resources:
|
||||
- logs:CreateLogStream
|
||||
- logs:PutLogEvents
|
||||
Resource: arn:aws:logs:*:*:*
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- dynamodb:Scan
|
||||
Resource: !Ref DynamoDbTableArn
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- sqs:SendMessage
|
||||
Resource: !Ref SqsQueueArn
|
||||
- Effect: Allow
|
||||
Action:
|
||||
- s3:GetObject
|
||||
@@ -85,8 +71,8 @@ Resources:
|
||||
|
||||
Outputs:
|
||||
SqsFillerFunctionArn:
|
||||
Description: ARN of the SQS Filler Lambda Function
|
||||
Description: ARN of the Queue Filler Lambda Function
|
||||
Value: !GetAtt SqsFillerFunction.Arn
|
||||
SqsFillerFunctionRoleArn:
|
||||
Description: ARN of the IAM Role for SQS Filler Lambda Function
|
||||
Description: ARN of the IAM Role for Queue Filler Lambda Function
|
||||
Value: !GetAtt SqsFillerFunctionRole.Arn
|
||||
@@ -1,36 +0,0 @@
|
||||
AWSTemplateFormatVersion: '2010-09-09'
|
||||
Description: 'CloudFormation template for RSS Feed Processor SQS Queue'
|
||||
|
||||
Parameters:
|
||||
SQSQueueName:
|
||||
Type: String
|
||||
Description: ""
|
||||
|
||||
Resources:
|
||||
RSSFeedQueue:
|
||||
Type: AWS::SQS::Queue
|
||||
Properties:
|
||||
QueueName: !Ref SQSQueueName
|
||||
VisibilityTimeout: 900 # Should be set to the 3rd standard deviation of your lambda runtime distribution.
|
||||
ReceiveMessageWaitTimeSeconds: 20
|
||||
RedrivePolicy:
|
||||
deadLetterTargetArn: !GetAtt RSSFeedDLQ.Arn
|
||||
maxReceiveCount: 3
|
||||
|
||||
RSSFeedDLQ:
|
||||
Type: AWS::SQS::Queue
|
||||
Properties:
|
||||
QueueName: !Sub '${AWS::StackName}-rss-feed-dlq'
|
||||
|
||||
Outputs:
|
||||
QueueURL:
|
||||
Description: 'URL of the SQS queue for RSS feeds'
|
||||
Value: !Ref RSSFeedQueue
|
||||
Export:
|
||||
Name: !Sub '${AWS::StackName}-RSSFeedQueueURL'
|
||||
|
||||
DLQueueURL:
|
||||
Description: 'URL of the Dead Letter Queue for RSS feeds'
|
||||
Value: !Ref RSSFeedDLQ
|
||||
Export:
|
||||
Name: !Sub '${AWS::StackName}-RSSFeedDLQueueURL'
|
||||
@@ -143,13 +143,6 @@ def deploy_infrastructure():
|
||||
key_info = kms_client.describe_key(KeyId=kms_key_id)
|
||||
kms_key_arn = key_info['KeyMetadata']['Arn']
|
||||
|
||||
deploy_cloudformation('dynamo.yaml', 'DynamoDB',
|
||||
parameters=[
|
||||
{
|
||||
'ParameterKey': 'DynamoDBName',
|
||||
'ParameterValue': os.environ.get('DYNAMODB_TABLE_NAME', 'default-table-name')
|
||||
}
|
||||
])
|
||||
|
||||
|
||||
deploy_cloudformation('s3.yaml', 'S3',
|
||||
@@ -166,13 +159,6 @@ def deploy_infrastructure():
|
||||
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
|
||||
}
|
||||
])
|
||||
deploy_cloudformation('sqs.yaml', 'SQS',
|
||||
parameters=[
|
||||
{
|
||||
'ParameterKey': 'SQSQueueName',
|
||||
'ParameterValue': os.environ.get('SQS_QUEUE_NAME', 'default-queue-name')
|
||||
}
|
||||
])
|
||||
deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True,
|
||||
parameters=[
|
||||
{
|
||||
|
||||
@@ -80,26 +80,8 @@ def update_function_configuration(lambda_client, function_name, handler, role, t
|
||||
|
||||
@retry_with_backoff()
|
||||
def configure_sqs_trigger(lambda_client, function_name, queue_arn):
|
||||
event_source_mapping = {
|
||||
'FunctionName': function_name,
|
||||
'EventSourceArn': queue_arn,
|
||||
'BatchSize': 1,
|
||||
'MaximumBatchingWindowInSeconds': 0,
|
||||
'ScalingConfig': {
|
||||
'MaximumConcurrency': 50
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
response = lambda_client.create_event_source_mapping(**event_source_mapping)
|
||||
print(f"SQS trigger configured successfully for {function_name}")
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == 'ResourceConflictException':
|
||||
print(f"SQS trigger already exists for {function_name}. Updating configuration...")
|
||||
# If you want to update existing trigger, you'd need to list existing mappings and update them
|
||||
# This is left as an exercise as it requires additional error handling and logic
|
||||
else:
|
||||
raise e
|
||||
"""Placeholder for backward compatibility. Redis deployment uses no SQS trigger."""
|
||||
return
|
||||
|
||||
@retry_with_backoff()
|
||||
def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy):
|
||||
@@ -219,13 +201,6 @@ def deploy_lambda():
|
||||
policy = get_lambda_policy()
|
||||
create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy)
|
||||
|
||||
# Configure SQS trigger
|
||||
queue_arn = os.getenv('SQS_QUEUE_ARN') # Make sure to set this environment variable
|
||||
if queue_arn:
|
||||
configure_sqs_trigger(lambda_client, LAMBDA_NAME, queue_arn)
|
||||
else:
|
||||
print("Warning: SQS_QUEUE_ARN not set. Skipping SQS trigger configuration.")
|
||||
|
||||
print("Lambda deployment completed successfully!")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import os
|
||||
|
||||
# SQS Configuration
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
|
||||
# Redis Configuration
|
||||
REDIS_URL = os.environ["REDIS_URL"]
|
||||
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
|
||||
|
||||
# Logging Configuration
|
||||
LOG_LEVEL = os.environ.get('LOG_LEVEL', 'INFO')
|
||||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
|
||||
|
||||
# RSS Feed Processing Configuration
|
||||
MAX_ARTICLES_PER_FEED = int(os.environ.get('MAX_ARTICLES_PER_FEED', '10'))
|
||||
FEED_PROCESSING_TIMEOUT = int(os.environ.get('FEED_PROCESSING_TIMEOUT', '90'))
|
||||
MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10"))
|
||||
FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90"))
|
||||
|
||||
# Article Extraction Configuration
|
||||
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get('ARTICLE_EXTRACTION_TIMEOUT', '30'))
|
||||
ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30"))
|
||||
|
||||
@@ -2,7 +2,6 @@ import boto3
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
from random import randint
|
||||
from datetime import datetime
|
||||
|
||||
from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
|
||||
@@ -10,10 +9,8 @@ from analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize
|
||||
logger = logging.getLogger()
|
||||
|
||||
s3 = boto3.client('s3')
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
|
||||
CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))
|
||||
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME")
|
||||
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||
|
||||
##### Article Storage #####
|
||||
@@ -51,8 +48,6 @@ def pinecone_save_article(article:dict):
|
||||
logger.info("Upserting article to Pinecone")
|
||||
upsert_vectors(index, data, namespace)
|
||||
|
||||
def dynamodb_save_article(article:dict):
|
||||
pass
|
||||
|
||||
def s3_save_article(article:dict):
|
||||
logger.info("Saving article to S3")
|
||||
@@ -94,14 +89,20 @@ def s3_save_article(article:dict):
|
||||
|
||||
|
||||
###### Feed Storage ######
|
||||
def update_rss_feed(feed:dict, last_pub_dt:int):
|
||||
RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json")
|
||||
|
||||
|
||||
def update_rss_feed(feed: dict, last_pub_dt: int):
|
||||
try:
|
||||
table = dynamodb.Table(DYNAMODB_TABLE)
|
||||
table.update_item(
|
||||
Key={'url': feed['u']},
|
||||
UpdateExpression='SET dt = :val',
|
||||
ExpressionAttributeValues={':val': last_pub_dt}
|
||||
)
|
||||
logger.info(f"Updated RSS feed in DynamoDB: {feed['u']} with dt: {feed['dt']}")
|
||||
if not os.path.exists(RSS_FEEDS_FILE):
|
||||
return
|
||||
with open(RSS_FEEDS_FILE, "r") as f:
|
||||
feeds = json.load(f)
|
||||
for item in feeds:
|
||||
if item.get("u") == feed["u"]:
|
||||
item["dt"] = int(last_pub_dt)
|
||||
with open(RSS_FEEDS_FILE, "w") as f:
|
||||
json.dump(feeds, f)
|
||||
logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RSS feed: {str(e)}")
|
||||
@@ -1,75 +1,60 @@
|
||||
import json
|
||||
import time
|
||||
from feed_processor import process_feed
|
||||
import os
|
||||
import redis
|
||||
from feed_processor import extract_feed
|
||||
from data_storage import save_article, update_rss_feed
|
||||
from utils import setup_logging
|
||||
from config import SQS_QUEUE_URL
|
||||
from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError
|
||||
from metrics import record_processed_articles, record_processing_time, record_extraction_errors
|
||||
import boto3
|
||||
import os
|
||||
from feed_processor import extract_feed
|
||||
from config import REDIS_URL, REDIS_QUEUE_NAME
|
||||
from exceptions import RSSProcessingError, DataStorageError
|
||||
from metrics import (
|
||||
record_processed_articles,
|
||||
record_processing_time,
|
||||
record_extraction_errors,
|
||||
)
|
||||
|
||||
# Set up logging
|
||||
logger = setup_logging()
|
||||
storage_strategy = os.environ.get("STORAGE_STRATEGY")
|
||||
redis_client = redis.Redis.from_url(REDIS_URL)
|
||||
|
||||
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||
|
||||
# Initialize AWS clients
|
||||
sqs = boto3.client('sqs')
|
||||
|
||||
def lambda_handler(event, context):
|
||||
logger.info("Starting RSS feed processing")
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Receive message from SQS
|
||||
event_source = event["Records"][0]["eventSource"]
|
||||
if event_source == "aws:sqs":
|
||||
feed = event["Records"][0]["body"]
|
||||
logger.info(f"Received message from SQS: {feed}")
|
||||
feed = json.loads(feed)
|
||||
feed_data = redis_client.rpop(REDIS_QUEUE_NAME)
|
||||
if not feed_data:
|
||||
logger.info("No messages in queue")
|
||||
return {"statusCode": 200, "body": json.dumps("No feeds to process")}
|
||||
feed = json.loads(feed_data)
|
||||
|
||||
receipt_handle = event["Records"][0]['receiptHandle']
|
||||
|
||||
# Process the feed
|
||||
result = extract_feed(feed)
|
||||
logger.info(f"Process Feed Result Dictionary: {result}")
|
||||
last_pub_dt = result['max_date']
|
||||
last_pub_dt = result["max_date"]
|
||||
|
||||
if result:
|
||||
# Save articles and update feed
|
||||
for article in result['articles']:
|
||||
for article in result["articles"]:
|
||||
try:
|
||||
save_article(article, storage_strategy)
|
||||
except DataStorageError as e:
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
record_extraction_errors(1)
|
||||
|
||||
update_rss_feed(result['feed'], last_pub_dt)
|
||||
|
||||
# Delete the message from the queue
|
||||
logger.info("Deleting sqs queue message")
|
||||
try:
|
||||
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting message from SQS: {str(e)}")
|
||||
logger.info("We can skip this but delete this block of code if it fails. This means the queue is already deleted when it triggers.")
|
||||
update_rss_feed(result["feed"], last_pub_dt)
|
||||
logger.info(f"Processed feed: {feed['u']}")
|
||||
|
||||
# Record metrics
|
||||
record_processed_articles(len(result['articles']))
|
||||
record_processed_articles(len(result["articles"]))
|
||||
else:
|
||||
logger.warning(f"Failed to process feed: {feed['u']}")
|
||||
record_extraction_errors(1)
|
||||
|
||||
except RSSProcessingError as e:
|
||||
logger.error(f"RSS Processing Error: {str(e)}")
|
||||
return {'statusCode': 500, 'body': json.dumps('RSS processing failed')}
|
||||
return {"statusCode": 500, "body": json.dumps("RSS processing failed")}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error: {str(e)}")
|
||||
return {'statusCode': 500, 'body': json.dumps('An unexpected error occurred')}
|
||||
return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")}
|
||||
|
||||
finally:
|
||||
end_time = time.time()
|
||||
@@ -77,7 +62,4 @@ def lambda_handler(event, context):
|
||||
record_processing_time(processing_time)
|
||||
logger.info(f"Lambda execution time: {processing_time:.2f} seconds")
|
||||
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps('RSS feed processed successfully')
|
||||
}
|
||||
return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")}
|
||||
|
||||
@@ -49,20 +49,12 @@ def deploy_sqs_filler():
|
||||
'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'SqsQueueUrl',
|
||||
'ParameterValue': os.getenv('SQS_QUEUE_URL')
|
||||
'ParameterKey': 'RedisUrl',
|
||||
'ParameterValue': os.getenv('REDIS_URL')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'DynamoDbTableName',
|
||||
'ParameterValue': os.getenv('DYNAMODB_TABLE_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'DynamoDbTableArn',
|
||||
'ParameterValue': os.getenv('DYNAMODB_TABLE_ARN')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'SqsQueueArn',
|
||||
'ParameterValue': os.getenv('SQS_QUEUE_ARN')
|
||||
'ParameterKey': 'RedisQueueName',
|
||||
'ParameterValue': os.getenv('REDIS_QUEUE_NAME')
|
||||
},
|
||||
{
|
||||
'ParameterKey': 'LambdaCodeS3Bucket',
|
||||
|
||||
@@ -1,57 +1,39 @@
|
||||
import json
|
||||
import os
|
||||
import boto3
|
||||
from decimal import Decimal
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import redis
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel("INFO")
|
||||
|
||||
dynamodb = boto3.resource('dynamodb')
|
||||
sqs = boto3.client('sqs')
|
||||
REDIS_URL = os.environ["REDIS_URL"]
|
||||
REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue")
|
||||
RSS_FEEDS_FILE = os.environ.get("RSS_FEEDS_FILE", "rss_feeds.json")
|
||||
|
||||
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
|
||||
DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME']
|
||||
|
||||
class DecimalEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, Decimal):
|
||||
return int(obj)
|
||||
return super(DecimalEncoder, self).default(obj)
|
||||
redis_client = redis.Redis.from_url(REDIS_URL)
|
||||
|
||||
def handler(event, context):
|
||||
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
|
||||
messages_sent = 0
|
||||
|
||||
# Scan the DynamoDB table
|
||||
response = table.scan()
|
||||
|
||||
for item in response['Items']:
|
||||
rss_url = item.get('url')
|
||||
rss_dt = item.get('dt')
|
||||
|
||||
logger.debug(f"Processing RSS feed: {rss_url}")
|
||||
logger.debug(f"Last published date: {rss_dt}")
|
||||
|
||||
if rss_url:
|
||||
message = {
|
||||
'u': rss_url,
|
||||
'dt': rss_dt
|
||||
}
|
||||
logger.debug("message", message)
|
||||
try:
|
||||
sqs.send_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MessageBody=json.dumps(message, cls=DecimalEncoder)
|
||||
)
|
||||
with open(RSS_FEEDS_FILE, "r") as f:
|
||||
feeds = json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load RSS feed file: {e}")
|
||||
return {"statusCode": 500, "body": "Failed to load feeds"}
|
||||
|
||||
for feed in feeds:
|
||||
message = {"u": feed.get("u"), "dt": feed.get("dt")}
|
||||
try:
|
||||
redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message))
|
||||
messages_sent += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending message to SQS: {str(e)}")
|
||||
|
||||
logger.info(f"Sent {messages_sent} messages to SQS at {datetime.now().isoformat()}")
|
||||
logger.error(f"Error pushing message to Redis: {e}")
|
||||
|
||||
logger.info(
|
||||
f"Sent {messages_sent} messages to Redis at {datetime.now().isoformat()}"
|
||||
)
|
||||
return {
|
||||
'statusCode': 200,
|
||||
'body': json.dumps(f'Sent {messages_sent} RSS URLs to SQS')
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(f"Sent {messages_sent} RSS URLs to Redis"),
|
||||
}
|
||||
@@ -29,14 +29,9 @@ def update_env_vars(function_name):
|
||||
'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'),
|
||||
'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'),
|
||||
|
||||
# DynamoDB Configuration
|
||||
'DYNAMODB_TABLE_NAME': os.environ.get('DYNAMODB_TABLE_NAME'),
|
||||
'DYNAMODB_TABLE_ARN': os.environ.get('DYNAMODB_TABLE_ARN'),
|
||||
|
||||
# SQS Configuration
|
||||
'SQS_QUEUE_NAME': os.environ.get('SQS_QUEUE_NAME'),
|
||||
'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'),
|
||||
'SQS_QUEUE_ARN': os.environ.get('SQS_QUEUE_ARN'),
|
||||
# Redis Configuration
|
||||
'REDIS_URL': os.environ.get('REDIS_URL'),
|
||||
'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'),
|
||||
|
||||
# Queue Filler Lambda Configuration
|
||||
'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'),
|
||||
|
||||
@@ -62,8 +62,8 @@ def main():
|
||||
env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}"
|
||||
env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}"
|
||||
env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}"
|
||||
env_vars["DYNAMODB_TABLE_NAME"] = get_env_value("DYNAMODB_TABLE_NAME", "Enter DynamoDB Table Name:", options=["rss-feeds-table", "custom-rss-table"], advanced=advanced_mode)
|
||||
env_vars["SQS_QUEUE_NAME"] = get_env_value("SQS_QUEUE_NAME", "Enter SQS Queue Name:", options=["rss-feed-queue", "custom-rss-queue"], advanced=advanced_mode)
|
||||
env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode)
|
||||
env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode)
|
||||
|
||||
# Advanced Configuration
|
||||
env_vars["LAMBDA_LAYER_VERSION"] = 3
|
||||
@@ -72,9 +72,6 @@ def main():
|
||||
env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}"
|
||||
env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}"
|
||||
env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode)
|
||||
env_vars["SQS_QUEUE_URL"] = f"https://sqs.{env_vars['AWS_REGION']}.amazonaws.com/{env_vars['AWS_ACCOUNT_ID']}/{env_vars['SQS_QUEUE_NAME']}"
|
||||
env_vars["SQS_QUEUE_ARN"] = f"arn:aws:sqs:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:{env_vars['SQS_QUEUE_NAME']}"
|
||||
env_vars["DYNAMODB_TABLE_ARN"] = f"arn:aws:dynamodb:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:table/{env_vars['DYNAMODB_TABLE_NAME']}"
|
||||
|
||||
env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode)
|
||||
env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}"
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -20,16 +20,13 @@ def check_env() -> None:
|
||||
"LAMBDA_EXECUTION_ROLE_NAME",
|
||||
"LAMBDA_ROLE_ARN",
|
||||
"S3_BUCKET_NAME",
|
||||
"DYNAMODB_TABLE_NAME",
|
||||
"SQS_QUEUE_NAME",
|
||||
"REDIS_URL",
|
||||
"REDIS_QUEUE_NAME",
|
||||
"LAMBDA_LAYER_VERSION",
|
||||
"LAMBDA_LAYER_NAME",
|
||||
"LAMBDA_LAYER_ARN",
|
||||
"S3_LAYER_BUCKET_NAME",
|
||||
"S3_LAYER_KEY_NAME",
|
||||
"SQS_QUEUE_URL",
|
||||
"SQS_QUEUE_ARN",
|
||||
"DYNAMODB_TABLE_ARN",
|
||||
"PYTHON_VERSION",
|
||||
"LAMBDA_RUNTIME",
|
||||
"LAMBDA_TIMEOUT",
|
||||
|
||||
@@ -13,8 +13,8 @@ STACK_BASE=${LAMBDA_FUNCTION_NAME}
|
||||
LAMBDA_EXECUTION_ROLE_NAME=rss-feed-processor-role-${AWS_REGION}
|
||||
LAMBDA_ROLE_ARN=arn:aws:iam::${AWS_ACCOUNT_ID}:role/${LAMBDA_EXECUTION_ROLE_NAME}
|
||||
S3_BUCKET_NAME=open-rss-articles-${AWS_REGION}
|
||||
DYNAMODB_TABLE_NAME=rss-feeds-table
|
||||
SQS_QUEUE_NAME=rss-feed-queue
|
||||
REDIS_URL=redis://localhost:6379
|
||||
REDIS_QUEUE_NAME=rss-feed-queue
|
||||
|
||||
|
||||
LAMBDA_LAYER_VERSION=6 # This is fixed.
|
||||
@@ -25,10 +25,6 @@ S3_LAMBDA_ZIPPED_BUCKET_NAME=open-rss-lambda-${AWS_REGION}
|
||||
|
||||
S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION}
|
||||
S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies
|
||||
|
||||
SQS_QUEUE_URL=https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SQS_QUEUE_NAME}
|
||||
SQS_QUEUE_ARN=arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SQS_QUEUE_NAME}
|
||||
DYNAMODB_TABLE_ARN=arn:aws:dynamodb:${AWS_REGION}:${AWS_ACCOUNT_ID}:table/${DYNAMODB_TABLE_NAME}
|
||||
PYTHON_VERSION=3.12
|
||||
LAMBDA_RUNTIME=python${PYTHON_VERSION}
|
||||
LAMBDA_TIMEOUT=300
|
||||
|
||||
Reference in New Issue
Block a user