sped up batch ingestion of articles.

This commit is contained in:
Charles-Gormley
2024-11-20 18:31:11 -05:00
parent f400c8bda5
commit 28148f1181
8 changed files with 289613 additions and 630 deletions

View File

@@ -11,17 +11,33 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"from src.search import S3BatchDownloader\n",
"from time import time\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Root imports\n",
"os.chdir('..')\n",
"\n",
"# Configure logging\n",
"import logging\n",
"logging.basicConfig(level=logging.INFO)"
"from src.search.batch import S3BatchDownloader\n",
"from dotenv import load_dotenv\n",
"\n",
"load_dotenv(override=True)\n",
"\n",
"downloader = S3BatchDownloader()\n",
"\n",
"os.chdir('example-notebooks')\n"
]
},
{
@@ -31,18 +47,226 @@
"## Initialize the Downloader"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"List permission: Allowed\n"
]
}
],
"source": [
"import boto3\n",
"from botocore.exceptions import ClientError\n",
"\n",
"s3 = boto3.client('s3')\n",
"\n",
"# Test ListObjects\n",
"try:\n",
" response = s3.list_objects_v2(Bucket='open-rss-articles-us-east-1')\n",
" print(\"List permission: Allowed\")\n",
"except ClientError as e:\n",
" print(\"List permission: Denied\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Downloaded data to consolidated_data.csv in 59.01 seconds\n"
]
}
],
"source": [
"# Optional: Set environment variables\n",
"os.environ['AWS_REGION'] = 'eu-west-3'\n",
"os.environ['RSS_BUCKET_NAME'] = 'your-bucket'\n",
"os.environ['RSS_PREFIX'] = 'articles/'\n",
"start = time()\n",
"output_path = \"consolidated_data.csv\" # or \"consolidated_data.json\"\n",
"\n",
"# Initialize downloader\n",
"downloader = S3BatchDownloader()"
"# Define date range\n",
"start_date = \"2024-11-17\"\n",
"end_date = \"2024-11-20\"\n",
"\n",
"# Start downloading\n",
"downloader.download_to_file(\n",
" output_path=output_path,\n",
" file_format=\"csv\", # or \"json\"\n",
" start_date=start_date,\n",
" end_date=end_date\n",
")\n",
"\n",
"print(f\"Downloaded data to {output_path} in {time() - start:.2f} seconds\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"df = pd.read_csv(output_path)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>link</th>\n",
" <th>rss</th>\n",
" <th>title</th>\n",
" <th>content</th>\n",
" <th>unixTime</th>\n",
" <th>rss_id</th>\n",
" <th>article_id</th>\n",
" <th>llm_summary</th>\n",
" <th>embedding</th>\n",
" <th>unixtime</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>https://www.cnet.com/home/internet/comporium-i...</td>\n",
" <td>https://www.cnet.com/rss/news/</td>\n",
" <td>Comporium Home Internet: Pricing, Speeds and A...</td>\n",
" <td>Unavailable in Provider unavailable in 90001 E...</td>\n",
" <td>1731883654</td>\n",
" <td>f5e6f52c79</td>\n",
" <td>00089f7505</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>1731883654</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>https://kotaku.com/lady-gaga-wednesday-season-...</td>\n",
" <td>https://kotaku.com/rss</td>\n",
" <td>Lady Gaga Appearing In Wednesday Season 2 Is A...</td>\n",
" <td>The next stop on Lady Gagas years-long tour t...</td>\n",
" <td>1731883665</td>\n",
" <td>a0840ab3b4</td>\n",
" <td>009f880a86</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>1731883665</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>https://expeditionportal.com/classifieds-honda...</td>\n",
" <td>https://www.expeditionportal.com/feed/</td>\n",
" <td>2008 Honda Element EX ECamper :: Classifieds</td>\n",
" <td>Overlanding is about experiences first and for...</td>\n",
" <td>1731883665</td>\n",
" <td>ecdc66bb02</td>\n",
" <td>00ac77e95f</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>1731883665</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>http://www.influencerupdate.biz/news/68624/twi...</td>\n",
" <td>https://www.influencerupdate.biz/rss/news/</td>\n",
" <td>Twitch is testing mid-roll ads on creator broa...</td>\n",
" <td>Streaming giant Twitch is planning to test mid...</td>\n",
" <td>1731883669</td>\n",
" <td>e34caba76d</td>\n",
" <td>0041bc4abf</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>1731883669</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>https://thejournal.com/Articles/2024/10/29/Rep...</td>\n",
" <td>https://thejournal.com/rss-feeds/news.aspx</td>\n",
" <td>Report: 90% of Schools Depend on E-rate Fundin...</td>\n",
" <td>Report: 90% of Schools Depend on E-rate Fundin...</td>\n",
" <td>1731883616</td>\n",
" <td>efd9bb9654</td>\n",
" <td>000a3da3b6</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>1731883616</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" link \\\n",
"0 https://www.cnet.com/home/internet/comporium-i... \n",
"1 https://kotaku.com/lady-gaga-wednesday-season-... \n",
"2 https://expeditionportal.com/classifieds-honda... \n",
"3 http://www.influencerupdate.biz/news/68624/twi... \n",
"4 https://thejournal.com/Articles/2024/10/29/Rep... \n",
"\n",
" rss \\\n",
"0 https://www.cnet.com/rss/news/ \n",
"1 https://kotaku.com/rss \n",
"2 https://www.expeditionportal.com/feed/ \n",
"3 https://www.influencerupdate.biz/rss/news/ \n",
"4 https://thejournal.com/rss-feeds/news.aspx \n",
"\n",
" title \\\n",
"0 Comporium Home Internet: Pricing, Speeds and A... \n",
"1 Lady Gaga Appearing In Wednesday Season 2 Is A... \n",
"2 2008 Honda Element EX ECamper :: Classifieds \n",
"3 Twitch is testing mid-roll ads on creator broa... \n",
"4 Report: 90% of Schools Depend on E-rate Fundin... \n",
"\n",
" content unixTime rss_id \\\n",
"0 Unavailable in Provider unavailable in 90001 E... 1731883654 f5e6f52c79 \n",
"1 The next stop on Lady Gagas years-long tour t... 1731883665 a0840ab3b4 \n",
"2 Overlanding is about experiences first and for... 1731883665 ecdc66bb02 \n",
"3 Streaming giant Twitch is planning to test mid... 1731883669 e34caba76d \n",
"4 Report: 90% of Schools Depend on E-rate Fundin... 1731883616 efd9bb9654 \n",
"\n",
" article_id llm_summary embedding unixtime \n",
"0 00089f7505 NaN NaN 1731883654 \n",
"1 009f880a86 NaN NaN 1731883665 \n",
"2 00ac77e95f NaN NaN 1731883665 \n",
"3 0041bc4abf NaN NaN 1731883669 \n",
"4 000a3da3b6 NaN NaN 1731883616 "
]
},
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.head()"
]
},
{
@@ -51,150 +275,27 @@
"source": [
"## 1. Check Storage Statistics"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Get storage stats\n",
"stats = downloader.get_storage_stats()\n",
"\n",
"print(f\"Total objects: {stats['total_objects']:,}\")\n",
"print(f\"Total size: {stats['total_size_mb']:.2f} MB\")\n",
"print(f\"Average object size: {stats['average_size_kb']:.2f} KB\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Download Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Download last 7 days of articles\n",
"start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')\n",
"\n",
"recent_articles_path = downloader.download_to_csv(\n",
" output_path='recent_articles.csv',\n",
" start_date=start_date\n",
")\n",
"\n",
"# Load and display sample\n",
"recent_df = pd.read_csv(recent_articles_path)\n",
"print(f\"\\nDownloaded {len(recent_df)} recent articles\")\n",
"recent_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Download articles from specific source prefix\n",
"tech_articles_path = downloader.download_to_csv(\n",
" output_path='tech_articles.csv',\n",
" prefix='articles/tech/'\n",
")\n",
"\n",
"tech_df = pd.read_csv(tech_articles_path)\n",
"print(f\"\\nDownloaded {len(tech_df)} tech articles\")\n",
"tech_df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Analysis Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Analyze downloaded data\n",
"if 'source' in tech_df.columns:\n",
" source_counts = tech_df['source'].value_counts()\n",
" \n",
" print(\"\\nArticles per Source:\")\n",
" print(source_counts)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Time-based analysis\n",
"if 'published_date' in tech_df.columns:\n",
" tech_df['published_date'] = pd.to_datetime(tech_df['published_date'])\n",
" daily_counts = tech_df.resample('D', on='published_date').size()\n",
" \n",
" # Plot\n",
" import matplotlib.pyplot as plt\n",
" \n",
" plt.figure(figsize=(15, 6))\n",
" daily_counts.plot(kind='bar')\n",
" plt.title('Articles per Day')\n",
" plt.xlabel('Date')\n",
" plt.ylabel('Number of Articles')\n",
" plt.xticks(rotation=45)\n",
" plt.tight_layout()\n",
" plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Export Filtered Data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Filter and export specific data\n",
"if 'title' in tech_df.columns and 'content' in tech_df.columns:\n",
" # Filter articles containing specific keywords\n",
" ai_articles = tech_df[\n",
" tech_df['title'].str.contains('AI|artificial intelligence', case=False, na=False) |\n",
" tech_df['content'].str.contains('AI|artificial intelligence', case=False, na=False)\n",
" ]\n",
" \n",
" # Export filtered data\n",
" ai_articles.to_csv('ai_articles.csv', index=False)\n",
" print(f\"\\nExported {len(ai_articles)} AI-related articles to ai_articles.csv\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. Cleanup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Optional: Remove downloaded CSV files\n",
"import os\n",
"\n",
"for file in ['recent_articles.csv', 'tech_articles.csv', 'ai_articles.csv']:\n",
" if os.path.exists(file):\n",
" os.remove(file)\n",
" print(f\"Removed {file}\")"
]
}
]
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because one or more lines are too long

