diff --git a/monthly_ingestion.ipynb b/monthly_ingestion.ipynb index 454e9bf..2a3d187 100644 --- a/monthly_ingestion.ipynb +++ b/monthly_ingestion.ipynb @@ -166,15 +166,39 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Found 23857 objects to process\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|██████████| 23857/23857 [02:37<00:00, 151.22object/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloaded data to November-3.csv in 219.71 seconds\n" + ] + } + ], "source": [ "start = time()\n", "output_path = \"November-3.csv\" # or \"consolidated_data.json\"\n", "\n", + "\n", + "\n", "# Define date range\n", - "start_date = \"2024-11-8\" # FIXME: Fix the error where data can't be collected before the date you started collecting.\n", + "start_date = \"2024-11-28\" # FIXME: Fix the error where data can't be collected before the date you started collecting.\n", "end_date = \"2024-11-30\"\n", "\n", "# Start downloading\n", @@ -195,6 +219,82 @@ "# Aggregating" ] }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Aggregated DataFrame:\n", + " link \\\n", + "0 https://www.npr.org/2022/11/04/1134434712/plan... \n", + "1 https://www.communitysignal.com/embodying-work... \n", + "2 https://identityweek.net/tech5-awarded-a-15-ye... \n", + "3 https://edtechmagazine.com/k12/article/2024/11... \n", + "4 https://www.ufc.com/news/updates-ufc-309-jones... \n", + "\n", + " rss \\\n", + "0 https://www.npr.org/rss/podcast.php?id=510289 \n", + "1 https://www.communitysignal.com/feed/ \n", + "2 https://www.planetbiometrics.com/rss/ \n", + "3 https://www.edtechmagazine.com/k12/rss.xml?tax... \n", + "4 https://www.ufc.com/rss/news \n", + "\n", + " title \\\n", + "0 The crisis pollsters face ahead of the midterm... \n", + "1 Embodying Work-Life Balance as a Community Pro... \n", + "2 TECH5 awarded a 15-year contract with Virginia... \n", + "3 How Can You Prepare Your Cloud to Safely Imple... \n", + "4 Updates To UFC 309: Jones vs Miocic \n", + "\n", + " content unixTime rss_id \\\n", + "0 Planet Money tries election polling\\n\\nEnlarge... 1731884660 8181d7a585 \n", + "1 Are you able to step away from your community ... 1731883672 b6cef58d91 \n", + "2 TECH5 has announced a major contract win with ... 1731883632 084b136c50 \n", + "3 Many K–12 schools that operate in the cloud ha... 1731883653 7827152faf \n", + "4 UFC 309: JONES vs MIOCIC Updates:\\n\\nDue to me... 1731883662 2c774cd014 \n", + "\n", + " article_id llm_summary embedding unixtime \n", + "0 05dfb6bb11 NaN NaN 1731884660 \n", + "1 0418489c55 NaN NaN 1731883672 \n", + "2 0bb2ec0554 NaN NaN 1731883632 \n", + "3 07d39400e9 NaN NaN 1731883653 \n", + "4 03f31af6e2 NaN NaN 1731883662 \n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "import glob\n", + "\n", + "# List to hold dataframes\n", + "dataframes = []\n", + "\n", + "# Loop through the files November-1.csv to November-3.csv\n", + "for i in range(1, 4):\n", + " filename = f\"November-{i}.csv\"\n", + " try:\n", + " # Read each CSV and append to the list\n", + " df = pd.read_csv(filename)\n", + " dataframes.append(df)\n", + " except FileNotFoundError:\n", + " print(f\"File {filename} not found, skipping.\")\n", + "\n", + "# Aggregate the dataframes\n", + "if dataframes:\n", + " aggregated_df = pd.concat(dataframes, ignore_index=True)\n", + " print(\"Aggregated DataFrame:\")\n", + " print(aggregated_df.head())\n", + "else:\n", + " print(\"No files were aggregated.\")\n", + "\n", + "# If you want to save the result to a new CSV\n", + "aggregated_df.to_csv(\"Aggregated_November.csv\", index=False)" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/src/infra/deploy_infrastructure.py b/src/infra/deploy_infrastructure.py index 6dae220..25f7575 100644 --- a/src/infra/deploy_infrastructure.py +++ b/src/infra/deploy_infrastructure.py @@ -207,7 +207,7 @@ def deploy_infrastructure(): metric=vector_search_metric, spec = ServerlessSpec( cloud="aws", - region=os.getenv("AWS_REGION"), + region="us-east-1", ), ) diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py index b54b759..0281b44 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/analytics/embeddings/vector_db.py @@ -24,8 +24,8 @@ def get_index(): def vectorize(article:str) -> list[float]: response = client.embeddings.create( - input=article, - model=os.getenv('OPENAI_EMBEDDING_MODEL', 'text') + input=article, # FIXME: This fails when article is something else, find what the 'something else' is and implement fix. + model=os.getenv('OPENAI_EMBEDDING_MODEL', 'text-') ) return response.data[0].embedding diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py index a5b8a4a..18850fc 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/data_storage.py @@ -29,16 +29,21 @@ def save_article(article:dict, strategy:str): def pinecone_save_article(article:dict): logger.info("Saving article to Pinecone") index = get_index() + # Expected Keys from Pinecone *MUST* include 'id' and 'values' - article["id"] = article["article_id"] - article["values"] = vectorize(article["content"]) + data = dict() + logging.info(f"Article ID into Pinecone") + data["id"] = article["article_id"] + logging.info(f"Article content into Pinecone") + data["values"] = vectorize(article=article["content"]) + namespace = os.getenv('PINECONE_NAMESPACE') logger.info("Upserting article to Pinecone") - upsert_vectors(index, [article], namespace) - logger.info(f"Successfully upserted article w/ article-id: {article["article_id"]} to Pinecone index {index.name} with namespace {namespace}") + upsert_vectors(index, [data], namespace) + logger.info(f"Successfully upserted article w/ article-id: {article["article_id"]} to Pinecone with namespace {namespace}") def dynamodb_save_article(article:dict): pass diff --git a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py index 779f687..ee9c56e 100644 --- a/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py +++ b/src/infra/lambdas/RSSFeedProcessorLambda/src/lambda_function.py @@ -29,8 +29,6 @@ def lambda_handler(event, context): feed = event["Records"][0]["body"] logger.info(f"Received message from SQS: {feed}") feed = json.loads(feed) - - receipt_handle = event["Records"][0]['receiptHandle'] diff --git a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py index 61485d0..8936636 100644 --- a/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py +++ b/src/infra/lambdas/lambda_utils/update_lambda_env_vars.py @@ -65,7 +65,8 @@ def update_env_vars(function_name): 'VECTOR_SEARCH_METRIC': os.environ.get('VECTOR_SEARCH_METRIC'), # OpenAI Configuration - 'OPENAI_API_KEY': os.environ.get('OPENAI_API_KEY') + 'OPENAI_API_KEY': os.environ.get('OPENAI_API_KEY'), + "OPENAI_EMBEDDING_MODEL": os.environ.get('OPENAI_EMBEDDING_MODEL'), } return lambda_client.update_function_configuration( diff --git a/todo.md b/todo.md index febac37..4d1cd47 100644 --- a/todo.md +++ b/todo.md @@ -4,7 +4,9 @@ * Vector Database Initialization at earlier phase. [ Done ] * Test out Vector Databases at Small Scale. * [ ] Testing - * [ ] Fix OpenAI Error. + * [x] Fix OpenAI Error. + * [x] Fix Pinecone Error + * [ ] Fix input error. * [ ] Let it run for a day * Test out Vector Databases at Scale.