mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
239 lines
9.6 KiB
Python
239 lines
9.6 KiB
Python
"""Inter-procedural taint tracking - the 'Toss the Salad' algorithm.
|
|
|
|
This module implements cross-function taint tracking by following
|
|
data flow through function arguments and return values.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
from typing import Dict, List, Any, Optional, Set
|
|
|
|
from .database import get_containing_function, get_code_snippet
|
|
|
|
|
|
def trace_inter_procedural_flow(
|
|
cursor: sqlite3.Cursor,
|
|
source_var: str,
|
|
source_file: str,
|
|
source_line: int,
|
|
source_function: str,
|
|
sinks: List[Dict[str, Any]],
|
|
max_depth: int = 5
|
|
) -> List[Any]: # Returns List[TaintPath]
|
|
"""
|
|
The 'Toss the Salad' algorithm for inter-procedural taint tracking.
|
|
|
|
This function traces taint flow across function boundaries by:
|
|
1. Following variables passed as function arguments
|
|
2. Mapping arguments to function parameters inside callees
|
|
3. Tracking taint through return values
|
|
4. Mapping return values back to variables in the caller
|
|
|
|
Args:
|
|
cursor: Database cursor
|
|
source_var: The tainted variable to track
|
|
source_file: File containing the source
|
|
source_line: Line where taint originates
|
|
source_function: Function containing the source
|
|
sinks: List of potential sinks to check
|
|
max_depth: Maximum call depth to trace
|
|
|
|
Returns:
|
|
List of TaintPath objects showing inter-procedural vulnerabilities
|
|
"""
|
|
# Import TaintPath here to avoid circular dependency
|
|
from .core import TaintPath
|
|
|
|
paths = []
|
|
debug = os.environ.get("THEAUDITOR_TAINT_DEBUG") or os.environ.get("THEAUDITOR_DEBUG")
|
|
|
|
if debug:
|
|
print(f"\n[INTER-PROCEDURAL] Starting toss-the-salad tracking:", file=sys.stderr)
|
|
print(f" Source var: {source_var} in {source_function} at {source_file}:{source_line}", file=sys.stderr)
|
|
print(f" Max depth: {max_depth}", file=sys.stderr)
|
|
print(f" Checking {len(sinks)} sinks", file=sys.stderr)
|
|
|
|
# Track visited functions to avoid cycles
|
|
visited = set()
|
|
|
|
# Worklist: (current_var, current_function, current_file, depth, path_so_far)
|
|
worklist = [(source_var, source_function, source_file, 0, [])]
|
|
|
|
while worklist:
|
|
current_var, current_func, current_file, depth, path = worklist.pop(0)
|
|
|
|
if depth > max_depth:
|
|
if debug:
|
|
print(f"[INTER-PROCEDURAL] Max depth {max_depth} reached", file=sys.stderr)
|
|
continue
|
|
|
|
# Create unique key for this state
|
|
state_key = f"{current_file}:{current_func}:{current_var}:{depth}"
|
|
if state_key in visited:
|
|
continue
|
|
visited.add(state_key)
|
|
|
|
if debug:
|
|
print(f"\n[INTER-PROCEDURAL] Depth {depth}: Tracking {current_var} in {current_func}", file=sys.stderr)
|
|
|
|
# Step 1: Check if current variable is passed as argument to other functions
|
|
cursor.execute("""
|
|
SELECT callee_function, param_name, line
|
|
FROM function_call_args
|
|
WHERE file = ?
|
|
AND caller_function = ?
|
|
AND (argument_expr = ? OR argument_expr LIKE ?)
|
|
""", (current_file, current_func, current_var, f"%{current_var}%"))
|
|
|
|
calls = cursor.fetchall()
|
|
if debug and calls:
|
|
print(f"[INTER-PROCEDURAL] Found {len(calls)} function calls passing {current_var}", file=sys.stderr)
|
|
|
|
for callee_func, param_name, call_line in calls:
|
|
if debug:
|
|
print(f" -> {current_var} passed to {callee_func}({param_name}) at line {call_line}", file=sys.stderr)
|
|
|
|
# Track the parameter in the callee function
|
|
new_path = path + [{
|
|
"type": "argument_pass",
|
|
"from_func": current_func,
|
|
"to_func": callee_func,
|
|
"var": current_var,
|
|
"param": param_name,
|
|
"line": call_line
|
|
}]
|
|
|
|
# Add to worklist to continue tracking in callee
|
|
worklist.append((param_name, callee_func, current_file, depth + 1, new_path))
|
|
|
|
# Step 2: Check if callee function contains any sinks using this parameter
|
|
for sink in sinks:
|
|
if sink["file"] != current_file:
|
|
continue
|
|
|
|
# Get function containing the sink
|
|
sink_function = get_containing_function(cursor, sink)
|
|
if not sink_function or sink_function["name"] != callee_func:
|
|
continue
|
|
|
|
# Check if parameter flows to sink
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM function_call_args
|
|
WHERE file = ?
|
|
AND line = ?
|
|
AND argument_expr LIKE ?
|
|
""", (sink["file"], sink["line"], f"%{param_name}%"))
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
# Found inter-procedural vulnerability!
|
|
if debug:
|
|
print(f"[INTER-PROCEDURAL] VULNERABILITY FOUND!", file=sys.stderr)
|
|
print(f" {source_var} -> {param_name} -> {sink['pattern']}", file=sys.stderr)
|
|
|
|
vuln_path = new_path + [{
|
|
"type": "sink_reached",
|
|
"func": callee_func,
|
|
"var": param_name,
|
|
"sink": sink["pattern"],
|
|
"line": sink["line"]
|
|
}]
|
|
|
|
path_obj = TaintPath(
|
|
source={"file": source_file, "line": source_line, "pattern": source_var, "name": source_var},
|
|
sink=sink,
|
|
path=vuln_path
|
|
)
|
|
paths.append(path_obj)
|
|
|
|
# Step 3: Check if current variable is returned by current function
|
|
cursor.execute("""
|
|
SELECT return_expr, line
|
|
FROM function_returns
|
|
WHERE file = ?
|
|
AND function_name = ?
|
|
AND (return_expr = ? OR return_expr LIKE ? OR return_vars LIKE ?)
|
|
""", (current_file, current_func, current_var, f"%{current_var}%", f'%"{current_var}"%'))
|
|
|
|
returns = cursor.fetchall()
|
|
if debug and returns:
|
|
print(f"[INTER-PROCEDURAL] {current_func} returns {current_var} in {len(returns)} places", file=sys.stderr)
|
|
|
|
for return_expr, return_line in returns:
|
|
# Find where this function is called and its return value is used
|
|
cursor.execute("""
|
|
SELECT caller_function, target_var, line
|
|
FROM function_call_args
|
|
WHERE file = ?
|
|
AND callee_function = ?
|
|
AND target_var IS NOT NULL
|
|
""", (current_file, current_func))
|
|
|
|
call_sites = cursor.fetchall()
|
|
if debug and call_sites:
|
|
print(f"[INTER-PROCEDURAL] {current_func} called from {len(call_sites)} locations", file=sys.stderr)
|
|
|
|
for caller_func, target_var, call_line in call_sites:
|
|
if not target_var:
|
|
continue
|
|
|
|
if debug:
|
|
print(f" <- Return value assigned to {target_var} in {caller_func}", file=sys.stderr)
|
|
|
|
# The return value is now tainted in the caller
|
|
new_path = path + [{
|
|
"type": "return_flow",
|
|
"from_func": current_func,
|
|
"to_func": caller_func,
|
|
"return_var": current_var,
|
|
"target_var": target_var,
|
|
"line": call_line
|
|
}]
|
|
|
|
# Add to worklist to continue tracking in caller
|
|
worklist.append((target_var, caller_func, current_file, depth + 1, new_path))
|
|
|
|
# Step 4: Check if current variable directly reaches a sink in current function
|
|
for sink in sinks:
|
|
if sink["file"] != current_file:
|
|
continue
|
|
|
|
# Get function containing the sink
|
|
sink_function = get_containing_function(cursor, sink)
|
|
if not sink_function or sink_function["name"] != current_func:
|
|
continue
|
|
|
|
# Check if current variable is used in sink
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM function_call_args
|
|
WHERE file = ?
|
|
AND line = ?
|
|
AND argument_expr LIKE ?
|
|
""", (sink["file"], sink["line"], f"%{current_var}%"))
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
# Direct vulnerability in current function
|
|
if debug:
|
|
print(f"[INTER-PROCEDURAL] Direct sink reached in {current_func}", file=sys.stderr)
|
|
|
|
vuln_path = path + [{
|
|
"type": "direct_sink",
|
|
"func": current_func,
|
|
"var": current_var,
|
|
"sink": sink["pattern"],
|
|
"line": sink["line"]
|
|
}]
|
|
|
|
path_obj = TaintPath(
|
|
source={"file": source_file, "line": source_line, "pattern": source_var, "name": source_var},
|
|
sink=sink,
|
|
path=vuln_path
|
|
)
|
|
paths.append(path_obj)
|
|
|
|
if debug:
|
|
print(f"\n[INTER-PROCEDURAL] Completed. Found {len(paths)} vulnerabilities", file=sys.stderr)
|
|
|
|
return paths |