View File

@@ -1,199 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# RSS Article Query Examples\n",
"\n",
"This notebook demonstrates how to query RSS articles using the ArticleQuerier class."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"import os\n",
"import pandas as pd\n",
"from datetime import datetime, timedelta\n",
"from src.search import ArticleQuerier\n",
"\n",
"# Configure logging if needed\n",
"import logging\n",
"logging.basicConfig(level=logging.INFO)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the Querier\n",
"\n",
"You can either set environment variables or use default values:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Optional: Set environment variables\n",
"os.environ['AWS_REGION'] = 'eu-west-3'\n",
"os.environ['RSS_DATABASE_NAME'] = 'rss_articles'\n",
"os.environ['RSS_TABLE_NAME'] = 'articles'\n",
"os.environ['RSS_BUCKET_NAME'] = 'your-bucket'\n",
"\n",
"# Initialize querier\n",
"querier = ArticleQuerier()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Basic Source Analysis"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Get all sources and their article counts\n",
"sources = querier.get_sources()\n",
"\n",
"# Display top sources\n",
"print(\"Top Sources by Article Count:\")\n",
"sources.head(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. Search Examples"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Search articles containing 'python' in title\n",
"python_articles = querier.search(title=\"python\", limit=5)\n",
"print(\"\\nArticles about Python:\")\n",
"python_articles[['title', 'source', 'published_date']]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Search with multiple filters\n",
"# Get recent AWS articles from specific source\n",
"filtered_articles = querier.search(\n",
" content=\"aws\",\n",
" source=\"techcrunch\",\n",
" date_from=(datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d'),\n",
" limit=5\n",
")\n",
"\n",
"print(\"\\nRecent AWS articles from TechCrunch:\")\n",
"filtered_articles[['title', 'published_date', 'url']]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Custom SQL Queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Articles per month by source\n",
"monthly_stats = querier.query(\"\"\"\n",
" SELECT \n",
" source,\n",
" DATE_TRUNC('month', published_date) as month,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '6' MONTH\n",
" GROUP BY 1, 2\n",
" ORDER BY 2 DESC, 3 DESC\n",
"\"\"\")\n",
"\n",
"print(\"\\nMonthly Article Counts:\")\n",
"monthly_stats.head(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"# Topic analysis (searching for specific keywords)\n",
"topic_analysis = querier.query(\"\"\"\n",
" SELECT \n",
" CASE\n",
" WHEN LOWER(title) LIKE '%python%' THEN 'Python'\n",
" WHEN LOWER(title) LIKE '%javascript%' OR LOWER(title) LIKE '%js%' THEN 'JavaScript'\n",
" WHEN LOWER(title) LIKE '%aws%' THEN 'AWS'\n",
" WHEN LOWER(title) LIKE '%ai%' OR LOWER(title) LIKE '%artificial intelligence%' THEN 'AI'\n",
" END as topic,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '30' DAY\n",
" GROUP BY 1\n",
" HAVING topic IS NOT NULL\n",
" ORDER BY 2 DESC\n",
"\"\"\")\n",
"\n",
"print(\"\\nTopic Analysis (Last 30 Days):\")\n",
"topic_analysis"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. Visualization Example"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Get daily article counts\n",
"daily_counts = querier.query(\"\"\"\n",
" SELECT \n",
" DATE_TRUNC('day', published_date) as date,\n",
" COUNT(*) as article_count\n",
" FROM articles\n",
" WHERE published_date >= CURRENT_DATE - INTERVAL '30' DAY\n",
" GROUP BY 1\n",
" ORDER BY 1\n",
"\"\"\")\n",
"\n",
"# Plot\n",
"plt.figure(figsize=(15, 6))\n",
"plt.plot(daily_counts['date'], daily_counts['article_count'])\n",
"plt.title('Daily Article Counts (Last 30 Days)')\n",
"plt.xlabel('Date')\n",
"plt.ylabel('Number of Articles')\n",
"plt.grid(True)\n",
"plt.xticks(rotation=45)\n",
"plt.tight_layout()\n",
"plt.show()"
]
}
]
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 187 KiB

