mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-18 14:34:26 +01:00
starting with rich text library and 🪲fixes for main lambda.
This commit is contained in:
@@ -69,6 +69,7 @@ def extract_feed(rss: dict, output_queue, stop_thread):
|
||||
}
|
||||
output_queue.put(output)
|
||||
except Exception as e:
|
||||
logger.error(f"Feed: {entry}")
|
||||
logger.error(f"Feed failed due to error: {e}")
|
||||
|
||||
def parse_pub_date(date_string):
|
||||
|
||||
@@ -7,34 +7,33 @@ from config import SQS_QUEUE_URL
|
||||
from exceptions import RSSProcessingError, ArticleExtractionError, DataStorageError
|
||||
from metrics import record_processed_articles, record_processing_time, record_extraction_errors
|
||||
import boto3
|
||||
import os
|
||||
|
||||
# Set up logging
|
||||
logger = setup_logging()
|
||||
|
||||
storage_strategy = os.environ.get('STORAGE_STRATEGY')
|
||||
|
||||
# Initialize AWS clients
|
||||
sqs = boto3.client('sqs')
|
||||
|
||||
def lambda_handler(event, context):
|
||||
logger.info("Starting RSS feed processing")
|
||||
print("starting rss feed, delete this later.")
|
||||
start_time = time.time()
|
||||
|
||||
|
||||
|
||||
try:
|
||||
# Receive message from SQS
|
||||
response = sqs.receive_message(
|
||||
QueueUrl=SQS_QUEUE_URL,
|
||||
MaxNumberOfMessages=1,
|
||||
WaitTimeSeconds=0
|
||||
)
|
||||
logger.debug("SQS Response: ", response)
|
||||
|
||||
if 'Messages' not in response:
|
||||
logger.info("No messages in queue")
|
||||
return {'statusCode': 200, 'body': json.dumps('No RSS feeds to process')}
|
||||
|
||||
message = response['Messages'][0]
|
||||
receipt_handle = message['ReceiptHandle']
|
||||
feed = json.loads(message['Body'])
|
||||
event_source = event["Records"][0]["eventSource"]
|
||||
if event_source == "aws:sqs":
|
||||
feed = event["Records"][0]["body"]
|
||||
logger.info(f"Received message from SQS: {feed}")
|
||||
feed = json.loads(feed)
|
||||
|
||||
|
||||
|
||||
receipt_handle = event["Records"][0]['receiptHandle']
|
||||
|
||||
# Process the feed
|
||||
result = process_feed(feed)
|
||||
@@ -45,7 +44,7 @@ def lambda_handler(event, context):
|
||||
# Save articles and update feed
|
||||
for article in result['articles']:
|
||||
try:
|
||||
save_article(article)
|
||||
save_article(article, storage_strategy)
|
||||
except DataStorageError as e:
|
||||
logger.error(f"Failed to save article: {str(e)}")
|
||||
record_extraction_errors(1)
|
||||
@@ -54,7 +53,11 @@ def lambda_handler(event, context):
|
||||
|
||||
# Delete the message from the queue
|
||||
logger.info("Deleting sqs queue message")
|
||||
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
|
||||
try:
|
||||
sqs.delete_message(QueueUrl=SQS_QUEUE_URL, ReceiptHandle=receipt_handle)
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting message from SQS: {str(e)}")
|
||||
logger.info("We can skip this but delete this block of code if it fails. This means the queue is already deleted when it triggers.")
|
||||
logger.info(f"Processed feed: {feed['u']}")
|
||||
|
||||
# Record metrics
|
||||
|
||||
@@ -4,7 +4,7 @@ import hashlib
|
||||
|
||||
def setup_logging():
|
||||
logger = logging.getLogger()
|
||||
log_level = "DEBUG"
|
||||
log_level = os.environ.get('LOG_LEVEL', 'INFO')
|
||||
logger.setLevel(logging.getLevelName(log_level))
|
||||
return logger
|
||||
|
||||
|
||||
@@ -13,7 +13,8 @@ def update_env_vars(function_name):
|
||||
'SQS_QUEUE_URL': os.environ.get('SQS_QUEUE_URL'),
|
||||
'CONTENT_BUCKET': os.environ.get('S3_BUCKET_NAME'),
|
||||
'DYNAMODB_TABLE': os.environ.get('DYNAMODB_TABLE_NAME'),
|
||||
'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO')
|
||||
'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO'),
|
||||
'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY')
|
||||
}
|
||||
|
||||
return lambda_client.update_function_configuration(
|
||||
|
||||
45
src/launch/launch_cmd.py
Normal file
45
src/launch/launch_cmd.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import os
|
||||
from prompt_toolkit import prompt
|
||||
from prompt_toolkit.completion import WordCompleter
|
||||
from rich.console import Console
|
||||
from rich.text import Text
|
||||
from time import sleep
|
||||
|
||||
console = Console()
|
||||
|
||||
def animate_text(text):
|
||||
for char in text:
|
||||
console.print(char, end='', style="bold green")
|
||||
sleep(0.05)
|
||||
print()
|
||||
|
||||
def set_env_var(name, value):
|
||||
os.environ[name] = value
|
||||
animate_text(f"Environment variable {name} set to {value}")
|
||||
|
||||
def list_env_vars():
|
||||
animate_text("Current environment variables:")
|
||||
for key, value in os.environ.items():
|
||||
console.print(f"{key}: {value}")
|
||||
|
||||
def main():
|
||||
while True:
|
||||
action = prompt(
|
||||
"Choose an action (set/list/quit): ",
|
||||
completer=WordCompleter(['set', 'list', 'quit'])
|
||||
)
|
||||
|
||||
if action == 'set':
|
||||
name = prompt("Enter variable name: ")
|
||||
value = prompt("Enter variable value: ")
|
||||
set_env_var(name, value)
|
||||
elif action == 'list':
|
||||
list_env_vars()
|
||||
elif action == 'quit':
|
||||
animate_text("Goodbye!")
|
||||
break
|
||||
else:
|
||||
console.print("Invalid action. Please try again.", style="bold red")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
9
todo.md
9
todo.md
@@ -1,3 +1,6 @@
|
||||
# TODO: Fix nonetype error.
|
||||
# TODO: Try to fix forbiden url issue.
|
||||
|
||||
|
||||
# TODO: Add in console setup python script for new project into launch.py
|
||||
# TODO: Eventbridge set up ( make sure this works )
|
||||
@@ -15,6 +18,12 @@
|
||||
* Semantic Storage Module
|
||||
* API Module ( Semantic Search, Retrieval )
|
||||
|
||||
# Future Use Cases
|
||||
* Betting Market Prediction
|
||||
* Financial Market Predicitions
|
||||
* News Aggregation
|
||||
* News Letter Tooling
|
||||
|
||||
# Over-caffeineted Ideas
|
||||
* Make it solarpunk themed.
|
||||
* Write a serverless manifesto for personal projects and where you would like to see the serverless world go.
|
||||
|
||||
Reference in New Issue
Block a user