Files
enclava/backend/tests/integration/test_week1_optimizations.py
2025-08-19 09:50:15 +02:00

236 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Performance test specifically for Week 1 optimizations:
1. Database connection pooling
2. Models endpoint caching
3. Async audit logging
This test measures the impact of these optimizations on API response times.
"""
import asyncio
import aiohttp
import time
import json
import statistics
from typing import List, Dict, Any
# Test configuration
PLATFORM_BASE_URL = "http://localhost:58000"
LITELLM_BASE_URL = "http://localhost:54000"
TEST_ITERATIONS = 10
class PerformanceTest:
def __init__(self):
self.results = {}
async def time_request(self, session: aiohttp.ClientSession, method: str, url: str,
headers: Dict = None, json_data: Dict = None) -> float:
"""Time a single HTTP request"""
start_time = time.perf_counter()
try:
async with session.request(method, url, headers=headers, json=json_data) as response:
await response.read() # Ensure we read the full response
end_time = time.perf_counter()
return (end_time - start_time) * 1000 # Return milliseconds
except Exception as e:
print(f"Request failed: {e}")
return -1
async def test_models_endpoint_caching(self):
"""Test the models endpoint caching optimization"""
print("Testing models endpoint caching...")
async with aiohttp.ClientSession() as session:
# Test platform models endpoint (should benefit from caching)
platform_times = []
litellm_times = []
# Test LiteLLM direct access first (baseline)
for i in range(TEST_ITERATIONS):
try:
duration = await self.time_request(
session, "GET", f"{LITELLM_BASE_URL}/v1/models"
)
if duration > 0:
litellm_times.append(duration)
print(f"LiteLLM models #{i+1}: {duration:.2f}ms")
except Exception as e:
print(f"LiteLLM test #{i+1} failed: {e}")
await asyncio.sleep(0.1) # Small delay between requests
# Test platform models endpoint (with caching)
for i in range(TEST_ITERATIONS):
try:
duration = await self.time_request(
session, "GET", f"{PLATFORM_BASE_URL}/api/v1/llm/models",
headers={"Authorization": "Bearer dummy_jwt_token"} # Will fail auth but should still test routing
)
if duration > 0:
platform_times.append(duration)
print(f"Platform models #{i+1}: {duration:.2f}ms")
except Exception as e:
print(f"Platform test #{i+1} failed: {e}")
await asyncio.sleep(0.1)
return {
"litellm_avg": statistics.mean(litellm_times) if litellm_times else 0,
"litellm_min": min(litellm_times) if litellm_times else 0,
"litellm_max": max(litellm_times) if litellm_times else 0,
"platform_avg": statistics.mean(platform_times) if platform_times else 0,
"platform_min": min(platform_times) if platform_times else 0,
"platform_max": max(platform_times) if platform_times else 0,
"overhead_ms": (statistics.mean(platform_times) - statistics.mean(litellm_times)) if platform_times and litellm_times else 0,
"iterations": len(platform_times)
}
async def test_health_endpoints(self):
"""Test basic health endpoints to measure database connection performance"""
print("Testing health endpoints...")
async with aiohttp.ClientSession() as session:
platform_health_times = []
# Test platform health endpoint (uses database connection)
for i in range(TEST_ITERATIONS):
try:
duration = await self.time_request(
session, "GET", f"{PLATFORM_BASE_URL}/health"
)
if duration > 0:
platform_health_times.append(duration)
print(f"Platform health #{i+1}: {duration:.2f}ms")
except Exception as e:
print(f"Health test #{i+1} failed: {e}")
await asyncio.sleep(0.1)
return {
"platform_health_avg": statistics.mean(platform_health_times) if platform_health_times else 0,
"platform_health_min": min(platform_health_times) if platform_health_times else 0,
"platform_health_max": max(platform_health_times) if platform_health_times else 0,
"iterations": len(platform_health_times)
}
async def test_concurrent_requests(self):
"""Test concurrent request handling (benefits from connection pooling)"""
print("Testing concurrent request handling...")
async def make_concurrent_requests(session, num_concurrent=5):
tasks = []
for i in range(num_concurrent):
task = self.time_request(session, "GET", f"{PLATFORM_BASE_URL}/health")
tasks.append(task)
start_time = time.perf_counter()
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
successful_results = [r for r in results if isinstance(r, (int, float)) and r > 0]
total_time = (end_time - start_time) * 1000
return {
"total_time_ms": total_time,
"successful_requests": len(successful_results),
"average_individual_time": statistics.mean(successful_results) if successful_results else 0
}
async with aiohttp.ClientSession() as session:
# Test sequential requests
sequential_start = time.perf_counter()
sequential_times = []
for i in range(5):
duration = await self.time_request(session, "GET", f"{PLATFORM_BASE_URL}/health")
if duration > 0:
sequential_times.append(duration)
sequential_end = time.perf_counter()
sequential_total = (sequential_end - sequential_start) * 1000
# Test concurrent requests
concurrent_result = await make_concurrent_requests(session, 5)
return {
"sequential_total_ms": sequential_total,
"sequential_avg_individual": statistics.mean(sequential_times) if sequential_times else 0,
"concurrent_total_ms": concurrent_result["total_time_ms"],
"concurrent_avg_individual": concurrent_result["average_individual_time"],
"concurrency_improvement_pct": ((sequential_total - concurrent_result["total_time_ms"]) / sequential_total * 100) if sequential_total > 0 else 0
}
async def run_all_tests(self):
"""Run all performance tests"""
print("=" * 60)
print("Week 1 Optimization Performance Test")
print("=" * 60)
# Test 1: Models endpoint caching
models_results = await self.test_models_endpoint_caching()
self.results["models_caching"] = models_results
print(f"\nModels Endpoint Results:")
print(f" LiteLLM Direct: {models_results['litellm_avg']:.2f}ms avg ({models_results['litellm_min']:.2f}-{models_results['litellm_max']:.2f}ms)")
print(f" Platform API: {models_results['platform_avg']:.2f}ms avg ({models_results['platform_min']:.2f}-{models_results['platform_max']:.2f}ms)")
print(f" Overhead: {models_results['overhead_ms']:.2f}ms")
# Test 2: Health endpoints (database connection pooling)
health_results = await self.test_health_endpoints()
self.results["health_endpoints"] = health_results
print(f"\nHealth Endpoint Results:")
print(f" Platform Health: {health_results['platform_health_avg']:.2f}ms avg ({health_results['platform_health_min']:.2f}-{health_results['platform_health_max']:.2f}ms)")
# Test 3: Concurrent requests (connection pooling benefit)
concurrent_results = await self.test_concurrent_requests()
self.results["concurrent_requests"] = concurrent_results
print(f"\nConcurrent Request Results:")
print(f" Sequential (5 requests): {concurrent_results['sequential_total_ms']:.2f}ms total")
print(f" Concurrent (5 requests): {concurrent_results['concurrent_total_ms']:.2f}ms total")
print(f" Concurrency improvement: {concurrent_results['concurrency_improvement_pct']:.1f}%")
# Save results
timestamp = int(time.time())
results_file = f"week1_optimization_results_{timestamp}.json"
with open(results_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\nResults saved to: {results_file}")
print("=" * 60)
return self.results
async def main():
test = PerformanceTest()
results = await test.run_all_tests()
# Summary
print("\nSUMMARY:")
print("=" * 60)
models_overhead = results["models_caching"]["overhead_ms"]
health_avg = results["health_endpoints"]["platform_health_avg"]
concurrent_improvement = results["concurrent_requests"]["concurrency_improvement_pct"]
print(f"Models endpoint overhead: {models_overhead:.2f}ms")
print(f"Health endpoint average: {health_avg:.2f}ms")
print(f"Concurrency improvement: {concurrent_improvement:.1f}%")
if models_overhead < 200:
print("✅ Models endpoint overhead is reasonable")
else:
print("⚠️ Models endpoint overhead is high - may need further optimization")
if health_avg < 50:
print("✅ Health endpoint response is fast")
else:
print("⚠️ Health endpoint response could be faster")
if concurrent_improvement > 30:
print("✅ Good concurrency improvement from connection pooling")
else:
print("⚠️ Concurrency improvement is modest")
if __name__ == "__main__":
asyncio.run(main())