Files
Chasing-Your-Tail-NG/input_validation.py
2025-07-23 14:48:04 -07:00

301 lines
11 KiB
Python

"""
Input validation and sanitization for CYT
Prevents injection attacks and ensures data integrity
"""
import re
import json
import logging
from typing import Any, Optional, Dict, List
from pathlib import Path
logger = logging.getLogger(__name__)
class InputValidator:
"""Comprehensive input validation for CYT"""
# Regex patterns for validation
MAC_PATTERN = re.compile(r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$')
SSID_PATTERN = re.compile(r'^[\x20-\x7E]{1,32}$') # Printable ASCII, max 32 chars
PATH_PATTERN = re.compile(r'^[a-zA-Z0-9._\-/\\:]+$')
FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9._\-]+$')
# Dangerous characters to filter
DANGEROUS_CHARS = ['<', '>', '"', "'", '&', ';', '|', '`', '$', '(', ')', '{', '}', '[', ']']
SQL_KEYWORDS = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'UNION', 'EXEC', 'SCRIPT']
@classmethod
def validate_mac_address(cls, mac: str) -> bool:
"""Validate MAC address format"""
if not isinstance(mac, str):
return False
if len(mac) > 17: # Max length for MAC address
return False
return bool(cls.MAC_PATTERN.match(mac))
@classmethod
def validate_ssid(cls, ssid: str) -> bool:
"""Validate SSID format and content"""
if not isinstance(ssid, str):
return False
if len(ssid) == 0 or len(ssid) > 32:
return False
# Check for null bytes and control characters
if '\x00' in ssid or any(ord(c) < 32 and c not in '\t\n\r' for c in ssid):
return False
# Check for dangerous characters
if any(char in ssid for char in cls.DANGEROUS_CHARS):
logger.warning(f"SSID contains dangerous characters: {ssid}")
return False
return True
@classmethod
def validate_file_path(cls, path: str) -> bool:
"""Validate file path is safe"""
if not isinstance(path, str):
return False
if len(path) > 4096: # Max reasonable path length
return False
# Check for path traversal attempts
if '..' in path or '~' in path:
logger.warning(f"Path traversal attempt detected: {path}")
return False
# Check for dangerous characters
if any(char in path for char in ['<', '>', '|', '&', ';', '`']):
logger.warning(f"Dangerous characters in path: {path}")
return False
return True
@classmethod
def validate_filename(cls, filename: str) -> bool:
"""Validate filename is safe"""
if not isinstance(filename, str):
return False
if len(filename) > 255: # Max filename length
return False
# Check for dangerous patterns
if filename in ['.', '..', ''] or filename.startswith('.'):
return False
return bool(cls.FILENAME_PATTERN.match(filename))
@classmethod
def sanitize_string(cls, input_str: str, max_length: int = 1000) -> str:
"""Sanitize string input by removing dangerous content"""
if not isinstance(input_str, str):
return ""
# Truncate if too long
if len(input_str) > max_length:
input_str = input_str[:max_length]
logger.warning(f"Input truncated to {max_length} characters")
# Remove null bytes and control characters (except whitespace)
sanitized = ''.join(c for c in input_str if ord(c) >= 32 or c in '\t\n\r')
# Remove dangerous characters
for char in cls.DANGEROUS_CHARS:
sanitized = sanitized.replace(char, '')
# Check for SQL injection attempts
upper_sanitized = sanitized.upper()
for keyword in cls.SQL_KEYWORDS:
if keyword in upper_sanitized:
logger.warning(f"Potential SQL injection attempt: {sanitized}")
sanitized = sanitized.replace(keyword, '')
sanitized = sanitized.replace(keyword.lower(), '')
return sanitized.strip()
@classmethod
def validate_config_structure(cls, config: Dict[str, Any]) -> bool:
"""Validate configuration file structure"""
required_keys = ['paths', 'timing']
if not isinstance(config, dict):
logger.error("Config must be a dictionary")
return False
for key in required_keys:
if key not in config:
logger.error(f"Missing required config key: {key}")
return False
# Validate paths section
paths = config['paths']
if not isinstance(paths, dict):
logger.error("Config 'paths' must be a dictionary")
return False
required_paths = ['log_dir', 'kismet_logs', 'ignore_lists']
for path_key in required_paths:
if path_key not in paths:
logger.error(f"Missing required path: {path_key}")
return False
path_value = paths[path_key]
if isinstance(path_value, str):
if not cls.validate_file_path(path_value):
logger.error(f"Invalid path format: {path_key}={path_value}")
return False
# Validate timing section
timing = config['timing']
if not isinstance(timing, dict):
logger.error("Config 'timing' must be a dictionary")
return False
timing_keys = ['check_interval', 'list_update_interval']
for timing_key in timing_keys:
if timing_key in timing:
value = timing[timing_key]
if not isinstance(value, (int, float)) or value <= 0:
logger.error(f"Invalid timing value: {timing_key}={value}")
return False
return True
@classmethod
def validate_ignore_list(cls, ignore_list: List[str], list_type: str) -> List[str]:
"""Validate and filter ignore list entries"""
if not isinstance(ignore_list, list):
logger.error(f"Ignore list must be a list, got {type(ignore_list)}")
return []
validated_list = []
validator_func = cls.validate_mac_address if list_type == 'mac' else cls.validate_ssid
for item in ignore_list:
if validator_func(item):
validated_list.append(item)
else:
logger.warning(f"Invalid {list_type} entry removed: {item}")
return validated_list
@classmethod
def validate_json_input(cls, json_str: str, max_size: int = 1024*1024) -> Optional[Dict]:
"""Safely parse and validate JSON input"""
if not isinstance(json_str, str):
logger.error("JSON input must be string")
return None
if len(json_str) > max_size:
logger.error(f"JSON input too large: {len(json_str)} > {max_size}")
return None
try:
# Parse JSON
data = json.loads(json_str)
# Basic structure validation
if isinstance(data, dict):
# Validate keys and values
for key, value in data.items():
if not isinstance(key, str) or len(key) > 100:
logger.warning(f"Invalid JSON key: {key}")
return None
# Recursively validate nested structures
if isinstance(value, (dict, list)):
continue # Could add deeper validation here
elif isinstance(value, str):
if len(value) > 10000: # Reasonable string limit
logger.warning(f"JSON string value too long: {key}")
return None
return data
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
return None
@classmethod
def validate_database_path(cls, db_path: str) -> bool:
"""Validate database path is safe and accessible"""
if not cls.validate_file_path(db_path):
return False
# Check if path exists (for globbed paths, check pattern)
if '*' in db_path:
# It's a glob pattern - validate the base directory
base_dir = db_path.split('*')[0]
if base_dir and not Path(base_dir).exists():
logger.warning(f"Database base directory does not exist: {base_dir}")
return False
else:
# It's a specific file
if not Path(db_path).exists():
logger.warning(f"Database file does not exist: {db_path}")
return False
return True
class SecureInputHandler:
"""Wrapper for handling all input validation in CYT"""
def __init__(self):
self.validator = InputValidator()
def safe_load_config(self, config_path: str) -> Optional[Dict[str, Any]]:
"""Safely load and validate configuration file"""
try:
if not self.validator.validate_file_path(config_path):
logger.error(f"Invalid config path: {config_path}")
return None
if not Path(config_path).exists():
logger.error(f"Config file not found: {config_path}")
return None
with open(config_path, 'r', encoding='utf-8') as f:
content = f.read()
# Validate JSON structure
config = self.validator.validate_json_input(content)
if not config:
return None
# Validate configuration structure
if not self.validator.validate_config_structure(config):
return None
logger.info(f"Configuration loaded and validated: {config_path}")
return config
except Exception as e:
logger.error(f"Error loading config: {e}")
return None
def safe_load_ignore_list(self, file_path: Path, list_type: str) -> List[str]:
"""Safely load ignore list with validation"""
try:
if not file_path.exists():
logger.info(f"Ignore list not found: {file_path}")
return []
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Try to parse as JSON first
try:
data = json.loads(content)
if isinstance(data, list):
return self.validator.validate_ignore_list(data, list_type)
except json.JSONDecodeError:
pass
# Fall back to Python variable parsing (but safer)
# This is for legacy compatibility
logger.warning(f"Using legacy ignore list format: {file_path}")
# We'll implement safer parsing if needed
return []
except Exception as e:
logger.error(f"Error loading ignore list {file_path}: {e}")
return []