After

Width:  |  Height:  |  Size: 154 KiB

View File

@@ -3,8 +3,8 @@ import pandas as pd
from typing import Optional, List, Dict, Union, Any
import json
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
import logging
from string import Template
@@ -15,7 +15,7 @@ class S3BatchDownloader:
"region": "${AWS_REGION}",
"bucket": "${RSS_BUCKET_NAME}",
"prefix": "${RSS_PREFIX}",
"max_workers": 10
"max_workers": os.cpu_count() or 10
}
def __init__(self, config_path: Optional[str] = None):
@@ -41,16 +41,14 @@ class S3BatchDownloader:
template = Template(json.dumps(self.DEFAULT_CONFIG))
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'eu-west-3'),
'RSS_BUCKET_NAME': os.getenv('RSS_BUCKET_NAME', 'your-bucket'),
'RSS_PREFIX': os.getenv('RSS_PREFIX', 'articles/'),
'AWS_REGION': os.getenv('AWS_REGION', 'us-east-1'),
'RSS_BUCKET_NAME': os.getenv('S3_BUCKET_NAME')
}
config_str = template.safe_substitute(env_vars)
try:
config = json.loads(config_str)
# Ensure max_workers is an integer
config['max_workers'] = int(config.get('max_workers', 10))
return config
except json.JSONDecodeError as e:
@@ -60,84 +58,65 @@ class S3BatchDownloader:
"""Validate the configuration"""
required_fields = ['region', 'bucket', 'prefix']
missing_fields = [field for field in required_fields if field not in self.config]
if missing_fields:
raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}")
def download_to_csv(self,
output_path: str,
prefix: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
batch_size: int = 1000) -> str:
def download_to_file(self,
output_path: str,
file_format: str = 'csv',
start_date: Optional[str] = None,
end_date: Optional[str] = None) -> str:
"""
Download articles from S3 to CSV file
Download articles from S3 to a consolidated file
Args:
output_path: Path to save CSV file
prefix: Optional S3 prefix filter
start_date: Optional start date filter (YYYY-MM-DD)
end_date: Optional end date filter (YYYY-MM-DD)
batch_size: Number of objects to process in each batch
output_path: Path to save the output file.
file_format: Format to save the file ('csv' or 'json').
start_date: Optional start date filter (YYYY-MM-DD).
end_date: Optional end date filter (YYYY-MM-DD).
Returns:
Path to the saved CSV file
Path to the saved file.
"""
self.logger.info(f"Starting batch download to {output_path}")
# Convert dates if provided
start_ts = datetime.strptime(start_date, '%Y-%m-%d') if start_date else None
end_ts = datetime.strptime(end_date, '%Y-%m-%D') if end_date else None
# Convert date strings to UTC datetime
start_ts = datetime.strptime(start_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if start_date else None
end_ts = datetime.strptime(end_date, '%Y-%m-%d').replace(tzinfo=timezone.utc) if end_date else None
# Get list of all objects
objects = self._list_objects(prefix)
# List and filter objects
objects = self._list_objects()
# Filter by date if specified
if start_ts or end_ts:
objects = [
obj for obj in objects
if self._is_in_date_range(obj['LastModified'], start_ts, end_ts)
]
self.logger.info(f"Found {len(objects)} objects to process")
# Process in batches
# Download and merge data
all_data = []
for i in range(0, len(objects), batch_size):
batch = objects[i:i + batch_size]
self.logger.info(f"Processing batch {i//batch_size + 1}/{(len(objects)-1)//batch_size + 1}")
# Download batch in parallel
with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor:
results = list(executor.map(self._download_object, batch))
# Add successful downloads to results
for result in results:
with ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor:
future_to_obj = {executor.submit(self._download_object, obj): obj for obj in objects}
for future in as_completed(future_to_obj):
result = future.result()
if result is not None:
all_data.extend(result if isinstance(result, list) else [result])
# Convert to DataFrame and save
df = pd.DataFrame(all_data)
df.to_csv(output_path, index=False)
self.logger.info(f"Successfully downloaded {len(df)} articles to {output_path}")
# Save to file
self._save_to_file(all_data, output_path, file_format)
self.logger.info(f"Successfully downloaded {len(all_data)} articles to {output_path}")
return output_path
def _list_objects(self, prefix: Optional[str] = None) -> List[Dict]:
def _list_objects(self) -> List[Dict]:
"""List objects in S3 bucket"""
objects = []
paginator = self.s3.get_paginator('list_objects_v2')
paginator = self.s3.get_paginator('list_objects')
try:
for page in paginator.paginate(
Bucket=self.config['bucket'],
Prefix=prefix or self.config['prefix']
):
for page in paginator.paginate(Bucket=self.config['bucket']):
if 'Contents' in page:
objects.extend(page['Contents'])
return objects
except Exception as e:
self.logger.error(f"Error listing objects: {str(e)}")
raise
@@ -145,41 +124,31 @@ class S3BatchDownloader:
def _download_object(self, obj: Dict) -> Optional[Union[Dict, List[Dict]]]:
"""Download and parse single S3 object"""
try:
response = self.s3.get_object(
Bucket=self.config['bucket'],
Key=obj['Key']
)
response = self.s3.get_object(Bucket=self.config['bucket'], Key=obj['Key'])
content = response['Body'].read().decode('utf-8')
# Handle both single JSON objects and arrays
data = json.loads(content)
return data if isinstance(data, list) else [data]
metadata = response.get('Metadata', {})
if isinstance(data, dict):
data.update(metadata)
return [data]
elif isinstance(data, list):
for item in data:
item.update(metadata)
return data
except Exception as e:
self.logger.error(f"Error downloading {obj['Key']}: {str(e)}")
return None
def _is_in_date_range(self,
ts: datetime,
start: Optional[datetime],
end: Optional[datetime]) -> bool:
"""Check if timestamp is within date range"""
if start and ts < start:
return False
if end and ts > end:
return False
return True
def _is_in_date_range(self, ts: datetime, start: Optional[datetime], end: Optional[datetime]) -> bool:
"""Check if timestamp is within the date range"""
return (not start or ts >= start) and (not end or ts <= end)
def get_storage_stats(self) -> Dict[str, Union[int, float]]:
"""
Get storage statistics
Returns:
Dict containing total objects, total size, etc.
"""
objects = self._list_objects()
return {
'total_objects': len(objects),
'total_size_mb': sum(obj['Size'] for obj in objects) / (1024 * 1024),
'average_size_kb': sum(obj['Size'] for obj in objects) / len(objects) / 1024 if objects else 0
}
def _save_to_file(self, data: List[Dict], output_path: str, file_format: str) -> None:
"""Save data to file"""
df = pd.DataFrame(data)
if file_format == 'csv':
df.to_csv(output_path, index=False)
elif file_format == 'json':
df.to_json(output_path, orient='records', lines=True)
else:
raise ValueError(f"Unsupported file format: {file_format}")

View File

@@ -1,3 +0,0 @@
from .querier import ArticleQuerier
__all__ = ['ArticleQuerier']

View File

@@ -1,175 +0,0 @@
import boto3
import pandas as pd
from typing import Optional, Dict, List, Any
import json
import os
from datetime import datetime
from string import Template
import logging
class ArticleQuerier:
"""Class for querying RSS articles using Amazon Athena"""
DEFAULT_CONFIG = {
"region": "${AWS_REGION}",
"database": "${RSS_DATABASE_NAME}",
"table": "${RSS_TABLE_NAME}",
"output_location": "s3://${RSS_BUCKET_NAME}/athena-output/"
}
def __init__(self, config_path: Optional[str] = None):
"""
Initialize the ArticleQuerier
Args:
config_path: Optional path to config file. If None, uses environment variables.
"""
self.logger = logging.getLogger(__name__)
self.config = self._load_config(config_path)
self._validate_config()
self.athena = boto3.client('athena', region_name=self.config['region'])
self.logger.info(f"Initialized ArticleQuerier with database: {self.config['database']}")
def _load_config(self, config_path: Optional[str]) -> Dict[str, str]:
"""Load and process configuration"""
if config_path and os.path.exists(config_path):
with open(config_path) as f:
template = Template(f.read())
else:
template = Template(json.dumps(self.DEFAULT_CONFIG))
env_vars = {
'AWS_REGION': os.getenv('AWS_REGION', 'eu-west-3'),
'RSS_DATABASE_NAME': os.getenv('RSS_DATABASE_NAME', 'rss_articles'),
'RSS_TABLE_NAME': os.getenv('RSS_TABLE_NAME', 'articles'),
'RSS_BUCKET_NAME': os.getenv('RSS_BUCKET_NAME', 'your-bucket'),
}
config_str = template.safe_substitute(env_vars)
try:
return json.loads(config_str)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON config after variable substitution: {str(e)}")
def _validate_config(self) -> None:
"""Validate the configuration"""
required_fields = ['region', 'database', 'table', 'output_location']
missing_fields = [field for field in required_fields if field not in self.config]
if missing_fields:
raise ValueError(f"Missing required config fields: {', '.join(missing_fields)}")
if not self.config['output_location'].startswith('s3://'):
raise ValueError("output_location must be an S3 URL (s3://...)")
def search(self,
title: Optional[str] = None,
content: Optional[str] = None,
source: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 100) -> pd.DataFrame:
"""
Search articles using various filters
Args:
title: Search in article titles
content: Search in article content
source: Filter by source
date_from: Start date (YYYY-MM-DD)
date_to: End date (YYYY-MM-DD)
limit: Maximum number of results
Returns:
DataFrame containing the results
"""
conditions = []
if title:
conditions.append(f"LOWER(title) LIKE LOWER('%{title}%')")
if content:
conditions.append(f"LOWER(content) LIKE LOWER('%{content}%')")
if source:
conditions.append(f"source = '{source}'")
if date_from:
conditions.append(f"published_date >= TIMESTAMP '{date_from}'")
if date_to:
conditions.append(f"published_date <= TIMESTAMP '{date_to}'")
where_clause = " AND ".join(conditions) if conditions else "1=1"
query = f"""
SELECT *
FROM {self.config['database']}.{self.config['table']}
WHERE {where_clause}
ORDER BY published_date DESC
LIMIT {limit}
"""
return self.query(query)
def query(self, query: str) -> pd.DataFrame:
"""
Execute custom SQL query
Args:
query: SQL query string
Returns:
DataFrame containing the results
"""
try:
self.logger.debug(f"Executing query: {query}")
response = self.athena.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': self.config['database']},
ResultConfiguration={'OutputLocation': self.config['output_location']}
)
return self._get_query_results(response['QueryExecutionId'])
except Exception as e:
self.logger.error(f"Query execution failed: {str(e)}")
raise
def get_sources(self) -> pd.DataFrame:
"""
Get list of sources and their article counts
Returns:
DataFrame with source statistics
"""
query = f"""
SELECT
source,
COUNT(*) as article_count,
MIN(published_date) as earliest_article,
MAX(published_date) as latest_article
FROM {self.config['database']}.{self.config['table']}
GROUP BY source
ORDER BY article_count DESC
"""
return self.query(query)
def _get_query_results(self, query_id: str) -> pd.DataFrame:
"""Helper method to get query results"""
while True:
status = self.athena.get_query_execution(QueryExecutionId=query_id)
state = status['QueryExecution']['Status']['State']
if state == 'SUCCEEDED':
break
elif state in ['FAILED', 'CANCELLED']:
error_message = status['QueryExecution']['Status'].get('StateChangeReason', 'Unknown error')
raise Exception(f"Query failed: {error_message}")
results = []
columns = None
paginator = self.athena.get_paginator('get_query_results')
for page in paginator.paginate(QueryExecutionId=query_id):
if not columns:
columns = [col['Name'] for col in page['ResultSet']['ResultSetMetadata']['ColumnInfo']]
for row in page['ResultSet']['Rows'][1:]:
results.append([field.get('VarCharValue', '') for field in row['Data']])
return pd.DataFrame(results, columns=columns)

24
todo.md
View File

@@ -1,18 +1,20 @@
# Before Public Launch
* Rethink Partitioning Strategy [Done]
* Implement New Partitioning Strategy. [Done]
- Incorporate: All the json data except for content and link.
Partitioning Strategy: {Year}/{Month}/{Day}/{article_id}
* API Tool - to Pull data that you have down.
* API Tool - to Pull data that you have down.
[x] Faster Batch Responses ( Parrallelization of requests. )
[x] Query API ( Technical Feasability / is S3 even good for this. )
* Test out Vector Databases at Small Scale.
* Test out Vector Databases at Scale.
* Test out LLM Summarizaiton At Small Scale.
* Test out LLM Summarization At Scall
* Text Processings & Text Cleaning of Content during ingestion step.
* Fix up ReadMe
* Publish RoadMap.
* Tips on where to gather RSS Feeds.
* Public Launch Posts
* Reddit
* Twitter
* Kaggle
* Test out Vector Databases at Small Scale.
* Test out Vector Databases at Scale.
* Test out LLM Summarizaiton At Small Scale
* Test out LLM Summarization At Scall
* Text Processings & Text Cleaning of Content during ingestion step.