Files
enclava/backend/tests/unit/core/test_security.py
2025-08-25 17:13:15 +02:00

662 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Security & Authentication Tests - Phase 1 Critical Business Logic
Priority: app/core/security.py (23% → 75% coverage)
Tests comprehensive security functionality:
- JWT token generation/validation
- Password hashing/verification
- API key validation
- Rate limiting logic
- Permission checking
- Authentication flows
"""
import pytest
import jwt
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, AsyncMock
from app.core.security import SecurityService, get_current_user, verify_api_key
from app.models.user import User
from app.models.api_key import APIKey
from app.core.config import get_settings
class TestSecurityService:
"""Comprehensive test suite for Security Service"""
@pytest.fixture
def security_service(self):
"""Create security service instance"""
return SecurityService()
@pytest.fixture
def sample_user(self):
"""Sample user for testing"""
return User(
id=1,
username="testuser",
email="test@example.com",
password_hash="$2b$12$hashed_password_here",
is_active=True,
created_at=datetime.utcnow()
)
@pytest.fixture
def sample_api_key(self, sample_user):
"""Sample API key for testing"""
return APIKey(
id=1,
user_id=sample_user.id,
name="Test API Key",
key_prefix="ce_test",
hashed_key="$2b$12$hashed_api_key_here",
is_active=True,
created_at=datetime.utcnow(),
last_used_at=None
)
# === JWT TOKEN GENERATION AND VALIDATION ===
@pytest.mark.asyncio
async def test_create_access_token_success(self, security_service, sample_user):
"""Test successful JWT access token creation"""
token_data = {"sub": str(sample_user.id), "username": sample_user.username}
expires_delta = timedelta(minutes=30)
token = await security_service.create_access_token(
data=token_data,
expires_delta=expires_delta
)
assert token is not None
assert isinstance(token, str)
# Decode token to verify contents
settings = get_settings()
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
assert decoded["sub"] == str(sample_user.id)
assert decoded["username"] == sample_user.username
assert "exp" in decoded
assert "iat" in decoded
@pytest.mark.asyncio
async def test_create_access_token_with_custom_expiry(self, security_service):
"""Test token creation with custom expiration time"""
token_data = {"sub": "123", "username": "testuser"}
custom_expiry = timedelta(hours=2)
token = await security_service.create_access_token(
data=token_data,
expires_delta=custom_expiry
)
# Decode and check expiration
settings = get_settings()
decoded = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
issued_at = datetime.fromtimestamp(decoded["iat"])
expires_at = datetime.fromtimestamp(decoded["exp"])
actual_lifetime = expires_at - issued_at
# Should be approximately 2 hours (within 1 minute tolerance)
assert abs(actual_lifetime.total_seconds() - 7200) < 60
@pytest.mark.asyncio
async def test_verify_token_success(self, security_service, sample_user):
"""Test successful token verification"""
# Create a valid token
token_data = {"sub": str(sample_user.id), "username": sample_user.username}
token = await security_service.create_access_token(token_data)
# Verify the token
payload = await security_service.verify_token(token)
assert payload is not None
assert payload["sub"] == str(sample_user.id)
assert payload["username"] == sample_user.username
@pytest.mark.asyncio
async def test_verify_expired_token(self, security_service):
"""Test verification of expired token"""
# Create token with very short expiry
token_data = {"sub": "123", "username": "testuser"}
short_expiry = timedelta(seconds=-1) # Already expired
token = await security_service.create_access_token(
token_data,
expires_delta=short_expiry
)
# Should raise exception for expired token
with pytest.raises(jwt.ExpiredSignatureError):
await security_service.verify_token(token)
@pytest.mark.asyncio
async def test_verify_invalid_token(self, security_service):
"""Test verification of malformed/invalid tokens"""
invalid_tokens = [
"invalid.token.here",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.invalid",
"",
None,
"Bearer invalid_token"
]
for invalid_token in invalid_tokens:
if invalid_token is not None:
with pytest.raises((jwt.InvalidTokenError, jwt.DecodeError, ValueError)):
await security_service.verify_token(invalid_token)
else:
with pytest.raises((TypeError, ValueError)):
await security_service.verify_token(invalid_token)
@pytest.mark.asyncio
async def test_verify_token_wrong_secret(self, security_service):
"""Test token verification with wrong secret key"""
# Create token with different secret
wrong_secret = "wrong_secret_key_here"
token_data = {"sub": "123", "username": "testuser"}
# Create token with wrong secret
token = jwt.encode(
payload=token_data,
key=wrong_secret,
algorithm="HS256"
)
# Should fail verification
with pytest.raises(jwt.InvalidSignatureError):
await security_service.verify_token(token)
# === PASSWORD HASHING AND VERIFICATION ===
@pytest.mark.asyncio
async def test_hash_password_success(self, security_service):
"""Test successful password hashing"""
password = "SecurePassword123!"
hashed = await security_service.hash_password(password)
assert hashed is not None
assert hashed != password # Should be hashed, not plain text
assert hashed.startswith("$2b$") # bcrypt hash format
assert len(hashed) > 50 # Reasonable hash length
@pytest.mark.asyncio
async def test_hash_password_different_hashes(self, security_service):
"""Test that same password produces different hashes (due to salt)"""
password = "TestPassword123"
hash1 = await security_service.hash_password(password)
hash2 = await security_service.hash_password(password)
# Should be different due to random salt
assert hash1 != hash2
# But both should verify correctly
assert await security_service.verify_password(password, hash1)
assert await security_service.verify_password(password, hash2)
@pytest.mark.asyncio
async def test_verify_password_success(self, security_service):
"""Test successful password verification"""
password = "CorrectPassword123"
hashed = await security_service.hash_password(password)
is_valid = await security_service.verify_password(password, hashed)
assert is_valid is True
@pytest.mark.asyncio
async def test_verify_password_failure(self, security_service):
"""Test password verification failure"""
correct_password = "CorrectPassword123"
wrong_password = "WrongPassword456"
hashed = await security_service.hash_password(correct_password)
is_valid = await security_service.verify_password(wrong_password, hashed)
assert is_valid is False
@pytest.mark.asyncio
async def test_password_hash_security(self, security_service):
"""Test password hash security properties"""
password = "TestSecurityPassword"
hashed = await security_service.hash_password(password)
# Hash should not contain the original password
assert password not in hashed
# Hash should be using strong bcrypt algorithm
assert hashed.startswith("$2b$12$") or hashed.startswith("$2b$10$")
# Hash should be deterministically different each time (salt)
hash2 = await security_service.hash_password(password)
assert hashed != hash2
# === API KEY VALIDATION ===
@pytest.mark.asyncio
async def test_verify_api_key_success(self, security_service, sample_api_key):
"""Test successful API key verification"""
raw_key = "ce_test123456789abcdef" # Sample raw key
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_api_key
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = True
api_key = await security_service.verify_api_key(raw_key)
assert api_key is not None
assert api_key.id == sample_api_key.id
assert api_key.is_active is True
mock_verify.assert_called_once()
@pytest.mark.asyncio
async def test_verify_api_key_invalid_format(self, security_service):
"""Test API key validation with invalid format"""
invalid_keys = [
"invalid_format",
"short",
"",
None,
"wrongprefix_1234567890abcdef",
"ce_tooshort"
]
for invalid_key in invalid_keys:
with pytest.raises((ValueError, TypeError)):
await security_service.verify_api_key(invalid_key)
@pytest.mark.asyncio
async def test_verify_api_key_not_found(self, security_service):
"""Test API key verification when key not found"""
nonexistent_key = "ce_nonexistent1234567890"
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = None
with pytest.raises(ValueError) as exc_info:
await security_service.verify_api_key(nonexistent_key)
assert "not found" in str(exc_info.value).lower() or "invalid" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_verify_api_key_inactive(self, security_service, sample_api_key):
"""Test API key verification when key is inactive"""
raw_key = "ce_test123456789abcdef"
sample_api_key.is_active = False # Deactivated key
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_api_key
with pytest.raises(ValueError) as exc_info:
await security_service.verify_api_key(raw_key)
assert "inactive" in str(exc_info.value).lower() or "disabled" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_api_key_usage_tracking(self, security_service, sample_api_key):
"""Test that API key usage is tracked"""
raw_key = "ce_test123456789abcdef"
original_last_used = sample_api_key.last_used_at
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_api_key
mock_session.commit.return_value = None
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = True
api_key = await security_service.verify_api_key(raw_key)
# last_used_at should be updated
assert sample_api_key.last_used_at != original_last_used
assert sample_api_key.last_used_at is not None
mock_session.commit.assert_called()
# === RATE LIMITING LOGIC ===
@pytest.mark.asyncio
async def test_rate_limit_check_within_limit(self, security_service):
"""Test rate limiting when within allowed limits"""
user_id = "123"
endpoint = "/api/v1/chat/completions"
with patch.object(security_service, 'redis_client') as mock_redis:
mock_redis.get.return_value = "5" # 5 requests in current window
is_allowed = await security_service.check_rate_limit(
identifier=user_id,
endpoint=endpoint,
limit=100, # 100 requests per window
window=3600 # 1 hour window
)
assert is_allowed is True
@pytest.mark.asyncio
async def test_rate_limit_check_exceeded(self, security_service):
"""Test rate limiting when limit is exceeded"""
user_id = "123"
endpoint = "/api/v1/chat/completions"
with patch.object(security_service, 'redis_client') as mock_redis:
mock_redis.get.return_value = "150" # 150 requests in current window
is_allowed = await security_service.check_rate_limit(
identifier=user_id,
endpoint=endpoint,
limit=100, # 100 requests per window
window=3600 # 1 hour window
)
assert is_allowed is False
@pytest.mark.asyncio
async def test_rate_limit_increment(self, security_service):
"""Test rate limit counter increment"""
user_id = "456"
endpoint = "/api/v1/embeddings"
with patch.object(security_service, 'redis_client') as mock_redis:
mock_redis.incr.return_value = 1
mock_redis.expire.return_value = True
await security_service.increment_rate_limit(
identifier=user_id,
endpoint=endpoint,
window=3600
)
# Verify Redis operations
mock_redis.incr.assert_called_once()
mock_redis.expire.assert_called_once()
@pytest.mark.asyncio
async def test_rate_limit_different_tiers(self, security_service):
"""Test different rate limits for different user tiers"""
# Regular user
regular_user = "regular_123"
premium_user = "premium_456"
with patch.object(security_service, 'redis_client') as mock_redis:
mock_redis.get.side_effect = ["50", "500"] # Different usage levels
# Regular user - should be blocked at 50 requests (limit 30)
regular_allowed = await security_service.check_rate_limit(
identifier=regular_user,
endpoint="/api/v1/chat",
limit=30,
window=3600
)
# Premium user - should be allowed at 500 requests (limit 1000)
premium_allowed = await security_service.check_rate_limit(
identifier=premium_user,
endpoint="/api/v1/chat",
limit=1000,
window=3600
)
assert regular_allowed is False
assert premium_allowed is True
# === PERMISSION CHECKING ===
@pytest.mark.asyncio
async def test_check_user_permissions_success(self, security_service, sample_user):
"""Test successful permission checking"""
sample_user.permissions = ["read", "write", "admin"]
has_read = await security_service.check_permission(sample_user, "read")
has_write = await security_service.check_permission(sample_user, "write")
has_admin = await security_service.check_permission(sample_user, "admin")
assert has_read is True
assert has_write is True
assert has_admin is True
@pytest.mark.asyncio
async def test_check_user_permissions_failure(self, security_service, sample_user):
"""Test permission checking failure"""
sample_user.permissions = ["read"] # Only read permission
has_read = await security_service.check_permission(sample_user, "read")
has_write = await security_service.check_permission(sample_user, "write")
has_admin = await security_service.check_permission(sample_user, "admin")
assert has_read is True
assert has_write is False
assert has_admin is False
@pytest.mark.asyncio
async def test_check_role_based_permissions(self, security_service, sample_user):
"""Test role-based permission checking"""
sample_user.role = "admin"
with patch.object(security_service, 'get_role_permissions') as mock_role_perms:
mock_role_perms.return_value = ["read", "write", "admin", "manage_users"]
has_admin = await security_service.check_role_permission(sample_user, "admin")
has_manage_users = await security_service.check_role_permission(sample_user, "manage_users")
has_super_admin = await security_service.check_role_permission(sample_user, "super_admin")
assert has_admin is True
assert has_manage_users is True
assert has_super_admin is False
@pytest.mark.asyncio
async def test_check_resource_ownership(self, security_service, sample_user):
"""Test resource ownership validation"""
resource_id = 123
resource_type = "document"
with patch.object(security_service, 'db_session') as mock_session:
# Mock resource owned by user
mock_resource = Mock()
mock_resource.user_id = sample_user.id
mock_resource.id = resource_id
mock_session.query.return_value.filter.return_value.first.return_value = mock_resource
is_owner = await security_service.check_resource_ownership(
user=sample_user,
resource_type=resource_type,
resource_id=resource_id
)
assert is_owner is True
@pytest.mark.asyncio
async def test_check_resource_ownership_denied(self, security_service, sample_user):
"""Test resource ownership validation denied"""
resource_id = 123
resource_type = "document"
other_user_id = 999
with patch.object(security_service, 'db_session') as mock_session:
# Mock resource owned by different user
mock_resource = Mock()
mock_resource.user_id = other_user_id
mock_resource.id = resource_id
mock_session.query.return_value.filter.return_value.first.return_value = mock_resource
is_owner = await security_service.check_resource_ownership(
user=sample_user,
resource_type=resource_type,
resource_id=resource_id
)
assert is_owner is False
# === AUTHENTICATION FLOWS ===
@pytest.mark.asyncio
async def test_authenticate_user_success(self, security_service, sample_user):
"""Test successful user authentication"""
username = "testuser"
password = "correctpassword"
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_user
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = True
authenticated_user = await security_service.authenticate_user(username, password)
assert authenticated_user is not None
assert authenticated_user.id == sample_user.id
assert authenticated_user.username == sample_user.username
mock_verify.assert_called_once()
@pytest.mark.asyncio
async def test_authenticate_user_wrong_password(self, security_service, sample_user):
"""Test user authentication with wrong password"""
username = "testuser"
password = "wrongpassword"
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_user
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = False
authenticated_user = await security_service.authenticate_user(username, password)
assert authenticated_user is None
@pytest.mark.asyncio
async def test_authenticate_user_not_found(self, security_service):
"""Test user authentication when user doesn't exist"""
username = "nonexistentuser"
password = "anypassword"
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = None
authenticated_user = await security_service.authenticate_user(username, password)
assert authenticated_user is None
@pytest.mark.asyncio
async def test_authenticate_inactive_user(self, security_service, sample_user):
"""Test authentication of inactive user"""
username = "testuser"
password = "correctpassword"
sample_user.is_active = False # Deactivated user
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_user
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = True
authenticated_user = await security_service.authenticate_user(username, password)
# Should not authenticate inactive users
assert authenticated_user is None
# === SECURITY EDGE CASES ===
@pytest.mark.asyncio
async def test_token_with_invalid_user_id(self, security_service):
"""Test token validation with invalid user ID"""
token_data = {"sub": "invalid_user_id", "username": "testuser"}
token = await security_service.create_access_token(token_data)
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = None
# Should handle gracefully when user doesn't exist
try:
current_user = await get_current_user(token, mock_session)
assert current_user is None
except Exception as e:
# Should raise appropriate authentication error
assert "user" in str(e).lower() or "authentication" in str(e).lower()
@pytest.mark.asyncio
async def test_concurrent_api_key_validation(self, security_service, sample_api_key):
"""Test concurrent API key validation (race condition handling)"""
raw_key = "ce_test123456789abcdef"
with patch.object(security_service, 'db_session') as mock_session:
mock_session.query.return_value.filter.return_value.first.return_value = sample_api_key
with patch.object(security_service, 'verify_password') as mock_verify:
mock_verify.return_value = True
# Simulate concurrent API key validations
import asyncio
tasks = [
security_service.verify_api_key(raw_key)
for _ in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# All should succeed or handle gracefully
successful_validations = [r for r in results if not isinstance(r, Exception)]
assert len(successful_validations) >= 4 # Most should succeed
"""
COVERAGE ANALYSIS FOR SECURITY SERVICE:
✅ JWT Token Management (6+ tests):
- Token creation with custom expiry
- Token verification success/failure
- Expired token handling
- Invalid token handling
- Wrong secret key detection
✅ Password Security (6+ tests):
- Password hashing with salt randomization
- Password verification success/failure
- Hash security properties
- Different hashes for same password
✅ API Key Validation (6+ tests):
- Valid API key verification
- Invalid format handling
- Non-existent key handling
- Inactive key handling
- Usage tracking
- Format validation
✅ Rate Limiting (4+ tests):
- Within limit checks
- Exceeded limit checks
- Counter increment
- Different user tiers
✅ Permission System (5+ tests):
- User permission checking
- Role-based permissions
- Resource ownership validation
- Permission failure handling
✅ Authentication Flows (4+ tests):
- User authentication success/failure
- Wrong password handling
- Non-existent user handling
- Inactive user handling
✅ Security Edge Cases (2+ tests):
- Invalid user ID in token
- Concurrent API key validation
ESTIMATED COVERAGE IMPROVEMENT:
- Current: 23% → Target: 75%
- Test Count: 30+ comprehensive tests
- Business Impact: Critical (platform security)
- Implementation: Authentication and authorization validation
"""