mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
668 lines
26 KiB
Python
668 lines
26 KiB
Python
"""Unified orchestrator for dynamic rule discovery and execution.
|
|
|
|
This module provides a central orchestrator that:
|
|
1. Dynamically discovers ALL rules in the /rules directory
|
|
2. Analyzes their signatures to determine requirements
|
|
3. Executes them with appropriate parameters
|
|
4. Provides a unified interface for all detection systems
|
|
"""
|
|
|
|
import importlib
|
|
import inspect
|
|
import json
|
|
import os
|
|
import pkgutil
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Callable, Optional, Set
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
@dataclass
|
|
class RuleInfo:
|
|
"""Metadata about a discovered rule."""
|
|
name: str
|
|
module: str
|
|
function: Callable
|
|
signature: inspect.Signature
|
|
category: str
|
|
requires_ast: bool = False
|
|
requires_db: bool = False
|
|
requires_file: bool = False
|
|
requires_content: bool = False
|
|
param_count: int = 0
|
|
param_names: List[str] = field(default_factory=list)
|
|
rule_type: str = "standalone" # standalone, discovery, taint-dependent
|
|
|
|
|
|
@dataclass
|
|
class RuleContext:
|
|
"""Context information for rule execution."""
|
|
file_path: Optional[Path] = None
|
|
content: Optional[str] = None
|
|
ast_tree: Optional[Any] = None
|
|
language: Optional[str] = None
|
|
db_path: Optional[str] = None
|
|
project_path: Optional[Path] = None
|
|
|
|
|
|
class RulesOrchestrator:
|
|
"""Unified orchestrator for ALL rule execution."""
|
|
|
|
def __init__(self, project_path: Path, db_path: Path = None):
|
|
"""Initialize the orchestrator.
|
|
|
|
Args:
|
|
project_path: Root path of the project being analyzed
|
|
db_path: Optional path to the database (defaults to .pf/repo_index.db)
|
|
"""
|
|
self.project_path = Path(project_path)
|
|
self.db_path = Path(db_path) if db_path else self.project_path / ".pf" / "repo_index.db"
|
|
self._debug = os.environ.get("THEAUDITOR_DEBUG", "").lower() == "true"
|
|
self.rules = self._discover_all_rules()
|
|
|
|
# NEW: Initialize taint infrastructure for rules that need it
|
|
# Lazy imports to avoid circular dependencies
|
|
self.taint_registry = None
|
|
self._taint_trace_func = None
|
|
self._taint_conn = None # Lazy-load database connection
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Discovered {sum(len(r) for r in self.rules.values())} rules across {len(self.rules)} categories")
|
|
|
|
def _discover_all_rules(self) -> Dict[str, List[RuleInfo]]:
|
|
"""Dynamically discover ALL rules in /rules directory.
|
|
|
|
Returns:
|
|
Dictionary mapping category name to list of RuleInfo objects
|
|
"""
|
|
rules_by_category = {}
|
|
|
|
# Get the rules package directory
|
|
import theauditor.rules as rules_package
|
|
rules_dir = Path(rules_package.__file__).parent
|
|
|
|
# Walk all subdirectories
|
|
for subdir in rules_dir.iterdir():
|
|
if not subdir.is_dir() or subdir.name.startswith('__'):
|
|
continue
|
|
|
|
category = subdir.name
|
|
rules_by_category[category] = []
|
|
|
|
# Process all Python files in the subdirectory
|
|
for py_file in subdir.glob("*.py"):
|
|
if py_file.name.startswith('__'):
|
|
continue
|
|
|
|
module_name = f"theauditor.rules.{category}.{py_file.stem}"
|
|
|
|
try:
|
|
# Import the module
|
|
module = importlib.import_module(module_name)
|
|
|
|
# Find all find_* functions
|
|
for name, obj in inspect.getmembers(module, inspect.isfunction):
|
|
if name.startswith('find_'):
|
|
# Check if function is defined in this module (not imported)
|
|
if obj.__module__ == module_name:
|
|
rule_info = self._analyze_rule(name, obj, module_name, category)
|
|
rules_by_category[category].append(rule_info)
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Found rule: {category}/{name} with {rule_info.param_count} params")
|
|
|
|
except ImportError as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Failed to import {module_name}: {e}")
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Error processing {module_name}: {e}")
|
|
|
|
# Also check for top-level rule files (not in subdirectories)
|
|
for py_file in rules_dir.glob("*.py"):
|
|
if py_file.name.startswith('__') or py_file.is_dir():
|
|
continue
|
|
|
|
module_name = f"theauditor.rules.{py_file.stem}"
|
|
category = "general" # Top-level rules go in general category
|
|
|
|
if category not in rules_by_category:
|
|
rules_by_category[category] = []
|
|
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
|
|
for name, obj in inspect.getmembers(module, inspect.isfunction):
|
|
if name.startswith('find_'):
|
|
if obj.__module__ == module_name:
|
|
rule_info = self._analyze_rule(name, obj, module_name, category)
|
|
rules_by_category[category].append(rule_info)
|
|
|
|
except ImportError:
|
|
pass # Silent skip for non-importable files
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Error processing {module_name}: {e}")
|
|
|
|
return rules_by_category
|
|
|
|
def _analyze_rule(self, name: str, func: Callable, module: str, category: str) -> RuleInfo:
|
|
"""Analyze a rule function to determine its requirements.
|
|
|
|
Args:
|
|
name: Function name
|
|
func: The function object
|
|
module: Module name
|
|
category: Category name
|
|
|
|
Returns:
|
|
RuleInfo object with metadata about the rule
|
|
"""
|
|
sig = inspect.signature(func)
|
|
params = list(sig.parameters.keys())
|
|
|
|
# Determine what the rule needs based on parameter names
|
|
requires_ast = any(p in ['ast', 'tree', 'ast_tree', 'python_ast'] for p in params)
|
|
requires_db = any(p in ['db_path', 'database', 'conn'] for p in params)
|
|
requires_file = any(p in ['file_path', 'filepath', 'path', 'filename'] for p in params)
|
|
requires_content = any(p in ['content', 'source', 'code', 'text'] for p in params)
|
|
|
|
# CRITICAL: Auto-detect rule type based on parameters
|
|
rule_type = "standalone" # Default
|
|
|
|
# Discovery rules: register new sinks/sources to the registry
|
|
if 'taint_registry' in params:
|
|
rule_type = "discovery"
|
|
# Taint-dependent rules: use taint analysis results
|
|
elif 'taint_checker' in params or 'trace_taint' in params:
|
|
rule_type = "taint-dependent"
|
|
# Everything else is standalone (doesn't need taint infrastructure)
|
|
else:
|
|
rule_type = "standalone"
|
|
|
|
return RuleInfo(
|
|
name=name,
|
|
module=module,
|
|
function=func,
|
|
signature=sig,
|
|
category=category,
|
|
requires_ast=requires_ast,
|
|
requires_db=requires_db,
|
|
requires_file=requires_file,
|
|
requires_content=requires_content,
|
|
param_count=len(params),
|
|
param_names=params,
|
|
rule_type=rule_type
|
|
)
|
|
|
|
def run_all_rules(self, context: Optional[RuleContext] = None) -> List[Dict[str, Any]]:
|
|
"""Execute ALL discovered rules with appropriate parameters.
|
|
|
|
Args:
|
|
context: Optional context with file, AST, database info
|
|
|
|
Returns:
|
|
List of findings from all rules
|
|
"""
|
|
if context is None:
|
|
context = RuleContext(
|
|
db_path=str(self.db_path),
|
|
project_path=self.project_path
|
|
)
|
|
|
|
all_findings = []
|
|
total_executed = 0
|
|
|
|
for category, rules in self.rules.items():
|
|
if not rules:
|
|
continue
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Running {len(rules)} rules in category: {category}")
|
|
|
|
for rule in rules:
|
|
try:
|
|
findings = self._execute_rule(rule, context)
|
|
if findings:
|
|
all_findings.extend(findings)
|
|
total_executed += 1
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] {rule.name}: {len(findings)} findings")
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Rule {rule.name} failed: {e}")
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Executed {total_executed} rules, found {len(all_findings)} issues")
|
|
|
|
return all_findings
|
|
|
|
def run_rules_for_file(self, context: RuleContext) -> List[Dict[str, Any]]:
|
|
"""Run rules applicable to a specific file.
|
|
|
|
Args:
|
|
context: Context with file information
|
|
|
|
Returns:
|
|
List of findings for this file
|
|
"""
|
|
findings = []
|
|
|
|
# Filter rules that need file/AST/content
|
|
for category, rules in self.rules.items():
|
|
for rule in rules:
|
|
# Skip database-only rules when processing individual files
|
|
if rule.requires_db and not (rule.requires_file or rule.requires_ast or rule.requires_content):
|
|
continue
|
|
|
|
# Skip rules that need AST if we don't have it
|
|
if rule.requires_ast and not context.ast_tree:
|
|
continue
|
|
|
|
try:
|
|
rule_findings = self._execute_rule(rule, context)
|
|
if rule_findings:
|
|
findings.extend(rule_findings)
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Rule {rule.name} failed for file: {e}")
|
|
|
|
return findings
|
|
|
|
def get_rules_by_type(self, rule_type: str) -> List[RuleInfo]:
|
|
"""Get all rules of a specific type.
|
|
|
|
Args:
|
|
rule_type: Type of rules to retrieve (standalone, discovery, taint-dependent)
|
|
|
|
Returns:
|
|
List of RuleInfo objects matching the type
|
|
"""
|
|
rules_of_type = []
|
|
for category, rules in self.rules.items():
|
|
for rule in rules:
|
|
if rule.rule_type == rule_type:
|
|
rules_of_type.append(rule)
|
|
return rules_of_type
|
|
|
|
def run_discovery_rules(self, registry) -> List[Dict[str, Any]]:
|
|
"""Run all discovery rules that populate the taint registry.
|
|
|
|
Args:
|
|
registry: TaintRegistry to populate with discovered patterns
|
|
|
|
Returns:
|
|
List of findings from discovery rules
|
|
"""
|
|
context = RuleContext(
|
|
db_path=str(self.db_path),
|
|
project_path=self.project_path
|
|
)
|
|
|
|
findings = []
|
|
discovery_rules = self.get_rules_by_type("discovery")
|
|
|
|
for rule in discovery_rules:
|
|
try:
|
|
# Pass registry to the rule
|
|
kwargs = self._build_rule_kwargs(rule, context)
|
|
kwargs['taint_registry'] = registry
|
|
|
|
rule_findings = rule.function(**kwargs)
|
|
if rule_findings:
|
|
findings.extend(rule_findings)
|
|
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Discovery rule {rule.name}: {len(rule_findings) if rule_findings else 0} findings")
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Discovery rule {rule.name} failed: {e}")
|
|
|
|
return findings
|
|
|
|
def run_standalone_rules(self) -> List[Dict[str, Any]]:
|
|
"""Run all standalone rules that don't need taint data.
|
|
|
|
Returns:
|
|
List of findings from standalone rules
|
|
"""
|
|
context = RuleContext(
|
|
db_path=str(self.db_path),
|
|
project_path=self.project_path
|
|
)
|
|
|
|
findings = []
|
|
standalone_rules = self.get_rules_by_type("standalone")
|
|
|
|
for rule in standalone_rules:
|
|
try:
|
|
kwargs = self._build_rule_kwargs(rule, context)
|
|
rule_findings = rule.function(**kwargs)
|
|
if rule_findings:
|
|
findings.extend(rule_findings)
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Standalone rule {rule.name} failed: {e}")
|
|
|
|
return findings
|
|
|
|
def run_taint_dependent_rules(self, taint_checker) -> List[Dict[str, Any]]:
|
|
"""Run all rules that depend on taint analysis results.
|
|
|
|
Args:
|
|
taint_checker: Function to check if a variable is tainted
|
|
|
|
Returns:
|
|
List of findings from taint-dependent rules
|
|
"""
|
|
context = RuleContext(
|
|
db_path=str(self.db_path),
|
|
project_path=self.project_path
|
|
)
|
|
|
|
findings = []
|
|
taint_rules = self.get_rules_by_type("taint-dependent")
|
|
|
|
for rule in taint_rules:
|
|
try:
|
|
kwargs = self._build_rule_kwargs(rule, context)
|
|
if 'taint_checker' in rule.param_names:
|
|
kwargs['taint_checker'] = taint_checker
|
|
|
|
rule_findings = rule.function(**kwargs)
|
|
if rule_findings:
|
|
findings.extend(rule_findings)
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Taint-dependent rule {rule.name} failed: {e}")
|
|
|
|
return findings
|
|
|
|
def _build_rule_kwargs(self, rule: RuleInfo, context: RuleContext) -> Dict[str, Any]:
|
|
"""Build keyword arguments for a rule based on its requirements.
|
|
|
|
Args:
|
|
rule: RuleInfo object
|
|
context: RuleContext with available data
|
|
|
|
Returns:
|
|
Dictionary of keyword arguments for the rule
|
|
"""
|
|
kwargs = {}
|
|
|
|
for param_name in rule.param_names:
|
|
if param_name in ['db_path', 'database']:
|
|
kwargs[param_name] = context.db_path or str(self.db_path)
|
|
elif param_name in ['file_path', 'filepath', 'path', 'filename']:
|
|
if context.file_path:
|
|
kwargs[param_name] = str(context.file_path)
|
|
elif param_name in ['content', 'source', 'code', 'text']:
|
|
if context.content:
|
|
kwargs[param_name] = context.content
|
|
elif param_name in ['ast', 'tree', 'ast_tree', 'python_ast']:
|
|
if context.ast_tree:
|
|
kwargs[param_name] = context.ast_tree
|
|
elif param_name == 'project_path':
|
|
kwargs[param_name] = str(context.project_path or self.project_path)
|
|
elif param_name == 'language':
|
|
kwargs[param_name] = context.language
|
|
|
|
return kwargs
|
|
|
|
def run_database_rules(self) -> List[Dict[str, Any]]:
|
|
"""Run rules that operate on the database.
|
|
|
|
Returns:
|
|
List of findings from database rules
|
|
"""
|
|
context = RuleContext(
|
|
db_path=str(self.db_path),
|
|
project_path=self.project_path
|
|
)
|
|
|
|
findings = []
|
|
|
|
# Filter rules that need database
|
|
for category, rules in self.rules.items():
|
|
for rule in rules:
|
|
if rule.requires_db:
|
|
try:
|
|
rule_findings = self._execute_rule(rule, context)
|
|
if rule_findings:
|
|
findings.extend(rule_findings)
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Database rule {rule.name} failed: {e}")
|
|
|
|
return findings
|
|
|
|
def _execute_rule(self, rule: RuleInfo, context: RuleContext) -> List[Dict[str, Any]]:
|
|
"""Execute a single rule with appropriate parameters.
|
|
|
|
Args:
|
|
rule: RuleInfo object describing the rule
|
|
context: RuleContext with available data
|
|
|
|
Returns:
|
|
List of findings from the rule
|
|
"""
|
|
# Build arguments based on what the rule needs
|
|
kwargs = {}
|
|
|
|
for param_name in rule.param_names:
|
|
# NEW: Provide taint infrastructure to rules that need it
|
|
if param_name == 'taint_registry':
|
|
# Lazy-load taint registry only when needed
|
|
if self.taint_registry is None:
|
|
from theauditor.taint.registry import TaintRegistry
|
|
self.taint_registry = TaintRegistry()
|
|
kwargs['taint_registry'] = self.taint_registry
|
|
|
|
elif param_name == 'taint_checker':
|
|
# Provide a function that checks if variable is tainted
|
|
kwargs['taint_checker'] = self._create_taint_checker(context)
|
|
|
|
elif param_name == 'trace_taint':
|
|
# Provide inter-procedural tracking function
|
|
kwargs['trace_taint'] = self._get_taint_tracer()
|
|
|
|
# Map parameter names to context values
|
|
elif param_name in ['ast', 'tree', 'ast_tree', 'python_ast']:
|
|
if context.ast_tree:
|
|
kwargs[param_name] = context.ast_tree
|
|
else:
|
|
return [] # Skip if AST required but not available
|
|
|
|
elif param_name in ['db_path', 'database']:
|
|
kwargs[param_name] = context.db_path or str(self.db_path)
|
|
|
|
elif param_name in ['file_path', 'filepath', 'path', 'filename']:
|
|
if context.file_path:
|
|
kwargs[param_name] = str(context.file_path)
|
|
else:
|
|
return [] # Skip if file required but not available
|
|
|
|
elif param_name in ['content', 'source', 'code', 'text']:
|
|
if context.content:
|
|
kwargs[param_name] = context.content
|
|
else:
|
|
return [] # Skip if content required but not available
|
|
|
|
elif param_name == 'project_path':
|
|
kwargs[param_name] = str(context.project_path or self.project_path)
|
|
|
|
elif param_name == 'language':
|
|
kwargs[param_name] = context.language
|
|
|
|
# Some rules might have other parameters - try to handle gracefully
|
|
else:
|
|
# Check if parameter has a default value
|
|
param = rule.signature.parameters[param_name]
|
|
if param.default != inspect.Parameter.empty:
|
|
# Has default, can skip
|
|
continue
|
|
else:
|
|
# Required parameter we don't know how to fill
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Don't know how to fill parameter '{param_name}' for rule {rule.name}")
|
|
return []
|
|
|
|
# Execute the rule
|
|
try:
|
|
result = rule.function(**kwargs)
|
|
|
|
# Normalize result to list of dicts
|
|
if result is None:
|
|
return []
|
|
elif isinstance(result, list):
|
|
return result
|
|
elif isinstance(result, dict):
|
|
return [result]
|
|
else:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Warning: Rule {rule.name} returned unexpected type: {type(result)}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
if self._debug:
|
|
print(f"[ORCHESTRATOR] Error executing rule {rule.name}: {e}")
|
|
return []
|
|
|
|
def get_rule_stats(self) -> Dict[str, Any]:
|
|
"""Get statistics about discovered rules.
|
|
|
|
Returns:
|
|
Dictionary with rule statistics
|
|
"""
|
|
stats = {
|
|
'total_rules': sum(len(rules) for rules in self.rules.values()),
|
|
'categories': list(self.rules.keys()),
|
|
'by_category': {cat: len(rules) for cat, rules in self.rules.items()},
|
|
'by_requirements': {
|
|
'ast_rules': sum(1 for rules in self.rules.values() for r in rules if r.requires_ast),
|
|
'db_rules': sum(1 for rules in self.rules.values() for r in rules if r.requires_db),
|
|
'file_rules': sum(1 for rules in self.rules.values() for r in rules if r.requires_file),
|
|
'content_rules': sum(1 for rules in self.rules.values() for r in rules if r.requires_content),
|
|
}
|
|
}
|
|
return stats
|
|
|
|
def _create_taint_checker(self, context: RuleContext):
|
|
"""Check taint using REAL taint analysis results.
|
|
|
|
This provides rules with a way to check if variables are tainted
|
|
using the main taint analyzer's cached results.
|
|
|
|
Args:
|
|
context: The rule execution context
|
|
|
|
Returns:
|
|
A function that checks if a variable is tainted
|
|
"""
|
|
# Get cached taint results
|
|
if not hasattr(self, '_taint_results'):
|
|
from theauditor.taint import trace_taint
|
|
self._taint_results = trace_taint(str(self.db_path), max_depth=5)
|
|
if self._debug:
|
|
total = len(self._taint_results.get("taint_paths", []))
|
|
print(f"[ORCHESTRATOR] Cached {total} taint paths for rules", file=sys.stderr)
|
|
|
|
def is_tainted(var_name: str, line: int) -> bool:
|
|
"""Check if variable is in any taint path.
|
|
|
|
Args:
|
|
var_name: Name of the variable to check
|
|
line: Line number where the check is happening
|
|
|
|
Returns:
|
|
True if the variable is tainted, False otherwise
|
|
"""
|
|
for path in self._taint_results.get("taint_paths", []):
|
|
# Check source
|
|
source = path.get("source", {})
|
|
if (source.get("file", "") == str(context.file_path) and
|
|
abs(source.get("line", 0) - line) < 10):
|
|
# Check if var is in path
|
|
for step in path.get("path", []):
|
|
if var_name in str(step):
|
|
return True
|
|
return False
|
|
|
|
return is_tainted
|
|
|
|
def _get_taint_tracer(self):
|
|
"""Get cached taint analysis results for rules to query.
|
|
|
|
This provides rules with access to the main taint analyzer's
|
|
results WITH JavaScript pattern support.
|
|
|
|
Returns:
|
|
A function that returns relevant taint paths
|
|
"""
|
|
if self._taint_trace_func is None:
|
|
# Run FULL taint analysis ONCE and cache it
|
|
from theauditor.taint import trace_taint
|
|
if not hasattr(self, '_taint_results'):
|
|
self._taint_results = trace_taint(str(self.db_path), max_depth=5)
|
|
if self._debug:
|
|
total = len(self._taint_results.get("taint_paths", []))
|
|
print(f"[ORCHESTRATOR] Cached {total} taint paths for rules", file=sys.stderr)
|
|
|
|
def get_taint_for_location(source_var: str, source_file: str, source_line: int, source_function: str = "unknown"):
|
|
"""Return cached taint paths relevant to location.
|
|
|
|
Args:
|
|
source_var: The variable to trace
|
|
source_file: File containing the variable
|
|
source_line: Line where the variable is defined
|
|
source_function: Function containing the variable (optional)
|
|
|
|
Returns:
|
|
List of relevant taint paths from cached results
|
|
"""
|
|
relevant_paths = []
|
|
for path in self._taint_results.get("taint_paths", []):
|
|
source = path.get("source", {})
|
|
# Match by file and approximate line
|
|
if (source.get("file", "").endswith(source_file) and
|
|
abs(source.get("line", 0) - source_line) < 10):
|
|
# Check if variable is in the path
|
|
for step in path.get("path", []):
|
|
if source_var in str(step.get("var", "")):
|
|
relevant_paths.append(path)
|
|
break
|
|
return relevant_paths
|
|
|
|
self._taint_trace_func = get_taint_for_location
|
|
|
|
return self._taint_trace_func
|
|
|
|
|
|
# Convenience function for backward compatibility
|
|
def run_all_rules(project_path: str, db_path: str = None) -> List[Dict[str, Any]]:
|
|
"""Run all rules for a project.
|
|
|
|
Args:
|
|
project_path: Root path of the project
|
|
db_path: Optional database path (defaults to .pf/repo_index.db)
|
|
|
|
Returns:
|
|
List of all findings
|
|
"""
|
|
orchestrator = RulesOrchestrator(Path(project_path))
|
|
|
|
context = RuleContext(
|
|
db_path=db_path or str(orchestrator.db_path),
|
|
project_path=Path(project_path)
|
|
)
|
|
|
|
return orchestrator.run_all_rules(context) |