batch update

This commit is contained in:
Charles-Gormley
2024-11-10 21:18:48 -05:00
parent e798dce237
commit 758c2ccdde
20 changed files with 885 additions and 51 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ repo_structure.txt
/layer/python*
*__pycache__*
*feeds.json
todo.md

View File

@@ -0,0 +1,200 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# RSS Article Batch Download Examples\n",
"\n",
"This notebook demonstrates how to batch download RSS articles from S3."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"import os\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"from src.search import S3BatchDownloader\n",
"\n",
"# Configure logging\n",
"import logging\n",
"logging.basicConfig(level=logging.INFO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the Downloader"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Optional: Set environment variables\n",
"os.environ['AWS_REGION'] = 'eu-west-3'\n",
"os.environ['RSS_BUCKET_NAME'] = 'your-bucket'\n",
"os.environ['RSS_PREFIX'] = 'articles/'\n",
"\n",
"# Initialize downloader\n",
"downloader = S3BatchDownloader()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Check Storage Statistics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Get storage stats\n",
"stats = downloader.get_storage_stats()\n",
"\n",
"print(f\"Total objects: {stats['total_objects']:,}\")\n",
"print(f\"Total size: {stats['total_size_mb']:.2f} MB\")\n",
"print(f\"Average object size: {stats['average_size_kb']:.2f} KB\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Download Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Download last 7 days of articles\n",
"start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')\n",
"\n",
"recent_articles_path = downloader.download_to_csv(\n",
" output_path='recent_articles.csv',\n",
" start_date=start_date\n",
")\n",
"\n",
"# Load and display sample\n",
"recent_df = pd.read_csv(recent_articles_path)\n",
"print(f\"\\nDownloaded {len(recent_df)} recent articles\")\n",
"recent_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Download articles from specific source prefix\n",
"tech_articles_path = downloader.download_to_csv(\n",
" output_path='tech_articles.csv',\n",
" prefix='articles/tech/'\n",
")\n",
"\n",
"tech_df = pd.read_csv(tech_articles_path)\n",
"print(f\"\\nDownloaded {len(tech_df)} tech articles\")\n",
"tech_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Analysis Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Analyze downloaded data\n",
"if 'source' in tech_df.columns:\n",
" source_counts = tech_df['source'].value_counts()\n",
" \n",
" print(\"\\nArticles per Source:\")\n",
" print(source_counts)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Time-based analysis\n",
"if 'published_date' in tech_df.columns:\n",
" tech_df['published_date'] = pd.to_datetime(tech_df['published_date'])\n",
" daily_counts = tech_df.resample('D', on='published_date').size()\n",
" \n",
" # Plot\n",
" import matplotlib.pyplot as plt\n",
" \n",
" plt.figure(figsize=(15, 6))\n",
" daily_counts.plot(kind='bar')\n",
" plt.title('Articles per Day')\n",
" plt.xlabel('Date')\n",
" plt.ylabel('Number of Articles')\n",
" plt.xticks(rotation=45)\n",
" plt.tight_layout()\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Export Filtered Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Filter and export specific data\n",
"if 'title' in tech_df.columns and 'content' in tech_df.columns:\n",
" # Filter articles containing specific keywords\n",
" ai_articles = tech_df[\n",
" tech_df['title'].str.contains('AI|artificial intelligence', case=False, na=False) |\n",
" tech_df['content'].str.contains('AI|artificial intelligence', case=False, na=False)\n",
" ]\n",
" \n",
" # Export filtered data\n",
" ai_articles.to_csv('ai_articles.csv', index=False)\n",
" print(f\"\\nExported {len(ai_articles)} AI-related articles to ai_articles.csv\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Cleanup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Optional: Remove downloaded CSV files\n",
"import os\n",
"\n",
"for file in ['recent_articles.csv', 'tech_articles.csv', 'ai_articles.csv']:\n",
" if os.path.exists(file):\n",
" os.remove(file)\n",
" print(f\"Removed {file}\")"
]
}
]
}

