"""Inter-procedural taint tracking - the 'Toss the Salad' algorithm. This module implements cross-function taint tracking by following data flow through function arguments and return values. """ import os import sys import sqlite3 from typing import Dict, List, Any, Optional, Set from .database import get_containing_function, get_code_snippet def trace_inter_procedural_flow( cursor: sqlite3.Cursor, source_var: str, source_file: str, source_line: int, source_function: str, sinks: List[Dict[str, Any]], max_depth: int = 5 ) -> List[Any]: # Returns List[TaintPath] """ The 'Toss the Salad' algorithm for inter-procedural taint tracking. This function traces taint flow across function boundaries by: 1. Following variables passed as function arguments 2. Mapping arguments to function parameters inside callees 3. Tracking taint through return values 4. Mapping return values back to variables in the caller Args: cursor: Database cursor source_var: The tainted variable to track source_file: File containing the source source_line: Line where taint originates source_function: Function containing the source sinks: List of potential sinks to check max_depth: Maximum call depth to trace Returns: List of TaintPath objects showing inter-procedural vulnerabilities """ # Import TaintPath here to avoid circular dependency from .core import TaintPath paths = [] debug = os.environ.get("THEAUDITOR_TAINT_DEBUG") or os.environ.get("THEAUDITOR_DEBUG") if debug: print(f"\n[INTER-PROCEDURAL] Starting toss-the-salad tracking:", file=sys.stderr) print(f" Source var: {source_var} in {source_function} at {source_file}:{source_line}", file=sys.stderr) print(f" Max depth: {max_depth}", file=sys.stderr) print(f" Checking {len(sinks)} sinks", file=sys.stderr) # Track visited functions to avoid cycles visited = set() # Worklist: (current_var, current_function, current_file, depth, path_so_far) worklist = [(source_var, source_function, source_file, 0, [])] while worklist: current_var, current_func, current_file, depth, path = worklist.pop(0) if depth > max_depth: if debug: print(f"[INTER-PROCEDURAL] Max depth {max_depth} reached", file=sys.stderr) continue # Create unique key for this state state_key = f"{current_file}:{current_func}:{current_var}:{depth}" if state_key in visited: continue visited.add(state_key) if debug: print(f"\n[INTER-PROCEDURAL] Depth {depth}: Tracking {current_var} in {current_func}", file=sys.stderr) # Step 1: Check if current variable is passed as argument to other functions cursor.execute(""" SELECT callee_function, param_name, line FROM function_call_args WHERE file = ? AND caller_function = ? AND (argument_expr = ? OR argument_expr LIKE ?) """, (current_file, current_func, current_var, f"%{current_var}%")) calls = cursor.fetchall() if debug and calls: print(f"[INTER-PROCEDURAL] Found {len(calls)} function calls passing {current_var}", file=sys.stderr) for callee_func, param_name, call_line in calls: if debug: print(f" -> {current_var} passed to {callee_func}({param_name}) at line {call_line}", file=sys.stderr) # Track the parameter in the callee function new_path = path + [{ "type": "argument_pass", "from_func": current_func, "to_func": callee_func, "var": current_var, "param": param_name, "line": call_line }] # Add to worklist to continue tracking in callee worklist.append((param_name, callee_func, current_file, depth + 1, new_path)) # Step 2: Check if callee function contains any sinks using this parameter for sink in sinks: if sink["file"] != current_file: continue # Get function containing the sink sink_function = get_containing_function(cursor, sink) if not sink_function or sink_function["name"] != callee_func: continue # Check if parameter flows to sink cursor.execute(""" SELECT COUNT(*) FROM function_call_args WHERE file = ? AND line = ? AND argument_expr LIKE ? """, (sink["file"], sink["line"], f"%{param_name}%")) if cursor.fetchone()[0] > 0: # Found inter-procedural vulnerability! if debug: print(f"[INTER-PROCEDURAL] VULNERABILITY FOUND!", file=sys.stderr) print(f" {source_var} -> {param_name} -> {sink['pattern']}", file=sys.stderr) vuln_path = new_path + [{ "type": "sink_reached", "func": callee_func, "var": param_name, "sink": sink["pattern"], "line": sink["line"] }] path_obj = TaintPath( source={"file": source_file, "line": source_line, "pattern": source_var, "name": source_var}, sink=sink, path=vuln_path ) paths.append(path_obj) # Step 3: Check if current variable is returned by current function cursor.execute(""" SELECT return_expr, line FROM function_returns WHERE file = ? AND function_name = ? AND (return_expr = ? OR return_expr LIKE ? OR return_vars LIKE ?) """, (current_file, current_func, current_var, f"%{current_var}%", f'%"{current_var}"%')) returns = cursor.fetchall() if debug and returns: print(f"[INTER-PROCEDURAL] {current_func} returns {current_var} in {len(returns)} places", file=sys.stderr) for return_expr, return_line in returns: # Find where this function is called and its return value is used cursor.execute(""" SELECT caller_function, target_var, line FROM function_call_args WHERE file = ? AND callee_function = ? AND target_var IS NOT NULL """, (current_file, current_func)) call_sites = cursor.fetchall() if debug and call_sites: print(f"[INTER-PROCEDURAL] {current_func} called from {len(call_sites)} locations", file=sys.stderr) for caller_func, target_var, call_line in call_sites: if not target_var: continue if debug: print(f" <- Return value assigned to {target_var} in {caller_func}", file=sys.stderr) # The return value is now tainted in the caller new_path = path + [{ "type": "return_flow", "from_func": current_func, "to_func": caller_func, "return_var": current_var, "target_var": target_var, "line": call_line }] # Add to worklist to continue tracking in caller worklist.append((target_var, caller_func, current_file, depth + 1, new_path)) # Step 4: Check if current variable directly reaches a sink in current function for sink in sinks: if sink["file"] != current_file: continue # Get function containing the sink sink_function = get_containing_function(cursor, sink) if not sink_function or sink_function["name"] != current_func: continue # Check if current variable is used in sink cursor.execute(""" SELECT COUNT(*) FROM function_call_args WHERE file = ? AND line = ? AND argument_expr LIKE ? """, (sink["file"], sink["line"], f"%{current_var}%")) if cursor.fetchone()[0] > 0: # Direct vulnerability in current function if debug: print(f"[INTER-PROCEDURAL] Direct sink reached in {current_func}", file=sys.stderr) vuln_path = path + [{ "type": "direct_sink", "func": current_func, "var": current_var, "sink": sink["pattern"], "line": sink["line"] }] path_obj = TaintPath( source={"file": source_file, "line": source_line, "pattern": source_var, "name": source_var}, sink=sink, path=vuln_path ) paths.append(path_obj) if debug: print(f"\n[INTER-PROCEDURAL] Completed. Found {len(paths)} vulnerabilities", file=sys.stderr) return paths