Files
Auditor/theauditor/pipelines.py
TheAuditorTool c7a59e420b Fix: Critical Windows ProcessPoolExecutor hang and documentation drift
Fixed critical Windows compatibility issues and updated outdated documentation.

  CRITICAL WINDOWS HANG FIXES:
  1. ProcessPoolExecutor → ThreadPoolExecutor
     - Fixes PowerShell/terminal hang where Ctrl+C wouldn't work
     - Prevents .pf directory lock requiring Task Manager kill
     - Root cause: Nested ProcessPool + ThreadPool on Windows creates kernel deadlock

  2. Ctrl+C Interruption Support
     - Replaced subprocess.run with Popen+poll pattern (industry standard)
     - Poll subprocess every 100ms for interruption checking
     - Added global stop_event and signal handlers for graceful shutdown
     - Root cause: subprocess.run blocks threads with no signal propagation

  DOCUMENTATION DRIFT FIX:
  - Removed hardcoded "14 phases" references (actual is 19+ commands)
  - Updated to "multiple analysis phases" throughout all docs
  - Fixed CLI help text to be version-agnostic
  - Added missing "Summary generation" step in HOWTOUSE.md

  Changes:
  - pipelines.py: ProcessPoolExecutor → ThreadPoolExecutor, added Popen+poll pattern
  - Added signal handling and run_subprocess_with_interrupt() function
  - commands/full.py: Updated docstring to remove specific phase count
  - README.md: Changed "14 distinct phases" to "multiple analysis phases"
  - HOWTOUSE.md: Updated phase references, added missing summary step
  - CLAUDE.md & ARCHITECTURE.md: Removed hardcoded phase counts

  Impact: Critical UX fixes - Windows compatibility restored, pipeline interruptible
  Testing: Ctrl+C works, no PowerShell hangs, .pf directory deletable
2025-09-09 14:26:18 +07:00

1167 lines
53 KiB
Python