View File

@@ -0,0 +1,199 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# RSS Article Query Examples\n",
"\n",
"This notebook demonstrates how to query RSS articles using the ArticleQuerier class."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"import os\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"from src.search import ArticleQuerier\n",
"\n",
"# Configure logging if needed\n",
"import logging\n",
"logging.basicConfig(level=logging.INFO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the Querier\n",
"\n",
"You can either set environment variables or use default values:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Optional: Set environment variables\n",
"os.environ['AWS_REGION'] = 'eu-west-3'\n",
"os.environ['RSS_DATABASE_NAME'] = 'rss_articles'\n",
"os.environ['RSS_TABLE_NAME'] = 'articles'\n",
"os.environ['RSS_BUCKET_NAME'] = 'your-bucket'\n",
"\n",
"# Initialize querier\n",
"querier = ArticleQuerier()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Basic Source Analysis"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Get all sources and their article counts\n",
"sources = querier.get_sources()\n",
"\n",
"# Display top sources\n",
"print(\"Top Sources by Article Count:\")\n",
"sources.head(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Search Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Search articles containing 'python' in title\n",
"python_articles = querier.search(title=\"python\", limit=5)\n",
"print(\"\\nArticles about Python:\")\n",
"python_articles[['title', 'source', 'published_date']]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Search with multiple filters\n",
"# Get recent AWS articles from specific source\n",
"filtered_articles = querier.search(\n",
" content=\"aws\",\n",
" source=\"techcrunch\",\n",
" date_from=(datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'),\n",
" limit=5\n",
")\n",
"\n",
"print(\"\\nRecent AWS articles from TechCrunch:\")\n",
"filtered_articles[['title', 'published_date', 'url']]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Custom SQL Queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Articles per month by source\n",
"monthly_stats = querier.query(\"\"\"\n",
" SELECT \n",
" source,\n",
" DATE_TRUNC('month', published_date) as month,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '6' MONTH\n",
" GROUP BY 1, 2\n",
" ORDER BY 2 DESC, 3 DESC\n",
"\"\"\")\n",
"\n",
"print(\"\\nMonthly Article Counts:\")\n",
"monthly_stats.head(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Topic analysis (searching for specific keywords)\n",
"topic_analysis = querier.query(\"\"\"\n",
" SELECT \n",
" CASE\n",
" WHEN LOWER(title) LIKE '%python%' THEN 'Python'\n",
" WHEN LOWER(title) LIKE '%javascript%' OR LOWER(title) LIKE '%js%' THEN 'JavaScript'\n",
" WHEN LOWER(title) LIKE '%aws%' THEN 'AWS'\n",
" WHEN LOWER(title) LIKE '%ai%' OR LOWER(title) LIKE '%artificial intelligence%' THEN 'AI'\n",
" END as topic,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '30' DAY\n",
" GROUP BY 1\n",
" HAVING topic IS NOT NULL\n",
" ORDER BY 2 DESC\n",
"\"\"\")\n",
"\n",
"print(\"\\nTopic Analysis (Last 30 Days):\")\n",
"topic_analysis"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Visualization Example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Get daily article counts\n",
"daily_counts = querier.query(\"\"\"\n",
" SELECT \n",
" DATE_TRUNC('day', published_date) as date,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '30' DAY\n",
" GROUP BY 1\n",
" ORDER BY 1\n",
"\"\"\")\n",
"\n",
"# Plot\n",
"plt.figure(figsize=(15, 6))\n",
"plt.plot(daily_counts['date'], daily_counts['article_count'])\n",
"plt.title('Daily Article Counts (Last 30 Days)')\n",
"plt.xlabel('Date')\n",
"plt.ylabel('Number of Articles')\n",
"plt.grid(True)\n",
"plt.xticks(rotation=45)\n",
"plt.tight_layout()\n",
"plt.show()"
]
}
]
}

34
roadmap.md Normal file
View File

