mirror of
https://github.com/aljazceru/IngestRSS.git
synced 2025-12-17 22:14:20 +01:00
Sunday Error Patching
This commit is contained in:
@@ -28,6 +28,7 @@ Resources:
|
|||||||
Properties:
|
Properties:
|
||||||
Name: rss-feed-processor-schedule
|
Name: rss-feed-processor-schedule
|
||||||
Description: Runs the RSS Feed Processor Lambda function every hour
|
Description: Runs the RSS Feed Processor Lambda function every hour
|
||||||
|
State: DISABLED
|
||||||
ScheduleExpression: rate(30 minutes)
|
ScheduleExpression: rate(30 minutes)
|
||||||
FlexibleTimeWindow:
|
FlexibleTimeWindow:
|
||||||
Mode: FLEXIBLE
|
Mode: FLEXIBLE
|
||||||
|
|||||||
@@ -6,24 +6,21 @@ from src.utils.retry_logic import retry_with_backoff
|
|||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
load_dotenv()
|
load_dotenv(override=True)
|
||||||
|
|
||||||
region_name = os.getenv("AWS_REGION")
|
kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION"))
|
||||||
kms_client = boto3.client('kms', region_name=region_name)
|
|
||||||
stack_base = os.getenv("STACK_BASE")
|
stack_base = os.getenv("STACK_BASE")
|
||||||
|
|
||||||
@retry_with_backoff()
|
@retry_with_backoff()
|
||||||
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
|
def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]):
|
||||||
cf_client = boto3.client('cloudformation')
|
cf_client = boto3.client('cloudformation', region_name=os.getenv("AWS_REGION"))
|
||||||
stack_name = f"{stack_base}-{stack_suffix}"
|
stack_name = f"{stack_base}-{stack_suffix}"
|
||||||
|
|
||||||
|
|
||||||
with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
|
with open(f'src/infra/cloudformation/{template_file}', 'r') as file:
|
||||||
template_body = file.read()
|
template_body = file.read()
|
||||||
|
|
||||||
capabilities = ['CAPABILITY_NAMED_IAM']
|
capabilities = ['CAPABILITY_NAMED_IAM']
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if force_recreate:
|
if force_recreate:
|
||||||
try:
|
try:
|
||||||
@@ -70,7 +67,7 @@ def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, par
|
|||||||
|
|
||||||
def get_or_create_kms_key():
|
def get_or_create_kms_key():
|
||||||
# Create a KMS client
|
# Create a KMS client
|
||||||
kms_client = boto3.client('kms', region_name=region_name)
|
kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION"))
|
||||||
tag_key = 'purpose'
|
tag_key = 'purpose'
|
||||||
tag_value = 'You pass butter'
|
tag_value = 'You pass butter'
|
||||||
description = 'KMS key for RSS Feed Processor... Oh my god'
|
description = 'KMS key for RSS Feed Processor... Oh my god'
|
||||||
@@ -85,7 +82,7 @@ def get_or_create_kms_key():
|
|||||||
for key in response['Keys']:
|
for key in response['Keys']:
|
||||||
try:
|
try:
|
||||||
tags = kms_client.list_resource_tags(KeyId=key['KeyId'])['Tags']
|
tags = kms_client.list_resource_tags(KeyId=key['KeyId'])['Tags']
|
||||||
if any(tag['TagKey'] == tag_key and tag['TagValue'] == tag_value for tag in tags):
|
if any(tag['TagKey'] == tag_key and tag['TagValue'] == tag_value for tag in tags) and any(tag['TagKey'] == 'region' and tag['TagValue'] == os.getenv("AWS_REGION") for tag in tags): # TODO: This is inefficient and should be fixed and more readable.
|
||||||
print(f"Found existing KMS key with ID: {key['KeyId']}")
|
print(f"Found existing KMS key with ID: {key['KeyId']}")
|
||||||
return key['KeyId']
|
return key['KeyId']
|
||||||
except ClientError:
|
except ClientError:
|
||||||
@@ -120,7 +117,7 @@ def get_or_create_kms_key():
|
|||||||
Description=description,
|
Description=description,
|
||||||
KeyUsage='ENCRYPT_DECRYPT',
|
KeyUsage='ENCRYPT_DECRYPT',
|
||||||
Origin='AWS_KMS',
|
Origin='AWS_KMS',
|
||||||
Tags=[{'TagKey': tag_key, 'TagValue': tag_value}],
|
Tags=[{'TagKey': tag_key, 'TagValue': tag_value}, {'TagKey': 'region', 'TagValue': os.getenv("AWS_REGION")}],
|
||||||
Policy=json.dumps(key_policy)
|
Policy=json.dumps(key_policy)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import time
|
|||||||
import sys
|
import sys
|
||||||
from src.infra.deploy_infrastructure import get_or_create_kms_key
|
from src.infra.deploy_infrastructure import get_or_create_kms_key
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
load_dotenv()
|
load_dotenv(override=True)
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
|
logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO'))
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ def extract_feed_threading(rss: dict, output_queue, stop_thread):
|
|||||||
for entry in feed['entries']:
|
for entry in feed['entries']:
|
||||||
if stop_thread.is_set():
|
if stop_thread.is_set():
|
||||||
break
|
break
|
||||||
|
|
||||||
pub_date = parse_pub_date(entry['published'])
|
pub_date = parse_pub_date(entry['published'])
|
||||||
|
|
||||||
if pub_date > last_date:
|
if pub_date > last_date:
|
||||||
@@ -110,11 +110,20 @@ def extract_feed(rss: dict):
|
|||||||
logger.error(f"Feed: {entry}")
|
logger.error(f"Feed: {entry}")
|
||||||
logger.error(f"Feed failed due to error: {e}")
|
logger.error(f"Feed failed due to error: {e}")
|
||||||
|
|
||||||
def parse_pub_date(date_string):
|
def parse_pub_date(entry:dict):
|
||||||
try:
|
|
||||||
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
|
if 'published' in entry:
|
||||||
except ValueError:
|
date_string = entry['published']
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
|
return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp())
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return int(parser.parse(date_string).timestamp())
|
try:
|
||||||
|
return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp())
|
||||||
|
except ValueError:
|
||||||
|
try:
|
||||||
|
return int(parser.parse(date_string).timestamp())
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return int(datetime.now().timestamp()) # Return current time if no date is found
|
||||||
@@ -6,7 +6,7 @@ from dotenv import load_dotenv
|
|||||||
from src.infra.deploy_infrastructure import deploy_cloudformation
|
from src.infra.deploy_infrastructure import deploy_cloudformation
|
||||||
|
|
||||||
# Load environment variables
|
# Load environment variables
|
||||||
load_dotenv()
|
load_dotenv(override=True)
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
|
|
||||||
|
|||||||
15
todo.md
15
todo.md
@@ -1,13 +1,19 @@
|
|||||||
# Testing🧪
|
# Before Public Launch
|
||||||
* Testing from 3rd party aws account.
|
* Testing from 3rd party aws account.
|
||||||
|
* Fix Issue with KMS Keys & IAM Role [ Done ]
|
||||||
|
* Debug the Errors that are at scale.
|
||||||
|
|
||||||
* Test Large Amounts of Feeds ( Decrease the cadence of ingesting. ) [ Today ]
|
* Test Large Amounts of Feeds ( Decrease the cadence of ingesting. ) [ Today ]
|
||||||
* Test out how long an S3 Full Pull will take on the
|
* Test out how long an S3 Full Pull will take on the full thing.
|
||||||
** First Run
|
** First Run ( ~30 Minutes)
|
||||||
** Second Run.
|
** Second Run.
|
||||||
* Test out Vector Databases at Small Scale.
|
* Test out Vector Databases at Small Scale.
|
||||||
* Test out Vector Databases at Scale.
|
* Test out Vector Databases at Scale.
|
||||||
* Test out LLM Summarizaiton At Small Scale
|
* Test out LLM Summarizaiton At Small Scale
|
||||||
* Test out LLM Summarization At Scall
|
* Test out LLM Summarization At Scall
|
||||||
|
|
||||||
|
* Re-enable the Scheduler
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Application Modules
|
# Application Modules
|
||||||
@@ -18,6 +24,7 @@
|
|||||||
* AWS Budget, Pinecone Budget, & LLM Budget
|
* AWS Budget, Pinecone Budget, & LLM Budget
|
||||||
* Integration with bumblebee (Easily Handle standardization with embedding models & LLMs)
|
* Integration with bumblebee (Easily Handle standardization with embedding models & LLMs)
|
||||||
* Visualization System ( Ingesting, Clustering, etc...)
|
* Visualization System ( Ingesting, Clustering, etc...)
|
||||||
|
* API Infrastructure.
|
||||||
|
|
||||||
# Misc
|
# Misc
|
||||||
* Duplicate Article Check Module.
|
* Duplicate Article Check Module.
|
||||||
|
|||||||
Reference in New Issue
Block a user