Files
Auditor/theauditor/impact_analyzer.py

683 lines
25 KiB
Python

"""Impact analysis engine for tracing code dependencies and change blast radius."""
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Any, Set, Tuple
def analyze_impact(
db_path: str,
target_file: str,
target_line: int,
trace_to_backend: bool = False
) -> Dict[str, Any]:
"""
Analyze the impact of changing code at a specific file and line.
Traces both upstream dependencies (who calls this) and downstream
dependencies (what this calls) to understand the blast radius of changes.
Args:
db_path: Path to the SQLite database
target_file: Path to the file containing the target code
target_line: Line number of the target code
Returns:
Dictionary containing:
- target_symbol: Name and type of the symbol at target location
- upstream: List of symbols that call the target (callers)
- downstream: List of symbols called by the target (callees)
- impact_summary: Statistics about the blast radius
"""
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Normalize the target file path to match database format
target_file = Path(target_file).as_posix()
if target_file.startswith("./"):
target_file = target_file[2:]
# Check if cross-stack analysis is requested
if trace_to_backend and target_file.endswith(('.js', '.jsx', '.ts', '.tsx', '.mjs', '.cjs')):
# Attempt cross-stack tracing
cross_stack_trace = trace_frontend_to_backend(cursor, target_file, target_line)
if cross_stack_trace:
# Found a backend endpoint - analyze its downstream impact
backend_file = cross_stack_trace["backend"]["file"]
backend_line = cross_stack_trace["backend"]["line"]
# Find the backend function/class at the traced location
cursor.execute("""
SELECT name, type, line, col
FROM symbols
WHERE path = ?
AND type IN ('function', 'class')
AND line <= ?
ORDER BY line DESC
LIMIT 1
""", (backend_file, backend_line))
backend_result = cursor.fetchone()
if backend_result:
backend_name, backend_type, backend_def_line, backend_col = backend_result
# Only get downstream dependencies from backend (not upstream)
downstream = find_downstream_dependencies(cursor, backend_file, backend_def_line, backend_name)
downstream_transitive = calculate_transitive_impact(cursor, downstream, "downstream")
# Build cross-stack response
return {
"cross_stack_trace": cross_stack_trace,
"target_symbol": {
"name": f"API Call to {cross_stack_trace['frontend']['url']}",
"type": "api_call",
"file": target_file,
"line": target_line,
"column": 0
},
"backend_symbol": {
"name": backend_name,
"type": backend_type,
"file": backend_file,
"line": backend_def_line,
"column": backend_col
},
"upstream": [], # Frontend has no upstream in this context
"upstream_transitive": [],
"downstream": downstream,
"downstream_transitive": downstream_transitive,
"impact_summary": {
"direct_upstream": 0,
"direct_downstream": len(downstream),
"total_upstream": 0,
"total_downstream": len(downstream) + len(downstream_transitive),
"total_impact": len(downstream) + len(downstream_transitive),
"affected_files": len(set(
[d["file"] for d in downstream] +
[d["file"] for d in downstream_transitive]
)),
"cross_stack": True
}
}
# Step 1: Find the target symbol at the specified location
# Look for function or class definition at or near the target line
cursor.execute("""
SELECT name, type, line, col
FROM symbols
WHERE path = ?
AND type IN ('function', 'class')
AND line <= ?
ORDER BY line DESC
LIMIT 1
""", (target_file, target_line))
target_result = cursor.fetchone()
if not target_result:
# No function/class found, return empty analysis
return {
"target_symbol": None,
"error": f"No function or class found at {target_file}:{target_line}",
"upstream": [],
"downstream": [],
"impact_summary": {
"total_upstream": 0,
"total_downstream": 0,
"total_impact": 0
}
}
target_name, target_type, target_def_line, target_col = target_result
# Step 2: Find upstream dependencies (who calls this symbol)
upstream = find_upstream_dependencies(cursor, target_file, target_name, target_type)
# Step 3: Find downstream dependencies (what this symbol calls)
downstream = find_downstream_dependencies(cursor, target_file, target_def_line, target_name)
# Step 4: Calculate transitive impact (recursive dependencies)
upstream_transitive = calculate_transitive_impact(cursor, upstream, "upstream")
downstream_transitive = calculate_transitive_impact(cursor, downstream, "downstream")
# Build response
return {
"target_symbol": {
"name": target_name,
"type": target_type,
"file": target_file,
"line": target_def_line,
"column": target_col
},
"upstream": upstream,
"upstream_transitive": upstream_transitive,
"downstream": downstream,
"downstream_transitive": downstream_transitive,
"impact_summary": {
"direct_upstream": len(upstream),
"direct_downstream": len(downstream),
"total_upstream": len(upstream) + len(upstream_transitive),
"total_downstream": len(downstream) + len(downstream_transitive),
"total_impact": len(upstream) + len(downstream) + len(upstream_transitive) + len(downstream_transitive),
"affected_files": len(set(
[u["file"] for u in upstream] +
[d["file"] for d in downstream] +
[u["file"] for u in upstream_transitive] +
[d["file"] for d in downstream_transitive]
))
}
}
finally:
conn.close()
def find_upstream_dependencies(
cursor: sqlite3.Cursor,
target_file: str,
target_name: str,
target_type: str
) -> List[Dict[str, Any]]:
"""
Find all symbols that call the target symbol (upstream dependencies).
Args:
cursor: Database cursor
target_file: File containing the target symbol
target_name: Name of the target symbol
target_type: Type of the target symbol (function/class)
Returns:
List of upstream dependency dictionaries
"""
upstream = []
# Find all calls to this symbol
# Match by name (simple matching, could be enhanced with qualified names)
cursor.execute("""
SELECT DISTINCT s1.path, s1.name, s1.type, s1.line, s1.col
FROM symbols s1
WHERE s1.type = 'call'
AND s1.name = ?
AND EXISTS (
SELECT 1 FROM symbols s2
WHERE s2.path = s1.path
AND s2.type IN ('function', 'class')
AND s2.line <= s1.line
AND s2.name != ?
)
ORDER BY s1.path, s1.line
""", (target_name, target_name))
for row in cursor.fetchall():
call_file, call_name, call_type, call_line, call_col = row
# Find the containing function/class for this call
cursor.execute("""
SELECT name, type, line
FROM symbols
WHERE path = ?
AND type IN ('function', 'class')
AND line <= ?
ORDER BY line DESC
LIMIT 1
""", (call_file, call_line))
container = cursor.fetchone()
if container:
container_name, container_type, container_line = container
upstream.append({
"file": call_file,
"symbol": container_name,
"type": container_type,
"line": container_line,
"call_line": call_line,
"calls": target_name
})
# Deduplicate by file+symbol combination
seen = set()
unique_upstream = []
for dep in upstream:
key = (dep["file"], dep["symbol"])
if key not in seen:
seen.add(key)
unique_upstream.append(dep)
return unique_upstream
def find_downstream_dependencies(
cursor: sqlite3.Cursor,
target_file: str,
target_line: int,
target_name: str
) -> List[Dict[str, Any]]:
"""
Find all symbols called by the target symbol (downstream dependencies).
Args:
cursor: Database cursor
target_file: File containing the target symbol
target_line: Line where target symbol is defined
target_name: Name of the target symbol
Returns:
List of downstream dependency dictionaries
"""
downstream = []
# Find the end line of the target function/class
# Look for the next function/class definition in the same file
cursor.execute("""
SELECT line
FROM symbols
WHERE path = ?
AND type IN ('function', 'class')
AND line > ?
ORDER BY line
LIMIT 1
""", (target_file, target_line))
next_symbol = cursor.fetchone()
end_line = next_symbol[0] if next_symbol else 999999
# Find all calls within the target function/class body
cursor.execute("""
SELECT DISTINCT name, line, col
FROM symbols
WHERE path = ?
AND type = 'call'
AND line > ?
AND line < ?
ORDER BY line
""", (target_file, target_line, end_line))
for row in cursor.fetchall():
called_name, call_line, call_col = row
# Skip recursive calls
if called_name == target_name:
continue
# Try to find the definition of the called symbol
cursor.execute("""
SELECT path, type, line
FROM symbols
WHERE name = ?
AND type IN ('function', 'class')
LIMIT 1
""", (called_name,))
definition = cursor.fetchone()
if definition:
def_file, def_type, def_line = definition
downstream.append({
"file": def_file,
"symbol": called_name,
"type": def_type,
"line": def_line,
"called_from_line": call_line,
"called_by": target_name
})
else:
# External or built-in function
downstream.append({
"file": "external",
"symbol": called_name,
"type": "unknown",
"line": 0,
"called_from_line": call_line,
"called_by": target_name
})
# Deduplicate by symbol name
seen = set()
unique_downstream = []
for dep in downstream:
if dep["symbol"] not in seen:
seen.add(dep["symbol"])
unique_downstream.append(dep)
return unique_downstream
def calculate_transitive_impact(
cursor: sqlite3.Cursor,
direct_deps: List[Dict[str, Any]],
direction: str,
max_depth: int = 2,
visited: Optional[Set[Tuple[str, str]]] = None
) -> List[Dict[str, Any]]:
"""
Calculate transitive dependencies up to max_depth.
Args:
cursor: Database cursor
direct_deps: Direct dependencies to expand
direction: "upstream" or "downstream"
max_depth: Maximum recursion depth
visited: Set of already visited (file, symbol) pairs
Returns:
List of transitive dependencies
"""
if max_depth <= 0 or not direct_deps:
return []
if visited is None:
visited = set()
transitive = []
for dep in direct_deps:
# Skip external dependencies
if dep["file"] == "external":
continue
dep_key = (dep["file"], dep["symbol"])
if dep_key in visited:
continue
visited.add(dep_key)
if direction == "upstream":
# Find who calls this dependency
next_level = find_upstream_dependencies(
cursor, dep["file"], dep["symbol"], dep["type"]
)
else:
# Find what this dependency calls
next_level = find_downstream_dependencies(
cursor, dep["file"], dep["line"], dep["symbol"]
)
# Add current level
for next_dep in next_level:
next_dep["depth"] = max_depth
transitive.append(next_dep)
# Recurse
recursive_deps = calculate_transitive_impact(
cursor, next_level, direction, max_depth - 1, visited
)
transitive.extend(recursive_deps)
return transitive
def trace_frontend_to_backend(
cursor: sqlite3.Cursor,
target_file: str,
target_line: int
) -> Optional[Dict[str, Any]]:
"""
Trace a frontend API call to its corresponding backend endpoint.
Args:
cursor: Database cursor
target_file: Frontend file containing API call
target_line: Line number of the API call
Returns:
Dictionary with cross-stack trace information or None if not found
"""
import re
from pathlib import Path
# Read the target file to extract API call details
try:
file = Path(target_file)
if not file_path.exists():
# Try relative path
file = Path(".") / target_file
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# Get context around the target line (5 lines before and after)
start_idx = max(0, target_line - 6) # -6 because line numbers are 1-based
end_idx = min(len(lines), target_line + 5)
context_lines = lines[start_idx:end_idx]
context = ''.join(context_lines)
# Extract API call patterns
# Common patterns: axios.get('/api/users'), fetch('/api/users'), http.post('/api/items')
api_patterns = [
# axios patterns
r'axios\.(get|post|put|patch|delete)\s*\(\s*[\'"`]([^\'"`]+)[\'"`]',
# fetch patterns
r'fetch\s*\(\s*[\'"`]([^\'"`]+)[\'"`].*method:\s*[\'"`](GET|POST|PUT|PATCH|DELETE)[\'"`]',
# fetch with default GET
r'fetch\s*\(\s*[\'"`]([^\'"`]+)[\'"`]',
# http/request patterns
r'(http|request)\.(get|post|put|patch|delete)\s*\(\s*[\'"`]([^\'"`]+)[\'"`]',
# jQuery ajax
r'\$\.(ajax|get|post)\s*\(\s*\{[^}]*url:\s*[\'"`]([^\'"`]+)[\'"`]',
]
method = None
url_path = None
for pattern in api_patterns:
match = re.search(pattern, context, re.IGNORECASE | re.MULTILINE)
if match:
groups = match.groups()
if 'fetch' in pattern and len(groups) == 2:
# fetch with explicit method
url_path = groups[0]
method = groups[1].upper()
elif 'fetch' in pattern and len(groups) == 1:
# fetch defaults to GET
url_path = groups[0]
method = 'GET'
elif len(groups) >= 2:
# axios, http, request patterns
if pattern.startswith(r'axios'):
method = groups[0].upper()
url_path = groups[1]
elif pattern.startswith(r'(http|request)'):
method = groups[1].upper()
url_path = groups[2]
elif pattern.startswith(r'\$'):
# jQuery
url_path = groups[1]
if groups[0] == 'ajax':
# Look for method in context
method_match = re.search(r'type:\s*[\'"`](GET|POST|PUT|PATCH|DELETE)[\'"`]', context)
method = method_match.group(1).upper() if method_match else 'GET'
elif groups[0] == 'get':
method = 'GET'
elif groups[0] == 'post':
method = 'POST'
break
if not url_path or not method:
return None
# Clean up the URL path
# Remove query parameters and fragments
url_path = url_path.split('?')[0].split('#')[0]
# Remove any template literals (${...})
url_path = re.sub(r'\$\{[^}]+\}', '*', url_path)
# Query the api_endpoints table to find matching backend endpoint
# Try exact match first
cursor.execute("""
SELECT file, method, pattern, controls
FROM api_endpoints
WHERE pattern = ? AND method = ?
LIMIT 1
""", (url_path, method))
backend_match = cursor.fetchone()
if not backend_match:
# Try pattern matching (e.g., /api/users/* matches /api/users/:id)
# Convert URL to SQL LIKE pattern
like_pattern = url_path.replace('*', '%')
cursor.execute("""
SELECT file, method, pattern, controls
FROM api_endpoints
WHERE ? LIKE REPLACE(REPLACE(pattern, ':id', '%'), ':{param}', '%')
AND method = ?
LIMIT 1
""", (url_path, method))
backend_match = cursor.fetchone()
if not backend_match:
# No matching backend endpoint found
return None
backend_file, backend_method, backend_pattern, backend_controls = backend_match
# Find the exact line number of the backend endpoint
cursor.execute("""
SELECT line
FROM symbols
WHERE path = ? AND type = 'function'
ORDER BY line
LIMIT 1
""", (backend_file,))
line_result = cursor.fetchone()
backend_line = line_result[0] if line_result else 1
return {
"frontend": {
"file": target_file,
"line": target_line,
"method": method,
"url": url_path
},
"backend": {
"file": backend_file,
"line": backend_line,
"method": backend_method,
"pattern": backend_pattern,
"controls": backend_controls
}
}
except Exception as e:
# Error reading file or parsing
return None
def format_impact_report(impact_data: Dict[str, Any]) -> str:
"""
Format impact analysis results into a human-readable report.
Args:
impact_data: Results from analyze_impact
Returns:
Formatted string report
"""
lines = []
# Header
lines.append("=" * 60)
lines.append("IMPACT ANALYSIS REPORT")
lines.append("=" * 60)
# Target symbol
if impact_data.get("error"):
lines.append(f"\nError: {impact_data['error']}")
return "\n".join(lines)
# Check for cross-stack trace
if impact_data.get("cross_stack_trace"):
trace = impact_data["cross_stack_trace"]
lines.append(f"\n{'' * 40}")
lines.append("FRONTEND TO BACKEND TRACE")
lines.append(f"{'' * 40}")
lines.append(f"Frontend API Call:")
lines.append(f" File: {trace['frontend']['file']}:{trace['frontend']['line']}")
lines.append(f" Method: {trace['frontend']['method']}")
lines.append(f" URL: {trace['frontend']['url']}")
lines.append(f"\nBackend Endpoint:")
lines.append(f" File: {trace['backend']['file']}:{trace['backend']['line']}")
lines.append(f" Method: {trace['backend']['method']}")
lines.append(f" Pattern: {trace['backend']['pattern']}")
if trace['backend'].get('controls') and trace['backend']['controls'] != '[]':
lines.append(f" Security Controls: {trace['backend']['controls']}")
# Show backend symbol as the primary target
if impact_data.get("backend_symbol"):
backend = impact_data["backend_symbol"]
lines.append(f"\nBackend Function: {backend['name']} ({backend['type']})")
lines.append(f"Location: {backend['file']}:{backend['line']}")
else:
target = impact_data["target_symbol"]
lines.append(f"\nTarget Symbol: {target['name']} ({target['type']})")
lines.append(f"Location: {target['file']}:{target['line']}")
# Impact summary
summary = impact_data["impact_summary"]
lines.append(f"\n{'' * 40}")
lines.append("IMPACT SUMMARY")
lines.append(f"{'' * 40}")
lines.append(f"Direct Upstream Dependencies: {summary['direct_upstream']}")
lines.append(f"Direct Downstream Dependencies: {summary['direct_downstream']}")
lines.append(f"Total Upstream (including transitive): {summary['total_upstream']}")
lines.append(f"Total Downstream (including transitive): {summary['total_downstream']}")
lines.append(f"Total Impact Radius: {summary['total_impact']} symbols")
lines.append(f"Affected Files: {summary['affected_files']}")
# Upstream dependencies
if impact_data["upstream"]:
lines.append(f"\n{'' * 40}")
lines.append("UPSTREAM DEPENDENCIES (Who calls this)")
lines.append(f"{'' * 40}")
for dep in impact_data["upstream"][:10]: # Limit to first 10
lines.append(f"{dep['symbol']} ({dep['type']}) in {dep['file']}:{dep['line']}")
if len(impact_data["upstream"]) > 10:
lines.append(f" ... and {len(impact_data['upstream']) - 10} more")
# Downstream dependencies
if impact_data["downstream"]:
lines.append(f"\n{'' * 40}")
lines.append("DOWNSTREAM DEPENDENCIES (What this calls)")
lines.append(f"{'' * 40}")
for dep in impact_data["downstream"][:10]: # Limit to first 10
if dep["file"] != "external":
lines.append(f"{dep['symbol']} ({dep['type']}) in {dep['file']}:{dep['line']}")
else:
lines.append(f"{dep['symbol']} (external/built-in)")
if len(impact_data["downstream"]) > 10:
lines.append(f" ... and {len(impact_data['downstream']) - 10} more")
# Risk assessment
lines.append(f"\n{'' * 40}")
lines.append("RISK ASSESSMENT")
lines.append(f"{'' * 40}")
risk_level = "LOW"
if summary["total_impact"] > 20:
risk_level = "HIGH"
elif summary["total_impact"] > 10:
risk_level = "MEDIUM"
lines.append(f"Change Risk Level: {risk_level}")
if risk_level == "HIGH":
lines.append("⚠ WARNING: This change has a large blast radius!")
lines.append(" Consider:")
lines.append(" - Breaking the change into smaller, incremental steps")
lines.append(" - Adding comprehensive tests before refactoring")
lines.append(" - Reviewing all upstream dependencies for compatibility")
elif risk_level == "MEDIUM":
lines.append("⚠ CAUTION: This change affects multiple components")
lines.append(" Ensure all callers are updated if the interface changes")
lines.append("=" * 60)
return "\n".join(lines)