@@ -0,0 +1,34 @@
# Application Modules
* Vector Storage
* Vector Analysis
* LLM Summarization
* Semantic Understanding & Reasoning
* AWS Budget, Pinecone Budget, & LLM Budget
* Integration with bumblebee (Easily Handle standardization with embedding models & LLMs)
* Visualization System ( Ingesting, Clustering, etc...)
* API Infrastructure.
# Misc
* Duplicate Article Check Module.
* Github Issues & Github Actions.
# Future Modules
* Other Add-on's with text classification of articles ( Sentiment Analysis, political polarity, etc... )
* Article Clustering Module
* API Module ( Semantic Search, Retrieval )
* Architecture Diagram
* Error Handling and Removing Error Prone Feeds.
# Possible Use Cases/Examples
* Betting Market Prediction
* Financial Market Predicitions
* News Aggregation
* News Letter Tooling
# Marketing
* Add some datasets & analysis of text data to kaggle.
* Once you hit a certain scale maybe consider something with hackernews.
# Over-caffeineted Ideas ☕
* Make it solarpunk themed.
* Write a serverless manifesto for personal projects and where you would like to see the serverless world go.

Binary file not shown.

After

Width:  |  Height:  |  Size: 187 KiB

View File

@@ -0,0 +1,55 @@
import boto3
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from collections import defaultdict
def get_s3_object_creation_dates(bucket_name):
s3 = boto3.client('s3')
creation_dates = []
# List all objects in the bucket
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get('Contents', []):
creation_dates.append(obj['LastModified'].date())
return creation_dates
def plot_creation_dates(dates):
# Count objects created on each date
date_counts = defaultdict(int)
for date in dates:
date_counts[date] += 1
# Sort dates and get counts
sorted_dates = sorted(date_counts.keys())
counts = [date_counts[date] for date in sorted_dates]
# Create the plot
plt.figure(figsize=(15, 8))
bars = plt.bar(sorted_dates, counts)
plt.title('S3 Object Creation Dates')
plt.xlabel('Date')
plt.ylabel('Number of Objects Created')
plt.xticks(rotation=45, ha='right')
# Label each bar with its height
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width()/2., height,
f'{int(height)}',
ha='center', va='bottom')
plt.tight_layout()
# Save the plot
plt.savefig('s3_object_creation_dates.png', dpi=300, bbox_inches='tight')
print("Graph saved as 's3_object_creation_dates.png'")
def main():
bucket_name = 'open-rss-articles-us-east-1'
dates = get_s3_object_creation_dates(bucket_name)
plot_creation_dates(dates)
if __name__ == "__main__":
main()

View File

@@ -28,8 +28,8 @@ Resources:
Properties:
Name: rss-feed-processor-schedule
Description: Runs the RSS Feed Processor Lambda function every hour
State: DISABLED
ScheduleExpression: rate(30 minutes)
State: ENABLED
ScheduleExpression: rate(120 minutes)
FlexibleTimeWindow:
Mode: FLEXIBLE
MaximumWindowInMinutes: 1

View File

