mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
- Replace hardcoded 300s (5 min) timeout that was killing taint-analyze and detect-patterns
- Add COMMAND_TIMEOUTS dictionary with appropriate timeouts per command type:
* taint-analyze, detect-patterns: 2 hours (can take 10-60 min on large codebases)
* lint: 15 minutes (ESLint/ruff on monorepos)
* fce: 30 minutes (correlation analysis)
* graph operations: 10 minutes
* Quick operations (deps, docs, workset): 5 minutes
- Add get_command_timeout() function to determine timeout based on command name
- Support environment variable overrides:
* Global: THEAUDITOR_TIMEOUT_SECONDS (default 30 min)
* Per-command: THEAUDITOR_TIMEOUT_TAINT_ANALYZE_SECONDS, etc.
- Update all 3 run_subprocess_with_interrupt calls to use adaptive timeouts
1221 lines
55 KiB
Python
1221 lines
55 KiB
Python
"""Pipeline execution module for TheAuditor."""
|
|
|
|
import json
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Callable, List, Tuple
|
|
|
|
# Import our custom temp manager to avoid WSL2/Windows issues
|
|
try:
|
|
from theauditor.utils.temp_manager import TempManager
|
|
except ImportError:
|
|
# Fallback if not available yet
|
|
TempManager = None
|
|
|
|
# Windows compatibility
|
|
IS_WINDOWS = platform.system() == "Windows"
|
|
|
|
# Command-specific timeout configuration (in seconds)
|
|
# Based on empirical testing and user reports of 10-60 minute analysis times
|
|
COMMAND_TIMEOUTS = {
|
|
"index": 600, # 10 minutes - AST parsing can be slow on large codebases
|
|
"detect-frameworks": 300, # 5 minutes - Quick scan of config files
|
|
"deps": 300, # 5 minutes - Network I/O but usually fast
|
|
"docs": 300, # 5 minutes - Network I/O for fetching docs
|
|
"workset": 300, # 5 minutes - File system traversal
|
|
"lint": 900, # 15 minutes - ESLint/ruff on large codebases
|
|
"detect-patterns": 7200, # 2 hours - 100+ security patterns on all files
|
|
"graph": 600, # 10 minutes - Building dependency graphs
|
|
"taint-analyze": 7200, # 2 hours - Data flow analysis is expensive
|
|
"taint": 7200, # 2 hours - Alias for taint-analyze
|
|
"fce": 1800, # 30 minutes - Correlation analysis
|
|
"report": 600, # 10 minutes - Report generation
|
|
"summary": 300, # 5 minutes - Quick summary generation
|
|
}
|
|
|
|
# Allow environment variable override for all timeouts
|
|
DEFAULT_TIMEOUT = int(os.environ.get('THEAUDITOR_TIMEOUT_SECONDS', '1800')) # Default 30 minutes
|
|
|
|
def get_command_timeout(cmd: List[str]) -> int:
|
|
"""
|
|
Determine appropriate timeout for a command based on its name.
|
|
|
|
Args:
|
|
cmd: Command array to execute
|
|
|
|
Returns:
|
|
Timeout in seconds
|
|
"""
|
|
# Extract command name from the command array
|
|
# Format: [python, -m, theauditor.cli, COMMAND_NAME, ...]
|
|
cmd_str = " ".join(cmd)
|
|
|
|
# Check for specific command patterns
|
|
for cmd_name, timeout in COMMAND_TIMEOUTS.items():
|
|
if cmd_name in cmd_str:
|
|
# Check for environment variable override for specific command
|
|
env_key = f'THEAUDITOR_TIMEOUT_{cmd_name.upper().replace("-", "_")}_SECONDS'
|
|
return int(os.environ.get(env_key, timeout))
|
|
|
|
# Default timeout if command not recognized
|
|
return DEFAULT_TIMEOUT
|
|
|
|
# Global stop event for interrupt handling
|
|
stop_event = threading.Event()
|
|
|
|
def signal_handler(signum, frame):
|
|
"""Handle Ctrl+C by setting stop event."""
|
|
print("\n[INFO] Interrupt received, stopping pipeline gracefully...", file=sys.stderr)
|
|
stop_event.set()
|
|
|
|
# Register signal handler
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
if not IS_WINDOWS:
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def run_subprocess_with_interrupt(cmd, stdout_fp, stderr_fp, cwd, shell=False, timeout=300):
|
|
"""
|
|
Run subprocess with interrupt checking every 100ms.
|
|
|
|
Args:
|
|
cmd: Command to execute
|
|
stdout_fp: File handle for stdout
|
|
stderr_fp: File handle for stderr
|
|
cwd: Working directory
|
|
shell: Whether to use shell execution
|
|
timeout: Maximum time to wait (seconds)
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess-like object with returncode, stdout, stderr
|
|
"""
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
stdout=stdout_fp,
|
|
stderr=stderr_fp,
|
|
text=True,
|
|
cwd=cwd,
|
|
shell=shell
|
|
)
|
|
|
|
# Poll process every 100ms to check for completion or interruption
|
|
start_time = time.time()
|
|
while process.poll() is None:
|
|
if stop_event.is_set():
|
|
# User interrupted - terminate subprocess
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
process.wait()
|
|
raise KeyboardInterrupt("Pipeline interrupted by user")
|
|
|
|
# Check timeout
|
|
if time.time() - start_time > timeout:
|
|
process.terminate()
|
|
try:
|
|
process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
process.wait()
|
|
raise subprocess.TimeoutExpired(cmd, timeout)
|
|
|
|
# Sleep briefly to avoid busy-waiting
|
|
time.sleep(0.1)
|
|
|
|
# Create result object similar to subprocess.run
|
|
class Result:
|
|
def __init__(self, returncode):
|
|
self.returncode = returncode
|
|
self.stdout = None
|
|
self.stderr = None
|
|
|
|
result = Result(process.returncode)
|
|
return result
|
|
|
|
|
|
def run_command_chain(commands: List[Tuple[str, List[str]]], root: str, chain_name: str) -> dict:
|
|
"""
|
|
Execute a chain of commands sequentially and capture their output.
|
|
Used for parallel execution of independent command tracks.
|
|
|
|
Args:
|
|
commands: List of (description, command_array) tuples
|
|
root: Working directory
|
|
chain_name: Name of this chain for logging
|
|
|
|
Returns:
|
|
Dict with chain results including success, output, and timing
|
|
"""
|
|
chain_start = time.time()
|
|
chain_output = []
|
|
chain_errors = []
|
|
failed = False
|
|
|
|
# Write progress to a status file for monitoring
|
|
status_dir = Path(root) / ".pf" / "status"
|
|
try:
|
|
status_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not create status dir {status_dir}: {e}", file=sys.stderr)
|
|
status_file = status_dir / f"{chain_name.replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')}.status"
|
|
|
|
def write_status(message: str, completed: int = 0, total: int = 0):
|
|
"""Write current status to file for external monitoring."""
|
|
try:
|
|
with open(status_file, 'w', encoding='utf-8') as f:
|
|
status_data = {
|
|
"track": chain_name,
|
|
"current": message,
|
|
"completed": completed,
|
|
"total": total,
|
|
"timestamp": time.time(),
|
|
"elapsed": time.time() - chain_start
|
|
}
|
|
f.write(json.dumps(status_data) + "\n")
|
|
f.flush() # Force write to disk
|
|
# Debug output to stderr (visible in subprocess)
|
|
print(f"[STATUS] {chain_name}: {message} [{completed}/{total}]", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[ERROR] Could not write status to {status_file}: {e}", file=sys.stderr)
|
|
|
|
# Write initial status
|
|
write_status("Starting", 0, len(commands))
|
|
|
|
completed_count = 0
|
|
for description, cmd in commands:
|
|
# Update status before starting command
|
|
write_status(f"Running: {description}", completed_count, len(commands))
|
|
|
|
start_time = time.time()
|
|
chain_output.append(f"\n{'='*60}")
|
|
chain_output.append(f"[{chain_name}] {description}")
|
|
chain_output.append('='*60)
|
|
|
|
try:
|
|
# Use temp files to capture output
|
|
if TempManager:
|
|
# Sanitize chain name and description for Windows paths
|
|
safe_chain = chain_name.replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')
|
|
safe_desc = description[:20].replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')
|
|
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
|
|
root, f"chain_{safe_chain}_{safe_desc}"
|
|
)
|
|
else:
|
|
# Fallback to regular tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
|
|
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
|
|
stdout_file = out_tmp.name
|
|
stderr_file = err_tmp.name
|
|
|
|
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
|
|
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
|
|
|
|
# Determine appropriate timeout for this command
|
|
cmd_timeout = get_command_timeout(cmd)
|
|
|
|
result = run_subprocess_with_interrupt(
|
|
cmd,
|
|
stdout_fp=out_fp,
|
|
stderr_fp=err_fp,
|
|
cwd=root,
|
|
shell=IS_WINDOWS, # Windows compatibility fix
|
|
timeout=cmd_timeout # Adaptive timeout based on command type
|
|
)
|
|
|
|
# Read outputs
|
|
with open(stdout_file, 'r', encoding='utf-8') as f:
|
|
stdout = f.read()
|
|
with open(stderr_file, 'r', encoding='utf-8') as f:
|
|
stderr = f.read()
|
|
|
|
# Clean up temp files
|
|
try:
|
|
os.unlink(stdout_file)
|
|
os.unlink(stderr_file)
|
|
except (OSError, PermissionError):
|
|
pass # Windows file locking
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
# Check for special exit codes (findings commands)
|
|
is_findings_command = "taint-analyze" in cmd or ("deps" in cmd and "--vuln-scan" in cmd)
|
|
if is_findings_command:
|
|
success = result.returncode in [0, 1, 2]
|
|
else:
|
|
success = result.returncode == 0
|
|
|
|
if success:
|
|
completed_count += 1
|
|
write_status(f"Completed: {description}", completed_count, len(commands))
|
|
chain_output.append(f"[OK] {description} completed in {elapsed:.1f}s")
|
|
if stdout:
|
|
lines = stdout.strip().split('\n')
|
|
# For parallel tracks, include all output (chains collect their own output)
|
|
if len(lines) <= 5:
|
|
for line in lines:
|
|
chain_output.append(f" {line}")
|
|
else:
|
|
# Show first 5 lines and indicate more in chain output
|
|
for line in lines[:5]:
|
|
chain_output.append(f" {line}")
|
|
chain_output.append(f" ... ({len(lines) - 5} more lines)")
|
|
# Add full output marker for later processing
|
|
chain_output.append(" [Full output available in pipeline.log]")
|
|
else:
|
|
failed = True
|
|
write_status(f"FAILED: {description}", completed_count, len(commands))
|
|
chain_output.append(f"[FAILED] {description} failed (exit code {result.returncode})")
|
|
if stderr:
|
|
chain_errors.append(f"Error in {description}: {stderr}")
|
|
break # Stop chain on failure
|
|
|
|
except KeyboardInterrupt:
|
|
# User interrupted - clean up and exit
|
|
failed = True
|
|
write_status(f"INTERRUPTED: {description}", completed_count, len(commands))
|
|
chain_output.append(f"[INTERRUPTED] Pipeline stopped by user")
|
|
raise # Re-raise to propagate up
|
|
except Exception as e:
|
|
failed = True
|
|
write_status(f"ERROR: {description}", completed_count, len(commands))
|
|
chain_output.append(f"[FAILED] {description} failed: {e}")
|
|
chain_errors.append(f"Exception in {description}: {str(e)}")
|
|
break
|
|
|
|
# Final status
|
|
if not failed:
|
|
write_status(f"Completed all {len(commands)} tasks", len(commands), len(commands))
|
|
|
|
chain_elapsed = time.time() - chain_start
|
|
return {
|
|
"success": not failed,
|
|
"output": "\n".join(chain_output),
|
|
"errors": "\n".join(chain_errors) if chain_errors else "",
|
|
"elapsed": chain_elapsed,
|
|
"name": chain_name
|
|
}
|
|
|
|
|
|
def run_full_pipeline(
|
|
root: str = ".",
|
|
quiet: bool = False,
|
|
exclude_self: bool = False,
|
|
offline: bool = False,
|
|
log_callback: Callable[[str, bool], None] = None
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Run complete audit pipeline in exact order specified in teamsop.md.
|
|
|
|
Args:
|
|
root: Root directory to analyze
|
|
quiet: Whether to run in quiet mode (minimal output)
|
|
log_callback: Optional callback function for logging messages (message, is_error)
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- success: Whether all phases succeeded
|
|
- failed_phases: Number of failed phases
|
|
- total_phases: Total number of phases
|
|
- elapsed_time: Total execution time in seconds
|
|
- created_files: List of all created files
|
|
- log_lines: List of all log lines
|
|
"""
|
|
# CRITICAL: Archive previous run BEFORE any new artifacts are created
|
|
# Import and call _archive function directly to avoid subprocess issues
|
|
try:
|
|
from theauditor.commands._archive import _archive
|
|
# Call the function directly with appropriate parameters
|
|
# Note: Click commands can be invoked as regular functions
|
|
_archive.callback(run_type="full", diff_spec=None)
|
|
print("[INFO] Previous run archived successfully", file=sys.stderr)
|
|
except ImportError as e:
|
|
print(f"[WARNING] Could not import archive command: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[WARNING] Archive operation failed: {e}", file=sys.stderr)
|
|
|
|
# Track all created files throughout execution
|
|
all_created_files = []
|
|
|
|
# CRITICAL FIX: Open log file immediately for real-time writing
|
|
# This ensures we don't lose logs if the pipeline crashes
|
|
# Write directly to .pf root, not in readthis (which gets recreated by extraction)
|
|
pf_dir = Path(root) / ".pf"
|
|
pf_dir.mkdir(parents=True, exist_ok=True)
|
|
log_file_path = pf_dir / "pipeline.log"
|
|
log_lines = [] # Keep for return value
|
|
|
|
# Open log file in write mode with line buffering for immediate writes
|
|
log_file = None
|
|
try:
|
|
log_file = open(log_file_path, 'w', encoding='utf-8', buffering=1)
|
|
except Exception as e:
|
|
print(f"[CRITICAL] Failed to open log file {log_file_path}: {e}", file=sys.stderr)
|
|
# Fall back to memory-only logging if file can't be opened
|
|
log_file = None
|
|
|
|
# CRITICAL: Create the .pf/raw/ directory for ground truth preservation
|
|
# This directory will store immutable copies of all analysis artifacts
|
|
raw_dir = Path(root) / ".pf" / "raw"
|
|
try:
|
|
raw_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
print(f"[CRITICAL] Failed to create raw directory {raw_dir}: {e}", file=sys.stderr)
|
|
# Continue execution - we'll handle missing directory during file moves
|
|
|
|
# Ensure readthis directory exists for fresh chunks
|
|
# Archive has already moved old content to history
|
|
readthis_dir = Path(root) / ".pf" / "readthis"
|
|
readthis_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def log_output(message, is_error=False):
|
|
"""Log message to callback, file (real-time), and memory."""
|
|
if log_callback and not quiet:
|
|
log_callback(message, is_error)
|
|
# Always add to log list for return value
|
|
log_lines.append(message)
|
|
# CRITICAL: Write immediately to file and flush (if file is open)
|
|
if log_file:
|
|
try:
|
|
log_file.write(message + '\n')
|
|
log_file.flush() # Force write to disk immediately
|
|
except Exception as e:
|
|
print(f"[CRITICAL] Failed to write to log file: {e}", file=sys.stderr)
|
|
# Continue execution - logging failure shouldn't stop pipeline
|
|
|
|
# Log header
|
|
log_output(f"TheAuditor Full Pipeline Execution Log")
|
|
log_output(f"Started: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
log_output(f"Working Directory: {Path(root).resolve()}")
|
|
log_output("=" * 80)
|
|
|
|
# Dynamically discover available commands from CLI registration (Courier principle)
|
|
from theauditor.cli import cli
|
|
|
|
# Get all registered commands, excluding internal (_) and special commands
|
|
available_commands = sorted(cli.commands.keys())
|
|
|
|
# Define execution order and arguments for known commands
|
|
# This provides the order and arguments, but dynamically adapts to available commands
|
|
command_order = [
|
|
("index", []),
|
|
("detect-frameworks", []),
|
|
("deps", ["--check-latest"]),
|
|
("docs", ["fetch", "--deps", "./.pf/raw/deps.json"]),
|
|
("docs", ["summarize"]),
|
|
("workset", ["--all"]),
|
|
("lint", ["--workset"]),
|
|
("detect-patterns", []),
|
|
("graph", ["build"]),
|
|
("graph", ["analyze"]),
|
|
("graph", ["viz", "--view", "full", "--include-analysis"]),
|
|
("graph", ["viz", "--view", "cycles", "--include-analysis"]),
|
|
("graph", ["viz", "--view", "hotspots", "--include-analysis"]),
|
|
("graph", ["viz", "--view", "layers", "--include-analysis"]),
|
|
("taint-analyze", []),
|
|
("fce", []),
|
|
("report", []),
|
|
("summary", []),
|
|
]
|
|
|
|
# Build command list from available commands in the defined order
|
|
commands = []
|
|
phase_num = 0
|
|
|
|
for cmd_name, extra_args in command_order:
|
|
# Check if command exists (dynamic discovery)
|
|
if cmd_name in available_commands or (cmd_name == "docs" and "docs" in available_commands) or (cmd_name == "graph" and "graph" in available_commands):
|
|
phase_num += 1
|
|
# Generate human-readable description from command name
|
|
if cmd_name == "index":
|
|
description = f"{phase_num}. Index repository"
|
|
# Add --exclude-self flag if requested
|
|
if exclude_self and cmd_name == "index":
|
|
extra_args = extra_args + ["--exclude-self"]
|
|
elif cmd_name == "detect-frameworks":
|
|
description = f"{phase_num}. Detect frameworks"
|
|
elif cmd_name == "deps" and "--check-latest" in extra_args:
|
|
description = f"{phase_num}. Check dependencies"
|
|
elif cmd_name == "docs" and "fetch" in extra_args:
|
|
description = f"{phase_num}. Fetch documentation"
|
|
elif cmd_name == "docs" and "summarize" in extra_args:
|
|
description = f"{phase_num}. Summarize documentation"
|
|
elif cmd_name == "workset":
|
|
description = f"{phase_num}. Create workset (all files)"
|
|
elif cmd_name == "lint":
|
|
description = f"{phase_num}. Run linting"
|
|
elif cmd_name == "detect-patterns":
|
|
description = f"{phase_num}. Detect patterns"
|
|
# Add --exclude-self flag if requested
|
|
if exclude_self and cmd_name == "detect-patterns":
|
|
extra_args = extra_args + ["--exclude-self"]
|
|
elif cmd_name == "graph" and "build" in extra_args:
|
|
description = f"{phase_num}. Build graph"
|
|
elif cmd_name == "graph" and "analyze" in extra_args:
|
|
description = f"{phase_num}. Analyze graph"
|
|
elif cmd_name == "graph" and "viz" in extra_args:
|
|
# Extract view type from arguments
|
|
if "--view" in extra_args:
|
|
view_idx = extra_args.index("--view")
|
|
if view_idx + 1 < len(extra_args):
|
|
view_type = extra_args[view_idx + 1]
|
|
description = f"{phase_num}. Visualize graph ({view_type})"
|
|
else:
|
|
description = f"{phase_num}. Visualize graph"
|
|
else:
|
|
description = f"{phase_num}. Visualize graph"
|
|
elif cmd_name == "taint-analyze":
|
|
description = f"{phase_num}. Taint analysis"
|
|
elif cmd_name == "fce":
|
|
description = f"{phase_num}. Factual correlation engine"
|
|
elif cmd_name == "report":
|
|
description = f"{phase_num}. Generate report"
|
|
elif cmd_name == "summary":
|
|
description = f"{phase_num}. Generate audit summary"
|
|
else:
|
|
# Generic description for any new commands
|
|
description = f"{phase_num}. Run {cmd_name.replace('-', ' ')}"
|
|
|
|
# Build command array - use python module directly
|
|
command_array = [sys.executable, "-m", "theauditor.cli", cmd_name] + extra_args
|
|
commands.append((description, command_array))
|
|
else:
|
|
# Command not available, log warning but continue (resilient)
|
|
log_output(f"[WARNING] Command '{cmd_name}' not available, skipping")
|
|
|
|
total_phases = len(commands)
|
|
current_phase = 0
|
|
failed_phases = 0
|
|
phases_with_warnings = 0 # Track phases that completed but had errors in output
|
|
pipeline_start = time.time()
|
|
|
|
def collect_created_files():
|
|
"""Collect all files created during execution."""
|
|
files = []
|
|
|
|
# Core files
|
|
if (Path(root) / "manifest.json").exists():
|
|
files.append("manifest.json")
|
|
if (Path(root) / "repo_index.db").exists():
|
|
files.append("repo_index.db")
|
|
|
|
# .pf directory files
|
|
pf_dir = Path(root) / ".pf"
|
|
if pf_dir.exists():
|
|
for item in pf_dir.rglob("*"):
|
|
if item.is_file():
|
|
files.append(item.relative_to(Path(root)).as_posix())
|
|
|
|
# docs directory files (in .pf/docs)
|
|
docs_dir = Path(root) / ".pf" / "docs"
|
|
if docs_dir.exists():
|
|
for item in docs_dir.rglob("*"):
|
|
if item.is_file():
|
|
files.append(item.relative_to(Path(root)).as_posix())
|
|
|
|
return sorted(set(files))
|
|
|
|
# PARALLEL PIPELINE IMPLEMENTATION
|
|
# Reorganize commands into stages for parallel execution
|
|
|
|
# Stage categorization
|
|
foundation_commands = [] # Must run first sequentially
|
|
track_a_commands = [] # Network I/O track (deps, docs)
|
|
track_b_commands = [] # Code analysis track (workset, lint, patterns)
|
|
track_c_commands = [] # Graph & taint analysis track
|
|
final_commands = [] # Must run last sequentially
|
|
|
|
# Categorize each command into appropriate stage/track
|
|
for phase_name, cmd in commands:
|
|
cmd_str = " ".join(cmd)
|
|
|
|
# Stage 1: Foundation (must complete first)
|
|
if "index" in cmd_str:
|
|
foundation_commands.append((phase_name, cmd))
|
|
elif "detect-frameworks" in cmd_str:
|
|
foundation_commands.append((phase_name, cmd))
|
|
|
|
# Stage 2: Parallel tracks
|
|
elif "deps" in cmd_str:
|
|
if not offline: # Skip deps if offline mode
|
|
track_a_commands.append((phase_name, cmd))
|
|
elif "docs" in cmd_str:
|
|
if not offline: # Skip docs if offline mode
|
|
track_a_commands.append((phase_name, cmd))
|
|
elif "workset" in cmd_str:
|
|
track_b_commands.append((phase_name, cmd))
|
|
elif "lint" in cmd_str:
|
|
track_b_commands.append((phase_name, cmd))
|
|
elif "detect-patterns" in cmd_str:
|
|
track_b_commands.append((phase_name, cmd))
|
|
elif "graph build" in cmd_str:
|
|
track_c_commands.append((phase_name, cmd))
|
|
elif "graph analyze" in cmd_str:
|
|
track_c_commands.append((phase_name, cmd))
|
|
elif "graph viz" in cmd_str:
|
|
track_c_commands.append((phase_name, cmd))
|
|
elif "taint" in cmd_str:
|
|
track_c_commands.append((phase_name, cmd))
|
|
|
|
# Stage 4: Final aggregation (must run last)
|
|
elif "fce" in cmd_str:
|
|
final_commands.append((phase_name, cmd))
|
|
elif "report" in cmd_str:
|
|
final_commands.append((phase_name, cmd))
|
|
elif "summary" in cmd_str:
|
|
final_commands.append((phase_name, cmd))
|
|
else:
|
|
# Default to final commands for safety
|
|
final_commands.append((phase_name, cmd))
|
|
|
|
# STAGE 1: Foundation (Sequential)
|
|
log_output("\n" + "="*60)
|
|
log_output("[STAGE 1] FOUNDATION - Sequential Execution")
|
|
log_output("="*60)
|
|
|
|
for phase_name, cmd in foundation_commands:
|
|
current_phase += 1
|
|
log_output(f"\n[Phase {current_phase}/{total_phases}] {phase_name}")
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Execute foundation command
|
|
if TempManager:
|
|
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
|
|
root, f"foundation_{phase_name.replace(' ', '_')}"
|
|
)
|
|
else:
|
|
# Fallback to regular tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
|
|
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
|
|
stdout_file = out_tmp.name
|
|
stderr_file = err_tmp.name
|
|
|
|
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
|
|
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
|
|
|
|
# Determine appropriate timeout for this command
|
|
cmd_timeout = get_command_timeout(cmd)
|
|
|
|
result = run_subprocess_with_interrupt(
|
|
cmd,
|
|
stdout_fp=out_fp,
|
|
stderr_fp=err_fp,
|
|
cwd=root,
|
|
shell=IS_WINDOWS, # Windows compatibility fix
|
|
timeout=cmd_timeout # Adaptive timeout based on command type
|
|
)
|
|
|
|
# Read outputs
|
|
with open(stdout_file, 'r', encoding='utf-8') as f:
|
|
result.stdout = f.read()
|
|
with open(stderr_file, 'r', encoding='utf-8') as f:
|
|
result.stderr = f.read()
|
|
|
|
# Clean up temp files
|
|
try:
|
|
os.unlink(stdout_file)
|
|
os.unlink(stderr_file)
|
|
except (OSError, PermissionError):
|
|
pass
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
if result.returncode == 0:
|
|
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s")
|
|
if result.stdout:
|
|
lines = result.stdout.strip().split('\n')
|
|
# Write FULL output to log file
|
|
if log_file and len(lines) > 3:
|
|
log_file.write(" [Full output below, truncated in terminal]\n")
|
|
for line in lines:
|
|
log_file.write(f" {line}\n")
|
|
log_file.flush()
|
|
|
|
# Special handling for framework detection to show actual results
|
|
if "Detect frameworks" in phase_name and len(lines) > 3:
|
|
# Check if this looks like table output (has header separator)
|
|
has_table = any("---" in line for line in lines[:5])
|
|
if has_table:
|
|
# Show more lines for table output to include actual data
|
|
display_lines = []
|
|
for i, line in enumerate(lines):
|
|
if i < 6 or (i == 0): # Show first line (path info) + table header + first few data rows
|
|
display_lines.append(line)
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 6:
|
|
truncate_msg = f" ... ({len(lines) - 6} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
# Regular truncation for non-table output
|
|
for line in lines[:3]:
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 3:
|
|
truncate_msg = f" ... ({len(lines) - 3} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
# Regular truncation for other commands
|
|
for line in lines[:3]:
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 3:
|
|
truncate_msg = f" ... ({len(lines) - 3} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
failed_phases += 1
|
|
log_output(f"[FAILED] {phase_name} failed (exit code {result.returncode})", is_error=True)
|
|
if result.stderr:
|
|
# Write FULL error to log file
|
|
if log_file:
|
|
log_file.write(f" [Full error output]:\n")
|
|
log_file.write(f" {result.stderr}\n")
|
|
log_file.flush()
|
|
# Show truncated in terminal
|
|
error_msg = f" Error: {result.stderr[:200]}"
|
|
if len(result.stderr) > 200:
|
|
error_msg += "... [see pipeline.log for full error]"
|
|
if log_callback and not quiet:
|
|
log_callback(error_msg, True)
|
|
log_lines.append(error_msg)
|
|
# Foundation failure stops pipeline
|
|
log_output("[CRITICAL] Foundation stage failed - stopping pipeline", is_error=True)
|
|
break
|
|
|
|
except Exception as e:
|
|
failed_phases += 1
|
|
log_output(f"[FAILED] {phase_name} failed: {e}", is_error=True)
|
|
break
|
|
|
|
# Only proceed to parallel stage if foundation succeeded
|
|
if failed_phases == 0 and (track_a_commands or track_b_commands or track_c_commands):
|
|
# STAGE 2: Concurrent Analysis (Parallel Execution)
|
|
log_output("\n" + "="*60)
|
|
log_output("[STAGE 2] CONCURRENT ANALYSIS - Parallel Execution")
|
|
log_output("="*60)
|
|
if offline:
|
|
log_output("[OFFLINE MODE] Skipping network operations")
|
|
log_output("Launching 2 parallel tracks:")
|
|
else:
|
|
log_output("Launching 3 parallel tracks:")
|
|
log_output(" Track A: Network I/O (deps, docs)")
|
|
log_output(" Track B: Code Analysis (workset, lint, patterns)")
|
|
log_output(" Track C: Graph & Taint Analysis")
|
|
|
|
# Execute parallel tracks using ThreadPoolExecutor (Windows-safe)
|
|
parallel_results = []
|
|
with ThreadPoolExecutor(max_workers=3) as executor:
|
|
futures = []
|
|
|
|
# Submit Track A if it has commands
|
|
if track_a_commands:
|
|
future_a = executor.submit(run_command_chain, track_a_commands, root, "Track A (Network I/O)")
|
|
futures.append(future_a)
|
|
current_phase += len(track_a_commands)
|
|
|
|
# Submit Track B if it has commands
|
|
if track_b_commands:
|
|
future_b = executor.submit(run_command_chain, track_b_commands, root, "Track B (Code Analysis)")
|
|
futures.append(future_b)
|
|
current_phase += len(track_b_commands)
|
|
|
|
# Submit Track C if it has commands
|
|
if track_c_commands:
|
|
future_c = executor.submit(run_command_chain, track_c_commands, root, "Track C (Graph & Taint)")
|
|
futures.append(future_c)
|
|
current_phase += len(track_c_commands)
|
|
|
|
# STAGE 3: Synchronization Point - Wait for all parallel tracks
|
|
log_output("\n[SYNC] Waiting for parallel tracks to complete...")
|
|
|
|
# Monitor progress while waiting
|
|
status_dir = Path(root) / ".pf" / "status"
|
|
last_status_check = 0
|
|
status_check_interval = 2 # Check every 2 seconds
|
|
|
|
# Process futures as they complete, but also check status periodically
|
|
pending_futures = list(futures)
|
|
while pending_futures:
|
|
# Check for completed futures (with short timeout)
|
|
done, pending_futures = wait(pending_futures, timeout=status_check_interval)
|
|
|
|
# Read and display status if enough time has passed
|
|
current_time = time.time()
|
|
if current_time - last_status_check >= status_check_interval:
|
|
last_status_check = current_time
|
|
|
|
# Read all status files
|
|
if status_dir.exists():
|
|
status_summary = []
|
|
status_files = list(status_dir.glob("*.status"))
|
|
# Debug: show if we found any status files
|
|
if not status_files and not quiet:
|
|
log_output(f"[DEBUG] No status files found in {status_dir}")
|
|
for status_file in status_files:
|
|
try:
|
|
with open(status_file, 'r', encoding='utf-8') as f:
|
|
status_data = json.loads(f.read().strip())
|
|
track = status_data.get("track", "Unknown")
|
|
completed = status_data.get("completed", 0)
|
|
total = status_data.get("total", 0)
|
|
current = status_data.get("current", "")
|
|
|
|
# Format progress
|
|
if total > 0:
|
|
progress = f"[{completed}/{total}]"
|
|
else:
|
|
progress = ""
|
|
|
|
status_summary.append(f" {track}: {progress} {current[:50]}")
|
|
except Exception:
|
|
pass # Ignore status read errors
|
|
|
|
if status_summary:
|
|
log_output("[PROGRESS] Track Status:")
|
|
for status_line in status_summary:
|
|
log_output(status_line)
|
|
|
|
# Process completed futures
|
|
for future in done:
|
|
try:
|
|
result = future.result()
|
|
parallel_results.append(result)
|
|
if result["success"]:
|
|
log_output(f"[OK] {result['name']} completed in {result['elapsed']:.1f}s")
|
|
else:
|
|
log_output(f"[FAILED] {result['name']} failed", is_error=True)
|
|
failed_phases += 1
|
|
except KeyboardInterrupt:
|
|
log_output(f"[INTERRUPTED] Pipeline stopped by user", is_error=True)
|
|
# Cancel remaining futures
|
|
for f in pending_futures:
|
|
f.cancel()
|
|
raise # Re-raise to exit
|
|
except Exception as e:
|
|
log_output(f"[ERROR] Parallel track failed with exception: {e}", is_error=True)
|
|
failed_phases += 1
|
|
|
|
# Print outputs from parallel tracks sequentially for clean logging
|
|
log_output("\n" + "="*60)
|
|
log_output("[STAGE 2 RESULTS] Parallel Track Outputs")
|
|
log_output("="*60)
|
|
|
|
for result in parallel_results:
|
|
log_output(result["output"])
|
|
if result["errors"]:
|
|
log_output("[ERRORS]:")
|
|
log_output(result["errors"])
|
|
|
|
# STAGE 4: Final Aggregation (Sequential)
|
|
if failed_phases == 0 and final_commands:
|
|
log_output("\n" + "="*60)
|
|
log_output("[STAGE 4] FINAL AGGREGATION - Sequential Execution")
|
|
log_output("="*60)
|
|
|
|
for phase_name, cmd in final_commands:
|
|
current_phase += 1
|
|
log_output(f"\n[Phase {current_phase}/{total_phases}] {phase_name}")
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Execute final aggregation command
|
|
if TempManager:
|
|
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
|
|
root, f"final_{phase_name.replace(' ', '_')}"
|
|
)
|
|
else:
|
|
# Fallback to regular tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
|
|
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
|
|
stdout_file = out_tmp.name
|
|
stderr_file = err_tmp.name
|
|
|
|
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
|
|
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
|
|
|
|
# Determine appropriate timeout for this command
|
|
cmd_timeout = get_command_timeout(cmd)
|
|
|
|
result = run_subprocess_with_interrupt(
|
|
cmd,
|
|
stdout_fp=out_fp,
|
|
stderr_fp=err_fp,
|
|
cwd=root,
|
|
shell=IS_WINDOWS, # Windows compatibility fix
|
|
timeout=cmd_timeout # Adaptive timeout based on command type
|
|
)
|
|
|
|
# Read outputs
|
|
with open(stdout_file, 'r', encoding='utf-8') as f:
|
|
result.stdout = f.read()
|
|
with open(stderr_file, 'r', encoding='utf-8') as f:
|
|
result.stderr = f.read()
|
|
|
|
# Clean up temp files
|
|
try:
|
|
os.unlink(stdout_file)
|
|
os.unlink(stderr_file)
|
|
except (OSError, PermissionError):
|
|
pass
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
# Handle special exit codes for findings commands
|
|
is_findings_command = "taint-analyze" in cmd or ("deps" in cmd and "--vuln-scan" in cmd)
|
|
if is_findings_command:
|
|
success = result.returncode in [0, 1, 2]
|
|
else:
|
|
success = result.returncode == 0
|
|
|
|
if success:
|
|
if result.returncode == 2 and is_findings_command:
|
|
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s - CRITICAL findings")
|
|
elif result.returncode == 1 and is_findings_command:
|
|
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s - HIGH findings")
|
|
else:
|
|
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s")
|
|
|
|
if result.stdout:
|
|
lines = result.stdout.strip().split('\n')
|
|
# Write FULL output to log file
|
|
if log_file and len(lines) > 3:
|
|
log_file.write(" [Full output below, truncated in terminal]\n")
|
|
for line in lines:
|
|
log_file.write(f" {line}\n")
|
|
log_file.flush()
|
|
|
|
# Special handling for framework detection to show actual results
|
|
if "Detect frameworks" in phase_name and len(lines) > 3:
|
|
# Check if this looks like table output (has header separator)
|
|
has_table = any("---" in line for line in lines[:5])
|
|
if has_table:
|
|
# Show more lines for table output to include actual data
|
|
display_lines = []
|
|
for i, line in enumerate(lines):
|
|
if i < 6 or (i == 0): # Show first line (path info) + table header + first few data rows
|
|
display_lines.append(line)
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 6:
|
|
truncate_msg = f" ... ({len(lines) - 6} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
# Regular truncation for non-table output
|
|
for line in lines[:3]:
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 3:
|
|
truncate_msg = f" ... ({len(lines) - 3} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
# Regular truncation for other commands
|
|
for line in lines[:3]:
|
|
if log_callback and not quiet:
|
|
log_callback(f" {line}", False)
|
|
log_lines.append(f" {line}")
|
|
if len(lines) > 3:
|
|
truncate_msg = f" ... ({len(lines) - 3} more lines)"
|
|
if log_callback and not quiet:
|
|
log_callback(truncate_msg, False)
|
|
log_lines.append(truncate_msg)
|
|
else:
|
|
failed_phases += 1
|
|
log_output(f"[FAILED] {phase_name} failed (exit code {result.returncode})", is_error=True)
|
|
if result.stderr:
|
|
# Write FULL error to log file
|
|
if log_file:
|
|
log_file.write(f" [Full error output]:\n")
|
|
log_file.write(f" {result.stderr}\n")
|
|
log_file.flush()
|
|
# Show truncated in terminal
|
|
error_msg = f" Error: {result.stderr[:200]}"
|
|
if len(result.stderr) > 200:
|
|
error_msg += "... [see pipeline.log for full error]"
|
|
if log_callback and not quiet:
|
|
log_callback(error_msg, True)
|
|
log_lines.append(error_msg)
|
|
|
|
# CRITICAL: Run extraction AFTER FCE and BEFORE report
|
|
if "factual correlation" in phase_name.lower():
|
|
try:
|
|
from theauditor.extraction import extract_all_to_readthis
|
|
|
|
log_output("\n" + "="*60)
|
|
log_output("[EXTRACTION] Creating AI-consumable chunks from raw data")
|
|
log_output("="*60)
|
|
|
|
extraction_start = time.time()
|
|
extraction_success = extract_all_to_readthis(root)
|
|
extraction_elapsed = time.time() - extraction_start
|
|
|
|
if extraction_success:
|
|
log_output(f"[OK] Chunk extraction completed in {extraction_elapsed:.1f}s")
|
|
log_output("[INFO] AI-readable chunks available in .pf/readthis/")
|
|
else:
|
|
log_output(f"[WARNING] Chunk extraction completed with errors in {extraction_elapsed:.1f}s", is_error=True)
|
|
log_output("[WARNING] Some chunks may be incomplete", is_error=True)
|
|
|
|
except ImportError as e:
|
|
log_output(f"[ERROR] Could not import extraction module: {e}", is_error=True)
|
|
log_output("[ERROR] Chunks will not be generated", is_error=True)
|
|
except Exception as e:
|
|
log_output(f"[ERROR] Ticket extraction failed: {e}", is_error=True)
|
|
log_output("[ERROR] Raw data preserved in .pf/raw/ but no chunks created", is_error=True)
|
|
|
|
except Exception as e:
|
|
failed_phases += 1
|
|
log_output(f"[FAILED] {phase_name} failed: {e}", is_error=True)
|
|
|
|
# After all commands complete, collect all created files
|
|
pipeline_elapsed = time.time() - pipeline_start
|
|
all_created_files = collect_created_files()
|
|
|
|
# Create allfiles.md in .pf root (not in readthis which gets deleted/recreated)
|
|
pf_dir = Path(root) / ".pf"
|
|
allfiles_path = pf_dir / "allfiles.md"
|
|
with open(allfiles_path, 'w', encoding='utf-8') as f:
|
|
f.write("# All Files Created by `aud full` Command\n\n")
|
|
f.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
f.write(f"Total files: {len(all_created_files)}\n\n")
|
|
|
|
# Group files by directory
|
|
files_by_dir = {}
|
|
for file_path in all_created_files:
|
|
dir_name = str(Path(file_path).parent)
|
|
if dir_name not in files_by_dir:
|
|
files_by_dir[dir_name] = []
|
|
files_by_dir[dir_name].append(file_path)
|
|
|
|
# Write files grouped by directory
|
|
for dir_name in sorted(files_by_dir.keys()):
|
|
f.write(f"\n## {dir_name}/\n\n")
|
|
for file_path in sorted(files_by_dir[dir_name]):
|
|
file_size = 0
|
|
if Path(file_path).exists():
|
|
file_size = Path(file_path).stat().st_size
|
|
f.write(f"- `{Path(file_path).name}` ({file_size:,} bytes)\n")
|
|
|
|
f.write(f"\n---\n")
|
|
f.write(f"Total execution time: {pipeline_elapsed:.1f} seconds ({pipeline_elapsed/60:.1f} minutes)\n")
|
|
f.write(f"Commands executed: {total_phases}\n")
|
|
f.write(f"Failed commands: {failed_phases}\n")
|
|
|
|
# Display final summary
|
|
log_output("\n" + "="*60)
|
|
if failed_phases == 0 and phases_with_warnings == 0:
|
|
log_output(f"[OK] AUDIT COMPLETE - All {total_phases} phases successful")
|
|
elif phases_with_warnings > 0 and failed_phases == 0:
|
|
log_output(f"[WARNING] AUDIT COMPLETE - {phases_with_warnings} phases completed with errors")
|
|
else:
|
|
log_output(f"[WARN] AUDIT COMPLETE - {failed_phases} phases failed, {phases_with_warnings} phases with errors")
|
|
log_output(f"[TIME] Total time: {pipeline_elapsed:.1f}s ({pipeline_elapsed/60:.1f} minutes)")
|
|
|
|
# Display all created files summary
|
|
log_output("\n" + "="*60)
|
|
log_output("[FILES] ALL CREATED FILES")
|
|
log_output("="*60)
|
|
|
|
# Count files by category
|
|
pf_files = [f for f in all_created_files if f.startswith(".pf/")]
|
|
readthis_files = [f for f in all_created_files if f.startswith(".pf/readthis/")]
|
|
docs_files = [f for f in all_created_files if f.startswith(".pf/docs/")]
|
|
root_files = [f for f in all_created_files if "/" not in f]
|
|
|
|
log_output(f"\n[STATS] Summary:")
|
|
log_output(f" Total files created: {len(all_created_files)}")
|
|
log_output(f" .pf/ files: {len(pf_files)}")
|
|
log_output(f" .pf/readthis/ files: {len(readthis_files)}")
|
|
if docs_files:
|
|
log_output(f" .pf/docs/ files: {len(docs_files)}")
|
|
log_output(f" Root files: {len(root_files)}")
|
|
|
|
log_output(f"\n[SAVED] Complete file list saved to: .pf/allfiles.md")
|
|
log_output(f"\n[TIP] Key artifacts:")
|
|
log_output(f" * .pf/readthis/ - All AI-consumable chunks")
|
|
log_output(f" * .pf/allfiles.md - Complete file list")
|
|
log_output(f" * .pf/pipeline.log - Full execution log")
|
|
log_output(f" * .pf/findings.json - Pattern detection results")
|
|
log_output(f" * .pf/risk_scores.json - Risk analysis")
|
|
|
|
log_output("\n" + "="*60)
|
|
log_output("[COMPLETE] AUDIT SUITE EXECUTION COMPLETE")
|
|
log_output("="*60)
|
|
|
|
# Close the log file (already written throughout execution)
|
|
if log_file:
|
|
try:
|
|
log_file.close()
|
|
log_file = None
|
|
except Exception as e:
|
|
print(f"[CRITICAL] Failed to close log file: {e}", file=sys.stderr)
|
|
|
|
# Move files from temp to readthis if needed
|
|
temp_dir = Path(root) / ".pf" / "temp"
|
|
readthis_final = Path(root) / ".pf" / "readthis"
|
|
|
|
# Ensure readthis exists
|
|
readthis_final.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Move pipeline.log if it's in temp
|
|
temp_log = temp_dir / "pipeline.log"
|
|
final_log = readthis_final / "pipeline.log"
|
|
|
|
if temp_log.exists() and not final_log.exists():
|
|
try:
|
|
shutil.move(str(temp_log), str(final_log))
|
|
log_file_path = final_log
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not move log to final location: {e}", file=sys.stderr)
|
|
|
|
# Move allfiles.md if it's in temp
|
|
temp_allfiles = temp_dir / "allfiles.md"
|
|
final_allfiles = readthis_final / "allfiles.md"
|
|
|
|
if temp_allfiles.exists() and not final_allfiles.exists():
|
|
try:
|
|
shutil.move(str(temp_allfiles), str(final_allfiles))
|
|
allfiles_path = final_allfiles
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not move allfiles.md to final location: {e}", file=sys.stderr)
|
|
|
|
print(f"\n[SAVED] Full pipeline log saved to: {log_file_path}")
|
|
|
|
# Add allfiles.md and pipeline.log to the list of created files for completeness
|
|
all_created_files.append(str(allfiles_path))
|
|
all_created_files.append(str(log_file_path))
|
|
|
|
# Clean up temporary files created during pipeline execution
|
|
if TempManager:
|
|
try:
|
|
TempManager.cleanup_temp_dir(root)
|
|
print("[INFO] Temporary files cleaned up", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not clean temp files: {e}", file=sys.stderr)
|
|
|
|
# Clean up status files
|
|
status_dir = Path(root) / ".pf" / "status"
|
|
if status_dir.exists():
|
|
try:
|
|
for status_file in status_dir.glob("*.status"):
|
|
status_file.unlink()
|
|
# Remove directory if empty
|
|
if not list(status_dir.iterdir()):
|
|
status_dir.rmdir()
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not clean status files: {e}", file=sys.stderr)
|
|
|
|
# Collect findings summary from generated reports
|
|
critical_findings = 0
|
|
high_findings = 0
|
|
medium_findings = 0
|
|
low_findings = 0
|
|
total_vulnerabilities = 0
|
|
|
|
# Try to read taint analysis results
|
|
taint_path = Path(root) / ".pf" / "raw" / "taint_analysis.json"
|
|
if taint_path.exists():
|
|
try:
|
|
import json
|
|
with open(taint_path, encoding='utf-8') as f:
|
|
taint_data = json.load(f)
|
|
if taint_data.get("success"):
|
|
summary = taint_data.get("summary", {})
|
|
critical_findings += summary.get("critical_count", 0)
|
|
high_findings += summary.get("high_count", 0)
|
|
medium_findings += summary.get("medium_count", 0)
|
|
low_findings += summary.get("low_count", 0)
|
|
total_vulnerabilities = taint_data.get("total_vulnerabilities", 0)
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not read taint analysis results from {taint_path}: {e}", file=sys.stderr)
|
|
# Non-critical - continue without taint stats
|
|
|
|
# Try to read vulnerability scan results
|
|
vuln_path = Path(root) / ".pf" / "raw" / "vulnerabilities.json"
|
|
if vuln_path.exists():
|
|
try:
|
|
import json
|
|
with open(vuln_path, encoding='utf-8') as f:
|
|
vuln_data = json.load(f)
|
|
if vuln_data.get("vulnerabilities"):
|
|
for vuln in vuln_data["vulnerabilities"]:
|
|
severity = vuln.get("severity", "").lower()
|
|
if severity == "critical":
|
|
critical_findings += 1
|
|
elif severity == "high":
|
|
high_findings += 1
|
|
elif severity == "medium":
|
|
medium_findings += 1
|
|
elif severity == "low":
|
|
low_findings += 1
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not read vulnerability scan results from {vuln_path}: {e}", file=sys.stderr)
|
|
# Non-critical - continue without vulnerability stats
|
|
|
|
# Try to read pattern detection results
|
|
patterns_path = Path(root) / ".pf" / "raw" / "patterns.json"
|
|
if not patterns_path.exists():
|
|
# Fallback to findings.json (alternate name)
|
|
patterns_path = Path(root) / ".pf" / "raw" / "findings.json"
|
|
|
|
if patterns_path.exists():
|
|
try:
|
|
import json
|
|
with open(patterns_path, encoding='utf-8') as f:
|
|
patterns_data = json.load(f)
|
|
# Aggregate findings by severity
|
|
for finding in patterns_data.get("findings", []):
|
|
severity = finding.get("severity", "").lower()
|
|
if severity == "critical":
|
|
critical_findings += 1
|
|
elif severity == "high":
|
|
high_findings += 1
|
|
elif severity == "medium":
|
|
medium_findings += 1
|
|
elif severity == "low":
|
|
low_findings += 1
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not read pattern results from {patterns_path}: {e}", file=sys.stderr)
|
|
# Non-critical - continue without pattern stats
|
|
|
|
return {
|
|
"success": failed_phases == 0 and phases_with_warnings == 0,
|
|
"failed_phases": failed_phases,
|
|
"phases_with_warnings": phases_with_warnings,
|
|
"total_phases": total_phases,
|
|
"elapsed_time": pipeline_elapsed,
|
|
"created_files": all_created_files,
|
|
"log_lines": log_lines,
|
|
"findings": {
|
|
"critical": critical_findings,
|
|
"high": high_findings,
|
|
"medium": medium_findings,
|
|
"low": low_findings,
|
|
"total_vulnerabilities": total_vulnerabilities,
|
|
}
|
|
} |