Files
Auditor/theauditor/taint/propagation.py

633 lines
28 KiB
Python

"""Taint propagation through assignments and data flow.
This module implements the worklist algorithm for tracking taint through
variable assignments and function calls within a single function scope.
"""
import os
import sys
import sqlite3
import json
from typing import Dict, List, Set, Any, Optional
from collections import deque
from .sources import SANITIZERS, TAINT_SOURCES
from .database import get_containing_function, get_function_boundaries, get_code_snippet
from .interprocedural import trace_inter_procedural_flow
def is_sanitizer(function_name: str) -> bool:
"""Check if a function is a known sanitizer."""
if not function_name:
return False
# Normalize function name
func_lower = function_name.lower()
# Check all sanitizer categories
for sanitizer_list in SANITIZERS.values():
for sanitizer in sanitizer_list:
if sanitizer.lower() in func_lower or func_lower in sanitizer.lower():
return True
return False
def has_sanitizer_between(cursor: sqlite3.Cursor, source: Dict[str, Any], sink: Dict[str, Any]) -> bool:
"""Check if there's a sanitizer call between source and sink in the same function."""
if source["file"] != sink["file"]:
return False
# Find all calls between source and sink lines
cursor.execute("""
SELECT name, line
FROM symbols
WHERE path = ?
AND type = 'call'
AND line > ?
AND line < ?
ORDER BY line
""", (source["file"], source["line"], sink["line"]))
intermediate_calls = cursor.fetchall()
# Check if any intermediate call is a sanitizer
for call_name, _ in intermediate_calls:
if is_sanitizer(call_name):
return True
return False
def is_external_source(cursor: sqlite3.Cursor, source: Dict[str, Any]) -> bool:
"""
Validate if source actually handles external data.
Returns True only for sources that truly bring in untrusted external data,
not internal application data.
"""
pattern = source.get("pattern", "")
# Web scraping sources are always external
web_scraping_patterns = [
"requests.get", "requests.post", "requests.put", "requests.patch", "requests.delete",
"response.text", "response.content", "response.json",
"BeautifulSoup", "soup.find", "soup.find_all", "soup.select",
"page.content", "page.inner_text", "page.inner_html",
"driver.page_source", "element.text", "element.get_attribute",
"urlopen", "urllib.request.urlopen"
]
if pattern in web_scraping_patterns:
return True
# Web framework inputs are external
web_input_patterns = [
"req.body", "req.query", "req.params", "req.headers",
"request.args", "request.form", "request.json", "request.data",
"request.GET", "request.POST", "request.FILES"
]
if pattern in web_input_patterns:
return True
# File I/O - check if reading external files
if pattern in ["open", "json.load", "json.loads", "pd.read_csv", "pd.read_json", "pd.read_excel"]:
# Check for nearby network/scraping calls suggesting external data
cursor.execute("""
SELECT COUNT(*) FROM symbols
WHERE path = ? AND line BETWEEN ? AND ?
AND (name LIKE '%request%' OR name LIKE '%download%'
OR name LIKE '%fetch%' OR name LIKE '%scrape%'
OR name LIKE '%BeautifulSoup%' OR name LIKE '%urlopen%')
""", (source["file"], source["line"] - 50, source["line"] + 50))
nearby_external_calls = cursor.fetchone()[0]
return nearby_external_calls > 0
# Environment variables and CLI args are external
if pattern in ["os.getenv", "os.environ.get", "sys.argv", "input", "click.argument"]:
return True
# Conservative: if we're not sure, don't flag it
return False
def trace_from_source(
cursor: sqlite3.Cursor,
source: Dict[str, Any],
source_function: Dict[str, Any],
sinks: List[Dict[str, Any]],
call_graph: Dict[str, List[str]],
max_depth: int
) -> List[Any]: # Returns List[TaintPath]
"""
Trace taint propagation from a source to potential sinks using true data flow analysis.
This implements a worklist algorithm that:
1. Identifies variables tainted by the source
2. Propagates taint through assignments
3. Tracks taint through function calls and returns
4. Only reports vulnerabilities when tainted data reaches a sink
"""
# Import TaintPath here to avoid circular dependency
from .core import TaintPath
# Validate source is truly external
if not is_external_source(cursor, source):
return [] # Skip internal sources
paths = []
# CRITICAL FIX: Check for direct-use vulnerabilities FIRST
# This handles cases like res.send(req.body) where tainted data flows directly to sink
# without intermediate variable assignment
for sink in sinks:
# Check if source and sink are in the same function
if sink["file"] == source_function["file"]:
# Use actual function boundaries
source_start, source_end = get_function_boundaries(
cursor, source["file"], source_function["line"]
)
# Verify BOTH source and sink are within same function scope
if (source_start <= source["line"] <= source_end and
source_start <= sink["line"] <= source_end):
# Guaranteed same function - no false positives
# Check if there's a sanitizer between source and sink
if not has_sanitizer_between(cursor, source, sink):
# Direct vulnerability found - source flows directly to sink
path = TaintPath(
source=source,
sink=sink,
path=[
{
"type": "direct_use",
"location": f"{source['file']}:{source['line']}",
"code": get_code_snippet(source['file'], source['line'])
},
{
"type": "sink",
"location": f"{sink['file']}:{sink['line']}",
"code": get_code_snippet(sink['file'], sink['line'])
}
]
)
paths.append(path)
# Check if the new data flow tables exist for assignment-based tracing
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='assignments'
""")
has_data_flow_tables = cursor.fetchone() is not None
if not has_data_flow_tables:
# Fall back to old proximity-based approach if tables don't exist
# This maintains backward compatibility
if paths: # Return direct-use paths if found
return paths
return trace_from_source_legacy(cursor, source, source_function, sinks, call_graph, max_depth)
# Initialize the set of tainted elements for assignment-based tracing
# Format: "function:variable" or "function:__return__" for return values
tainted_elements = set()
# CRITICAL AMENDMENT: Check assignments table for taint source instantiation
# Find initial tainted variables from assignments that match ANY taint source
cursor.execute("""
SELECT target_var, in_function, source_expr
FROM assignments
WHERE file = ? AND line BETWEEN ? AND ?
""", (source["file"], source["line"] - 1, source["line"] + 1))
initial_assignments = cursor.fetchall()
# Get all taint source patterns for comparison
all_taint_sources = []
for source_list in TAINT_SOURCES.values():
all_taint_sources.extend(source_list)
# Check each assignment to see if it contains a taint source
for target_var, in_function, source_expr in initial_assignments:
# Check if the source expression contains any known taint source
for source_pattern in all_taint_sources:
if source_pattern in source_expr:
# Add this variable as initially tainted
tainted_elements.add(f"{in_function}:{target_var}")
break # Move to the next assignment
# DEBUG: Log what we're looking for
debug_mode = os.environ.get("THEAUDITOR_DEBUG") or os.environ.get("THEAUDITOR_TAINT_DEBUG")
if debug_mode:
print(f"\n{'='*60}", file=sys.stderr)
print(f"[TAINT] Processing source: {source['pattern']} at {source['file']}:{source['line']}", file=sys.stderr)
print(f"[TAINT] Source function: {source_function.get('name', 'unknown')} ({source_function['file']}:{source_function['line']})", file=sys.stderr)
print(f"[TAINT] Initial tainted variables: {tainted_elements}", file=sys.stderr)
print(f"[TAINT] Found {len(sinks)} potential sinks to check", file=sys.stderr)
# Step 1: Also check for direct assignment matching the specific source pattern
# Check if the source directly taints a variable through assignment
cursor.execute("""
SELECT target_var, in_function FROM assignments
WHERE file = ? AND line = ? AND source_expr LIKE ?
""", (source["file"], source["line"], f"%{source['pattern']}%"))
initial_taints = cursor.fetchall()
# DEBUG: Log what we found
if debug_mode:
print(f"[TAINT] Found {len(initial_taints)} initial taints from direct assignment", file=sys.stderr)
for taint in initial_taints[:3]: # Show first 3
print(f"[TAINT] - {taint[0]} in {taint[1]}", file=sys.stderr)
if not initial_taints:
# Try to find assignments near the source (within 3 lines)
cursor.execute("""
SELECT target_var, in_function, line, source_expr FROM assignments
WHERE file = ? AND line BETWEEN ? AND ? AND source_expr LIKE ?
""", (source["file"], source["line"] - 1, source["line"] + 3, f"%{source['pattern']}%"))
initial_taints = cursor.fetchall()
# Add initially tainted variables to the worklist
for row in initial_taints:
target_var = row[0]
in_function = row[1]
tainted_elements.add(f"{in_function}:{target_var}")
# If no direct assignment found, check if source is in a property access
if not tainted_elements:
# For sources like req.body, req.query, treat the entire expression as tainted
if "." in source["pattern"]:
# Find where this property is used
cursor.execute("""
SELECT target_var, in_function FROM assignments
WHERE file = ? AND source_expr LIKE ?
""", (source["file"], f"%{source['pattern']}%"))
for target_var, in_function in cursor.fetchall():
tainted_elements.add(f"{in_function}:{target_var}")
# ENHANCEMENT: If still no tainted elements, check for source usage in expressions
# This helps catch cases where source is used in expressions without assignment
if not tainted_elements:
# Look for any usage of the source pattern in expressions
cursor.execute("""
SELECT DISTINCT in_function FROM assignments
WHERE file = ? AND (source_expr LIKE ? OR source_vars LIKE ?)
LIMIT 1
""", (source["file"], f"%{source['pattern']}%", f'%"{source["pattern"]}"%'))
result = cursor.fetchone()
if result:
# Mark the source pattern itself as tainted in this function
tainted_elements.add(f"{result[0]}:{source['pattern']}")
# DEBUG: Log tainted elements before propagation
if debug_mode:
print(f"[TAINT] Tainted elements before propagation: {tainted_elements}", file=sys.stderr)
if not tainted_elements:
print(f"[TAINT] WARNING: No tainted elements found for source {source['pattern']}", file=sys.stderr)
print(f"[TAINT] This means taint will be LOST here!", file=sys.stderr)
# CRITICAL FIX: For JavaScript, ensure source patterns create initial taint
if source["file"].endswith(('.js', '.jsx', '.ts', '.tsx')):
# If no tainted elements found yet for common JS sources, create one
if not tainted_elements and source["pattern"] in ["req.body", "req.query", "req.params", "req.headers", "req.cookies"]:
# Treat the source itself as tainted within its function scope
func_name = source_function.get("name", "unknown")
tainted_elements.add(f"{func_name}:{source['pattern']}")
if debug_mode:
print(f"[TAINT] Created initial taint for JS source: {func_name}:{source['pattern']}", file=sys.stderr)
# ENHANCEMENT: Apply JavaScript-specific taint tracking
if source["file"].endswith(('.js', '.jsx', '.ts', '.tsx')):
from .javascript import enhance_javascript_tracking
tainted_elements = enhance_javascript_tracking(
cursor, source, tainted_elements, source["file"]
)
if debug_mode and tainted_elements:
print(f"[TAINT] JavaScript enhancement added: {tainted_elements}", file=sys.stderr)
# Step 2: Propagate taint through assignments (worklist algorithm)
processed = set()
iterations = 0
max_iterations = 100 # Prevent infinite loops
while tainted_elements - processed and iterations < max_iterations:
iterations += 1
new_taints = set()
for element in tainted_elements - processed:
processed.add(element)
# Parse the element (format: "function:variable")
if ":" in element:
func_name, var_name = element.split(":", 1)
else:
func_name = "global"
var_name = element
# Find assignments where this tainted variable is used as source
cursor.execute("""
SELECT target_var, in_function, line FROM assignments
WHERE file = ? AND in_function = ? AND
(source_expr LIKE ? OR source_vars LIKE ?)
""", (source["file"], func_name, f"%{var_name}%", f'%"{var_name}"%'))
for target_var, in_function, line in cursor.fetchall():
new_element = f"{in_function}:{target_var}"
if new_element not in processed:
# CRITICAL DEBUG: Log taint propagation through assignments
if os.environ.get("THEAUDITOR_TAINT_DEBUG"):
print(f"[TAINT] Propagating through assignment: {var_name} -> {target_var} in {in_function} at line {line}")
new_taints.add(new_element)
# Track taint through function calls
# Check if tainted variable is passed as argument
cursor.execute("""
SELECT callee_function, param_name, line FROM function_call_args
WHERE file = ? AND caller_function = ? AND argument_expr LIKE ?
""", (source["file"], func_name, f"%{var_name}%"))
for callee_function, param_name, line in cursor.fetchall():
# The parameter in the callee function is now tainted
new_element = f"{callee_function}:{param_name}"
if new_element not in processed:
# CRITICAL DEBUG: Log taint propagation through function calls
if os.environ.get("THEAUDITOR_TAINT_DEBUG"):
print(f"[TAINT] Propagating through function call: {var_name} in {func_name} -> {param_name} in {callee_function} at line {line}")
new_taints.add(new_element)
# Check if the callee function returns the tainted parameter
cursor.execute("""
SELECT return_expr FROM function_returns
WHERE file = ? AND function_name = ? AND
(return_expr LIKE ? OR return_vars LIKE ?)
""", (source["file"], callee_function, f"%{param_name}%", f'%"{param_name}"%'))
if cursor.fetchone():
# Function returns tainted data
new_element = f"{callee_function}:__return__"
if new_element not in processed:
new_taints.add(new_element)
tainted_elements.update(new_taints)
# DEBUG: Log final tainted elements
if debug_mode:
print(f"[TAINT] Propagation completed after {iterations} iterations", file=sys.stderr)
print(f"[TAINT] Final tainted elements: {tainted_elements}", file=sys.stderr)
print(f"[TAINT] Checking {len(sinks)} sinks for vulnerabilities", file=sys.stderr)
# Step 3: Check if any tainted element reaches a sink
for sink in sinks:
# Only check sinks in the same file for now (can be extended)
if sink["file"] != source["file"]:
continue
# Get the function containing the sink
sink_function = get_containing_function(cursor, sink)
if not sink_function:
continue
# ENHANCEMENT: Also check for direct use of source pattern in sink arguments
# This catches cases where source is used directly without variable assignment
if sink_function["name"] == source_function["name"]:
# Check if source pattern appears directly in sink's arguments
cursor.execute("""
SELECT COUNT(*) FROM function_call_args
WHERE file = ? AND line = ? AND argument_expr LIKE ?
""", (sink["file"], sink["line"], f"%{source['pattern']}%"))
if cursor.fetchone()[0] > 0:
# Direct use of source in sink arguments
if not has_sanitizer_between(cursor, source, sink):
path = TaintPath(
source=source,
sink=sink,
path=[
{
"type": "direct_argument",
"location": f"{source['file']}:{source['line']}",
"pattern": source['pattern']
},
{
"type": "sink",
"location": f"{sink['file']}:{sink['line']}",
"pattern": sink['pattern']
}
]
)
paths.append(path)
continue # Move to next sink
# Check if any tainted variable is used in the sink
for element in tainted_elements:
if ":" in element:
func_name, var_name = element.split(":", 1)
else:
func_name = "global"
var_name = element
# Skip if not in the same function as the sink - BUT try inter-procedural tracking
if func_name != sink_function["name"]:
# CRITICAL: Attempt inter-procedural tracking
if debug_mode:
print(f"[TAINT] Attempting inter-procedural tracking: {var_name} in {func_name} to sink in {sink_function['name']}", file=sys.stderr)
# Try to trace inter-procedural flow from this tainted variable to the sink
inter_paths = trace_inter_procedural_flow(
cursor=cursor,
source_var=var_name,
source_file=source["file"],
source_line=source["line"],
source_function=func_name,
sinks=[sink], # Check just this specific sink
max_depth=3 # Limited depth for performance
)
if inter_paths:
# Found inter-procedural vulnerability!
if debug_mode:
print(f"[TAINT] INTER-PROCEDURAL VULNERABILITY FOUND via toss-the-salad!", file=sys.stderr)
paths.extend(inter_paths)
elif debug_mode:
print(f"[TAINT] No inter-procedural path found from {var_name} to sink", file=sys.stderr)
continue
# Check if the tainted variable appears in the sink's context
# This is a simplified check - ideally we'd parse the sink expression
sink_context_found = False
# CRITICAL DEBUG: Log sink checking
if os.environ.get("THEAUDITOR_TAINT_DEBUG"):
print(f"[TAINT] Checking if tainted var {var_name} in {func_name} reaches sink at {sink['file']}:{sink['line']}")
# Check in function call arguments at the sink line
cursor.execute("""
SELECT argument_expr FROM function_call_args
WHERE file = ? AND line = ? AND argument_expr LIKE ?
""", (sink["file"], sink["line"], f"%{var_name}%"))
if cursor.fetchone():
sink_context_found = True
if os.environ.get("THEAUDITOR_TAINT_DEBUG"):
print(f"[TAINT] FOUND: Tainted var {var_name} reaches sink at line {sink['line']}!")
# Also check if sink pattern matches and variable is in scope
if not sink_context_found and var_name != "__return__":
# Check if there's an assignment or usage near the sink
cursor.execute("""
SELECT COUNT(*) FROM assignments
WHERE file = ? AND in_function = ? AND
line BETWEEN ? AND ? AND
(target_var = ? OR source_expr LIKE ?)
""", (sink["file"], func_name, sink["line"] - 5, sink["line"] + 5,
var_name, f"%{var_name}%"))
if cursor.fetchone()[0] > 0:
sink_context_found = True
if sink_context_found:
# Check for sanitizers between source and sink
if not has_sanitizer_between(cursor, source, sink):
# We found a real taint path!
if debug_mode:
print(f"[TAINT] VULNERABILITY FOUND!", file=sys.stderr)
print(f"[TAINT] Source: {source['pattern']} at line {source['line']}", file=sys.stderr)
print(f"[TAINT] Sink: {sink['pattern']} at line {sink['line']}", file=sys.stderr)
print(f"[TAINT] Via variable: {var_name}", file=sys.stderr)
path = TaintPath(
source=source,
sink=sink,
path=[
{
"type": "source",
"location": f"{source['file']}:{source['line']}",
"var": var_name,
"code": get_code_snippet(source['file'], source['line'])
},
{
"type": "propagation",
"tainted_vars": list(tainted_elements)[:5], # Limit for readability
"transformations": len(tainted_elements)
},
{
"type": "sink",
"location": f"{sink['file']}:{sink['line']}",
"var": var_name,
"code": get_code_snippet(sink['file'], sink['line'])
}
]
)
paths.append(path)
break # One path per sink is enough
return paths
def trace_from_source_legacy(
cursor: sqlite3.Cursor,
source: Dict[str, Any],
source_function: Dict[str, Any],
sinks: List[Dict[str, Any]],
call_graph: Dict[str, List[str]],
max_depth: int
) -> List[Any]: # Returns List[TaintPath]
"""Legacy proximity-based taint tracing for backward compatibility."""
# Import TaintPath here to avoid circular dependency
from .core import TaintPath
paths = []
# Check if source function directly contains any sinks
for sink in sinks:
if sink["file"] == source_function["file"]:
# Use unified boundary detection instead of arbitrary 100-line limit
source_start, source_end = get_function_boundaries(
cursor, source["file"], source_function["line"]
)
if source_start <= sink["line"] <= source_end:
# Check if sink is in same function
sink_function = get_containing_function(cursor, sink)
if sink_function and sink_function["name"] == source_function["name"]:
# Check if there's a sanitizer between source and sink
if not has_sanitizer_between(cursor, source, sink):
# Only add path if no sanitizer found
path = TaintPath(
source=source,
sink=sink,
path=[source_function]
)
paths.append(path)
# Trace interprocedural taint flow using BFS
visited = set()
sanitized_paths = set() # Track paths that have been sanitized
queue = deque([(source_function, [source_function], 0, False)])
while queue:
current_func, path, depth, is_sanitized = queue.popleft()
if depth >= max_depth:
continue
func_key = f"{current_func['file']}:{current_func['name']}"
if func_key in visited:
continue
visited.add(func_key)
# Get functions called by current function
called_functions = call_graph.get(func_key, [])
for called_name in called_functions:
# Check if this call is a sanitizer
if is_sanitizer(called_name):
# Mark this path as sanitized and continue tracing (but don't report vulnerabilities)
is_sanitized = True
sanitized_paths.add(func_key)
# Check if this call is to a sink
for sink in sinks:
if called_name in sink["name"] or sink["pattern"] in called_name:
# Only report if path is not sanitized
if not is_sanitized:
taint_path = TaintPath(
source=source,
sink=sink,
path=path + [{"name": called_name, "type": "call", "file": sink["file"], "line": sink["line"]}]
)
paths.append(taint_path)
# Find definition of called function
cursor.execute("""
SELECT path, line
FROM symbols
WHERE name = ?
AND type = 'function'
LIMIT 1
""", (called_name.split(".")[-1],)) # Handle method calls
func_def = cursor.fetchone()
if func_def:
next_func = {
"file": func_def[0],
"name": called_name,
"line": func_def[1]
}
queue.append((next_func, path + [next_func], depth + 1, is_sanitized))
return paths
def deduplicate_paths(paths: List[Any]) -> List[Any]: # Accepts/returns List[TaintPath]
"""Deduplicate taint paths, keeping the shortest path for each source-sink pair."""
unique = {}
for path in paths:
key = (
f"{path.source['file']}:{path.source['line']}",
f"{path.sink['file']}:{path.sink['line']}"
)
if key not in unique or len(path.path) < len(unique[key].path):
unique[key] = path
return list(unique.values())