mirror of
https://github.com/aljazceru/Chasing-Your-Tail-NG.git
synced 2025-12-17 23:34:19 +01:00
301 lines
11 KiB
Python
301 lines
11 KiB
Python
"""
|
|
Input validation and sanitization for CYT
|
|
Prevents injection attacks and ensures data integrity
|
|
"""
|
|
import re
|
|
import json
|
|
import logging
|
|
from typing import Any, Optional, Dict, List
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class InputValidator:
|
|
"""Comprehensive input validation for CYT"""
|
|
|
|
# Regex patterns for validation
|
|
MAC_PATTERN = re.compile(r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$')
|
|
SSID_PATTERN = re.compile(r'^[\x20-\x7E]{1,32}$') # Printable ASCII, max 32 chars
|
|
PATH_PATTERN = re.compile(r'^[a-zA-Z0-9._\-/\\:]+$')
|
|
FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9._\-]+$')
|
|
|
|
# Dangerous characters to filter
|
|
DANGEROUS_CHARS = ['<', '>', '"', "'", '&', ';', '|', '`', '$', '(', ')', '{', '}', '[', ']']
|
|
SQL_KEYWORDS = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'UNION', 'EXEC', 'SCRIPT']
|
|
|
|
@classmethod
|
|
def validate_mac_address(cls, mac: str) -> bool:
|
|
"""Validate MAC address format"""
|
|
if not isinstance(mac, str):
|
|
return False
|
|
if len(mac) > 17: # Max length for MAC address
|
|
return False
|
|
return bool(cls.MAC_PATTERN.match(mac))
|
|
|
|
@classmethod
|
|
def validate_ssid(cls, ssid: str) -> bool:
|
|
"""Validate SSID format and content"""
|
|
if not isinstance(ssid, str):
|
|
return False
|
|
if len(ssid) == 0 or len(ssid) > 32:
|
|
return False
|
|
# Check for null bytes and control characters
|
|
if '\x00' in ssid or any(ord(c) < 32 and c not in '\t\n\r' for c in ssid):
|
|
return False
|
|
# Check for dangerous characters
|
|
if any(char in ssid for char in cls.DANGEROUS_CHARS):
|
|
logger.warning(f"SSID contains dangerous characters: {ssid}")
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def validate_file_path(cls, path: str) -> bool:
|
|
"""Validate file path is safe"""
|
|
if not isinstance(path, str):
|
|
return False
|
|
if len(path) > 4096: # Max reasonable path length
|
|
return False
|
|
|
|
# Check for path traversal attempts
|
|
if '..' in path or '~' in path:
|
|
logger.warning(f"Path traversal attempt detected: {path}")
|
|
return False
|
|
|
|
# Check for dangerous characters
|
|
if any(char in path for char in ['<', '>', '|', '&', ';', '`']):
|
|
logger.warning(f"Dangerous characters in path: {path}")
|
|
return False
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def validate_filename(cls, filename: str) -> bool:
|
|
"""Validate filename is safe"""
|
|
if not isinstance(filename, str):
|
|
return False
|
|
if len(filename) > 255: # Max filename length
|
|
return False
|
|
|
|
# Check for dangerous patterns
|
|
if filename in ['.', '..', ''] or filename.startswith('.'):
|
|
return False
|
|
|
|
return bool(cls.FILENAME_PATTERN.match(filename))
|
|
|
|
@classmethod
|
|
def sanitize_string(cls, input_str: str, max_length: int = 1000) -> str:
|
|
"""Sanitize string input by removing dangerous content"""
|
|
if not isinstance(input_str, str):
|
|
return ""
|
|
|
|
# Truncate if too long
|
|
if len(input_str) > max_length:
|
|
input_str = input_str[:max_length]
|
|
logger.warning(f"Input truncated to {max_length} characters")
|
|
|
|
# Remove null bytes and control characters (except whitespace)
|
|
sanitized = ''.join(c for c in input_str if ord(c) >= 32 or c in '\t\n\r')
|
|
|
|
# Remove dangerous characters
|
|
for char in cls.DANGEROUS_CHARS:
|
|
sanitized = sanitized.replace(char, '')
|
|
|
|
# Check for SQL injection attempts
|
|
upper_sanitized = sanitized.upper()
|
|
for keyword in cls.SQL_KEYWORDS:
|
|
if keyword in upper_sanitized:
|
|
logger.warning(f"Potential SQL injection attempt: {sanitized}")
|
|
sanitized = sanitized.replace(keyword, '')
|
|
sanitized = sanitized.replace(keyword.lower(), '')
|
|
|
|
return sanitized.strip()
|
|
|
|
@classmethod
|
|
def validate_config_structure(cls, config: Dict[str, Any]) -> bool:
|
|
"""Validate configuration file structure"""
|
|
required_keys = ['paths', 'timing']
|
|
|
|
if not isinstance(config, dict):
|
|
logger.error("Config must be a dictionary")
|
|
return False
|
|
|
|
for key in required_keys:
|
|
if key not in config:
|
|
logger.error(f"Missing required config key: {key}")
|
|
return False
|
|
|
|
# Validate paths section
|
|
paths = config['paths']
|
|
if not isinstance(paths, dict):
|
|
logger.error("Config 'paths' must be a dictionary")
|
|
return False
|
|
|
|
required_paths = ['log_dir', 'kismet_logs', 'ignore_lists']
|
|
for path_key in required_paths:
|
|
if path_key not in paths:
|
|
logger.error(f"Missing required path: {path_key}")
|
|
return False
|
|
|
|
path_value = paths[path_key]
|
|
if isinstance(path_value, str):
|
|
if not cls.validate_file_path(path_value):
|
|
logger.error(f"Invalid path format: {path_key}={path_value}")
|
|
return False
|
|
|
|
# Validate timing section
|
|
timing = config['timing']
|
|
if not isinstance(timing, dict):
|
|
logger.error("Config 'timing' must be a dictionary")
|
|
return False
|
|
|
|
timing_keys = ['check_interval', 'list_update_interval']
|
|
for timing_key in timing_keys:
|
|
if timing_key in timing:
|
|
value = timing[timing_key]
|
|
if not isinstance(value, (int, float)) or value <= 0:
|
|
logger.error(f"Invalid timing value: {timing_key}={value}")
|
|
return False
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def validate_ignore_list(cls, ignore_list: List[str], list_type: str) -> List[str]:
|
|
"""Validate and filter ignore list entries"""
|
|
if not isinstance(ignore_list, list):
|
|
logger.error(f"Ignore list must be a list, got {type(ignore_list)}")
|
|
return []
|
|
|
|
validated_list = []
|
|
validator_func = cls.validate_mac_address if list_type == 'mac' else cls.validate_ssid
|
|
|
|
for item in ignore_list:
|
|
if validator_func(item):
|
|
validated_list.append(item)
|
|
else:
|
|
logger.warning(f"Invalid {list_type} entry removed: {item}")
|
|
|
|
return validated_list
|
|
|
|
@classmethod
|
|
def validate_json_input(cls, json_str: str, max_size: int = 1024*1024) -> Optional[Dict]:
|
|
"""Safely parse and validate JSON input"""
|
|
if not isinstance(json_str, str):
|
|
logger.error("JSON input must be string")
|
|
return None
|
|
|
|
if len(json_str) > max_size:
|
|
logger.error(f"JSON input too large: {len(json_str)} > {max_size}")
|
|
return None
|
|
|
|
try:
|
|
# Parse JSON
|
|
data = json.loads(json_str)
|
|
|
|
# Basic structure validation
|
|
if isinstance(data, dict):
|
|
# Validate keys and values
|
|
for key, value in data.items():
|
|
if not isinstance(key, str) or len(key) > 100:
|
|
logger.warning(f"Invalid JSON key: {key}")
|
|
return None
|
|
|
|
# Recursively validate nested structures
|
|
if isinstance(value, (dict, list)):
|
|
continue # Could add deeper validation here
|
|
elif isinstance(value, str):
|
|
if len(value) > 10000: # Reasonable string limit
|
|
logger.warning(f"JSON string value too long: {key}")
|
|
return None
|
|
|
|
return data
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Invalid JSON: {e}")
|
|
return None
|
|
|
|
@classmethod
|
|
def validate_database_path(cls, db_path: str) -> bool:
|
|
"""Validate database path is safe and accessible"""
|
|
if not cls.validate_file_path(db_path):
|
|
return False
|
|
|
|
# Check if path exists (for globbed paths, check pattern)
|
|
if '*' in db_path:
|
|
# It's a glob pattern - validate the base directory
|
|
base_dir = db_path.split('*')[0]
|
|
if base_dir and not Path(base_dir).exists():
|
|
logger.warning(f"Database base directory does not exist: {base_dir}")
|
|
return False
|
|
else:
|
|
# It's a specific file
|
|
if not Path(db_path).exists():
|
|
logger.warning(f"Database file does not exist: {db_path}")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class SecureInputHandler:
|
|
"""Wrapper for handling all input validation in CYT"""
|
|
|
|
def __init__(self):
|
|
self.validator = InputValidator()
|
|
|
|
def safe_load_config(self, config_path: str) -> Optional[Dict[str, Any]]:
|
|
"""Safely load and validate configuration file"""
|
|
try:
|
|
if not self.validator.validate_file_path(config_path):
|
|
logger.error(f"Invalid config path: {config_path}")
|
|
return None
|
|
|
|
if not Path(config_path).exists():
|
|
logger.error(f"Config file not found: {config_path}")
|
|
return None
|
|
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Validate JSON structure
|
|
config = self.validator.validate_json_input(content)
|
|
if not config:
|
|
return None
|
|
|
|
# Validate configuration structure
|
|
if not self.validator.validate_config_structure(config):
|
|
return None
|
|
|
|
logger.info(f"Configuration loaded and validated: {config_path}")
|
|
return config
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading config: {e}")
|
|
return None
|
|
|
|
def safe_load_ignore_list(self, file_path: Path, list_type: str) -> List[str]:
|
|
"""Safely load ignore list with validation"""
|
|
try:
|
|
if not file_path.exists():
|
|
logger.info(f"Ignore list not found: {file_path}")
|
|
return []
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Try to parse as JSON first
|
|
try:
|
|
data = json.loads(content)
|
|
if isinstance(data, list):
|
|
return self.validator.validate_ignore_list(data, list_type)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fall back to Python variable parsing (but safer)
|
|
# This is for legacy compatibility
|
|
logger.warning(f"Using legacy ignore list format: {file_path}")
|
|
# We'll implement safer parsing if needed
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading ignore list {file_path}: {e}")
|
|
return [] |