@@ -155,7 +155,13 @@ def deploy_infrastructure():
'ParameterValue': os.getenv('S3_BUCKET_NAME')
}
])
deploy_cloudformation('s3.yaml', 'S3-zipped',
parameters=[
{
'ParameterKey': 'BucketName',
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
}
])
deploy_cloudformation('sqs.yaml', 'SQS',
parameters=[
{

View File

@@ -25,7 +25,6 @@ LAMBDA_ROLE_ARN = os.getenv("LAMBDA_ROLE_ARN")
LAMBDA_TIMEOUT = int(os.getenv('LAMBDA_TIMEOUT'))
LAMBDA_MEMORY = int(os.getenv('LAMBDA_MEMORY'))
LAMBDA_RUNTIME = os.getenv('LAMBDA_RUNTIME')
S3_LAYER_BUCKET_NAME = os.getenv('S3_LAYER_BUCKET_NAME')
LAMBDA_STACK_NAME = os.getenv("STACK_BASE") + f"-{LAMBDA_NAME}"
LAMBDA_HANDLER = "lambda_function.lambda_handler"
LAMBDA_LAYER_NAME = LAMBDA_NAME + "Layer"

View File

@@ -20,6 +20,8 @@ def zip_lambda_code():
lambda_dir = 'src/infra/lambdas/RSSQueueFiller/lambda'
zip_path = 'tmp/lambda_function.zip'
os.makedirs(zip_path.split("/")[0], exist_ok=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(lambda_dir):
for file in files:
@@ -31,7 +33,7 @@ def zip_lambda_code():
def upload_to_s3(file_path):
s3_key = os.getenv('QUEUE_FILLER_LAMBDA_S3_KEY')
bucket_name = os.getenv('S3_LAYER_BUCKET_NAME')
bucket_name = os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
s3.upload_file(file_path, bucket_name, s3_key)
return f's3://{bucket_name}/{s3_key}'
@@ -64,7 +66,7 @@ def deploy_sqs_filler():
},
{
'ParameterKey': 'LambdaCodeS3Bucket',
'ParameterValue': os.getenv('S3_LAYER_BUCKET_NAME')
'ParameterValue': os.getenv('S3_LAMBDA_ZIPPED_BUCKET_NAME')
},
{
'ParameterKey': 'LambdaCodeS3Key',

View File

@@ -70,6 +70,7 @@ def main():
env_vars["LAMBDA_LAYER_NAME"] = f"ingest-rss-lambda-layer-{env_vars['AWS_REGION']}"
env_vars["LAMBDA_LAYER_ARN"] = f"arn:aws:lambda:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:layer:{env_vars['LAMBDA_LAYER_NAME']}:{env_vars['LAMBDA_LAYER_VERSION']}"
env_vars["S3_LAYER_BUCKET_NAME"] = f"rss-feed-processor-layers-{env_vars['AWS_REGION']}"
env_vars["S3_LAMBDA_ZIPPED_BUCKET_NAME"] = f"open-rss-lambda-{env_vars['AWS_REGION']}"
env_vars["S3_LAYER_KEY_NAME"] = get_env_value("S3_LAYER_KEY_NAME", "Enter S3 Layer Key Name:", options=["RSSFeedProcessorDependencies", "CustomDependencies"], advanced=advanced_mode)
env_vars["SQS_QUEUE_URL"] = f"https://sqs.{env_vars['AWS_REGION']}.amazonaws.com/{env_vars['AWS_ACCOUNT_ID']}/{env_vars['SQS_QUEUE_NAME']}"
env_vars["SQS_QUEUE_ARN"] = f"arn:aws:sqs:{env_vars['AWS_REGION']}:{env_vars['AWS_ACCOUNT_ID']}:{env_vars['SQS_QUEUE_NAME']}"

4
src/search/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .query.querier import ArticleQuerier
from .batch.downloader import S3BatchDownloader
__all__ = ['ArticleQuerier', 'S3BatchDownloader']

View File

@@ -0,0 +1,3 @@
from .downloader import S3BatchDownloader
__all__ = ['S3BatchDownloader']

View File

@@ -0,0 +1,185 @@
import boto3
import pandas as pd
from typing import Optional, List, Dict, Union, Any
import json
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import logging
from string import Template
class S3BatchDownloader:
"""Class for batch downloading RSS articles from S3"""
DEFAULT_CONFIG = {
"region": "${AWS_REGION}",
"bucket": "${RSS_BUCKET_NAME}",
"prefix": "${RSS_PREFIX}",
"max_workers": 10
}
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the S3BatchDownloader
Args:
config_path: Optional path to config file. If None, uses environment variables.
"""
self.logger = logging.getLogger(__name__)
self.config = self._load_config(config_path)
self._validate_config()
self.s3 = boto3.client('s3', region_name=self.config['region'])
self.logger.info(f"Initialized S3BatchDownloader for bucket: {self.config['bucket']}")
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load and process configuration"""
if config_path and os.path.exists(config_path):
with open(config_path) as f:
template = Template(f.read())
else:
template = Template(json.dumps(self.DEFAULT_CONFIG))
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'eu-west-3'),
'RSS_BUCKET_NAME': os.getenv('RSS_BUCKET_NAME', 'your-bucket'),
'RSS_PREFIX': os.getenv('RSS_PREFIX', 'articles/'),
}
config_str = template.safe_substitute(env_vars)
try:
config = json.loads(config_str)
# Ensure max_workers is an integer
config['max_workers'] = int(config.get('max_workers', 10))
return config
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON config after variable substitution: {str(e)}")
def _validate_config(self) -> None:
"""Validate the configuration"""
required_fields = ['region', 'bucket', 'prefix']
missing_fields = [field for field in required_fields if field not in self.config]
if missing_fields:
raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}")
def download_to_csv(self,
output_path: str,
prefix: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
batch_size: int = 1000) -> str:
"""
Download articles from S3 to CSV file
Args:
output_path: Path to save CSV file
prefix: Optional S3 prefix filter
start_date: Optional start date filter (YYYY-MM-DD)
end_date: Optional end date filter (YYYY-MM-DD)
batch_size: Number of objects to process in each batch
Returns:
Path to the saved CSV file
"""
self.logger.info(f"Starting batch download to {output_path}")
# Convert dates if provided
start_ts = datetime.strptime(start_date, '%Y-%m-%d') if start_date else None
end_ts = datetime.strptime(end_date, '%Y-%m-%D') if end_date else None
# Get list of all objects
objects = self._list_objects(prefix)
# Filter by date if specified
if start_ts or end_ts:
objects = [
obj for obj in objects
if self._is_in_date_range(obj['LastModified'], start_ts, end_ts)
]
self.logger.info(f"Found {len(objects)} objects to process")
# Process in batches
all_data = []
for i in range(0, len(objects), batch_size):
batch = objects[i:i + batch_size]
self.logger.info(f"Processing batch {i//batch_size + 1}/{(len(objects)-1)//batch_size + 1}")
# Download batch in parallel
with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor:
results = list(executor.map(self._download_object, batch))
# Add successful downloads to results
for result in results:
if result is not None:
all_data.extend(result if isinstance(result, list) else [result])
# Convert to DataFrame and save
df = pd.DataFrame(all_data)
df.to_csv(output_path, index=False)
self.logger.info(f"Successfully downloaded {len(df)} articles to {output_path}")
return output_path
def _list_objects(self, prefix: Optional[str] = None) -> List[Dict]:
"""List objects in S3 bucket"""
objects = []
paginator = self.s3.get_paginator('list_objects_v2')
try:
for page in paginator.paginate(
Bucket=self.config['bucket'],
Prefix=prefix or self.config['prefix']
):
if 'Contents' in page:
objects.extend(page['Contents'])
return objects
except Exception as e:
self.logger.error(f"Error listing objects: {str(e)}")
raise
def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]:
"""Download and parse single S3 object"""
try:
response = self.s3.get_object(
Bucket=self.config['bucket'],
Key=obj['Key']
)
content = response['Body'].read().decode('utf-8')
# Handle both single JSON objects and arrays
data = json.loads(content)
return data if isinstance(data, list) else [data]
except Exception as e:
self.logger.error(f"Error downloading {obj['Key']}: {str(e)}")
return None
def _is_in_date_range(self,
ts: datetime,
start: Optional[datetime],
end: Optional[datetime]) -> bool:
"""Check if timestamp is within date range"""
if start and ts < start:
return False
if end and ts > end:
return False
return True
def get_storage_stats(self) -> Dict[str, Union[int, float]]:
"""
Get storage statistics
Returns:
Dict containing total objects, total size, etc.
"""
objects = self._list_objects()
return {
'total_objects': len(objects),
'total_size_mb': sum(obj['Size'] for obj in objects) / (1024 * 1024),
'average_size_kb': sum(obj['Size'] for obj in objects) / len(objects) / 1024 if objects else 0
}

