Files
Auditor/theauditor/vulnerability_scanner.py

420 lines
15 KiB
Python

"""Native vulnerability scanners wrapper for npm audit and pip-audit.
This module runs native security tools and reports their raw output,
following TheAuditor's philosophy of using industry-standard tools
without interpretation.
"""
import json
import subprocess
import shutil
import platform
from pathlib import Path
from typing import Dict, List, Any
from datetime import datetime, UTC
# Windows compatibility
IS_WINDOWS = platform.system() == "Windows"
def scan_dependencies(
deps: List[Dict[str, Any]],
offline: bool = False,
cache_dir: str = "./.pf/vuln_cache" # Kept for compatibility, unused
) -> List[Dict[str, Any]]:
"""
Run native vulnerability scanners (npm audit, pip-audit) on dependencies.
Args:
deps: List of dependency dicts from deps.py
offline: If True, skip scanning (native tools need their own cache)
cache_dir: Unused, kept for backward compatibility
Returns:
List of vulnerability findings from native tools
"""
if offline:
# In offline mode, return empty - native tools manage their own offline capability
return []
vulnerabilities = []
# Check which package managers we have
has_npm = any(d["manager"] == "npm" for d in deps)
has_python = any(d["manager"] == "py" for d in deps)
# Run npm audit for Node.js packages
if has_npm:
npm_vulns = run_npm_audit()
vulnerabilities.extend(npm_vulns)
# Run pip-audit for Python packages
if has_python:
pip_vulns = run_pip_audit()
vulnerabilities.extend(pip_vulns)
return vulnerabilities
def run_npm_audit() -> List[Dict[str, Any]]:
"""
Run npm audit and parse its output.
Returns:
List of vulnerabilities in standard format
"""
vulnerabilities = []
# Check if package.json exists
project_root = Path.cwd()
package_json = project_root / "package.json"
if not package_json.exists():
return vulnerabilities
# Check if node_modules exists (npm audit needs it)
node_modules = project_root / "node_modules"
if not node_modules.exists():
# No node_modules = nothing to audit
return vulnerabilities
# CRITICAL FIX: Use sandboxed npm from TheAuditor's tools
# Find the sandboxed node and npm
sandbox_base = project_root / ".auditor_venv" / ".theauditor_tools"
node_runtime = sandbox_base / "node-runtime"
if IS_WINDOWS:
node_exe = node_runtime / "node.exe"
# On Windows, npm is a JavaScript file we run with node
npm_cli = node_runtime / "node_modules" / "npm" / "bin" / "npm-cli.js"
if npm_cli.exists():
npm_cmd = [str(node_exe), str(npm_cli), "audit", "--json"]
else:
# Fallback: npm.cmd might exist
npm_cmd_path = node_runtime / "npm.cmd"
if npm_cmd_path.exists():
npm_cmd = [str(npm_cmd_path), "audit", "--json"]
else:
# No sandboxed npm found
return vulnerabilities
else:
node_exe = node_runtime / "bin" / "node"
npm_exe = node_runtime / "bin" / "npm"
if npm_exe.exists():
npm_cmd = [str(npm_exe), "audit", "--json"]
else:
# No sandboxed npm found
return vulnerabilities
# Verify node exists before proceeding
if not node_exe.exists():
# Sandboxed node not installed - user needs to run 'aud setup-claude'
return vulnerabilities
try:
# Run npm audit --json using sandboxed npm
# Note: npm audit exits with code 1 if vulnerabilities found, which is expected
result = subprocess.run(
npm_cmd,
cwd=str(project_root),
capture_output=True,
text=True,
timeout=60,
shell=IS_WINDOWS
)
if result.stdout:
audit_data = json.loads(result.stdout)
# Parse npm audit output format
if "vulnerabilities" in audit_data:
for pkg_name, pkg_data in audit_data["vulnerabilities"].items():
# Skip if no actual vulnerability info
if not pkg_data.get("via"):
continue
# Extract vulnerability details from via field
for via_item in pkg_data.get("via", []):
# Skip if via is just a dependency name (transitive)
if isinstance(via_item, str):
continue
if isinstance(via_item, dict):
# Extract raw severity from npm
severity = via_item.get("severity", "")
# Extract IDs
vuln_id = via_item.get("cve")
if not vuln_id:
vuln_id = via_item.get("ghsa")
if not vuln_id:
vuln_id = via_item.get("source", f"npm-audit-{pkg_name}")
# Build aliases list
aliases = []
if via_item.get("cve"):
aliases.append(via_item["cve"])
if via_item.get("ghsa"):
aliases.append(via_item["ghsa"])
# Extract fixed version if available
fixed_version = None
if pkg_data.get("fixAvailable"):
fix_info = pkg_data["fixAvailable"]
if isinstance(fix_info, dict) and "version" in fix_info:
fixed_version = fix_info["version"]
# Get current version
affected_range = pkg_data.get("range", "")
current_version = affected_range.split(" ")[0].lstrip("<>=") if affected_range else ""
vulnerability = {
"package": pkg_name,
"version": current_version,
"manager": "npm",
"vulnerability_id": vuln_id,
"severity": severity,
"summary": via_item.get("title", "No summary available"),
"details": via_item.get("overview", ""),
"aliases": aliases,
"published": via_item.get("created", ""),
"modified": via_item.get("updated", ""),
"references": [{
"type": "ADVISORY",
"url": via_item.get("url", "")
}] if via_item.get("url") else [],
"affected_ranges": [pkg_data.get("range", "")] if pkg_data.get("range") else [],
"fixed_version": fixed_version,
"source": "npm audit"
}
vulnerabilities.append(vulnerability)
except subprocess.TimeoutExpired:
# Timeout after 60 seconds
pass
except (subprocess.SubprocessError, json.JSONDecodeError):
# npm audit failed or returned invalid JSON
pass
return vulnerabilities
def run_pip_audit() -> List[Dict[str, Any]]:
"""
Run pip-audit and parse its output.
Returns:
List of vulnerabilities in standard format
"""
vulnerabilities = []
# Check if pip-audit is available
if not shutil.which("pip-audit"):
# pip-audit not installed, skip
return vulnerabilities
# Check if we have Python dependencies to audit
# Look for requirements.txt or pyproject.toml
project_root = Path.cwd()
has_requirements = (project_root / "requirements.txt").exists()
has_pyproject = (project_root / "pyproject.toml").exists()
if not has_requirements and not has_pyproject:
return vulnerabilities
try:
# Build pip-audit command
cmd = ["pip-audit", "--format", "json"]
# Add requirements file if it exists
if has_requirements:
cmd.extend(["-r", "requirements.txt"])
# Run pip-audit
result = subprocess.run(
cmd,
cwd=str(project_root),
capture_output=True,
text=True,
timeout=60,
shell=IS_WINDOWS
)
if result.stdout:
audit_data = json.loads(result.stdout)
# Parse pip-audit output format
# pip-audit returns an array of vulnerability objects
for vuln in audit_data:
# Extract package info
pkg_name = vuln.get("name", "")
pkg_version = vuln.get("version", "")
# Extract vulnerability info
vuln_id = vuln.get("id", f"pip-audit-{pkg_name}")
# Build aliases from different ID fields
aliases = []
if vuln.get("aliases"):
aliases.extend(vuln["aliases"])
vulnerability = {
"package": pkg_name,
"version": pkg_version,
"manager": "py",
"vulnerability_id": vuln_id,
"severity": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else "", # pip-audit doesn't provide severity
"summary": vuln.get("description", "No summary available"),
"details": vuln.get("description", ""),
"aliases": aliases,
"published": "", # pip-audit doesn't provide dates
"modified": "",
"references": [], # pip-audit doesn't provide references in JSON
"affected_ranges": [],
"fixed_version": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else None,
"source": "pip-audit"
}
vulnerabilities.append(vulnerability)
except subprocess.TimeoutExpired:
# Timeout after 60 seconds
pass
except (subprocess.SubprocessError, json.JSONDecodeError):
# pip-audit failed or returned invalid JSON
pass
return vulnerabilities
def write_vulnerabilities_json(
vulnerabilities: List[Dict[str, Any]],
output_path: str = "./.pf/vulnerabilities.json"
) -> None:
"""
Write vulnerability findings to JSON file.
Args:
vulnerabilities: List of vulnerability dictionaries
output_path: Path to output JSON file
"""
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
# Count by severity
severity_counts = {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
}
for vuln in vulnerabilities:
severity = vuln.get("severity", "").lower()
if severity in severity_counts:
severity_counts[severity] += 1
else:
severity_counts["low"] += 1
# Build report structure
report = {
"vulnerabilities": vulnerabilities,
"scan_metadata": {
"timestamp": datetime.now(UTC).isoformat(),
"packages_scanned": len(set(v["package"] for v in vulnerabilities)) if vulnerabilities else 0,
"vulnerabilities_found": len(vulnerabilities),
"critical_count": severity_counts["critical"],
"high_count": severity_counts["high"],
"medium_count": severity_counts["medium"],
"low_count": severity_counts["low"],
"sources_used": list(set(v.get("source", "unknown") for v in vulnerabilities))
}
}
with open(output, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, sort_keys=True)
def format_vulnerability_report(vulnerabilities: List[Dict[str, Any]]) -> str:
"""
Format vulnerabilities as human-readable text report.
Args:
vulnerabilities: List of vulnerability dictionaries
Returns:
Formatted text report
"""
if not vulnerabilities:
return "[OK] No known vulnerabilities found in dependencies\n"
lines = []
# Count by severity
severity_counts = {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"unknown": 0
}
for vuln in vulnerabilities:
severity = vuln.get("severity", "").lower()
if severity in ["critical", "high", "medium", "low"]:
severity_counts[severity] += 1
else:
severity_counts["unknown"] += 1
# Summary
lines.append("[FACT] Native tool vulnerability scan results\n")
lines.append("=" * 60)
lines.append(f"Total: {len(vulnerabilities)} vulnerabilities reported\n")
if severity_counts["critical"] > 0:
lines.append(f"CRITICAL: {severity_counts['critical']}")
if severity_counts["high"] > 0:
lines.append(f"HIGH: {severity_counts['high']}")
if severity_counts["medium"] > 0:
lines.append(f"MEDIUM: {severity_counts['medium']}")
if severity_counts["low"] > 0:
lines.append(f"LOW: {severity_counts['low']}")
if severity_counts["unknown"] > 0:
lines.append(f"UNSPECIFIED: {severity_counts['unknown']}")
lines.append("")
lines.append("=" * 60)
# Group by source tool
npm_vulns = [v for v in vulnerabilities if v.get("source") == "npm audit"]
pip_vulns = [v for v in vulnerabilities if v.get("source") == "pip-audit"]
if npm_vulns:
lines.append(f"\n[npm audit reported {len(npm_vulns)} issues]")
lines.append("-" * 40)
for vuln in npm_vulns[:5]: # Show first 5
lines.append(f" {vuln['package']} v{vuln['version']}")
lines.append(f" {vuln.get('severity', 'UNSPECIFIED').upper()}: {vuln['summary']}")
if vuln.get("fixed_version"):
lines.append(f" Fix available: v{vuln['fixed_version']}")
if len(npm_vulns) > 5:
lines.append(f" ... and {len(npm_vulns) - 5} more")
if pip_vulns:
lines.append(f"\n[pip-audit reported {len(pip_vulns)} issues]")
lines.append("-" * 40)
for vuln in pip_vulns[:5]: # Show first 5
lines.append(f" {vuln['package']} v{vuln['version']}")
lines.append(f" {vuln['summary']}")
if vuln.get("fixed_version"):
lines.append(f" Fix available: v{vuln['fixed_version']}")
if len(pip_vulns) > 5:
lines.append(f" ... and {len(pip_vulns) - 5} more")
lines.append("")
lines.append("=" * 60)
lines.append("\nNative tool commands you can run:")
lines.append(" npm audit fix # Auto-fix npm vulnerabilities")
lines.append(" pip-audit --fix # Auto-fix Python vulnerabilities")
return "\n".join(lines)