diff --git a/docker-compose.yml b/docker-compose.yml index af38a83..f25c76a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.8' services: redis: image: redis:7 @@ -20,22 +19,6 @@ services: interval: 10s timeout: 5s retries: 5 - minio: - image: minio/minio - command: server /data --console-address ":9001" - environment: - MINIO_ROOT_USER: minioadmin - MINIO_ROOT_PASSWORD: minioadmin - ports: - - "9000:9000" - - "9001:9001" - volumes: - - minio-data:/data - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] - interval: 30s - timeout: 20s - retries: 5 worker: build: context: . @@ -45,19 +28,15 @@ services: condition: service_healthy mongodb: condition: service_healthy - minio: - condition: service_healthy environment: REDIS_URL: redis://redis:6379 REDIS_QUEUE_NAME: rss-feed-queue MONGODB_URL: mongodb://mongodb:27017 - MONGODB_DB_NAME: ingestrss - MONGODB_COLLECTION_NAME: rss_feeds - MINIO_ENDPOINT: minio:9000 - MINIO_ACCESS_KEY: minioadmin - MINIO_SECRET_KEY: minioadmin - MINIO_BUCKET: ingestrss - STORAGE_STRATEGY: s3 + MONGODB_ARTICLES_DB_NAME: articles_db + MONGODB_ARTICLES_COLLECTION_NAME: articles + MONGODB_FEEDS_DB_NAME: feeds_db + MONGODB_FEEDS_COLLECTION_NAME: rss_feeds + STORAGE_STRATEGY: mongodb LOG_LEVEL: INFO scheduler: build: @@ -72,9 +51,25 @@ services: REDIS_URL: redis://redis:6379 REDIS_QUEUE_NAME: rss-feed-queue MONGODB_URL: mongodb://mongodb:27017 - MONGODB_DB_NAME: ingestrss - MONGODB_COLLECTION_NAME: rss_feeds + MONGODB_ARTICLES_DB_NAME: articles_db + MONGODB_ARTICLES_COLLECTION_NAME: articles + MONGODB_FEEDS_DB_NAME: feeds_db + MONGODB_FEEDS_COLLECTION_NAME: rss_feeds LOG_LEVEL: INFO + dashboard: + build: + context: . + dockerfile: src/dashboard/Dockerfile + depends_on: + mongodb: + condition: service_healthy + environment: + MONGODB_URL: mongodb://mongodb:27017 + MONGODB_FEEDS_DB_NAME: feeds_db + MONGODB_FEEDS_COLLECTION_NAME: rss_feeds + volumes: + - ./src/dashboard:/app + ports: + - "18000:8000" volumes: - mongo-data: - minio-data: \ No newline at end of file + mongo-data: \ No newline at end of file diff --git a/docker/scheduler/Dockerfile b/docker/scheduler/Dockerfile index 11500e6..fa451e0 100644 --- a/docker/scheduler/Dockerfile +++ b/docker/scheduler/Dockerfile @@ -3,4 +3,5 @@ WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY . . +ENV PYTHONPATH=/app CMD ["python", "local_services/scheduler.py"] diff --git a/docker/worker/Dockerfile b/docker/worker/Dockerfile index 4f729ac..8eb2543 100644 --- a/docker/worker/Dockerfile +++ b/docker/worker/Dockerfile @@ -3,4 +3,5 @@ WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY . . +ENV PYTHONPATH=/app CMD ["python", "local_services/worker.py"] diff --git a/local_services/scheduler.py b/local_services/scheduler.py index ad798f0..a17c9a3 100644 --- a/local_services/scheduler.py +++ b/local_services/scheduler.py @@ -4,8 +4,8 @@ import logging import importlib handler = importlib.import_module( - 'src.infra.lambdas.RSSQueueFiller.lambda.lambda_function' -).handler + 'src.feed_management.scheduler_main' +).scheduler_main logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ def main(): logger.info("Starting scheduler loop") while True: try: - handler({}, None) + handler() except Exception as exc: logger.exception("Scheduler job failed: %s", exc) time.sleep(INTERVAL_MINUTES * 60) diff --git a/local_services/worker.py b/local_services/worker.py index 4f834a6..ca35b0f 100644 --- a/local_services/worker.py +++ b/local_services/worker.py @@ -2,7 +2,7 @@ import os import time import logging -from src.infra.lambdas.RSSFeedProcessorLambda.src.lambda_function import lambda_handler +from src.feed_processing.worker_main import worker_main logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logger = logging.getLogger(__name__) @@ -13,7 +13,7 @@ def main(): logger.info("Starting worker loop") while True: try: - lambda_handler({}, None) + worker_main() except Exception as exc: logger.exception("Worker iteration failed: %s", exc) time.sleep(SLEEP_SECONDS) diff --git a/requirements.txt b/requirements.txt index 8b5b1e0..7217056 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,6 @@ -boto3==1.35.* pymongo==4.* python-dotenv==1.0.* requests==2.32.* -constructs==10.2.69 # Vector database and embedding libraries qdrant-client ollama @@ -14,4 +12,6 @@ schedule==1.* feedparser newspaper3k python-dateutil -lxml \ No newline at end of file +lxml +lxml[html_clean] +boto3 \ No newline at end of file diff --git a/src/analysis-toolkit/s3_object_creation_dates.png b/src/analysis-toolkit/s3_object_creation_dates.png deleted file mode 100644 index e0476ae..0000000 Binary files a/src/analysis-toolkit/s3_object_creation_dates.png and /dev/null differ diff --git a/src/analysis-toolkit/s3_object_ingestion.py b/src/analysis-toolkit/s3_object_ingestion.py deleted file mode 100644 index 176292d..0000000 --- a/src/analysis-toolkit/s3_object_ingestion.py +++ /dev/null @@ -1,58 +0,0 @@ -from minio import Minio -import os -import matplotlib.pyplot as plt -from datetime import datetime, timedelta -from collections import defaultdict - -def get_s3_object_creation_dates(bucket_name): - client = Minio( - os.getenv("MINIO_ENDPOINT"), - access_key=os.getenv("MINIO_ACCESS_KEY"), - secret_key=os.getenv("MINIO_SECRET_KEY"), - secure=False - ) - creation_dates = [] - - for obj in client.list_objects(bucket_name, recursive=True): - creation_dates.append(obj.last_modified.date()) - - return creation_dates - -def plot_creation_dates(dates): - # Count objects created on each date - date_counts = defaultdict(int) - for date in dates: - date_counts[date] += 1 - - # Sort dates and get counts - sorted_dates = sorted(date_counts.keys()) - counts = [date_counts[date] for date in sorted_dates] - - # Create the plot - plt.figure(figsize=(15, 8)) - bars = plt.bar(sorted_dates, counts) - plt.title('S3 Object Creation Dates') - plt.xlabel('Date') - plt.ylabel('Number of Objects Created') - plt.xticks(rotation=45, ha='right') - - # Label each bar with its height - for bar in bars: - height = bar.get_height() - plt.text(bar.get_x() + bar.get_width()/2., height, - f'{int(height)}', - ha='center', va='bottom') - - plt.tight_layout() - - # Save the plot - plt.savefig('s3_object_creation_dates.png', dpi=300, bbox_inches='tight') - print("Graph saved as 's3_object_creation_dates.png'") - -def main(): - bucket_name = os.getenv('MINIO_BUCKET') - dates = get_s3_object_creation_dates(bucket_name) - plot_creation_dates(dates) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/src/feed_management/upload_rss_feeds.py b/src/feed_management/upload_rss_feeds.py index 3e53eb1..3f7e500 100644 --- a/src/feed_management/upload_rss_feeds.py +++ b/src/feed_management/upload_rss_feeds.py @@ -6,6 +6,10 @@ import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +MONGODB_URL = os.getenv('MONGODB_URL', 'mongodb://localhost:27017') +MONGODB_FEEDS_DB_NAME = os.getenv('MONGODB_FEEDS_DB_NAME', 'feeds_db') +MONGODB_FEEDS_COLLECTION_NAME = os.getenv('MONGODB_FEEDS_COLLECTION_NAME', 'rss_feeds') + def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name): client = MongoClient(mongo_url) collection = client[db_name][collection_name] @@ -33,10 +37,7 @@ def upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name): ) if __name__ == "__main__": - mongo_url = os.getenv('MONGODB_URL', 'mongodb://localhost:27017') - db_name = os.getenv('MONGODB_DB_NAME', 'ingestrss') - collection_name = os.getenv('MONGODB_COLLECTION_NAME', 'rss_feeds') with open('rss_feeds.json') as f: rss_feeds = json.load(f) logger.info(f"Loaded RSS feeds: {rss_feeds}") - upload_rss_feeds(rss_feeds, mongo_url, db_name, collection_name) + upload_rss_feeds(rss_feeds, MONGODB_URL, MONGODB_FEEDS_DB_NAME, MONGODB_FEEDS_COLLECTION_NAME) diff --git a/src/infra/cloudformation/eventbridge.yaml b/src/infra/cloudformation/eventbridge.yaml deleted file mode 100644 index 57a7a58..0000000 --- a/src/infra/cloudformation/eventbridge.yaml +++ /dev/null @@ -1,38 +0,0 @@ -Parameters: - LambdaFunctionArn: - Type: String - Description: ARN of the RSS Feed Processor Lambda function - -Resources: - EventBridgeScheduleRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: scheduler.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: EventBridgeSchedulePolicy - PolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Action: lambda:InvokeFunction - Resource: !Ref LambdaFunctionArn - - EventBridgeSchedule: - Type: AWS::Scheduler::Schedule - Properties: - Name: rss-feed-processor-schedule - Description: Runs the RSS Feed Processor Lambda function every hour - State: ENABLED - ScheduleExpression: rate(240 minutes) - FlexibleTimeWindow: - Mode: FLEXIBLE - MaximumWindowInMinutes: 1 - Target: - Arn: !Ref LambdaFunctionArn - RoleArn: !GetAtt EventBridgeScheduleRole.Arn \ No newline at end of file diff --git a/src/infra/cloudformation/lambda_role.yaml b/src/infra/cloudformation/lambda_role.yaml deleted file mode 100644 index 58acd32..0000000 --- a/src/infra/cloudformation/lambda_role.yaml +++ /dev/null @@ -1,68 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'IAM Role for RSS Feed Processor Lambda Function with Environment Variable Encryption' - -Parameters: - LambdaExecutionRoleName: - Type: String - Description: "Name of the Lambda Execution Role" - LambdaKMSKeyArn: - Type: String - Description: "ARN of the KMS Key for Lambda environment variable encryption" - Region: - Type: String - Description: "AWS Region for deployment" - Default: "us-east-1" - -Resources: - LambdaExecutionRole: - Type: 'AWS::IAM::Role' - Properties: - RoleName: !Ref LambdaExecutionRoleName - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: - - lambda.amazonaws.com - Action: - - 'sts:AssumeRole' - ManagedPolicyArns: - - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - Policies: - - PolicyName: 'RSSFeedProcessorLambdaPolicy' - PolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Action: - - 'sqs:*' - - 's3:*' - - 'lambda:*' - - 'logs:*' - - 'xray:*' - - 'cloudwatch:*' - - 'events:*' - Resource: '*' - - Effect: Allow - Action: - - 'kms:Decrypt' - - 'kms:GenerateDataKey' - Resource: !Ref LambdaKMSKeyArn - -Outputs: - LambdaRoleArn: - Description: 'ARN of the Lambda Execution Role' - Value: !GetAtt LambdaExecutionRole.Arn - Export: - Name: !Sub '${AWS::StackName}-LambdaRoleArn' - LambdaKMSKeyArn: - Description: 'ARN of the KMS Key for Lambda' - Value: !Ref LambdaKMSKeyArn - Export: - Name: !Sub '${AWS::StackName}-LambdaKMSKeyArn' - Region: - Description: 'AWS Region for deployment' - Value: !Ref Region - Export: - Name: !Sub '${AWS::StackName}-Region' \ No newline at end of file diff --git a/src/infra/cloudformation/rss_lambda_stack.yaml b/src/infra/cloudformation/rss_lambda_stack.yaml deleted file mode 100644 index be04071..0000000 --- a/src/infra/cloudformation/rss_lambda_stack.yaml +++ /dev/null @@ -1,78 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: Redis Queue Filler Lambda Stack - -Parameters: - QueueFillerLambdaName: - Type: String - Description: Name of the Lambda function - RedisUrl: - Type: String - Description: URL of the Redis instance - RedisQueueName: - Type: String - Description: Name of the Redis queue - LambdaCodeS3Bucket: - Type: String - Description: S3 bucket containing the Lambda function code - LambdaCodeS3Key: - Type: String - Description: S3 key for the Lambda function code - LambdaRuntime: - Type: String - Description: Lambda runtime - Default: python3.12 - LambdaTimeout: - Type: Number - Description: Lambda timeout in seconds - Default: 300 - -Resources: - SqsFillerFunction: - Type: AWS::Lambda::Function - Properties: - FunctionName: !Ref QueueFillerLambdaName - Runtime: !Ref LambdaRuntime - Handler: lambda_function.handler - Code: - S3Bucket: !Ref LambdaCodeS3Bucket - S3Key: !Ref LambdaCodeS3Key - Timeout: !Ref LambdaTimeout - Environment: - Variables: - REDIS_URL: !Ref RedisUrl - REDIS_QUEUE_NAME: !Ref RedisQueueName - Role: !GetAtt SqsFillerFunctionRole.Arn - - SqsFillerFunctionRole: - Type: AWS::IAM::Role - Properties: - AssumeRolePolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Principal: - Service: lambda.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: LambdaExecutionPolicy - PolicyDocument: - Version: '2012-10-17' - Statement: - - Effect: Allow - Action: - - logs:CreateLogGroup - - logs:CreateLogStream - - logs:PutLogEvents - Resource: arn:aws:logs:*:*:* - - Effect: Allow - Action: - - s3:GetObject - Resource: !Sub arn:aws:s3:::${LambdaCodeS3Bucket}/${LambdaCodeS3Key} - -Outputs: - SqsFillerFunctionArn: - Description: ARN of the Queue Filler Lambda Function - Value: !GetAtt SqsFillerFunction.Arn - SqsFillerFunctionRoleArn: - Description: ARN of the IAM Role for Queue Filler Lambda Function - Value: !GetAtt SqsFillerFunctionRole.Arn \ No newline at end of file diff --git a/src/infra/cloudformation/s3.yaml b/src/infra/cloudformation/s3.yaml deleted file mode 100644 index fabcb0b..0000000 --- a/src/infra/cloudformation/s3.yaml +++ /dev/null @@ -1,26 +0,0 @@ -AWSTemplateFormatVersion: '2010-09-09' -Description: 'CloudFormation template for RSS Feed Processor S3 Bucket' - -Parameters: - BucketName: - Type: String - Description: "Name of the Lambda Execution Role" - -Resources: - ArticleContentBucket: - Type: AWS::S3::Bucket - Properties: - BucketName: !Ref BucketName - VersioningConfiguration: - Status: Enabled - BucketEncryption: - ServerSideEncryptionConfiguration: - - ServerSideEncryptionByDefault: - SSEAlgorithm: AES256 - -Outputs: - BucketName: - Description: 'Name of the S3 bucket for article content' - Value: !Ref ArticleContentBucket - Export: - Name: !Sub '${AWS::StackName}-ArticleContentBucketName' \ No newline at end of file diff --git a/src/infra/deploy_infrastructure.py b/src/infra/deploy_infrastructure.py deleted file mode 100644 index 4f40247..0000000 --- a/src/infra/deploy_infrastructure.py +++ /dev/null @@ -1,197 +0,0 @@ -import boto3 -import os -import sys -import json -from src.utils.retry_logic import retry_with_backoff -from botocore.exceptions import ClientError -from qdrant_client import QdrantClient, models - - - -from dotenv import load_dotenv -load_dotenv(override=True) - -kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION")) -stack_base = os.getenv("STACK_BASE") - -@retry_with_backoff() -def deploy_cloudformation(template_file, stack_suffix, force_recreate=False, parameters=[]): - cf_client = boto3.client('cloudformation', region_name=os.getenv("AWS_REGION")) - stack_name = f"{stack_base}-{stack_suffix}" - - with open(f'src/infra/cloudformation/{template_file}', 'r') as file: - template_body = file.read() - - capabilities = ['CAPABILITY_NAMED_IAM'] - - try: - if force_recreate: - try: - print(f"Deleting stack {stack_name} for recreation...") - cf_client.delete_stack(StackName=stack_name) - waiter = cf_client.get_waiter('stack_delete_complete') - waiter.wait(StackName=stack_name) - print(f"Stack {stack_name} deleted successfully.") - except ClientError: - print(f"Stack {stack_name} does not exist or is already deleted.") - - try: - stack = cf_client.describe_stacks(StackName=stack_name)['Stacks'][0] - print(f"Updating stack {stack_name}...") - cf_client.update_stack( - StackName=stack_name, - TemplateBody=template_body, - Capabilities=capabilities, - Parameters=parameters # Add parameters here - ) - waiter = cf_client.get_waiter('stack_update_complete') - waiter.wait(StackName=stack_name) - print(f"Stack {stack_name} updated successfully.") - except ClientError as e: - if 'does not exist' in str(e): - print(f"Creating stack {stack_name}...") - cf_client.create_stack( - StackName=stack_name, - TemplateBody=template_body, - Capabilities=capabilities, - Parameters=parameters # Add parameters here - ) - waiter = cf_client.get_waiter('stack_create_complete') - waiter.wait(StackName=stack_name) - print(f"Stack {stack_name} created successfully.") - elif 'No updates are to be performed' in str(e): - print(f"No updates needed for stack {stack_name}.") - else: - raise - - except ClientError as e: - print(f"Error handling stack {stack_name}: {str(e)}") - raise - -def get_or_create_kms_key(): - # Create a KMS client - kms_client = boto3.client('kms', region_name=os.getenv("AWS_REGION")) - tag_key = 'purpose' - tag_value = 'You pass butter' - description = 'KMS key for RSS Feed Processor... Oh my god' - - account_id = os.getenv('AWS_ACCOUNT_ID') - - try: - # List all KMS keys - response = kms_client.list_keys() - - # Check each key for the specified tag - for key in response['Keys']: - try: - tags = kms_client.list_resource_tags(KeyId=key['KeyId'])['Tags'] - if any(tag['TagKey'] == tag_key and tag['TagValue'] == tag_value for tag in tags) and any(tag['TagKey'] == 'region' and tag['TagValue'] == os.getenv("AWS_REGION") for tag in tags): # FIXME: This is inefficient and should be fixed and more readable. - print(f"Found existing KMS key with ID: {key['KeyId']}") - return key['KeyId'] - except ClientError: - continue - - # If no key found, create a new one with appropriate policy - print("No existing key found. Creating a new KMS key.") - key_policy = { - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "Enable IAM User Permissions", - "Effect": "Allow", - "Principal": {"AWS": f"arn:aws:iam::{account_id}:root"}, - "Action": "kms:*", - "Resource": "*" - }, - { - "Sid": "Allow Lambda to use the key", - "Effect": "Allow", - "Principal": {"Service": "lambda.amazonaws.com"}, - "Action": [ - "kms:Decrypt", - "kms:GenerateDataKey*" - ], - "Resource": "*" - } - ] - } - - response = kms_client.create_key( - Description=description, - KeyUsage='ENCRYPT_DECRYPT', - Origin='AWS_KMS', - Tags=[{'TagKey': tag_key, 'TagValue': tag_value}, {'TagKey': 'region', 'TagValue': os.getenv("AWS_REGION")}], - Policy=json.dumps(key_policy) - ) - - key_id = response['KeyMetadata']['KeyId'] - print(f"Successfully created new KMS key with ID: {key_id}") - - return key_id - - except ClientError as e: - print(f"Error in KMS key operation: {e}") - sys.exit(1) - - - -def deploy_infrastructure(): - # Do some stuff with KMS keys. - kms_key_id = get_or_create_kms_key() - - key_info = kms_client.describe_key(KeyId=kms_key_id) - kms_key_arn = key_info['KeyMetadata']['Arn'] - - - - deploy_cloudformation('s3.yaml', 'S3', - parameters=[ - { - 'ParameterKey': 'BucketName', - 'ParameterValue': os.getenv('S3_BUCKET_NAME') - } - ]) - deploy_cloudformation('s3.yaml', 'S3-zipped', - parameters=[ - { - 'ParameterKey': 'BucketName', - 'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') - } - ]) - deploy_cloudformation('lambda_role.yaml', 'Lambda', force_recreate=True, - parameters=[ - { - 'ParameterKey': 'LambdaExecutionRoleName', - 'ParameterValue': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME', 'default-role-name') - }, - { - 'ParameterKey': 'LambdaKMSKeyArn', - 'ParameterValue': kms_key_arn - } - ] - ) - - deploy_cloudformation('eventbridge.yaml', 'Schedule', - parameters=[ - { - 'ParameterKey': 'LambdaFunctionArn', - 'ParameterValue': f"arn:aws:lambda:{os.getenv('AWS_REGION')}:{os.getenv('AWS_ACCOUNT_ID')}:function:{os.getenv('QUEUE_FILLER_LAMBDA_NAME')}" - } - ]) - - if os.getenv("STORAGE_STRATEGY") == 'qdrant': - client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY")) - collection = os.getenv("QDRANT_COLLECTION_NAME") - embedding_dim = int(os.getenv("VECTOR_EMBEDDING_DIM")) - metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine").upper() - - existing = [c.name for c in client.get_collections().collections] - if collection not in existing: - client.create_collection( - collection_name=collection, - vectors_config=models.VectorParams(size=embedding_dim, distance=getattr(models.Distance, metric)) - ) - - -if __name__ == "__main__": - deploy_infrastructure() \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py b/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py deleted file mode 100644 index 5ee5953..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/deploy_rss_feed_lambda.py +++ /dev/null @@ -1,211 +0,0 @@ -import boto3 -import os -import zipfile -import io -import requests -import json -from botocore.exceptions import ClientError -from src.utils.retry_logic import retry_with_backoff -import time -import sys -from src.infra.deploy_infrastructure import get_or_create_kms_key -from dotenv import load_dotenv -load_dotenv(override=True) - -import logging -logging.basicConfig(level=os.getenv('LOG_LEVEL', 'INFO')) - -# Set variables - -LAMBDA_NAME = os.getenv('LAMBDA_FUNCTION_NAME') - -ACCOUNT_NUM = os.getenv('AWS_ACCOUNT_ID') -REGION = os.getenv("AWS_REGION") -LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN") -LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT')) -LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY')) -LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME') -LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}" -LAMBDA_HANDLER = "lambda_function.lambda_handler" -LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer" -S3_LAYER_KEY = os.getenv('S3_LAYER_KEY_NAME')+'.zip' - -def zip_directory(path): - print(f"Creating deployment package from {path}...") - zip_buffer = io.BytesIO() - with zipfile.ZipFile(zip_buffer, 'a', zipfile.ZIP_DEFLATED, False) as zip_file: - for root, _, files in os.walk(path): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, path) - zip_file.write(file_path, arcname) - return zip_buffer.getvalue() - -@retry_with_backoff() -def update_function_code(lambda_client, function_name, zip_file): - return lambda_client.update_function_code( - FunctionName=function_name, - ZipFile=zip_file - ) - -def get_or_create_lambda_layer(): - layer_arn = os.getenv('LAMBDA_LAYER_ARN') - - return layer_arn - -@retry_with_backoff(max_retries=50, initial_backoff=5, backoff_multiplier=2) # Note: This function usually takes a long time to be successful. -def update_function_configuration(lambda_client, function_name, handler, role, timeout, memory, layers, kms_key_id): - - config = { - 'FunctionName': function_name, - 'Handler': handler, - 'Role': role, - 'Timeout': timeout, - 'MemorySize': memory, - 'Layers': layers - } - - - if kms_key_id: - config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}" - - try: - response = lambda_client.update_function_configuration(**config) - print(f"Update request sent successfully for {function_name}.") - - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceConflictException': - logging.info(f"Function {function_name} is currently being updated. Retrying...") - raise e - -@retry_with_backoff() -def configure_sqs_trigger(lambda_client, function_name, queue_arn): - """Placeholder for backward compatibility. Redis deployment uses no SQS trigger.""" - return - -@retry_with_backoff() -def create_function(lambda_client, function_name, runtime, role, handler, zip_file, timeout, memory, layers, kms_key_id, policy): - config = { - 'FunctionName': function_name, - 'Runtime': runtime, - 'Role': role, - 'Handler': handler, - 'Code': {'ZipFile': zip_file}, - 'Timeout': timeout, - 'MemorySize': memory, - 'Layers': layers - } - print(policy) - - if kms_key_id: - config['KMSKeyArn'] = f"arn:aws:kms:{REGION}:{ACCOUNT_NUM}:key/{kms_key_id}" - - try: - return lambda_client.create_function(**config) - except ClientError as e: - if e.response['Error']['Code'] == 'InvalidParameterValueException': - print(f"Error creating function: {e}") - print("Ensure that the IAM role has the correct trust relationship and permissions.") - print("There might be a delay in role propagation. Please wait a few minutes and try again.") - raise - -def get_pillow_layer_arn(): - url = f"https://api.klayers.cloud/api/v2/p{os.getenv('PYTHON_VERSION')}/layers/latest/{os.getenv('AWS_REGION')}/json" - try: - response = requests.get(url) - response.raise_for_status() - layers_data = response.json() - - pillow_layer = next((layer for layer in layers_data if layer['package'] == 'Pillow'), None) - - if pillow_layer: - return pillow_layer['arn'] - else: - print("Pillow layer not found in the API response.") - return None - except requests.RequestException as e: - print(f"Error fetching Pillow layer ARN: {e}") - return None - -def get_lambda_policy(): - policy = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents" - ], - "Resource": "arn:aws:logs:*:*:*" - }, - { - "Effect": "Allow", - "Action": [ - "s3:GetObject", - "s3:PutObject" - ], - "Resource": "arn:aws:s3:::your-bucket-name/*" - } - ] -} - -def deploy_lambda(): - lambda_client = boto3.client('lambda', region_name=REGION) - - print(f"Starting deployment of Lambda function: {LAMBDA_NAME}") - deployment_package = zip_directory('src/infra/lambdas/RSSFeedProcessorLambda/src') - - layer_arn = get_or_create_lambda_layer() - if layer_arn: - print(f"Using Lambda Layer ARN: {layer_arn}") - else: - print("Warning: Lambda Layer not found or created. Proceeding without Layer.") - - pillow_layer_arn = get_pillow_layer_arn() - if pillow_layer_arn: - print(f"Using Pillow Layer ARN: {pillow_layer_arn}") - else: - print("Warning: Pillow Layer not found. Proceeding without Pillow Layer.") - - kms_key_id = get_or_create_kms_key() - if kms_key_id: - print(f"Using KMS Key ID: {kms_key_id}") - else: - print("Warning: KMS Key not found or created. Proceeding without KMS Key.") - sys.exit(1) - - try: - # Check if the function exists - try: - lambda_client.get_function(FunctionName=LAMBDA_NAME) - function_exists = True - except ClientError as e: - if e.response['Error']['Code'] == 'ResourceNotFoundException': - function_exists = False - else: - raise e - - # Combine the layers - layers = [layer_arn] if layer_arn else [] - if pillow_layer_arn: - layers.append(pillow_layer_arn) - - if function_exists: - print("Updating existing Lambda function...") - update_function_configuration(lambda_client, LAMBDA_NAME, LAMBDA_HANDLER, LAMBDA_ROLE_ARN, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id) - update_function_code(lambda_client, LAMBDA_NAME, deployment_package) - else: - print(f"Lambda function '{LAMBDA_NAME}' not found. Creating new function...") - policy = get_lambda_policy() - create_function(lambda_client, LAMBDA_NAME, LAMBDA_RUNTIME, LAMBDA_ROLE_ARN, LAMBDA_HANDLER, deployment_package, LAMBDA_TIMEOUT, LAMBDA_MEMORY, layers, kms_key_id, policy) - - print("Lambda deployment completed successfully!") - - except Exception as e: - print(f"Error during Lambda deployment: {str(e)}") - raise - -if __name__ == "__main__": - deploy_lambda() \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/layers/requirements.txt b/src/infra/lambdas/RSSFeedProcessorLambda/layers/requirements.txt deleted file mode 100644 index faf552c..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/layers/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -newspaper3k -feedparser -python-dateutil -lxml \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py deleted file mode 100644 index 9bd4ee3..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py +++ /dev/null @@ -1,72 +0,0 @@ -import os -import requests - -from qdrant_client import QdrantClient, models - -try: - from ...utils import setup_logging -except ImportError: - # Fallback for when running standalone - import logging - def setup_logging(): - return logging.getLogger(__name__) - -logger = setup_logging() - -qdrant_url = os.getenv("QDRANT_URL", "http://localhost:6333") -qdrant_api_key = os.getenv("QDRANT_API_KEY") -collection_name = os.getenv("QDRANT_COLLECTION_NAME") - -embedding_dim = os.getenv("VECTOR_EMBEDDING_DIM") -vector_search_metric = os.getenv("VECTOR_SEARCH_METRIC", "cosine") - -ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434") -ollama_embedding_model = os.getenv("OLLAMA_EMBEDDING_MODEL", "nomic-embed-text") - -client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key) - -def get_index(): - collections = client.get_collections().collections - if collection_name not in [c.name for c in collections]: - raise KeyError(f"Collection {collection_name} not found") - return client - -def vectorize(article: str) -> list[float]: - try: - response = requests.post( - f"{ollama_host}/api/embeddings", - json={"model": ollama_embedding_model, "prompt": article}, - timeout=30, - ) - response.raise_for_status() - return response.json().get("embedding", []) - except requests.RequestException as e: - logger.error(f"Error generating embedding: {e}") - # Return a zero vector of the expected dimension as fallback - dim = int(embedding_dim) if embedding_dim else 384 # Default dimension - return [0.0] * dim - - -def upsert_vectors(index: QdrantClient, data: list[dict]): - points = [ - models.PointStruct(id=item["id"], vector=item["vector"], payload=item.get("payload")) - for item in data - ] - index.upsert(collection_name=collection_name, points=points) - - -def query_vectors(index: QdrantClient, vector: list[float], top_k: int, filter_query: dict | None = None): - if embedding_dim and len(vector) != int(embedding_dim): - raise ValueError("Length of vector does not match the embedding dimension") - return index.search( - collection_name=collection_name, - query_vector=vector, - limit=top_k, - with_payload=True, - query_filter=filter_query, - ) - - -if __name__ == "__main__": - paragraph = "This is a test." - vectorize(paragraph) \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/genai/summarization.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/genai/summarization.py deleted file mode 100644 index 3e109e9..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/genai/summarization.py +++ /dev/null @@ -1,6 +0,0 @@ - - -def summarize(text:str): - sub_prompt = "Summarize the following passage" - - \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/article_cleaning.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/article_cleaning.py deleted file mode 100644 index a03a743..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/article_cleaning.py +++ /dev/null @@ -1,14 +0,0 @@ -import re - -def remove_newlines(text: str) -> str: - return text.replace('\n', '') - -def remove_urls(text: str) -> str: - url_pattern = re.compile(r'http\S+|www\S+') - return url_pattern.sub('', text) - - -def clean_text(text: str) -> str: - text = remove_newlines(text) - text = remove_urls(text) - return text \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/article_extractor.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/article_extractor.py deleted file mode 100644 index 2a26d90..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/article_extractor.py +++ /dev/null @@ -1,31 +0,0 @@ -import newspaper -import logging - - - -logger = logging.getLogger() - -def extract_article(url): - """ - Extracts the title and text of an article from the given URL. - - Args: - url (str): The URL of the article. - Returns: - A tuple containing the title and text of the article, respectively. - """ - logger.debug(f"Starting Newspaper Article Extraction {url}") - config = newspaper.Config() - config.request_timeout = 60 - article = newspaper.Article(url) - - try: - article.download() - logger.debug(f"Downloaded Article {url}") - article.parse() - logger.debug(f"Parsed Article {url}") - - return article.title, article.text - except Exception as e: - logger.error(f"Failed to extract article {url}: {str(e)}") - return None, None \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py deleted file mode 100644 index 6c36b63..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/config.py +++ /dev/null @@ -1,15 +0,0 @@ -import os - -# Redis Configuration -REDIS_URL = os.environ["REDIS_URL"] -REDIS_QUEUE_NAME = os.environ.get("REDIS_QUEUE_NAME", "rss-feed-queue") - -# Logging Configuration -LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") - -# RSS Feed Processing Configuration -MAX_ARTICLES_PER_FEED = int(os.environ.get("MAX_ARTICLES_PER_FEED", "10")) -FEED_PROCESSING_TIMEOUT = int(os.environ.get("FEED_PROCESSING_TIMEOUT", "90")) - -# Article Extraction Configuration -ARTICLE_EXTRACTION_TIMEOUT = int(os.environ.get("ARTICLE_EXTRACTION_TIMEOUT", "30")) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py deleted file mode 100644 index 44e6f75..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ /dev/null @@ -1,127 +0,0 @@ -import boto3 -from minio import Minio -import json -import os -import logging -from datetime import datetime -from pymongo import MongoClient - -logger = logging.getLogger() - -# Try to import vector DB components, but make them optional -try: - from .analytics.embeddings.vector_db import get_index, upsert_vectors, vectorize - VECTOR_DB_AVAILABLE = True -except ImportError: - VECTOR_DB_AVAILABLE = False - logger.warning("Vector DB components not available. Qdrant storage will not work.") - -s3 = boto3.client('s3') - -CONTENT_BUCKET = os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET")) - -minio_client = Minio( - os.getenv("MINIO_ENDPOINT"), - access_key=os.getenv("MINIO_ACCESS_KEY"), - secret_key=os.getenv("MINIO_SECRET_KEY"), - secure=False -) -CONTENT_BUCKET = os.getenv("MINIO_BUCKET", os.getenv("S3_BUCKET_NAME", os.getenv("CONTENT_BUCKET"))) -DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE_NAME") -storage_strategy = os.environ.get('STORAGE_STRATEGY') - -MONGODB_URL = os.getenv("MONGODB_URL") -MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME") -MONGODB_COLLECTION_NAME = os.getenv("MONGODB_COLLECTION_NAME", "rss_feeds") - -mongo_client = MongoClient(MONGODB_URL) -feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] - -##### Article Storage ##### -def save_article(article: dict, strategy: str): - if strategy == "s3": - s3_save_article(article) - elif strategy == "qdrant": - if VECTOR_DB_AVAILABLE: - qdrant_save_article(article) - else: - logger.error("Qdrant storage requested but vector DB components not available") - raise ValueError("Vector DB components not available for Qdrant storage") - elif strategy == "both": - if VECTOR_DB_AVAILABLE: - qdrant_save_article(article) - s3_save_article(article) - else: - raise ValueError(f"Invalid storage strategy: {strategy}") - - -def qdrant_save_article(article: dict): - logger.info("Saving article to Qdrant") - index = get_index() - - data = { - "id": article["article_id"], - "vector": vectorize(article["content"]), - "payload": {"rss": article.get("rss"), "title": article.get("title")}, - } - - upsert_vectors(index, [data]) - - -def s3_save_article(article:dict): - logger.info("Saving article to MinIO") - - now = datetime.now() - article_id = article['article_id'] - - if not article_id: - logger.error(f"Missing rss_id or article_id in article: {article}") - return - - file_path = f"/tmp/{article_id}-article.json" - file_key = f"{now.year}/{now.month}/{now.day}/{article_id}.json" - - # Save article to /tmp json file - with open(file_path, "w") as f: - json.dump(article, f) - - try: - metadata = { - "rss": article.get("rss", ""), - "title": article.get("title", ""), - "unixTime": str(article.get("unixTime", "")), - "article_id": article.get("article_id", ""), - "link": article.get("link", ""), - "rss_id": article.get("rss_id", "") - } - minio_client.fput_object( - CONTENT_BUCKET, - file_key, - file_path, - content_type="application/json", - metadata=metadata - ) - logger.info(f"Saved article {article_id} to bucket {CONTENT_BUCKET}") - - except Exception as e: - logger.error(f"Failed to save article with error: {str(e)}. \n Article: {article} \n Article Type: {type(article)}") - - -###### Feed Storage ###### -RSS_FEEDS_FILE = os.getenv("RSS_FEEDS_FILE", "rss_feeds.json") - - -def update_rss_feed(feed: dict, last_pub_dt: int): - try: - if not os.path.exists(RSS_FEEDS_FILE): - return - with open(RSS_FEEDS_FILE, "r") as f: - feeds = json.load(f) - for item in feeds: - if item.get("u") == feed["u"]: - item["dt"] = int(last_pub_dt) - with open(RSS_FEEDS_FILE, "w") as f: - json.dump(feeds, f) - logger.info(f"Updated RSS feed {feed['u']} with dt: {last_pub_dt}") - except Exception as e: - logger.error(f"Failed to update RSS feed: {str(e)}") \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/exceptions.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/exceptions.py deleted file mode 100644 index c6c0702..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/exceptions.py +++ /dev/null @@ -1,11 +0,0 @@ -class RSSProcessingError(Exception): - """Exception raised for errors in the RSS processing.""" - pass - -class ArticleExtractionError(Exception): - """Exception raised for errors in the article extraction.""" - pass - -class DataStorageError(Exception): - """Exception raised for errors in data storage operations.""" - pass \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/feed_processor.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/feed_processor.py deleted file mode 100644 index 725da2a..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/feed_processor.py +++ /dev/null @@ -1,133 +0,0 @@ -import feedparser -from datetime import datetime -from dateutil import parser -import queue -import threading -import logging -from .utils import generate_key -from .article_extractor import extract_article -from .article_cleaning import clean_text - -logger = logging.getLogger() - -def process_feed(feed: dict): - output_queue = queue.Queue() - stop_thread = threading.Event() - thread = threading.Thread(target=extract_feed_threading, args=(feed, output_queue, stop_thread)) - thread.daemon = True - thread.start() - - logger.debug(f"Thread Started: {feed['u']}") - thread.join(timeout=90) - - if thread.is_alive(): - stop_thread.set() - logger.debug(f"Killing Thread: {feed['u']}") - return None - else: - try: - output = output_queue.get_nowait() - logger.info(f"Thread Succeeded: {feed['u']}") - return output - except queue.Empty: - logger.info(f"Thread Failed: {feed['u']}") - return None - -def extract_feed_threading(rss: dict, output_queue, stop_thread): - articles = [] - feed_url = rss['u'] - last_date = rss['dt'] - max_date = last_date - entry = None # Initialize entry variable - - try: - feed = feedparser.parse(feed_url) - for entry in feed['entries']: - if stop_thread.is_set(): - break - - pub_date = parse_pub_date(entry.get('published', '')) - - if pub_date > last_date: - title, text = extract_article(entry.link) - title, text = clean_text(title or ''), clean_text(text or '') - article = { - 'link': entry.link, - 'rss': feed_url, - 'title': title, - 'content': text, - 'unixTime': pub_date, - 'rss_id': generate_key(feed_url), - 'article_id': generate_key(entry.link), - 'llm_summary': None, - 'embedding': None - } - articles.append(article) - max_date = max(max_date, pub_date) - - output = { - 'articles': articles, - 'max_date': max_date, - 'feed': rss - } - output_queue.put(output) - except Exception as e: - logger.error(f"Feed URL: {feed_url}") - if entry: - logger.error(f"Current entry: {entry.get('link', 'unknown')}") - logger.error(f"Feed failed due to error: {e}") - -def extract_feed(rss: dict): - articles = [] - feed_url = rss['u'] - last_date = rss['dt'] - max_date = last_date - - try: - feed = feedparser.parse(feed_url) - for entry in feed['entries']: - pub_date = parse_pub_date(entry.get('published', '')) - - if pub_date > last_date: - title, text = extract_article(entry.link) - article = { - 'link': entry.link, - 'rss': feed_url, - 'title': title, - 'content': text, - 'unixTime': pub_date, - 'rss_id': generate_key(feed_url), - 'article_id': generate_key(entry.link), - 'llm_summary': None, - 'embedding': None - } - articles.append(article) - max_date = max(max_date, pub_date) - - output = { - 'articles': articles, - 'max_date': max_date, - 'feed': rss - } - return output - except Exception as e: - logger.error(f"Feed URL: {feed_url}") - logger.error(f"Feed failed due to error: {e}") - -def parse_pub_date(date_string: str) -> int: - """Parse publication date from various formats""" - if not date_string: - return int(datetime.now().timestamp()) - - try: - return int(datetime.strptime(date_string, "%a, %d %b %Y %H:%M:%S %z").timestamp()) - except ValueError: - try: - return int(datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%SZ").timestamp()) - except ValueError: - try: - return int(parser.parse(date_string).timestamp()) - except (ValueError, TypeError): - pass - - return int(datetime.now().timestamp()) # Return current time if no date is found \ No newline at end of file diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py deleted file mode 100644 index b8e42ff..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py +++ /dev/null @@ -1,65 +0,0 @@ -import json -import time -import os -import redis -from feed_processor import extract_feed -from data_storage import save_article, update_rss_feed -from utils import setup_logging -from config import REDIS_URL, REDIS_QUEUE_NAME -from exceptions import RSSProcessingError, DataStorageError -from metrics import ( - record_processed_articles, - record_processing_time, - record_extraction_errors, -) - -logger = setup_logging() -storage_strategy = os.environ.get("STORAGE_STRATEGY") -redis_client = redis.Redis.from_url(REDIS_URL) - - -def lambda_handler(event, context): - logger.info("Starting RSS feed processing") - start_time = time.time() - - try: - feed_data = redis_client.rpop(REDIS_QUEUE_NAME) - if not feed_data: - logger.info("No messages in queue") - return {"statusCode": 200, "body": json.dumps("No feeds to process")} - feed = json.loads(feed_data) - - result = extract_feed(feed) - logger.info(f"Process Feed Result Dictionary: {result}") - last_pub_dt = result["max_date"] - - if result: - for article in result["articles"]: - try: - save_article(article, storage_strategy) - except DataStorageError as e: - logger.error(f"Failed to save article: {str(e)}") - record_extraction_errors(1) - - update_rss_feed(result["feed"], last_pub_dt) - logger.info(f"Processed feed: {feed['u']}") - record_processed_articles(len(result["articles"])) - else: - logger.warning(f"Failed to process feed: {feed['u']}") - record_extraction_errors(1) - - except RSSProcessingError as e: - logger.error(f"RSS Processing Error: {str(e)}") - return {"statusCode": 500, "body": json.dumps("RSS processing failed")} - - except Exception as e: - logger.error(f"Unexpected error: {str(e)}") - return {"statusCode": 500, "body": json.dumps("An unexpected error occurred")} - - finally: - end_time = time.time() - processing_time = end_time - start_time - record_processing_time(processing_time) - logger.info(f"Lambda execution time: {processing_time:.2f} seconds") - - return {"statusCode": 200, "body": json.dumps("RSS feed processed successfully")} diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/metrics.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/metrics.py deleted file mode 100644 index 9d9fe1c..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/metrics.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Prometheus metrics utilities for the RSS feed processor.""" - -import os -from prometheus_client import Counter, Histogram, start_http_server - - -# Start a Prometheus metrics HTTP server exposing ``/metrics``. The port can be -# customised with the ``METRICS_PORT`` environment variable. This block is safe -# to run multiple times as it silently ignores port binding errors. -_metrics_port = int(os.environ.get("METRICS_PORT", "8000")) -if not os.environ.get("METRICS_SERVER_STARTED"): - try: - start_http_server(_metrics_port) - os.environ["METRICS_SERVER_STARTED"] = "1" - except OSError: - pass - - -# Metric definitions -_processed_articles = Counter( - "processed_articles_total", - "Total number of processed articles", -) -_processing_time = Histogram( - "rss_processing_seconds", - "Time spent processing RSS feeds", -) -_extraction_errors = Counter( - "extraction_errors_total", - "Number of article extraction errors", -) - - -def record_processed_articles(count: int) -> None: - """Increment the processed articles counter.""" - _processed_articles.inc(count) - - -def record_processing_time(duration: float) -> None: - """Record how long a feed took to process.""" - _processing_time.observe(duration) - - -def record_extraction_errors(count: int) -> None: - """Increment the extraction errors counter.""" - _extraction_errors.inc(count) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/utils.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/utils.py deleted file mode 100644 index b7dfcd7..0000000 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/utils.py +++ /dev/null @@ -1,18 +0,0 @@ -import logging -import os -import hashlib - -def setup_logging(): - logger = logging.getLogger() - log_level = os.environ.get('LOG_LEVEL', 'INFO') - logger.setLevel(logging.getLevelName(log_level)) - return logger - - -def generate_key(input_string, length=10): - # Create a SHA256 hash of the input string - hash_object = hashlib.sha256(input_string.encode()) - hex_dig = hash_object.hexdigest() - - # Return the first 'length' characters of the hash - return hex_dig[:length] diff --git a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py b/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py deleted file mode 100644 index 2897f78..0000000 --- a/src/infra/lambdas/RSSQueueFiller/deploy_sqs_filler_lambda.py +++ /dev/null @@ -1,81 +0,0 @@ -import os -import zipfile -import logging -import boto3 -from dotenv import load_dotenv -from src.infra.deploy_infrastructure import deploy_cloudformation - -# Load environment variables -load_dotenv(override=True) - -# Set up logging - -logging.basicConfig(level=os.getenv('LOG_LEVEL')) - - -# Set up S3 client -s3 = boto3.client('s3') - -def zip_lambda_code(): - lambda_dir = 'src/infra/lambdas/RSSQueueFiller/lambda' - zip_path = 'tmp/lambda_function.zip' - - os.makedirs(zip_path.split("/")[0], exist_ok=True) - - with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: - for root, _, files in os.walk(lambda_dir): - for file in files: - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, lambda_dir) - zipf.write(file_path, arcname) - - return zip_path - -def upload_to_s3(file_path): - s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY') - bucket_name = os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') - s3.upload_file(file_path, bucket_name, s3_key) - return f's3://{bucket_name}/{s3_key}' - -def deploy_sqs_filler(): - zip_file = zip_lambda_code() - upload_to_s3(zip_file) - - # Deploy CloudFormation - deploy_cloudformation('rss_lambda_stack.yaml', 'LambdaSQSFiller', - parameters=[ - { - 'ParameterKey': 'QueueFillerLambdaName', - 'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_NAME') - }, - { - 'ParameterKey': 'RedisUrl', - 'ParameterValue': os.getenv('REDIS_URL') - }, - { - 'ParameterKey': 'RedisQueueName', - 'ParameterValue': os.getenv('REDIS_QUEUE_NAME') - }, - { - 'ParameterKey': 'LambdaCodeS3Bucket', - 'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME') - }, - { - 'ParameterKey': 'LambdaCodeS3Key', - 'ParameterValue': os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY') - }, - { - 'ParameterKey': 'LambdaRuntime', - 'ParameterValue': os.getenv('LAMBDA_RUNTIME') - }, - { - 'ParameterKey': 'LambdaTimeout', - 'ParameterValue': os.getenv('LAMBDA_TIMEOUT') - } - ]) - - # Clean up local zip file - os.remove(zip_file) - -if __name__ == "__main__": - deploy_sqs_filler() \ No newline at end of file diff --git a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py b/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py deleted file mode 100644 index e8bf1b9..0000000 --- a/src/infra/lambdas/RSSQueueFiller/lambda/lambda_function.py +++ /dev/null @@ -1,76 +0,0 @@ -import json -import os -import logging -import boto3 -from decimal import Decimal -from pymongo import MongoClient -from datetime import datetime -import redis - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -# For AWS deployment - SQS -try: - sqs = boto3.client('sqs') - SQS_QUEUE_URL = os.environ.get('SQS_QUEUE_URL', '') - AWS_DEPLOYMENT = bool(SQS_QUEUE_URL) -except Exception: - AWS_DEPLOYMENT = False - -# For local deployment - Redis -if not AWS_DEPLOYMENT: - redis_client = redis.Redis.from_url(os.environ.get('REDIS_URL', 'redis://localhost:6379')) - REDIS_QUEUE_NAME = os.environ.get('REDIS_QUEUE_NAME', 'rss-feed-queue') - -MONGODB_URL = os.environ['MONGODB_URL'] -MONGODB_DB_NAME = os.environ['MONGODB_DB_NAME'] -MONGODB_COLLECTION_NAME = os.environ.get('MONGODB_COLLECTION_NAME', 'rss_feeds') - -mongo_client = MongoClient(MONGODB_URL) -feeds_collection = mongo_client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] - -class DecimalEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, Decimal): - return int(obj) - return super(DecimalEncoder, self).default(obj) - -def handler(event, context): - messages_sent = 0 - - # Iterate over all feeds in MongoDB - for item in feeds_collection.find({}): - rss_url = item.get('url') - rss_dt = item.get('dt') - - logger.debug(f"Processing RSS feed: {rss_url}") - logger.debug(f"Last published date: {rss_dt}") - - if rss_url: - message = { - 'u': rss_url, - 'dt': rss_dt - } - logger.debug(f"Message: {message}") - - try: - if AWS_DEPLOYMENT: - # Send to SQS for AWS deployment - sqs.send_message( - QueueUrl=SQS_QUEUE_URL, - MessageBody=json.dumps(message, cls=DecimalEncoder) - ) - else: - # Send to Redis for local deployment - redis_client.lpush(REDIS_QUEUE_NAME, json.dumps(message, cls=DecimalEncoder)) - messages_sent += 1 - except Exception as e: - logger.error(f"Error sending message to queue: {str(e)}") - - logger.info(f"Sent {messages_sent} messages to queue at {datetime.now().isoformat()}") - - return { - "statusCode": 200, - "body": json.dumps(f"Sent {messages_sent} RSS URLs to queue"), - } \ No newline at end of file diff --git a/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh b/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh deleted file mode 100644 index c6cbf99..0000000 --- a/src/infra/lambdas/lambda_utils/lambda_layer/lambda_layer_cloud9.sh +++ /dev/null @@ -1,117 +0,0 @@ -#!/bin/bash - -set -e - -####### Section 1: Checking Python Existence ######## -echo "Section 1: Checking Python Existence" - -# Ensure python3.12 is installed -if ! command -v python3.12 &> /dev/null; then - echo "Python 3.12 is not installed. Please install it before running this script." - exit 1 -fi -echo "Python 3.12 found. Proceeding..." - -####### Section 2: Installing Dependencies ######## -echo "Section 2: Installing Dependencies" - -# Install dependencies -python3.12 -m pip install --upgrade Pillow feedfinder2==0.0.4 python-dateutil newspaper3k==0.2.8 feedparser lxml[html5lib] lxml_html_clean lxml[html_clean] qdrant-client ollama -t python/ -echo "Dependencies installed successfully." - -####### Section 3: Creating ZIP File ######## -echo "Section 3: Creating ZIP File" - -# Create ZIP file -zip -r OpenRSSLambdaLayer.zip python/ -echo "ZIP file created." - -# Check if ZIP file was created and is not empty -if [ ! -s OpenRSSLambdaLayer.zip ]; then - echo "Error: ZIP file is empty or was not created." - exit 1 -fi -echo "ZIP file check passed." - -####### Section 4: Getting AWS Regions ######## -echo "Section 4: Getting AWS Regions" - -# Get list of all AWS regions -REGIONS=$(aws ec2 describe-regions --query 'Regions[].RegionName' --output text) -echo "Retrieved AWS regions: $REGIONS" - -####### Section 5: Creating Buckets, Uploading, and Publishing Layer ######## -echo "Section 5: Creating Buckets, Uploading, and Publishing Layer" - -create_bucket_upload_and_publish_layer() { - local region=$1 - local bucket_name="rss-feed-processor-layers-$region" - local layer_name="ingest-rss-lambda-layer-$region" - - echo "Processing region: $region" - - # Create bucket if it doesn't exist - if ! aws s3api head-bucket --bucket "$bucket_name" --region "$region" 2>/dev/null; then - echo "Creating bucket $bucket_name in $region" - if [ "$region" == "us-east-1" ]; then - aws s3api create-bucket --bucket "$bucket_name" --region "$region" - else - aws s3api create-bucket --bucket "$bucket_name" --region "$region" --create-bucket-configuration LocationConstraint=$region - fi - else - echo "Bucket $bucket_name already exists in $region" - fi - - # Upload ZIP to the region-specific bucket - echo "Uploading ZIP to $bucket_name" - aws s3 cp OpenRSSLambdaLayer.zip "s3://$bucket_name/" --region "$region" - - # Create and publish Lambda layer - echo "Creating Lambda layer in region: $region" - LAYER_VERSION=$(aws lambda publish-layer-version \ - --region "$region" \ - --layer-name $layer_name \ - --description "Layer with dependencies for RSS processing" \ - --license-info "MIT" \ - --content "S3Bucket=$bucket_name,S3Key=OpenRSSLambdaLayer.zip" \ - --compatible-runtimes python3.12 \ - --query 'Version' \ - --output text - ) - - if [ -z "$LAYER_VERSION" ]; then - echo "Failed to create Lambda layer in region $region." - return 1 - fi - - echo "Making layer public in region: $region" - aws lambda add-layer-version-permission \ - --region "$region" \ - --layer-name $layer_name \ - --version-number "$LAYER_VERSION" \ - --statement-id public \ - --action lambda:GetLayerVersion \ - --principal '*' - - ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) - ARN="arn:aws:lambda:${region}:${ACCOUNT_ID}:layer:$layer_name:${LAYER_VERSION}" - echo "Layer ARN for region $region: $ARN" - echo "$region:$ARN" >> layer_arns.txt -} - -# Process all regions -for region in $REGIONS; do - if create_bucket_upload_and_publish_layer "$region"; then - echo "Successfully processed region: $region" - else - echo "Failed to process region: $region. Continuing with next region..." - fi -done - -####### Section 6: Completion ######## -echo "Section 6: Completion" - -echo "Setup complete! OpenRSSLambdaLayer is now available in all processed regions." -echo "Layer ARNs have been saved to layer_arns.txt" - -echo "Script execution completed successfully." \ No newline at end of file diff --git a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py deleted file mode 100644 index 9d50ec0..0000000 --- a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py +++ /dev/null @@ -1,69 +0,0 @@ -import boto3 -import os -from src.utils.retry_logic import retry_with_backoff - -# Set variables -LAMBDA_NAME = "RSSFeedProcessor" - -@retry_with_backoff() -def update_env_vars(function_name): - lambda_client = boto3.client('lambda') - - env_vars = { - - # Lambda Configuration - 'LAMBDA_FUNCTION_NAME': os.environ.get('LAMBDA_FUNCTION_NAME'), - 'STACK_BASE': os.environ.get('STACK_BASE'), - 'LAMBDA_EXECUTION_ROLE_NAME': os.environ.get('LAMBDA_EXECUTION_ROLE_NAME'), - 'LAMBDA_ROLE_ARN': os.environ.get('LAMBDA_ROLE_ARN'), - 'LAMBDA_LAYER_VERSION': os.environ.get('LAMBDA_LAYER_VERSION'), - 'LAMBDA_LAYER_NAME': os.environ.get('LAMBDA_LAYER_NAME'), - 'LAMBDA_LAYER_ARN': os.environ.get('LAMBDA_LAYER_ARN'), - 'LAMBDA_RUNTIME': os.environ.get('LAMBDA_RUNTIME'), - 'LAMBDA_TIMEOUT': os.environ.get('LAMBDA_TIMEOUT', '300'), # Reasonable default timeout - 'LAMBDA_MEMORY': os.environ.get('LAMBDA_MEMORY', '512'), # Reasonable default memory - - # S3 Configuration - 'S3_BUCKET_NAME': os.environ.get('S3_BUCKET_NAME'), - 'S3_LAMBDA_ZIPPED_BUCKET_NAME': os.environ.get('S3_LAMBDA_ZIPPED_BUCKET_NAME'), - 'S3_LAYER_BUCKET_NAME': os.environ.get('S3_LAYER_BUCKET_NAME'), - 'S3_LAYER_KEY_NAME': os.environ.get('S3_LAYER_KEY_NAME'), - - # Redis Configuration - 'REDIS_URL': os.environ.get('REDIS_URL'), - 'REDIS_QUEUE_NAME': os.environ.get('REDIS_QUEUE_NAME'), - - # Queue Filler Lambda Configuration - 'QUEUE_FILLER_LAMBDA_NAME': os.environ.get('QUEUE_FILLER_LAMBDA_NAME'), - 'QUEUE_FILLER_LAMBDA_S3_KEY': os.environ.get('QUEUE_FILLER_LAMBDA_S3_KEY'), - - # Python Configuration - 'PYTHON_VERSION': os.environ.get('PYTHON_VERSION', '3.12'), # Default Python version - - # Application Settings - 'APP_NAME': os.environ.get('APP_NAME', 'RSS Feed Processor'), # Default app name is fine - 'VERSION': os.environ.get('VERSION', '1.0.0'), # Default version is fine - 'LOG_LEVEL': os.environ.get('LOG_LEVEL', 'INFO'), # Default to INFO logging - - # Storage Configuration - 'STORAGE_STRATEGY': os.environ.get('STORAGE_STRATEGY', 's3'), # Default to s3 storage - - # Qdrant Configuration (only used if STORAGE_STRATEGY is 'qdrant') - 'QDRANT_URL': os.environ.get('QDRANT_URL'), - 'QDRANT_API_KEY': os.environ.get('QDRANT_API_KEY'), - 'QDRANT_COLLECTION_NAME': os.environ.get('QDRANT_COLLECTION_NAME'), - - # Vector Configuration - 'VECTOR_EMBEDDING_MODEL': os.environ.get('VECTOR_EMBEDDING_MODEL'), - 'VECTOR_EMBEDDING_DIM': os.environ.get('VECTOR_EMBEDDING_DIM'), - 'VECTOR_SEARCH_METRIC': os.environ.get('VECTOR_SEARCH_METRIC'), - - # Ollama Configuration - 'OLLAMA_HOST': os.environ.get('OLLAMA_HOST'), - 'OLLAMA_EMBEDDING_MODEL': os.environ.get('OLLAMA_EMBEDDING_MODEL'), - } - - return lambda_client.update_function_configuration( - FunctionName=LAMBDA_NAME, - Environment={'Variables': env_vars} - ) diff --git a/src/launch/launch_env.py b/src/launch/launch_env.py index dd9e899..3107bd7 100644 --- a/src/launch/launch_env.py +++ b/src/launch/launch_env.py @@ -33,54 +33,16 @@ def main(): # Determine if we're in advanced mode advanced_mode = not Confirm.ask("Do you want to use basic mode? \n( We recommend basic for your first time ) ") - - # AWS Configuration - - - env_vars["AWS_ACCOUNT_ID"] = get_env_value("AWS_ACCOUNT_ID", "Enter AWS Account ID:") # Could we grab this for the user instead. - - # AWS Credentials - if not check_aws_region(): - console.print("AWS region not found in environment variables.") - env_vars["AWS_REGION"] = get_env_value("AWS_REGION", "Enter AWS Region:", options=get_aws_regions()) - else: - env_vars["AWS_REGION"] = os.environ.get('AWS_REGION') - if not check_aws_credentials(): - console.print("AWS credentials not found in environment variables.") - if Confirm.ask("Do you want to set AWS credentials?"): - env_vars["AWS_ACCESS_KEY_ID"] = get_env_value("AWS_ACCESS_KEY_ID", "Enter AWS Access Key ID:") - env_vars["AWS_SECRET_ACCESS_KEY"] = get_env_value("AWS_SECRET_ACCESS_KEY", "Enter AWS Secret Access Key:") - else: - env_vars["AWS_ACCESS_KEY_ID"] = os.environ.get('AWS_ACCESS_KEY_ID') - env_vars["AWS_SECRET_ACCESS_KEY"] = os.environ.get('AWS_SECRET_ACCESS_KEY') - console.print("AWS credentials found in environment variables.") - - # Resource Names - env_vars["LAMBDA_FUNCTION_NAME"] = get_env_value("LAMBDA_FUNCTION_NAME", "Enter Lambda Function Name:", options=["RSSFeedProcessor", "CustomRSSProcessor"], advanced=advanced_mode) - env_vars["STACK_BASE"] = env_vars["LAMBDA_FUNCTION_NAME"] - env_vars["LAMBDA_EXECUTION_ROLE_NAME"] = f"rss-feed-processor-role-{env_vars['AWS_REGION']}" - env_vars["LAMBDA_ROLE_ARN"] = f"arn:aws:iam::{env_vars['AWS_ACCOUNT_ID']}:role/{env_vars['LAMBDA_EXECUTION_ROLE_NAME']}" - env_vars["S3_BUCKET_NAME"] = f"open-rss-articles-{env_vars['AWS_REGION']}" + # MongoDB and Redis Configuration + env_vars["MONGODB_URL"] = get_env_value("MONGODB_URL", "Enter MongoDB URL:", options=["mongodb://localhost:27017"], advanced=advanced_mode) + env_vars["MONGODB_ARTICLES_DB_NAME"] = get_env_value("MONGODB_ARTICLES_DB_NAME", "Enter MongoDB Articles DB Name:", options=["articles_db"], advanced=advanced_mode) + env_vars["MONGODB_ARTICLES_COLLECTION_NAME"] = get_env_value("MONGODB_ARTICLES_COLLECTION_NAME", "Enter MongoDB Articles Collection Name:", options=["articles"], advanced=advanced_mode) + env_vars["MONGODB_FEEDS_DB_NAME"] = get_env_value("MONGODB_FEEDS_DB_NAME", "Enter MongoDB Feeds DB Name:", options=["feeds_db"], advanced=advanced_mode) + env_vars["MONGODB_FEEDS_COLLECTION_NAME"] = get_env_value("MONGODB_FEEDS_COLLECTION_NAME", "Enter MongoDB Feeds Collection Name:", options=["rss_feeds"], advanced=advanced_mode) env_vars["REDIS_URL"] = get_env_value("REDIS_URL", "Enter Redis URL:", options=["redis://localhost:6379"], advanced=advanced_mode) env_vars["REDIS_QUEUE_NAME"] = get_env_value("REDIS_QUEUE_NAME", "Enter Redis Queue Name:", options=["rss-feed-queue"], advanced=advanced_mode) - # Advanced Configuration - env_vars["LAMBDA_LAYER_VERSION"] = 3 - env_vars["LAMBDA_LAYER_NAME"] = f"ingest-rss-lambda-layer-{env_vars['AWS_REGION']}" - env_vars["LAMBDA_LAYER_ARN"] = f"arn:aws:lambda:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:layer:{env_vars['LAMBDA_LAYER_NAME']}:{env_vars['LAMBDA_LAYER_VERSION']}" - env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}" - env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}" - env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode) - - env_vars["PYTHON_VERSION"] = get_env_value("PYTHON_VERSION", "Enter Python Version:", options=["3.8", "3.9", "3.10", "3.11", "3.12"], advanced=advanced_mode) - env_vars["LAMBDA_RUNTIME"] = f"python{env_vars['PYTHON_VERSION']}" - env_vars["LAMBDA_TIMEOUT"] = get_env_value("LAMBDA_TIMEOUT", "Enter Lambda Timeout (in seconds):", options=["60", "120", "300"], advanced=advanced_mode) - env_vars["LAMBDA_MEMORY"] = get_env_value("LAMBDA_MEMORY", "Enter Lambda Memory (in MB):", options=["128", "256", "512", "1024"], advanced=advanced_mode) - - env_vars["QUEUE_FILLER_LAMBDA_NAME"] = get_env_value("QUEUE_FILLER_LAMBDA_NAME", "Enter Queue Filler Lambda Name:", options=["RSSQueueFiller", "CustomQueueFiller"], advanced=advanced_mode) - env_vars["QUEUE_FILLER_LAMBDA_S3_KEY"] = get_env_value("QUEUE_FILLER_LAMBDA_S3_KEY", "Enter Queue Filler Lambda S3 Key:", options=["RSSQueueFiller.zip", "CustomQueueFiller.zip"], advanced=advanced_mode) - # Logging Configuration env_vars["LOG_LEVEL"] = get_env_value("LOG_LEVEL", "Enter Log Level:", options=["DEBUG", "INFO", "WARNING", "ERROR"], advanced=advanced_mode) @@ -90,7 +52,7 @@ def main(): env_vars["TEST"] = get_env_value("TEST", "Enter Test Value:", options=["0", "1"], advanced=advanced_mode) # Storage Strategy - env_vars["STORAGE_STRATEGY"] = get_env_value("STORAGE_STRATEGY", "Choose Storage Strategy:", options=["s3", "qdrant"], advanced=advanced_mode) + env_vars["STORAGE_STRATEGY"] = "mongodb" # Qdrant Configuration (only if qdrant is selected) if env_vars["STORAGE_STRATEGY"] == "qdrant": diff --git a/src/search/batch/__init__.py b/src/search/batch/__init__.py index cfd1741..6dd80ec 100644 --- a/src/search/batch/__init__.py +++ b/src/search/batch/__init__.py @@ -1,3 +1,3 @@ -from .downloader import S3BatchDownloader +from .downloader import MongoDBBatchDownloader -__all__ = ['S3BatchDownloader'] \ No newline at end of file +__all__ = ['MongoDBBatchDownloader'] \ No newline at end of file diff --git a/src/search/batch/downloader.py b/src/search/batch/downloader.py index c4725fe..d9f25ab 100644 --- a/src/search/batch/downloader.py +++ b/src/search/batch/downloader.py @@ -1,170 +1,62 @@ -from minio import Minio import pandas as pd -from typing import Optional, List, Dict, Union, Any -import json +from typing import Optional, List, Dict, Any import os -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime, timezone import logging -from string import Template +from pymongo import MongoClient from tqdm import tqdm +from datetime import datetime -class S3BatchDownloader: - """Class for batch downloading RSS articles from a MinIO bucket""" - - DEFAULT_CONFIG = { - "region": "${AWS_REGION}", - "bucket": "${RSS_BUCKET_NAME}", - "prefix": "${RSS_PREFIX}", - "max_workers": os.cpu_count() or 10 - } - - def __init__(self, config_path: Optional[str] = None): - """ - Initialize the S3BatchDownloader - - Args: - config_path: Optional path to config file. If None, uses environment variables. - """ +class MongoDBBatchDownloader: + """Class for batch downloading RSS articles from a MongoDB collection""" + + def __init__(self, mongo_url: str, db_name: str, collection_name: str): self.logger = logging.getLogger(__name__) - self.config = self._load_config(config_path) - self._validate_config() - - self.s3 = Minio( - os.getenv('MINIO_ENDPOINT'), - access_key=os.getenv('MINIO_ACCESS_KEY'), - secret_key=os.getenv('MINIO_SECRET_KEY'), - secure=False - ) - self.logger.info( - f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}" - ) - - def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: - """Load and process configuration""" - if config_path and os.path.exists(config_path): - with open(config_path) as f: - template = Template(f.read()) - else: - template = Template(json.dumps(self.DEFAULT_CONFIG)) - - env_vars = { - 'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'), - 'RSS_BUCKET_NAME': os.getenv('MINIO_BUCKET') - } - - config_str = template.safe_substitute(env_vars) - - try: - config = json.loads(config_str) - config['max_workers'] = int(config.get('max_workers', 10)) - return config - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON config after variable substitution: {str(e)}") - - def _validate_config(self) -> None: - """Validate the configuration""" - required_fields = ['region', 'bucket', 'prefix'] - missing_fields = [field for field in required_fields if field not in self.config] - if missing_fields: - raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}") - + self.mongo_url = mongo_url + self.db_name = db_name + self.collection_name = collection_name + self.client = MongoClient(self.mongo_url) + self.collection = self.client[self.db_name][self.collection_name] + self.logger.info(f"Initialized MongoDBBatchDownloader for collection: {self.collection_name}") + def download_to_file(self, output_path: str, file_format: str = 'csv', start_date: Optional[str] = None, end_date: Optional[str] = None) -> str: """ - Download articles from MinIO to a consolidated file - + Download articles from MongoDB to a consolidated file Args: output_path: Path to save the output file. file_format: Format to save the file ('csv' or 'json'). - start_date: Optional start date filter (YYYY-MM-DD). + start_date: Optional start date filter (YYYY-MM-DD, expects 'unixTime' field in article). end_date: Optional end date filter (YYYY-MM-DD). - Returns: Path to the saved file. """ self.logger.info(f"Starting batch download to {output_path}") - - # Convert date strings to UTC datetime - start_ts = datetime.strptime(start_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if start_date else None - end_ts = datetime.strptime(end_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if end_date else None - - # List and filter objects - objects = self._list_objects() - - if start_ts or end_ts: - objects = [ - obj for obj in objects - if self._is_in_date_range(obj['LastModified'], start_ts, end_ts) - ] - self.logger.info(f"Found {len(objects)} objects to process") - print(f"Found {len(objects)} objects to process") - - # Download and merge data - all_data = [] - with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor, tqdm(total=len(objects), unit="object") as progress_bar: - future_to_obj = {executor.submit(self._download_object, obj): obj for obj in objects} - for future in as_completed(future_to_obj): - result = future.result() - if result is not None: - all_data.extend(result if isinstance(result, list) else [result]) - progress_bar.update(1) - + query = {} + if start_date or end_date: + date_query = {} + if start_date: + start_ts = int(datetime.strptime(start_date, '%Y-%m-%d').timestamp()) + date_query['$gte'] = start_ts + if end_date: + end_ts = int(datetime.strptime(end_date, '%Y-%m-%d').timestamp()) + date_query['$lte'] = end_ts + query['unixTime'] = date_query + articles = list(self.collection.find(query)) + self.logger.info(f"Found {len(articles)} articles to process") + print(f"Found {len(articles)} articles to process") + # Remove MongoDB _id field for export + for article in articles: + article.pop('_id', None) # Save to file - self._save_to_file(all_data, output_path, file_format) - self.logger.info(f"Successfully downloaded {len(all_data)} articles to {output_path}") - return output_path - - def _list_objects(self) -> List[Dict]: - """List objects in bucket""" - objects = [] - try: - for obj in self.s3.list_objects( - self.config['bucket'], - prefix=self.config['prefix'], - recursive=True - ): - objects.append({ - 'Key': obj.object_name, - 'LastModified': obj.last_modified - }) - return objects - except Exception as e: - self.logger.error(f"Error listing objects: {str(e)}") - raise - - def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]: - """Download and parse single object""" - try: - response = self.s3.get_object(self.config['bucket'], obj['Key']) - content = response.read().decode('utf-8') - data = json.loads(content) - stat = self.s3.stat_object(self.config['bucket'], obj['Key']) - metadata = stat.metadata - if isinstance(data, dict): - data.update(metadata) - return [data] - elif isinstance(data, list): - for item in data: - item.update(metadata) - return data - except Exception as e: - self.logger.error(f"Error downloading {obj['Key']}: {str(e)}") - return None - - def _is_in_date_range(self, ts: datetime, start: Optional[datetime], end: Optional[datetime]) -> bool: - """Check if timestamp is within the date range""" - return (not start or ts >= start) and (not end or ts <= end) - - def _save_to_file(self, data: List[Dict], output_path: str, file_format: str) -> None: - """Save data to file""" - df = pd.DataFrame(data) + df = pd.DataFrame(articles) if file_format == 'csv': df.to_csv(output_path, index=False) elif file_format == 'json': df.to_json(output_path, orient='records', lines=True) else: raise ValueError(f"Unsupported file format: {file_format}") + self.logger.info(f"Successfully downloaded {len(articles)} articles to {output_path}") + return output_path diff --git a/src/utils/check_env.py b/src/utils/check_env.py index 521f26c..695f312 100644 --- a/src/utils/check_env.py +++ b/src/utils/check_env.py @@ -6,40 +6,17 @@ from typing import List, Dict def check_env() -> None: # Variables that must be set by the user required_user_vars = [ - "AWS_REGION", - "AWS_ACCOUNT_ID", - "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY", - "MINIO_ENDPOINT", - "MINIO_ACCESS_KEY", - "MINIO_SECRET_KEY", - "MINIO_BUCKET" + "MONGODB_URL", + "MONGODB_ARTICLES_DB_NAME", + "MONGODB_ARTICLES_COLLECTION_NAME", + "MONGODB_FEEDS_DB_NAME", + "MONGODB_FEEDS_COLLECTION_NAME", + "REDIS_URL", + "REDIS_QUEUE_NAME" ] # Variables that are derived or have default values derived_vars = [ - "AWS_DEFAULT_REGION", - "LAMBDA_FUNCTION_NAME", - "STACK_BASE", - "LAMBDA_EXECUTION_ROLE_NAME", - "LAMBDA_ROLE_ARN", - "S3_BUCKET_NAME", - "REDIS_URL", - "REDIS_QUEUE_NAME", - "LAMBDA_LAYER_VERSION", - "LAMBDA_LAYER_NAME", - "LAMBDA_LAYER_ARN", - "S3_LAYER_BUCKET_NAME", - "S3_LAYER_KEY_NAME", - "PYTHON_VERSION", - "LAMBDA_RUNTIME", - "LAMBDA_TIMEOUT", - "LAMBDA_MEMORY", - "MONGODB_URL", - "MONGODB_DB_NAME", - "MONGODB_COLLECTION_NAME", - "QUEUE_FILLER_LAMBDA_NAME", - "QUEUE_FILLER_LAMBDA_S3_KEY", "LOG_LEVEL", "APP_NAME", "VERSION",