"""Pipeline execution module for TheAuditor."""
import json
import os
import platform
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, List, Tuple
# Import our custom temp manager to avoid WSL2/Windows issues
try:
from theauditor.utils.temp_manager import TempManager
except ImportError:
# Fallback if not available yet
TempManager = None
# Windows compatibility
IS_WINDOWS = platform.system() == "Windows"
# Global stop event for interrupt handling
stop_event = threading.Event()
def signal_handler(signum, frame):
"""Handle Ctrl+C by setting stop event."""
print("\n[INFO] Interrupt received, stopping pipeline gracefully...", file=sys.stderr)
stop_event.set()
# Register signal handler
signal.signal(signal.SIGINT, signal_handler)
if not IS_WINDOWS:
signal.signal(signal.SIGTERM, signal_handler)
def run_subprocess_with_interrupt(cmd, stdout_fp, stderr_fp, cwd, shell=False, timeout=300):
"""
Run subprocess with interrupt checking every 100ms.
Args:
cmd: Command to execute
stdout_fp: File handle for stdout
stderr_fp: File handle for stderr
cwd: Working directory
shell: Whether to use shell execution
timeout: Maximum time to wait (seconds)
Returns:
subprocess.CompletedProcess-like object with returncode, stdout, stderr
"""
process = subprocess.Popen(
cmd,
stdout=stdout_fp,
stderr=stderr_fp,
text=True,
cwd=cwd,
shell=shell
)
# Poll process every 100ms to check for completion or interruption
start_time = time.time()
while process.poll() is None:
if stop_event.is_set():
# User interrupted - terminate subprocess
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
raise KeyboardInterrupt("Pipeline interrupted by user")
# Check timeout
if time.time() - start_time > timeout:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
raise subprocess.TimeoutExpired(cmd, timeout)
# Sleep briefly to avoid busy-waiting
time.sleep(0.1)
# Create result object similar to subprocess.run
class Result:
def __init__(self, returncode):
self.returncode = returncode
self.stdout = None
self.stderr = None
result = Result(process.returncode)
return result
def run_command_chain(commands: List[Tuple[str, List[str]]], root: str, chain_name: str) -> dict:
"""
Execute a chain of commands sequentially and capture their output.
Used for parallel execution of independent command tracks.
Args:
commands: List of (description, command_array) tuples
root: Working directory
chain_name: Name of this chain for logging
Returns:
Dict with chain results including success, output, and timing
"""
chain_start = time.time()
chain_output = []
chain_errors = []
failed = False
# Write progress to a status file for monitoring
status_dir = Path(root) / ".pf" / "status"
try:
status_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[WARNING] Could not create status dir {status_dir}: {e}", file=sys.stderr)
status_file = status_dir / f"{chain_name.replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')}.status"
def write_status(message: str, completed: int = 0, total: int = 0):
"""Write current status to file for external monitoring."""
try:
with open(status_file, 'w', encoding='utf-8') as f:
status_data = {
"track": chain_name,
"current": message,
"completed": completed,
"total": total,
"timestamp": time.time(),
"elapsed": time.time() - chain_start
}
f.write(json.dumps(status_data) + "\n")
f.flush() # Force write to disk
# Debug output to stderr (visible in subprocess)
print(f"[STATUS] {chain_name}: {message} [{completed}/{total}]", file=sys.stderr)
except Exception as e:
print(f"[ERROR] Could not write status to {status_file}: {e}", file=sys.stderr)
# Write initial status
write_status("Starting", 0, len(commands))
completed_count = 0
for description, cmd in commands:
# Update status before starting command
write_status(f"Running: {description}", completed_count, len(commands))
start_time = time.time()
chain_output.append(f"\n{'='*60}")
chain_output.append(f"[{chain_name}] {description}")
chain_output.append('='*60)
try:
# Use temp files to capture output
if TempManager:
# Sanitize chain name and description for Windows paths
safe_chain = chain_name.replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')
safe_desc = description[:20].replace(' ', '_').replace('(', '').replace(')', '').replace('/', '_')
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
root, f"chain_{safe_chain}_{safe_desc}"
)
else:
# Fallback to regular tempfile
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
stdout_file = out_tmp.name
stderr_file = err_tmp.name
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
result = run_subprocess_with_interrupt(
cmd,
stdout_fp=out_fp,
stderr_fp=err_fp,
cwd=root,
shell=IS_WINDOWS, # Windows compatibility fix
timeout=300 # 5 minutes per command in parallel tracks
)
# Read outputs
with open(stdout_file, 'r', encoding='utf-8') as f:
stdout = f.read()
with open(stderr_file, 'r', encoding='utf-8') as f:
stderr = f.read()
# Clean up temp files
try:
os.unlink(stdout_file)
os.unlink(stderr_file)
except (OSError, PermissionError):
pass # Windows file locking
elapsed = time.time() - start_time
# Check for special exit codes (findings commands)
is_findings_command = "taint-analyze" in cmd or ("deps" in cmd and "--vuln-scan" in cmd)
if is_findings_command:
success = result.returncode in [0, 1, 2]
else:
success = result.returncode == 0
if success:
completed_count += 1
write_status(f"Completed: {description}", completed_count, len(commands))
chain_output.append(f"[OK] {description} completed in {elapsed:.1f}s")
if stdout:
lines = stdout.strip().split('\n')
# For parallel tracks, include all output (chains collect their own output)
if len(lines) <= 5:
for line in lines:
chain_output.append(f" {line}")
else:
# Show first 5 lines and indicate more in chain output
for line in lines[:5]:
chain_output.append(f" {line}")
chain_output.append(f" ... ({len(lines) - 5} more lines)")
# Add full output marker for later processing
chain_output.append(" [Full output available in pipeline.log]")
else:
failed = True
write_status(f"FAILED: {description}", completed_count, len(commands))
chain_output.append(f"[FAILED] {description} failed (exit code {result.returncode})")
if stderr:
chain_errors.append(f"Error in {description}: {stderr}")
break # Stop chain on failure
except KeyboardInterrupt:
# User interrupted - clean up and exit
failed = True
write_status(f"INTERRUPTED: {description}", completed_count, len(commands))
chain_output.append(f"[INTERRUPTED] Pipeline stopped by user")
raise # Re-raise to propagate up
except Exception as e:
failed = True
write_status(f"ERROR: {description}", completed_count, len(commands))
chain_output.append(f"[FAILED] {description} failed: {e}")
chain_errors.append(f"Exception in {description}: {str(e)}")
break
# Final status
if not failed:
write_status(f"Completed all {len(commands)} tasks", len(commands), len(commands))
chain_elapsed = time.time() - chain_start
return {
"success": not failed,
"output": "\n".join(chain_output),
"errors": "\n".join(chain_errors) if chain_errors else "",
"elapsed": chain_elapsed,
"name": chain_name
}
def run_full_pipeline(
root: str = ".",
quiet: bool = False,
exclude_self: bool = False,
offline: bool = False,
log_callback: Callable[[str, bool], None] = None
) -> dict[str, Any]:
"""
Run complete audit pipeline in exact order specified in teamsop.md.
Args:
root: Root directory to analyze
quiet: Whether to run in quiet mode (minimal output)
log_callback: Optional callback function for logging messages (message, is_error)
Returns:
Dict containing:
- success: Whether all phases succeeded
- failed_phases: Number of failed phases
- total_phases: Total number of phases
- elapsed_time: Total execution time in seconds
- created_files: List of all created files
- log_lines: List of all log lines
"""
# CRITICAL: Archive previous run BEFORE any new artifacts are created
# Import and call _archive function directly to avoid subprocess issues
try:
from theauditor.commands._archive import _archive
# Call the function directly with appropriate parameters
# Note: Click commands can be invoked as regular functions
_archive.callback(run_type="full", diff_spec=None)
print("[INFO] Previous run archived successfully", file=sys.stderr)
except ImportError as e:
print(f"[WARNING] Could not import archive command: {e}", file=sys.stderr)
except Exception as e:
print(f"[WARNING] Archive operation failed: {e}", file=sys.stderr)
# Track all created files throughout execution
all_created_files = []
# CRITICAL FIX: Open log file immediately for real-time writing
# This ensures we don't lose logs if the pipeline crashes
# Write directly to .pf root, not in readthis (which gets recreated by extraction)
pf_dir = Path(root) / ".pf"
pf_dir.mkdir(parents=True, exist_ok=True)
log_file_path = pf_dir / "pipeline.log"
log_lines = [] # Keep for return value
# Open log file in write mode with line buffering for immediate writes
log_file = None
try:
log_file = open(log_file_path, 'w', encoding='utf-8', buffering=1)
except Exception as e:
print(f"[CRITICAL] Failed to open log file {log_file_path}: {e}", file=sys.stderr)
# Fall back to memory-only logging if file can't be opened
log_file = None
# CRITICAL: Create the .pf/raw/ directory for ground truth preservation
# This directory will store immutable copies of all analysis artifacts
raw_dir = Path(root) / ".pf" / "raw"
try:
raw_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[CRITICAL] Failed to create raw directory {raw_dir}: {e}", file=sys.stderr)
# Continue execution - we'll handle missing directory during file moves
# Ensure readthis directory exists for fresh chunks
# Archive has already moved old content to history
readthis_dir = Path(root) / ".pf" / "readthis"
readthis_dir.mkdir(parents=True, exist_ok=True)
def log_output(message, is_error=False):
"""Log message to callback, file (real-time), and memory."""
if log_callback and not quiet:
log_callback(message, is_error)
# Always add to log list for return value
log_lines.append(message)
# CRITICAL: Write immediately to file and flush (if file is open)
if log_file:
try:
log_file.write(message + '\n')
log_file.flush() # Force write to disk immediately
except Exception as e:
print(f"[CRITICAL] Failed to write to log file: {e}", file=sys.stderr)
# Continue execution - logging failure shouldn't stop pipeline
# Log header
log_output(f"TheAuditor Full Pipeline Execution Log")
log_output(f"Started: {time.strftime('%Y-%m-%d %H:%M:%S')}")
log_output(f"Working Directory: {Path(root).resolve()}")
log_output("=" * 80)
# Dynamically discover available commands from CLI registration (Courier principle)
from theauditor.cli import cli
# Get all registered commands, excluding internal (_) and special commands
available_commands = sorted(cli.commands.keys())
# Define execution order and arguments for known commands
# This provides the order and arguments, but dynamically adapts to available commands
command_order = [
("index", []),
("detect-frameworks", []),
("deps", ["--check-latest"]),
("docs", ["fetch"]),
("docs", ["summarize"]),
("workset", ["--all"]),
("lint", ["--workset"]),
("detect-patterns", []),
("graph", ["build"]),
("graph", ["analyze"]),
("graph", ["viz", "--view", "full", "--include-analysis"]),
("graph", ["viz", "--view", "cycles", "--include-analysis"]),
("graph", ["viz", "--view", "hotspots", "--include-analysis"]),
("graph", ["viz", "--view", "layers", "--include-analysis"]),
("taint-analyze", []),
("fce", []),
("report", []),
("summary", []),
]
# Build command list from available commands in the defined order
commands = []
phase_num = 0
for cmd_name, extra_args in command_order:
# Check if command exists (dynamic discovery)
if cmd_name in available_commands or (cmd_name == "docs" and "docs" in available_commands) or (cmd_name == "graph" and "graph" in available_commands):
phase_num += 1
# Generate human-readable description from command name
if cmd_name == "index":
description = f"{phase_num}. Index repository"
# Add --exclude-self flag if requested
if exclude_self and cmd_name == "index":
extra_args = extra_args + ["--exclude-self"]
elif cmd_name == "detect-frameworks":
description = f"{phase_num}. Detect frameworks"
elif cmd_name == "deps" and "--check-latest" in extra_args:
description = f"{phase_num}. Check dependencies"
elif cmd_name == "docs" and "fetch" in extra_args:
description = f"{phase_num}. Fetch documentation"
elif cmd_name == "docs" and "summarize" in extra_args:
description = f"{phase_num}. Summarize documentation"
elif cmd_name == "workset":
description = f"{phase_num}. Create workset (all files)"
elif cmd_name == "lint":
description = f"{phase_num}. Run linting"
elif cmd_name == "detect-patterns":
description = f"{phase_num}. Detect patterns"
# Add --exclude-self flag if requested
if exclude_self and cmd_name == "detect-patterns":
extra_args = extra_args + ["--exclude-self"]
elif cmd_name == "graph" and "build" in extra_args:
description = f"{phase_num}. Build graph"
elif cmd_name == "graph" and "analyze" in extra_args:
description = f"{phase_num}. Analyze graph"
elif cmd_name == "graph" and "viz" in extra_args:
# Extract view type from arguments
if "--view" in extra_args:
view_idx = extra_args.index("--view")
if view_idx + 1 < len(extra_args):
view_type = extra_args[view_idx + 1]
description = f"{phase_num}. Visualize graph ({view_type})"
else:
description = f"{phase_num}. Visualize graph"
else:
description = f"{phase_num}. Visualize graph"
elif cmd_name == "taint-analyze":
description = f"{phase_num}. Taint analysis"
elif cmd_name == "fce":
description = f"{phase_num}. Factual correlation engine"
elif cmd_name == "report":
description = f"{phase_num}. Generate report"
elif cmd_name == "summary":
description = f"{phase_num}. Generate audit summary"
else:
# Generic description for any new commands
description = f"{phase_num}. Run {cmd_name.replace('-', ' ')}"
# Build command array - use python module directly
command_array = [sys.executable, "-m", "theauditor.cli", cmd_name] + extra_args
commands.append((description, command_array))
else:
# Command not available, log warning but continue (resilient)
log_output(f"[WARNING] Command '{cmd_name}' not available, skipping")
total_phases = len(commands)
current_phase = 0
failed_phases = 0
phases_with_warnings = 0 # Track phases that completed but had errors in output
pipeline_start = time.time()
def collect_created_files():
"""Collect all files created during execution."""
files = []
# Core files
if (Path(root) / "manifest.json").exists():
files.append("manifest.json")
if (Path(root) / "repo_index.db").exists():
files.append("repo_index.db")
# .pf directory files
pf_dir = Path(root) / ".pf"
if pf_dir.exists():
for item in pf_dir.rglob("*"):
if item.is_file():
files.append(item.relative_to(Path(root)).as_posix())
# docs directory files (in .pf/docs)
docs_dir = Path(root) / ".pf" / "docs"
if docs_dir.exists():
for item in docs_dir.rglob("*"):
if item.is_file():
files.append(item.relative_to(Path(root)).as_posix())
return sorted(set(files))
# PARALLEL PIPELINE IMPLEMENTATION
# Reorganize commands into stages for parallel execution
# Stage categorization
foundation_commands = [] # Must run first sequentially
track_a_commands = [] # Network I/O track (deps, docs)
track_b_commands = [] # Code analysis track (workset, lint, patterns)
track_c_commands = [] # Graph & taint analysis track
final_commands = [] # Must run last sequentially
# Categorize each command into appropriate stage/track
for phase_name, cmd in commands:
cmd_str = " ".join(cmd)
# Stage 1: Foundation (must complete first)
if "index" in cmd_str:
foundation_commands.append((phase_name, cmd))
elif "detect-frameworks" in cmd_str:
foundation_commands.append((phase_name, cmd))
# Stage 2: Parallel tracks
elif "deps" in cmd_str:
if not offline: # Skip deps if offline mode
track_a_commands.append((phase_name, cmd))
elif "docs" in cmd_str:
if not offline: # Skip docs if offline mode
track_a_commands.append((phase_name, cmd))
elif "workset" in cmd_str:
track_b_commands.append((phase_name, cmd))
elif "lint" in cmd_str:
track_b_commands.append((phase_name, cmd))
elif "detect-patterns" in cmd_str:
track_b_commands.append((phase_name, cmd))
elif "graph build" in cmd_str:
track_c_commands.append((phase_name, cmd))
elif "graph analyze" in cmd_str:
track_c_commands.append((phase_name, cmd))
elif "graph viz" in cmd_str:
track_c_commands.append((phase_name, cmd))
elif "taint" in cmd_str:
track_c_commands.append((phase_name, cmd))
# Stage 4: Final aggregation (must run last)
elif "fce" in cmd_str:
final_commands.append((phase_name, cmd))
elif "report" in cmd_str:
final_commands.append((phase_name, cmd))
elif "summary" in cmd_str:
final_commands.append((phase_name, cmd))
else:
# Default to final commands for safety
final_commands.append((phase_name, cmd))
# STAGE 1: Foundation (Sequential)
log_output("\n" + "="*60)
log_output("[STAGE 1] FOUNDATION - Sequential Execution")
log_output("="*60)
for phase_name, cmd in foundation_commands:
current_phase += 1
log_output(f"\n[Phase {current_phase}/{total_phases}] {phase_name}")
start_time = time.time()
try:
# Execute foundation command
if TempManager:
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
root, f"foundation_{phase_name.replace(' ', '_')}"
)
else:
# Fallback to regular tempfile
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
stdout_file = out_tmp.name
stderr_file = err_tmp.name
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
result = run_subprocess_with_interrupt(
cmd,
stdout_fp=out_fp,
stderr_fp=err_fp,
cwd=root,
shell=IS_WINDOWS, # Windows compatibility fix
timeout=300 # 5 minutes per command in parallel tracks
)
# Read outputs
with open(stdout_file, 'r', encoding='utf-8') as f:
result.stdout = f.read()
with open(stderr_file, 'r', encoding='utf-8') as f:
result.stderr = f.read()
# Clean up temp files
try:
os.unlink(stdout_file)
os.unlink(stderr_file)
except (OSError, PermissionError):
pass
elapsed = time.time() - start_time
if result.returncode == 0:
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s")
if result.stdout:
lines = result.stdout.strip().split('\n')
# Write FULL output to log file
if log_file and len(lines) > 3:
log_file.write(" [Full output below, truncated in terminal]\n")
for line in lines:
log_file.write(f" {line}\n")
log_file.flush()
# Special handling for framework detection to show actual results
if "Detect frameworks" in phase_name and len(lines) > 3:
# Check if this looks like table output (has header separator)
has_table = any("---" in line for line in lines[:5])
if has_table:
# Show more lines for table output to include actual data
display_lines = []
for i, line in enumerate(lines):
if i < 6 or (i == 0): # Show first line (path info) + table header + first few data rows
display_lines.append(line)
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 6:
truncate_msg = f" ... ({len(lines) - 6} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
# Regular truncation for non-table output
for line in lines[:3]:
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 3:
truncate_msg = f" ... ({len(lines) - 3} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
# Regular truncation for other commands
for line in lines[:3]:
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 3:
truncate_msg = f" ... ({len(lines) - 3} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
failed_phases += 1
log_output(f"[FAILED] {phase_name} failed (exit code {result.returncode})", is_error=True)
if result.stderr:
# Write FULL error to log file
if log_file:
log_file.write(f" [Full error output]:\n")
log_file.write(f" {result.stderr}\n")
log_file.flush()
# Show truncated in terminal
error_msg = f" Error: {result.stderr[:200]}"
if len(result.stderr) > 200:
error_msg += "... [see pipeline.log for full error]"
if log_callback and not quiet:
log_callback(error_msg, True)
log_lines.append(error_msg)
# Foundation failure stops pipeline
log_output("[CRITICAL] Foundation stage failed - stopping pipeline", is_error=True)
break
except Exception as e:
failed_phases += 1
log_output(f"[FAILED] {phase_name} failed: {e}", is_error=True)
break
# Only proceed to parallel stage if foundation succeeded
if failed_phases == 0 and (track_a_commands or track_b_commands or track_c_commands):
# STAGE 2: Concurrent Analysis (Parallel Execution)
log_output("\n" + "="*60)
log_output("[STAGE 2] CONCURRENT ANALYSIS - Parallel Execution")
log_output("="*60)
if offline:
log_output("[OFFLINE MODE] Skipping network operations")
log_output("Launching 2 parallel tracks:")
else:
log_output("Launching 3 parallel tracks:")
log_output(" Track A: Network I/O (deps, docs)")
log_output(" Track B: Code Analysis (workset, lint, patterns)")
log_output(" Track C: Graph & Taint Analysis")
# Execute parallel tracks using ThreadPoolExecutor (Windows-safe)
parallel_results = []
with ThreadPoolExecutor(max_workers=3) as executor:
futures = []
# Submit Track A if it has commands
if track_a_commands:
future_a = executor.submit(run_command_chain, track_a_commands, root, "Track A (Network I/O)")
futures.append(future_a)
current_phase += len(track_a_commands)
# Submit Track B if it has commands
if track_b_commands:
future_b = executor.submit(run_command_chain, track_b_commands, root, "Track B (Code Analysis)")
futures.append(future_b)
current_phase += len(track_b_commands)
# Submit Track C if it has commands
if track_c_commands:
future_c = executor.submit(run_command_chain, track_c_commands, root, "Track C (Graph & Taint)")
futures.append(future_c)
current_phase += len(track_c_commands)
# STAGE 3: Synchronization Point - Wait for all parallel tracks
log_output("\n[SYNC] Waiting for parallel tracks to complete...")
# Monitor progress while waiting
status_dir = Path(root) / ".pf" / "status"
last_status_check = 0
status_check_interval = 2 # Check every 2 seconds
# Process futures as they complete, but also check status periodically
pending_futures = list(futures)
while pending_futures:
# Check for completed futures (with short timeout)
done, pending_futures = wait(pending_futures, timeout=status_check_interval)
# Read and display status if enough time has passed
current_time = time.time()
if current_time - last_status_check >= status_check_interval:
last_status_check = current_time
# Read all status files
if status_dir.exists():
status_summary = []
status_files = list(status_dir.glob("*.status"))
# Debug: show if we found any status files
if not status_files and not quiet:
log_output(f"[DEBUG] No status files found in {status_dir}")
for status_file in status_files:
try:
with open(status_file, 'r', encoding='utf-8') as f:
status_data = json.loads(f.read().strip())
track = status_data.get("track", "Unknown")
completed = status_data.get("completed", 0)
total = status_data.get("total", 0)
current = status_data.get("current", "")
# Format progress
if total > 0:
progress = f"[{completed}/{total}]"
else:
progress = ""
status_summary.append(f" {track}: {progress} {current[:50]}")
except Exception:
pass # Ignore status read errors
if status_summary:
log_output("[PROGRESS] Track Status:")
for status_line in status_summary:
log_output(status_line)
# Process completed futures
for future in done:
try:
result = future.result()
parallel_results.append(result)
if result["success"]:
log_output(f"[OK] {result['name']} completed in {result['elapsed']:.1f}s")
else:
log_output(f"[FAILED] {result['name']} failed", is_error=True)
failed_phases += 1
except KeyboardInterrupt:
log_output(f"[INTERRUPTED] Pipeline stopped by user", is_error=True)
# Cancel remaining futures
for f in pending_futures:
f.cancel()
raise # Re-raise to exit
except Exception as e:
log_output(f"[ERROR] Parallel track failed with exception: {e}", is_error=True)
failed_phases += 1
# Print outputs from parallel tracks sequentially for clean logging
log_output("\n" + "="*60)
log_output("[STAGE 2 RESULTS] Parallel Track Outputs")
log_output("="*60)
for result in parallel_results:
log_output(result["output"])
if result["errors"]:
log_output("[ERRORS]:")
log_output(result["errors"])
# STAGE 4: Final Aggregation (Sequential)
if failed_phases == 0 and final_commands:
log_output("\n" + "="*60)
log_output("[STAGE 4] FINAL AGGREGATION - Sequential Execution")
log_output("="*60)
for phase_name, cmd in final_commands:
current_phase += 1
log_output(f"\n[Phase {current_phase}/{total_phases}] {phase_name}")
start_time = time.time()
try:
# Execute final aggregation command
if TempManager:
stdout_file, stderr_file = TempManager.create_temp_files_for_subprocess(
root, f"final_{phase_name.replace(' ', '_')}"
)
else:
# Fallback to regular tempfile
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stdout.txt') as out_tmp, \
tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='_stderr.txt') as err_tmp:
stdout_file = out_tmp.name
stderr_file = err_tmp.name
with open(stdout_file, 'w+', encoding='utf-8') as out_fp, \
open(stderr_file, 'w+', encoding='utf-8') as err_fp:
result = run_subprocess_with_interrupt(
cmd,
stdout_fp=out_fp,
stderr_fp=err_fp,
cwd=root,
shell=IS_WINDOWS, # Windows compatibility fix
timeout=600 # 10 minutes for final aggregation
)
# Read outputs
with open(stdout_file, 'r', encoding='utf-8') as f:
result.stdout = f.read()
with open(stderr_file, 'r', encoding='utf-8') as f:
result.stderr = f.read()
# Clean up temp files
try:
os.unlink(stdout_file)
os.unlink(stderr_file)
except (OSError, PermissionError):
pass
elapsed = time.time() - start_time
# Handle special exit codes for findings commands
is_findings_command = "taint-analyze" in cmd or ("deps" in cmd and "--vuln-scan" in cmd)
if is_findings_command:
success = result.returncode in [0, 1, 2]
else:
success = result.returncode == 0
if success:
if result.returncode == 2 and is_findings_command:
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s - CRITICAL findings")
elif result.returncode == 1 and is_findings_command:
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s - HIGH findings")
else:
log_output(f"[OK] {phase_name} completed in {elapsed:.1f}s")
if result.stdout:
lines = result.stdout.strip().split('\n')
# Write FULL output to log file
if log_file and len(lines) > 3:
log_file.write(" [Full output below, truncated in terminal]\n")
for line in lines:
log_file.write(f" {line}\n")
log_file.flush()
# Special handling for framework detection to show actual results
if "Detect frameworks" in phase_name and len(lines) > 3:
# Check if this looks like table output (has header separator)
has_table = any("---" in line for line in lines[:5])
if has_table:
# Show more lines for table output to include actual data
display_lines = []
for i, line in enumerate(lines):
if i < 6 or (i == 0): # Show first line (path info) + table header + first few data rows
display_lines.append(line)
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 6:
truncate_msg = f" ... ({len(lines) - 6} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
# Regular truncation for non-table output
for line in lines[:3]:
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 3:
truncate_msg = f" ... ({len(lines) - 3} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
# Regular truncation for other commands
for line in lines[:3]:
if log_callback and not quiet:
log_callback(f" {line}", False)
log_lines.append(f" {line}")
if len(lines) > 3:
truncate_msg = f" ... ({len(lines) - 3} more lines)"
if log_callback and not quiet:
log_callback(truncate_msg, False)
log_lines.append(truncate_msg)
else:
failed_phases += 1
log_output(f"[FAILED] {phase_name} failed (exit code {result.returncode})", is_error=True)
if result.stderr:
# Write FULL error to log file
if log_file:
log_file.write(f" [Full error output]:\n")
log_file.write(f" {result.stderr}\n")
log_file.flush()
# Show truncated in terminal
error_msg = f" Error: {result.stderr[:200]}"
if len(result.stderr) > 200:
error_msg += "... [see pipeline.log for full error]"
if log_callback and not quiet:
log_callback(error_msg, True)
log_lines.append(error_msg)
# CRITICAL: Run extraction AFTER FCE and BEFORE report
if "factual correlation" in phase_name.lower():
try:
from theauditor.extraction import extract_all_to_readthis
log_output("\n" + "="*60)
log_output("[EXTRACTION] Creating AI-consumable chunks from raw data")
log_output("="*60)
extraction_start = time.time()
extraction_success = extract_all_to_readthis(root)
extraction_elapsed = time.time() - extraction_start
if extraction_success:
log_output(f"[OK] Chunk extraction completed in {extraction_elapsed:.1f}s")
log_output("[INFO] AI-readable chunks available in .pf/readthis/")
else:
log_output(f"[WARNING] Chunk extraction completed with errors in {extraction_elapsed:.1f}s", is_error=True)
log_output("[WARNING] Some chunks may be incomplete", is_error=True)
except ImportError as e:
log_output(f"[ERROR] Could not import extraction module: {e}", is_error=True)
log_output("[ERROR] Chunks will not be generated", is_error=True)
except Exception as e:
log_output(f"[ERROR] Ticket extraction failed: {e}", is_error=True)
log_output("[ERROR] Raw data preserved in .pf/raw/ but no chunks created", is_error=True)
except Exception as e:
failed_phases += 1
log_output(f"[FAILED] {phase_name} failed: {e}", is_error=True)
# After all commands complete, collect all created files
pipeline_elapsed = time.time() - pipeline_start
all_created_files = collect_created_files()
# Create allfiles.md in .pf root (not in readthis which gets deleted/recreated)
pf_dir = Path(root) / ".pf"
allfiles_path = pf_dir / "allfiles.md"
with open(allfiles_path, 'w', encoding='utf-8') as f:
f.write("# All Files Created by `aud full` Command\n\n")
f.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Total files: {len(all_created_files)}\n\n")
# Group files by directory
files_by_dir = {}
for file_path in all_created_files:
dir_name = str(Path(file_path).parent)
if dir_name not in files_by_dir:
files_by_dir[dir_name] = []
files_by_dir[dir_name].append(file_path)
# Write files grouped by directory
for dir_name in sorted(files_by_dir.keys()):
f.write(f"\n## {dir_name}/\n\n")
for file_path in sorted(files_by_dir[dir_name]):
file_size = 0
if Path(file_path).exists():
file_size = Path(file_path).stat().st_size
f.write(f"- `{Path(file_path).name}` ({file_size:,} bytes)\n")
f.write(f"\n---\n")
f.write(f"Total execution time: {pipeline_elapsed:.1f} seconds ({pipeline_elapsed/60:.1f} minutes)\n")
f.write(f"Commands executed: {total_phases}\n")
f.write(f"Failed commands: {failed_phases}\n")
# Display final summary
log_output("\n" + "="*60)
if failed_phases == 0 and phases_with_warnings == 0:
log_output(f"[OK] AUDIT COMPLETE - All {total_phases} phases successful")
elif phases_with_warnings > 0 and failed_phases == 0:
log_output(f"[WARNING] AUDIT COMPLETE - {phases_with_warnings} phases completed with errors")
else:
log_output(f"[WARN] AUDIT COMPLETE - {failed_phases} phases failed, {phases_with_warnings} phases with errors")
log_output(f"[TIME] Total time: {pipeline_elapsed:.1f}s ({pipeline_elapsed/60:.1f} minutes)")
# Display all created files summary
log_output("\n" + "="*60)
log_output("[FILES] ALL CREATED FILES")
log_output("="*60)
# Count files by category
pf_files = [f for f in all_created_files if f.startswith(".pf/")]
readthis_files = [f for f in all_created_files if f.startswith(".pf/readthis/")]
docs_files = [f for f in all_created_files if f.startswith(".pf/docs/")]
root_files = [f for f in all_created_files if "/" not in f]
log_output(f"\n[STATS] Summary:")
log_output(f" Total files created: {len(all_created_files)}")
log_output(f" .pf/ files: {len(pf_files)}")
log_output(f" .pf/readthis/ files: {len(readthis_files)}")
if docs_files:
log_output(f" .pf/docs/ files: {len(docs_files)}")
log_output(f" Root files: {len(root_files)}")
log_output(f"\n[SAVED] Complete file list saved to: .pf/allfiles.md")
log_output(f"\n[TIP] Key artifacts:")
log_output(f" * .pf/readthis/ - All AI-consumable chunks")
log_output(f" * .pf/allfiles.md - Complete file list")
log_output(f" * .pf/pipeline.log - Full execution log")
log_output(f" * .pf/findings.json - Pattern detection results")
log_output(f" * .pf/risk_scores.json - Risk analysis")
log_output("\n" + "="*60)
log_output("[COMPLETE] AUDIT SUITE EXECUTION COMPLETE")
log_output("="*60)
# Close the log file (already written throughout execution)
if log_file:
try:
log_file.close()
log_file = None
except Exception as e:
print(f"[CRITICAL] Failed to close log file: {e}", file=sys.stderr)
# Move files from temp to readthis if needed
temp_dir = Path(root) / ".pf" / "temp"
readthis_final = Path(root) / ".pf" / "readthis"
# Ensure readthis exists
readthis_final.mkdir(parents=True, exist_ok=True)
# Move pipeline.log if it's in temp
temp_log = temp_dir / "pipeline.log"
final_log = readthis_final / "pipeline.log"
if temp_log.exists() and not final_log.exists():
try:
shutil.move(str(temp_log), str(final_log))
log_file_path = final_log
except Exception as e:
print(f"[WARNING] Could not move log to final location: {e}", file=sys.stderr)
# Move allfiles.md if it's in temp
temp_allfiles = temp_dir / "allfiles.md"
final_allfiles = readthis_final / "allfiles.md"
if temp_allfiles.exists() and not final_allfiles.exists():
try:
shutil.move(str(temp_allfiles), str(final_allfiles))
allfiles_path = final_allfiles
except Exception as e:
print(f"[WARNING] Could not move allfiles.md to final location: {e}", file=sys.stderr)
print(f"\n[SAVED] Full pipeline log saved to: {log_file_path}")
# Add allfiles.md and pipeline.log to the list of created files for completeness
all_created_files.append(str(allfiles_path))
all_created_files.append(str(log_file_path))
# Clean up temporary files created during pipeline execution
if TempManager:
try:
TempManager.cleanup_temp_dir(root)
print("[INFO] Temporary files cleaned up", file=sys.stderr)
except Exception as e:
print(f"[WARNING] Could not clean temp files: {e}", file=sys.stderr)
# Clean up status files
status_dir = Path(root) / ".pf" / "status"
if status_dir.exists():
try:
for status_file in status_dir.glob("*.status"):
status_file.unlink()
# Remove directory if empty
if not list(status_dir.iterdir()):
status_dir.rmdir()
except Exception as e:
print(f"[WARNING] Could not clean status files: {e}", file=sys.stderr)
# Collect findings summary from generated reports
critical_findings = 0
high_findings = 0
medium_findings = 0
low_findings = 0
total_vulnerabilities = 0
# Try to read taint analysis results
taint_path = Path(root) / ".pf" / "raw" / "taint_analysis.json"
if taint_path.exists():
try:
import json
with open(taint_path, encoding='utf-8') as f:
taint_data = json.load(f)
if taint_data.get("success"):
summary = taint_data.get("summary", {})
critical_findings += summary.get("critical_count", 0)
high_findings += summary.get("high_count", 0)
medium_findings += summary.get("medium_count", 0)
low_findings += summary.get("low_count", 0)
total_vulnerabilities = taint_data.get("total_vulnerabilities", 0)
except Exception as e:
print(f"[WARNING] Could not read taint analysis results from {taint_path}: {e}", file=sys.stderr)
# Non-critical - continue without taint stats
# Try to read vulnerability scan results
vuln_path = Path(root) / ".pf" / "raw" / "vulnerabilities.json"
if vuln_path.exists():
try:
import json
with open(vuln_path, encoding='utf-8') as f:
vuln_data = json.load(f)
if vuln_data.get("vulnerabilities"):
for vuln in vuln_data["vulnerabilities"]:
severity = vuln.get("severity", "").lower()
if severity == "critical":
critical_findings += 1
elif severity == "high":
high_findings += 1
elif severity == "medium":
medium_findings += 1
elif severity == "low":
low_findings += 1
except Exception as e:
print(f"[WARNING] Could not read vulnerability scan results from {vuln_path}: {e}", file=sys.stderr)
# Non-critical - continue without vulnerability stats
# Try to read pattern detection results
patterns_path = Path(root) / ".pf" / "raw" / "patterns.json"
if not patterns_path.exists():
# Fallback to findings.json (alternate name)
patterns_path = Path(root) / ".pf" / "raw" / "findings.json"
if patterns_path.exists():
try:
import json
with open(patterns_path, encoding='utf-8') as f:
patterns_data = json.load(f)
# Aggregate findings by severity
for finding in patterns_data.get("findings", []):
severity = finding.get("severity", "").lower()
if severity == "critical":
critical_findings += 1
elif severity == "high":
high_findings += 1
elif severity == "medium":
medium_findings += 1
elif severity == "low":
low_findings += 1
except Exception as e:
print(f"[WARNING] Could not read pattern results from {patterns_path}: {e}", file=sys.stderr)
# Non-critical - continue without pattern stats
return {
"success": failed_phases == 0 and phases_with_warnings == 0,
"failed_phases": failed_phases,
"phases_with_warnings": phases_with_warnings,
"total_phases": total_phases,
"elapsed_time": pipeline_elapsed,
"created_files": all_created_files,
"log_lines": log_lines,
"findings": {
"critical": critical_findings,
"high": high_findings,
"medium": medium_findings,
"low": low_findings,
"total_vulnerabilities": total_vulnerabilities,
}
}