mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
420 lines
15 KiB
Python
420 lines
15 KiB
Python
"""Native vulnerability scanners wrapper for npm audit and pip-audit.
|
|
|
|
This module runs native security tools and reports their raw output,
|
|
following TheAuditor's philosophy of using industry-standard tools
|
|
without interpretation.
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import shutil
|
|
import platform
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any
|
|
from datetime import datetime, UTC
|
|
|
|
# Windows compatibility
|
|
IS_WINDOWS = platform.system() == "Windows"
|
|
|
|
|
|
def scan_dependencies(
|
|
deps: List[Dict[str, Any]],
|
|
offline: bool = False,
|
|
cache_dir: str = "./.pf/vuln_cache" # Kept for compatibility, unused
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Run native vulnerability scanners (npm audit, pip-audit) on dependencies.
|
|
|
|
Args:
|
|
deps: List of dependency dicts from deps.py
|
|
offline: If True, skip scanning (native tools need their own cache)
|
|
cache_dir: Unused, kept for backward compatibility
|
|
|
|
Returns:
|
|
List of vulnerability findings from native tools
|
|
"""
|
|
if offline:
|
|
# In offline mode, return empty - native tools manage their own offline capability
|
|
return []
|
|
|
|
vulnerabilities = []
|
|
|
|
# Check which package managers we have
|
|
has_npm = any(d["manager"] == "npm" for d in deps)
|
|
has_python = any(d["manager"] == "py" for d in deps)
|
|
|
|
# Run npm audit for Node.js packages
|
|
if has_npm:
|
|
npm_vulns = run_npm_audit()
|
|
vulnerabilities.extend(npm_vulns)
|
|
|
|
# Run pip-audit for Python packages
|
|
if has_python:
|
|
pip_vulns = run_pip_audit()
|
|
vulnerabilities.extend(pip_vulns)
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def run_npm_audit() -> List[Dict[str, Any]]:
|
|
"""
|
|
Run npm audit and parse its output.
|
|
|
|
Returns:
|
|
List of vulnerabilities in standard format
|
|
"""
|
|
vulnerabilities = []
|
|
|
|
# Check if package.json exists
|
|
project_root = Path.cwd()
|
|
package_json = project_root / "package.json"
|
|
if not package_json.exists():
|
|
return vulnerabilities
|
|
|
|
# Check if node_modules exists (npm audit needs it)
|
|
node_modules = project_root / "node_modules"
|
|
if not node_modules.exists():
|
|
# No node_modules = nothing to audit
|
|
return vulnerabilities
|
|
|
|
# CRITICAL FIX: Use sandboxed npm from TheAuditor's tools
|
|
# Find the sandboxed node and npm
|
|
sandbox_base = project_root / ".auditor_venv" / ".theauditor_tools"
|
|
node_runtime = sandbox_base / "node-runtime"
|
|
|
|
if IS_WINDOWS:
|
|
node_exe = node_runtime / "node.exe"
|
|
# On Windows, npm is a JavaScript file we run with node
|
|
npm_cli = node_runtime / "node_modules" / "npm" / "bin" / "npm-cli.js"
|
|
if npm_cli.exists():
|
|
npm_cmd = [str(node_exe), str(npm_cli), "audit", "--json"]
|
|
else:
|
|
# Fallback: npm.cmd might exist
|
|
npm_cmd_path = node_runtime / "npm.cmd"
|
|
if npm_cmd_path.exists():
|
|
npm_cmd = [str(npm_cmd_path), "audit", "--json"]
|
|
else:
|
|
# No sandboxed npm found
|
|
return vulnerabilities
|
|
else:
|
|
node_exe = node_runtime / "bin" / "node"
|
|
npm_exe = node_runtime / "bin" / "npm"
|
|
if npm_exe.exists():
|
|
npm_cmd = [str(npm_exe), "audit", "--json"]
|
|
else:
|
|
# No sandboxed npm found
|
|
return vulnerabilities
|
|
|
|
# Verify node exists before proceeding
|
|
if not node_exe.exists():
|
|
# Sandboxed node not installed - user needs to run 'aud setup-claude'
|
|
return vulnerabilities
|
|
|
|
try:
|
|
# Run npm audit --json using sandboxed npm
|
|
# Note: npm audit exits with code 1 if vulnerabilities found, which is expected
|
|
result = subprocess.run(
|
|
npm_cmd,
|
|
cwd=str(project_root),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
shell=IS_WINDOWS
|
|
)
|
|
|
|
if result.stdout:
|
|
audit_data = json.loads(result.stdout)
|
|
|
|
# Parse npm audit output format
|
|
if "vulnerabilities" in audit_data:
|
|
for pkg_name, pkg_data in audit_data["vulnerabilities"].items():
|
|
# Skip if no actual vulnerability info
|
|
if not pkg_data.get("via"):
|
|
continue
|
|
|
|
# Extract vulnerability details from via field
|
|
for via_item in pkg_data.get("via", []):
|
|
# Skip if via is just a dependency name (transitive)
|
|
if isinstance(via_item, str):
|
|
continue
|
|
|
|
if isinstance(via_item, dict):
|
|
# Extract raw severity from npm
|
|
severity = via_item.get("severity", "")
|
|
|
|
# Extract IDs
|
|
vuln_id = via_item.get("cve")
|
|
if not vuln_id:
|
|
vuln_id = via_item.get("ghsa")
|
|
if not vuln_id:
|
|
vuln_id = via_item.get("source", f"npm-audit-{pkg_name}")
|
|
|
|
# Build aliases list
|
|
aliases = []
|
|
if via_item.get("cve"):
|
|
aliases.append(via_item["cve"])
|
|
if via_item.get("ghsa"):
|
|
aliases.append(via_item["ghsa"])
|
|
|
|
# Extract fixed version if available
|
|
fixed_version = None
|
|
if pkg_data.get("fixAvailable"):
|
|
fix_info = pkg_data["fixAvailable"]
|
|
if isinstance(fix_info, dict) and "version" in fix_info:
|
|
fixed_version = fix_info["version"]
|
|
|
|
# Get current version
|
|
affected_range = pkg_data.get("range", "")
|
|
current_version = affected_range.split(" ")[0].lstrip("<>=") if affected_range else ""
|
|
|
|
vulnerability = {
|
|
"package": pkg_name,
|
|
"version": current_version,
|
|
"manager": "npm",
|
|
"vulnerability_id": vuln_id,
|
|
"severity": severity,
|
|
"summary": via_item.get("title", "No summary available"),
|
|
"details": via_item.get("overview", ""),
|
|
"aliases": aliases,
|
|
"published": via_item.get("created", ""),
|
|
"modified": via_item.get("updated", ""),
|
|
"references": [{
|
|
"type": "ADVISORY",
|
|
"url": via_item.get("url", "")
|
|
}] if via_item.get("url") else [],
|
|
"affected_ranges": [pkg_data.get("range", "")] if pkg_data.get("range") else [],
|
|
"fixed_version": fixed_version,
|
|
"source": "npm audit"
|
|
}
|
|
|
|
vulnerabilities.append(vulnerability)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
# Timeout after 60 seconds
|
|
pass
|
|
except (subprocess.SubprocessError, json.JSONDecodeError):
|
|
# npm audit failed or returned invalid JSON
|
|
pass
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def run_pip_audit() -> List[Dict[str, Any]]:
|
|
"""
|
|
Run pip-audit and parse its output.
|
|
|
|
Returns:
|
|
List of vulnerabilities in standard format
|
|
"""
|
|
vulnerabilities = []
|
|
|
|
# Check if pip-audit is available
|
|
if not shutil.which("pip-audit"):
|
|
# pip-audit not installed, skip
|
|
return vulnerabilities
|
|
|
|
# Check if we have Python dependencies to audit
|
|
# Look for requirements.txt or pyproject.toml
|
|
project_root = Path.cwd()
|
|
has_requirements = (project_root / "requirements.txt").exists()
|
|
has_pyproject = (project_root / "pyproject.toml").exists()
|
|
|
|
if not has_requirements and not has_pyproject:
|
|
return vulnerabilities
|
|
|
|
try:
|
|
# Build pip-audit command
|
|
cmd = ["pip-audit", "--format", "json"]
|
|
|
|
# Add requirements file if it exists
|
|
if has_requirements:
|
|
cmd.extend(["-r", "requirements.txt"])
|
|
|
|
# Run pip-audit
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=str(project_root),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
shell=IS_WINDOWS
|
|
)
|
|
|
|
if result.stdout:
|
|
audit_data = json.loads(result.stdout)
|
|
|
|
# Parse pip-audit output format
|
|
# pip-audit returns an array of vulnerability objects
|
|
for vuln in audit_data:
|
|
# Extract package info
|
|
pkg_name = vuln.get("name", "")
|
|
pkg_version = vuln.get("version", "")
|
|
|
|
# Extract vulnerability info
|
|
vuln_id = vuln.get("id", f"pip-audit-{pkg_name}")
|
|
|
|
# Build aliases from different ID fields
|
|
aliases = []
|
|
if vuln.get("aliases"):
|
|
aliases.extend(vuln["aliases"])
|
|
|
|
vulnerability = {
|
|
"package": pkg_name,
|
|
"version": pkg_version,
|
|
"manager": "py",
|
|
"vulnerability_id": vuln_id,
|
|
"severity": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else "", # pip-audit doesn't provide severity
|
|
"summary": vuln.get("description", "No summary available"),
|
|
"details": vuln.get("description", ""),
|
|
"aliases": aliases,
|
|
"published": "", # pip-audit doesn't provide dates
|
|
"modified": "",
|
|
"references": [], # pip-audit doesn't provide references in JSON
|
|
"affected_ranges": [],
|
|
"fixed_version": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else None,
|
|
"source": "pip-audit"
|
|
}
|
|
|
|
vulnerabilities.append(vulnerability)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
# Timeout after 60 seconds
|
|
pass
|
|
except (subprocess.SubprocessError, json.JSONDecodeError):
|
|
# pip-audit failed or returned invalid JSON
|
|
pass
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
def write_vulnerabilities_json(
|
|
vulnerabilities: List[Dict[str, Any]],
|
|
output_path: str = "./.pf/vulnerabilities.json"
|
|
) -> None:
|
|
"""
|
|
Write vulnerability findings to JSON file.
|
|
|
|
Args:
|
|
vulnerabilities: List of vulnerability dictionaries
|
|
output_path: Path to output JSON file
|
|
"""
|
|
output = Path(output_path)
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Count by severity
|
|
severity_counts = {
|
|
"critical": 0,
|
|
"high": 0,
|
|
"medium": 0,
|
|
"low": 0
|
|
}
|
|
|
|
for vuln in vulnerabilities:
|
|
severity = vuln.get("severity", "").lower()
|
|
if severity in severity_counts:
|
|
severity_counts[severity] += 1
|
|
else:
|
|
severity_counts["low"] += 1
|
|
|
|
# Build report structure
|
|
report = {
|
|
"vulnerabilities": vulnerabilities,
|
|
"scan_metadata": {
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"packages_scanned": len(set(v["package"] for v in vulnerabilities)) if vulnerabilities else 0,
|
|
"vulnerabilities_found": len(vulnerabilities),
|
|
"critical_count": severity_counts["critical"],
|
|
"high_count": severity_counts["high"],
|
|
"medium_count": severity_counts["medium"],
|
|
"low_count": severity_counts["low"],
|
|
"sources_used": list(set(v.get("source", "unknown") for v in vulnerabilities))
|
|
}
|
|
}
|
|
|
|
with open(output, "w", encoding="utf-8") as f:
|
|
json.dump(report, f, indent=2, sort_keys=True)
|
|
|
|
|
|
def format_vulnerability_report(vulnerabilities: List[Dict[str, Any]]) -> str:
|
|
"""
|
|
Format vulnerabilities as human-readable text report.
|
|
|
|
Args:
|
|
vulnerabilities: List of vulnerability dictionaries
|
|
|
|
Returns:
|
|
Formatted text report
|
|
"""
|
|
if not vulnerabilities:
|
|
return "[OK] No known vulnerabilities found in dependencies\n"
|
|
|
|
lines = []
|
|
|
|
# Count by severity
|
|
severity_counts = {
|
|
"critical": 0,
|
|
"high": 0,
|
|
"medium": 0,
|
|
"low": 0,
|
|
"unknown": 0
|
|
}
|
|
|
|
for vuln in vulnerabilities:
|
|
severity = vuln.get("severity", "").lower()
|
|
if severity in ["critical", "high", "medium", "low"]:
|
|
severity_counts[severity] += 1
|
|
else:
|
|
severity_counts["unknown"] += 1
|
|
|
|
# Summary
|
|
lines.append("[FACT] Native tool vulnerability scan results\n")
|
|
lines.append("=" * 60)
|
|
lines.append(f"Total: {len(vulnerabilities)} vulnerabilities reported\n")
|
|
|
|
if severity_counts["critical"] > 0:
|
|
lines.append(f"CRITICAL: {severity_counts['critical']}")
|
|
if severity_counts["high"] > 0:
|
|
lines.append(f"HIGH: {severity_counts['high']}")
|
|
if severity_counts["medium"] > 0:
|
|
lines.append(f"MEDIUM: {severity_counts['medium']}")
|
|
if severity_counts["low"] > 0:
|
|
lines.append(f"LOW: {severity_counts['low']}")
|
|
if severity_counts["unknown"] > 0:
|
|
lines.append(f"UNSPECIFIED: {severity_counts['unknown']}")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 60)
|
|
|
|
# Group by source tool
|
|
npm_vulns = [v for v in vulnerabilities if v.get("source") == "npm audit"]
|
|
pip_vulns = [v for v in vulnerabilities if v.get("source") == "pip-audit"]
|
|
|
|
if npm_vulns:
|
|
lines.append(f"\n[npm audit reported {len(npm_vulns)} issues]")
|
|
lines.append("-" * 40)
|
|
for vuln in npm_vulns[:5]: # Show first 5
|
|
lines.append(f" {vuln['package']} v{vuln['version']}")
|
|
lines.append(f" {vuln.get('severity', 'UNSPECIFIED').upper()}: {vuln['summary']}")
|
|
if vuln.get("fixed_version"):
|
|
lines.append(f" Fix available: v{vuln['fixed_version']}")
|
|
if len(npm_vulns) > 5:
|
|
lines.append(f" ... and {len(npm_vulns) - 5} more")
|
|
|
|
if pip_vulns:
|
|
lines.append(f"\n[pip-audit reported {len(pip_vulns)} issues]")
|
|
lines.append("-" * 40)
|
|
for vuln in pip_vulns[:5]: # Show first 5
|
|
lines.append(f" {vuln['package']} v{vuln['version']}")
|
|
lines.append(f" {vuln['summary']}")
|
|
if vuln.get("fixed_version"):
|
|
lines.append(f" Fix available: v{vuln['fixed_version']}")
|
|
if len(pip_vulns) > 5:
|
|
lines.append(f" ... and {len(pip_vulns) - 5} more")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 60)
|
|
lines.append("\nNative tool commands you can run:")
|
|
lines.append(" npm audit fix # Auto-fix npm vulnerabilities")
|
|
lines.append(" pip-audit --fix # Auto-fix Python vulnerabilities")
|
|
|
|
return "\n".join(lines) |