View File

@@ -0,0 +1,3 @@
from .querier import ArticleQuerier
__all__ = ['ArticleQuerier']

175
src/search/query/querier.py Normal file
View File

@@ -0,0 +1,175 @@
import boto3
import pandas as pd
from typing import Optional, Dict, List, Any
import json
import os
from datetime import datetime
from string import Template
import logging
class ArticleQuerier:
"""Class for querying RSS articles using Amazon Athena"""
DEFAULT_CONFIG = {
"region": "${AWS_REGION}",
"database": "${RSS_DATABASE_NAME}",
"table": "${RSS_TABLE_NAME}",
"output_location": "s3://${RSS_BUCKET_NAME}/athena-output/"
}
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the ArticleQuerier
Args:
config_path: Optional path to config file. If None, uses environment variables.
"""
self.logger = logging.getLogger(__name__)
self.config = self._load_config(config_path)
self._validate_config()
self.athena = boto3.client('athena', region_name=self.config['region'])
self.logger.info(f"Initialized ArticleQuerier with database: {self.config['database']}")
def _load_config(self, config_path: Optional[str]) -> Dict[str, str]:
"""Load and process configuration"""
if config_path and os.path.exists(config_path):
with open(config_path) as f:
template = Template(f.read())
else:
template = Template(json.dumps(self.DEFAULT_CONFIG))
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'eu-west-3'),
'RSS_DATABASE_NAME': os.getenv('RSS_DATABASE_NAME', 'rss_articles'),
'RSS_TABLE_NAME': os.getenv('RSS_TABLE_NAME', 'articles'),
'RSS_BUCKET_NAME': os.getenv('RSS_BUCKET_NAME', 'your-bucket'),
}
config_str = template.safe_substitute(env_vars)
try:
return json.loads(config_str)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON config after variable substitution: {str(e)}")
def _validate_config(self) -> None:
"""Validate the configuration"""
required_fields = ['region', 'database', 'table', 'output_location']
missing_fields = [field for field in required_fields if field not in self.config]
if missing_fields:
raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}")
if not self.config['output_location'].startswith('s3://'):
raise ValueError("output_location must be an S3 URL (s3://...)")
def search(self,
title: Optional[str] = None,
content: Optional[str] = None,
source: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 100) -> pd.DataFrame:
"""
Search articles using various filters
Args:
title: Search in article titles
content: Search in article content
source: Filter by source
date_from: Start date (YYYY-MM-DD)
date_to: End date (YYYY-MM-DD)
limit: Maximum number of results
Returns:
DataFrame containing the results
"""
conditions = []
if title:
conditions.append(f"LOWER(title) LIKE LOWER('%{title}%')")
if content:
conditions.append(f"LOWER(content) LIKE LOWER('%{content}%')")
if source:
conditions.append(f"source = '{source}'")
if date_from:
conditions.append(f"published_date >= TIMESTAMP '{date_from}'")
if date_to:
conditions.append(f"published_date <= TIMESTAMP '{date_to}'")
where_clause = " AND ".join(conditions) if conditions else "1=1"
query = f"""
SELECT *
FROM {self.config['database']}.{self.config['table']}
WHERE {where_clause}
ORDER BY published_date DESC
LIMIT {limit}
"""
return self.query(query)
def query(self, query: str) -> pd.DataFrame:
"""
Execute custom SQL query
Args:
query: SQL query string
Returns:
DataFrame containing the results
"""
try:
self.logger.debug(f"Executing query: {query}")
response = self.athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': self.config['database']},
ResultConfiguration={'OutputLocation': self.config['output_location']}
)
return self._get_query_results(response['QueryExecutionId'])
except Exception as e:
self.logger.error(f"Query execution failed: {str(e)}")
raise
def get_sources(self) -> pd.DataFrame:
"""
Get list of sources and their article counts
Returns:
DataFrame with source statistics
"""
query = f"""
SELECT
source,
COUNT(*) as article_count,
MIN(published_date) as earliest_article,
MAX(published_date) as latest_article
FROM {self.config['database']}.{self.config['table']}
GROUP BY source
ORDER BY article_count DESC
"""
return self.query(query)
def _get_query_results(self, query_id: str) -> pd.DataFrame:
"""Helper method to get query results"""
while True:
status = self.athena.get_query_execution(QueryExecutionId=query_id)
state = status['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
break
elif state in ['FAILED', 'CANCELLED']:
error_message = status['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
raise Exception(f"Query failed: {error_message}")
results = []
columns = None
paginator = self.athena.get_paginator('get_query_results')
for page in paginator.paginate(QueryExecutionId=query_id):
if not columns:
columns = [col['Name'] for col in page['ResultSet']['ResultSetMetadata']['ColumnInfo']]
for row in page['ResultSet']['Rows'][1:]:
results.append([field.get('VarCharValue', '') for field in row['Data']])
return pd.DataFrame(results, columns=columns)

View File

@@ -103,7 +103,7 @@ def check_env() -> None:
raise EnvironmentError("Missing or improperly set environment variables")
else:
print("Someone followed directions!🐝🐝🐝")
print("All required environment variables are properly set. P")
print("All required environment variables are properly set.")
# Example usage
if __name__ == "__main__":

View File

@@ -21,6 +21,8 @@ LAMBDA_LAYER_VERSION=6 # This is fixed.
LAMBDA_LAYER_NAME=ingest-rss-lambda-layer-${AWS_REGION}
LAMBDA_LAYER_ARN=arn:aws:lambda:${AWS_REGION}:966265353179:layer:${LAMBDA_LAYER_NAME}:${LAMBDA_LAYER_VERSION}
S3_LAMBDA_ZIPPED_BUCKET_NAME=open-rss-lambda-${AWS_REGION}
S3_LAYER_BUCKET_NAME=rss-feed-processor-layers-${AWS_REGION}
S3_LAYER_KEY_NAME= RSSFeedProcessorDependencies

51
todo.md
View File

@@ -1,49 +1,14 @@
# Before Public Launch
* Testing from 3rd party aws account.
* Fix Issue with KMS Keys & IAM Role [ Done ]
* Debug the Errors that are at scale.
* Testing from 3rd party aws account. [Today]
* Make sure that the scraping works from a 3rd Party Account [ Today ]
* API Tool - to Pull data that you have down.
* Tips on where to gather RSS Feeds.
* Public Launch Posts
* Reddit
* Twitter
* Kaggle
* Test Large Amounts of Feeds ( Decrease the cadence of ingesting. ) [ Today ]
* Test out how long an S3 Full Pull will take on the full thing.
** First Run ( ~30 Minutes)
** Second Run.
* Test out Vector Databases at Small Scale.
* Test out Vector Databases at Scale.
* Test out LLM Summarizaiton At Small Scale
* Test out LLM Summarization At Scall
* Re-enable the Scheduler
# Application Modules
* Vector Storage
* Vector Analysis
* LLM Summarization
* Semantic Understanding & Reasoning
* AWS Budget, Pinecone Budget, & LLM Budget
* Integration with bumblebee (Easily Handle standardization with embedding models & LLMs)
* Visualization System ( Ingesting, Clustering, etc...)
* API Infrastructure.
# Misc
* Duplicate Article Check Module.
* Github Issues & Github Actions.
# Future Modules
* Other Add-on's with text classification of articles ( Sentiment Analysis, political polarity, etc... )
* Article Clustering Module
* API Module ( Semantic Search, Retrieval )
* Architecture Diagram
# Possible Use Cases/Examples
* Betting Market Prediction
* Financial Market Predicitions
* News Aggregation
* News Letter Tooling
* Add some datasets & analysis of text data to kaggle.
# Over-caffeineted Ideas ☕
* Make it solarpunk themed.
* Write a serverless manifesto for personal projects and where you would like to see the serverless world go.