mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-18 03:44:20 +01:00
479 lines
19 KiB
Python
479 lines
19 KiB
Python
"""Core taint analysis engine.
|
|
|
|
This module contains the main taint analysis function and TaintPath class.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from collections import defaultdict
|
|
|
|
from .sources import TAINT_SOURCES, SECURITY_SINKS, SANITIZERS
|
|
from .database import (
|
|
find_taint_sources,
|
|
find_security_sinks,
|
|
build_call_graph,
|
|
get_containing_function,
|
|
)
|
|
from .propagation import trace_from_source, deduplicate_paths
|
|
|
|
|
|
class TaintPath:
|
|
"""Represents a taint flow path from source to sink."""
|
|
|
|
def __init__(self, source: Dict[str, Any], sink: Dict[str, Any], path: List[Dict[str, Any]]):
|
|
self.source = source
|
|
self.sink = sink
|
|
self.path = path
|
|
self.vulnerability_type = self._classify_vulnerability()
|
|
|
|
def _classify_vulnerability(self) -> str:
|
|
"""Classify the vulnerability based on sink type - factual categorization."""
|
|
sink_name = self.sink.get("name", "").lower()
|
|
sink_category = self.sink.get("category", "")
|
|
|
|
# Use category if available, otherwise infer from name
|
|
if sink_category:
|
|
category_map = {
|
|
"sql": "SQL Injection",
|
|
"command": "Command Injection",
|
|
"xss": "Cross-Site Scripting (XSS)",
|
|
"path": "Path Traversal",
|
|
"ldap": "LDAP Injection",
|
|
"nosql": "NoSQL Injection"
|
|
}
|
|
return category_map.get(sink_category, "Data Exposure")
|
|
|
|
# Fallback: infer from sink name patterns
|
|
for vuln_type, sinks in SECURITY_SINKS.items():
|
|
if any(s.lower() in sink_name for s in sinks):
|
|
return {
|
|
"sql": "SQL Injection",
|
|
"command": "Command Injection",
|
|
"xss": "Cross-Site Scripting (XSS)",
|
|
"path": "Path Traversal",
|
|
"ldap": "LDAP Injection",
|
|
"nosql": "NoSQL Injection"
|
|
}.get(vuln_type, "Data Exposure")
|
|
|
|
return "Data Exposure"
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for JSON serialization with guaranteed structure."""
|
|
# Ensure source dict has all required keys
|
|
source_dict = self.source or {}
|
|
source_dict.setdefault("name", "unknown_source")
|
|
source_dict.setdefault("file", "unknown_file")
|
|
source_dict.setdefault("line", 0)
|
|
source_dict.setdefault("pattern", "unknown_pattern")
|
|
|
|
# Ensure sink dict has all required keys
|
|
sink_dict = self.sink or {}
|
|
sink_dict.setdefault("name", "unknown_sink")
|
|
sink_dict.setdefault("file", "unknown_file")
|
|
sink_dict.setdefault("line", 0)
|
|
sink_dict.setdefault("pattern", "unknown_pattern")
|
|
|
|
return {
|
|
"source": source_dict,
|
|
"sink": sink_dict,
|
|
"path": self.path or [],
|
|
"path_length": len(self.path) if self.path else 0,
|
|
"vulnerability_type": self.vulnerability_type
|
|
}
|
|
|
|
|
|
def trace_taint(db_path: str, max_depth: int = 5, registry=None) -> Dict[str, Any]:
|
|
"""
|
|
Perform taint analysis by tracing data flow from sources to sinks.
|
|
|
|
Args:
|
|
db_path: Path to the SQLite database
|
|
max_depth: Maximum depth to trace taint propagation
|
|
registry: Optional TaintRegistry with enriched patterns from rules
|
|
|
|
Returns:
|
|
Dictionary containing:
|
|
- taint_paths: List of source-to-sink vulnerability paths
|
|
- sources_found: Number of taint sources identified
|
|
- sinks_found: Number of security sinks identified
|
|
- vulnerabilities: Count by vulnerability type
|
|
"""
|
|
import sqlite3
|
|
|
|
# We'll temporarily modify the global TAINT_SOURCES and SECURITY_SINKS
|
|
global TAINT_SOURCES, SECURITY_SINKS
|
|
original_sources = TAINT_SOURCES
|
|
original_sinks = SECURITY_SINKS
|
|
|
|
# Load framework data to enhance analysis
|
|
frameworks = []
|
|
frameworks_path = Path(".pf/frameworks.json")
|
|
if frameworks_path.exists():
|
|
try:
|
|
with open(frameworks_path, 'r') as f:
|
|
frameworks = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
# Gracefully continue without framework data
|
|
pass
|
|
|
|
# CRITICAL: Use registry if provided, otherwise use framework enhancement
|
|
if registry:
|
|
# Use registry's enriched patterns (from rules)
|
|
dynamic_sources = {}
|
|
for category, patterns in registry.sources.items():
|
|
dynamic_sources[category] = [p.pattern for p in patterns]
|
|
|
|
dynamic_sinks = {}
|
|
for category, patterns in registry.sinks.items():
|
|
dynamic_sinks[category] = [p.pattern for p in patterns]
|
|
|
|
# Registry already has all framework patterns from rules
|
|
# Skip the framework enhancement below
|
|
else:
|
|
# Original framework enhancement logic
|
|
# Dynamically extend taint sources based on detected frameworks
|
|
# Create local copies to avoid modifying global constants
|
|
dynamic_sources = dict(TAINT_SOURCES)
|
|
dynamic_sinks = dict(SECURITY_SINKS)
|
|
|
|
# Add framework-specific patterns
|
|
for fw_info in frameworks:
|
|
framework = fw_info.get("framework", "").lower()
|
|
language = fw_info.get("language", "").lower()
|
|
|
|
# Django-specific sources (uppercase patterns)
|
|
if framework == "django" and language == "python":
|
|
if "python" not in dynamic_sources:
|
|
dynamic_sources["python"] = []
|
|
django_sources = [
|
|
"request.GET",
|
|
"request.POST",
|
|
"request.FILES",
|
|
"request.META",
|
|
"request.session",
|
|
"request.COOKIES",
|
|
"request.user",
|
|
"request.path",
|
|
"request.path_info",
|
|
"request.method",
|
|
]
|
|
# Add Django sources if not already present
|
|
for source in django_sources:
|
|
if source not in dynamic_sources["python"]:
|
|
dynamic_sources["python"].append(source)
|
|
|
|
# Flask-specific sources (already mostly covered but ensure completeness)
|
|
elif framework == "flask" and language == "python":
|
|
if "python" not in dynamic_sources:
|
|
dynamic_sources["python"] = []
|
|
flask_sources = [
|
|
"request.args",
|
|
"request.form",
|
|
"request.json",
|
|
"request.data",
|
|
"request.values",
|
|
"request.files",
|
|
"request.cookies",
|
|
"request.headers",
|
|
"request.get_json",
|
|
"request.get_data",
|
|
"request.environ",
|
|
"request.view_args",
|
|
]
|
|
for source in flask_sources:
|
|
if source not in dynamic_sources["python"]:
|
|
dynamic_sources["python"].append(source)
|
|
|
|
# FastAPI-specific sources
|
|
elif framework == "fastapi" and language == "python":
|
|
if "python" not in dynamic_sources:
|
|
dynamic_sources["python"] = []
|
|
fastapi_sources = [
|
|
# Starlette Request object (used in FastAPI)
|
|
"Request",
|
|
"request.url",
|
|
"request.headers",
|
|
"request.cookies",
|
|
"request.query_params",
|
|
"request.path_params",
|
|
"request.client",
|
|
"request.session",
|
|
"request.auth",
|
|
"request.user",
|
|
"request.state",
|
|
# FastAPI dependency injection parameters
|
|
"Query(",
|
|
"Path(",
|
|
"Body(",
|
|
"Header(",
|
|
"Cookie(",
|
|
"Form(",
|
|
"File(",
|
|
"UploadFile(",
|
|
"Depends(",
|
|
# FastAPI security
|
|
"HTTPBearer",
|
|
"HTTPBasic",
|
|
"OAuth2PasswordBearer",
|
|
"APIKeyHeader",
|
|
"APIKeyCookie",
|
|
"APIKeyQuery",
|
|
]
|
|
for source in fastapi_sources:
|
|
if source not in dynamic_sources["python"]:
|
|
dynamic_sources["python"].append(source)
|
|
|
|
# Express/Node.js sources
|
|
elif framework in ["express", "fastify", "koa"] and language == "javascript":
|
|
if "js" not in dynamic_sources:
|
|
dynamic_sources["js"] = []
|
|
node_sources = [
|
|
"req.body",
|
|
"req.query",
|
|
"req.params",
|
|
"req.headers",
|
|
"req.cookies",
|
|
"req.ip",
|
|
"req.hostname",
|
|
"req.path",
|
|
"req.url",
|
|
]
|
|
for source in node_sources:
|
|
if source not in dynamic_sources["js"]:
|
|
dynamic_sources["js"].append(source)
|
|
|
|
# CRITICAL FIX: Add Express.js specific sinks
|
|
if "xss" not in dynamic_sinks:
|
|
dynamic_sinks["xss"] = []
|
|
# Ensure it's a list (not a reference to the original)
|
|
if not isinstance(dynamic_sinks["xss"], list):
|
|
dynamic_sinks["xss"] = list(dynamic_sinks["xss"])
|
|
express_xss_sinks = [
|
|
# Express response methods with chained status
|
|
"res.status().json",
|
|
"res.status().send",
|
|
"res.status().jsonp",
|
|
"res.status().end",
|
|
# Other Express response methods
|
|
"res.redirect",
|
|
"res.cookie",
|
|
"res.header",
|
|
"res.set",
|
|
"res.jsonp",
|
|
"res.sendFile", # Path traversal risk
|
|
"res.download", # Path traversal risk
|
|
"res.sendStatus",
|
|
"res.format",
|
|
"res.attachment",
|
|
"res.append",
|
|
"res.location",
|
|
]
|
|
for sink in express_xss_sinks:
|
|
if sink not in dynamic_sinks["xss"]:
|
|
dynamic_sinks["xss"].append(sink)
|
|
|
|
# Add Express SQL sinks for ORMs commonly used with Express
|
|
if "sql" not in dynamic_sinks:
|
|
dynamic_sinks["sql"] = []
|
|
# Ensure it's a list (not a reference to the original)
|
|
if not isinstance(dynamic_sinks["sql"], list):
|
|
dynamic_sinks["sql"] = list(dynamic_sinks["sql"])
|
|
express_sql_sinks = [
|
|
"models.sequelize.query", # Sequelize raw queries
|
|
"sequelize.query",
|
|
"knex.raw", # Knex.js raw queries
|
|
"db.raw",
|
|
"db.query",
|
|
"pool.query", # Direct pg pool queries
|
|
"client.query", # Direct database client queries
|
|
]
|
|
for sink in express_sql_sinks:
|
|
if sink not in dynamic_sinks["sql"]:
|
|
dynamic_sinks["sql"].append(sink)
|
|
|
|
# Add path traversal sinks specific to Express/Node.js
|
|
if "path" not in dynamic_sinks:
|
|
dynamic_sinks["path"] = []
|
|
# Ensure it's a list (not a reference to the original)
|
|
if not isinstance(dynamic_sinks["path"], list):
|
|
dynamic_sinks["path"] = list(dynamic_sinks["path"])
|
|
express_path_sinks = [
|
|
"res.sendFile",
|
|
"res.download",
|
|
"fs.promises.readFile",
|
|
"fs.promises.writeFile",
|
|
"fs.promises.unlink",
|
|
"fs.promises.rmdir",
|
|
"fs.promises.mkdir",
|
|
"require", # Dynamic require with user input
|
|
]
|
|
for sink in express_path_sinks:
|
|
if sink not in dynamic_sinks["path"]:
|
|
dynamic_sinks["path"].append(sink)
|
|
|
|
# Replace global TAINT_SOURCES and SECURITY_SINKS with dynamic versions
|
|
TAINT_SOURCES = dynamic_sources
|
|
SECURITY_SINKS = dynamic_sinks
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Step 1: Find all taint sources in the codebase
|
|
# CRITICAL FIX: Pass dynamic sources to database function
|
|
sources = find_taint_sources(cursor, TAINT_SOURCES)
|
|
|
|
# Step 2: Find all security sinks in the codebase
|
|
# CRITICAL FIX: Pass dynamic sinks to database function
|
|
sinks = find_security_sinks(cursor, SECURITY_SINKS)
|
|
|
|
# Step 3: Build a call graph for efficient traversal
|
|
call_graph = build_call_graph(cursor)
|
|
|
|
# Step 4: Trace taint flow from each source
|
|
taint_paths = []
|
|
|
|
for source in sources:
|
|
# Find what function contains this source
|
|
source_function = get_containing_function(cursor, source)
|
|
if not source_function:
|
|
continue
|
|
|
|
# Trace taint propagation from this source
|
|
paths = trace_from_source(
|
|
cursor, source, source_function, sinks, call_graph, max_depth
|
|
)
|
|
taint_paths.extend(paths)
|
|
|
|
# Step 5: Deduplicate paths
|
|
unique_paths = deduplicate_paths(taint_paths)
|
|
|
|
# Step 6: Build factual summary with vulnerability counts
|
|
# Count vulnerabilities by type (factual categorization, not interpretation)
|
|
vulnerabilities_by_type = defaultdict(int)
|
|
for path in unique_paths:
|
|
vuln_type = path.vulnerability_type
|
|
vulnerabilities_by_type[vuln_type] += 1
|
|
|
|
# Convert paths to dictionaries
|
|
path_dicts = [p.to_dict() for p in unique_paths]
|
|
|
|
# Create summary for pipeline integration
|
|
summary = {
|
|
"total_count": len(unique_paths),
|
|
"by_type": dict(vulnerabilities_by_type),
|
|
# Basic counts for pipeline - no severity interpretation
|
|
"critical_count": 0, # Base analyzer doesn't assign severity
|
|
"high_count": 0,
|
|
"medium_count": 0,
|
|
"low_count": 0
|
|
}
|
|
|
|
return {
|
|
"success": True,
|
|
"taint_paths": path_dicts, # Keep original key for backward compatibility
|
|
"vulnerabilities": path_dicts, # Expected key for pipeline
|
|
"paths": path_dicts, # Add expected key for report generation
|
|
"sources_found": len(sources),
|
|
"sinks_found": len(sinks),
|
|
"total_vulnerabilities": len(unique_paths), # Expected field name
|
|
"total_flows": len(unique_paths), # Keep for compatibility
|
|
"vulnerabilities_by_type": dict(vulnerabilities_by_type),
|
|
"summary": summary
|
|
}
|
|
|
|
except sqlite3.OperationalError as e:
|
|
if "no such table" in str(e):
|
|
return {
|
|
"success": False,
|
|
"error": "Database is corrupted or incomplete. Run 'aud index' to rebuild the repository index.",
|
|
"taint_paths": [],
|
|
"vulnerabilities": [],
|
|
"paths": [], # Include both keys for compatibility
|
|
"sources_found": 0,
|
|
"sinks_found": 0,
|
|
"total_vulnerabilities": 0,
|
|
"total_flows": 0,
|
|
"vulnerabilities_by_type": {},
|
|
"summary": {"total_count": 0, "by_type": {}, "critical_count": 0, "high_count": 0, "medium_count": 0, "low_count": 0}
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"taint_paths": [],
|
|
"vulnerabilities": [],
|
|
"paths": [],
|
|
"sources_found": 0,
|
|
"sinks_found": 0,
|
|
"total_vulnerabilities": 0,
|
|
"total_flows": 0,
|
|
"vulnerabilities_by_type": {},
|
|
"summary": {"total_count": 0, "by_type": {}, "critical_count": 0, "high_count": 0, "medium_count": 0, "low_count": 0}
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"taint_paths": [],
|
|
"vulnerabilities": [],
|
|
"paths": [], # Include both keys for compatibility
|
|
"sources_found": 0,
|
|
"sinks_found": 0,
|
|
"total_vulnerabilities": 0,
|
|
"total_flows": 0,
|
|
"vulnerabilities_by_type": {},
|
|
"summary": {"total_count": 0, "by_type": {}, "critical_count": 0, "high_count": 0, "medium_count": 0, "low_count": 0}
|
|
}
|
|
finally:
|
|
conn.close()
|
|
# Restore original TAINT_SOURCES and SECURITY_SINKS
|
|
TAINT_SOURCES = original_sources
|
|
SECURITY_SINKS = original_sinks
|
|
|
|
|
|
def save_taint_analysis(analysis_result: Dict[str, Any], output_path: str = "./.pf/taint_analysis.json"):
|
|
"""Save taint analysis results to JSON file with normalized structure."""
|
|
output = Path(output_path)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Normalize all paths before saving
|
|
if "taint_paths" in analysis_result:
|
|
analysis_result["taint_paths"] = [
|
|
normalize_taint_path(p) for p in analysis_result.get("taint_paths", [])
|
|
]
|
|
if "paths" in analysis_result:
|
|
analysis_result["paths"] = [
|
|
normalize_taint_path(p) for p in analysis_result.get("paths", [])
|
|
]
|
|
|
|
with open(output, "w") as f:
|
|
json.dump(analysis_result, f, indent=2, sort_keys=True)
|
|
|
|
|
|
def normalize_taint_path(path: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Normalize a taint path dictionary to ensure all required keys exist."""
|
|
# Ensure top-level keys
|
|
# REMOVED: vulnerability_type and severity - Truth Couriers don't classify
|
|
path.setdefault("path_length", 0)
|
|
path.setdefault("path", [])
|
|
|
|
# Ensure source structure
|
|
if "source" not in path:
|
|
path["source"] = {}
|
|
path["source"].setdefault("name", "unknown_source")
|
|
path["source"].setdefault("file", "unknown_file")
|
|
path["source"].setdefault("line", 0)
|
|
path["source"].setdefault("pattern", "unknown_pattern")
|
|
|
|
# Ensure sink structure
|
|
if "sink" not in path:
|
|
path["sink"] = {}
|
|
path["sink"].setdefault("name", "unknown_sink")
|
|
path["sink"].setdefault("file", "unknown_file")
|
|
path["sink"].setdefault("line", 0)
|
|
path["sink"].setdefault("pattern", "unknown_pattern")
|
|
|
|
return path |