"""Native vulnerability scanners wrapper for npm audit and pip-audit. This module runs native security tools and reports their raw output, following TheAuditor's philosophy of using industry-standard tools without interpretation. """ import json import subprocess import shutil import platform from pathlib import Path from typing import Dict, List, Any from datetime import datetime, UTC # Windows compatibility IS_WINDOWS = platform.system() == "Windows" def scan_dependencies( deps: List[Dict[str, Any]], offline: bool = False, cache_dir: str = "./.pf/vuln_cache" # Kept for compatibility, unused ) -> List[Dict[str, Any]]: """ Run native vulnerability scanners (npm audit, pip-audit) on dependencies. Args: deps: List of dependency dicts from deps.py offline: If True, skip scanning (native tools need their own cache) cache_dir: Unused, kept for backward compatibility Returns: List of vulnerability findings from native tools """ if offline: # In offline mode, return empty - native tools manage their own offline capability return [] vulnerabilities = [] # Check which package managers we have has_npm = any(d["manager"] == "npm" for d in deps) has_python = any(d["manager"] == "py" for d in deps) # Run npm audit for Node.js packages if has_npm: npm_vulns = run_npm_audit() vulnerabilities.extend(npm_vulns) # Run pip-audit for Python packages if has_python: pip_vulns = run_pip_audit() vulnerabilities.extend(pip_vulns) return vulnerabilities def run_npm_audit() -> List[Dict[str, Any]]: """ Run npm audit and parse its output. Returns: List of vulnerabilities in standard format """ vulnerabilities = [] # Check if package.json exists project_root = Path.cwd() package_json = project_root / "package.json" if not package_json.exists(): return vulnerabilities # Check if node_modules exists (npm audit needs it) node_modules = project_root / "node_modules" if not node_modules.exists(): # No node_modules = nothing to audit return vulnerabilities # CRITICAL FIX: Use sandboxed npm from TheAuditor's tools # Find the sandboxed node and npm sandbox_base = project_root / ".auditor_venv" / ".theauditor_tools" node_runtime = sandbox_base / "node-runtime" if IS_WINDOWS: node_exe = node_runtime / "node.exe" # On Windows, npm is a JavaScript file we run with node npm_cli = node_runtime / "node_modules" / "npm" / "bin" / "npm-cli.js" if npm_cli.exists(): npm_cmd = [str(node_exe), str(npm_cli), "audit", "--json"] else: # Fallback: npm.cmd might exist npm_cmd_path = node_runtime / "npm.cmd" if npm_cmd_path.exists(): npm_cmd = [str(npm_cmd_path), "audit", "--json"] else: # No sandboxed npm found return vulnerabilities else: node_exe = node_runtime / "bin" / "node" npm_exe = node_runtime / "bin" / "npm" if npm_exe.exists(): npm_cmd = [str(npm_exe), "audit", "--json"] else: # No sandboxed npm found return vulnerabilities # Verify node exists before proceeding if not node_exe.exists(): # Sandboxed node not installed - user needs to run 'aud setup-claude' return vulnerabilities try: # Run npm audit --json using sandboxed npm # Note: npm audit exits with code 1 if vulnerabilities found, which is expected result = subprocess.run( npm_cmd, cwd=str(project_root), capture_output=True, text=True, timeout=60, shell=IS_WINDOWS ) if result.stdout: audit_data = json.loads(result.stdout) # Parse npm audit output format if "vulnerabilities" in audit_data: for pkg_name, pkg_data in audit_data["vulnerabilities"].items(): # Skip if no actual vulnerability info if not pkg_data.get("via"): continue # Extract vulnerability details from via field for via_item in pkg_data.get("via", []): # Skip if via is just a dependency name (transitive) if isinstance(via_item, str): continue if isinstance(via_item, dict): # Extract raw severity from npm severity = via_item.get("severity", "") # Extract IDs vuln_id = via_item.get("cve") if not vuln_id: vuln_id = via_item.get("ghsa") if not vuln_id: vuln_id = via_item.get("source", f"npm-audit-{pkg_name}") # Build aliases list aliases = [] if via_item.get("cve"): aliases.append(via_item["cve"]) if via_item.get("ghsa"): aliases.append(via_item["ghsa"]) # Extract fixed version if available fixed_version = None if pkg_data.get("fixAvailable"): fix_info = pkg_data["fixAvailable"] if isinstance(fix_info, dict) and "version" in fix_info: fixed_version = fix_info["version"] # Get current version affected_range = pkg_data.get("range", "") current_version = affected_range.split(" ")[0].lstrip("<>=") if affected_range else "" vulnerability = { "package": pkg_name, "version": current_version, "manager": "npm", "vulnerability_id": vuln_id, "severity": severity, "summary": via_item.get("title", "No summary available"), "details": via_item.get("overview", ""), "aliases": aliases, "published": via_item.get("created", ""), "modified": via_item.get("updated", ""), "references": [{ "type": "ADVISORY", "url": via_item.get("url", "") }] if via_item.get("url") else [], "affected_ranges": [pkg_data.get("range", "")] if pkg_data.get("range") else [], "fixed_version": fixed_version, "source": "npm audit" } vulnerabilities.append(vulnerability) except subprocess.TimeoutExpired: # Timeout after 60 seconds pass except (subprocess.SubprocessError, json.JSONDecodeError): # npm audit failed or returned invalid JSON pass return vulnerabilities def run_pip_audit() -> List[Dict[str, Any]]: """ Run pip-audit and parse its output. Returns: List of vulnerabilities in standard format """ vulnerabilities = [] # Check if pip-audit is available if not shutil.which("pip-audit"): # pip-audit not installed, skip return vulnerabilities # Check if we have Python dependencies to audit # Look for requirements.txt or pyproject.toml project_root = Path.cwd() has_requirements = (project_root / "requirements.txt").exists() has_pyproject = (project_root / "pyproject.toml").exists() if not has_requirements and not has_pyproject: return vulnerabilities try: # Build pip-audit command cmd = ["pip-audit", "--format", "json"] # Add requirements file if it exists if has_requirements: cmd.extend(["-r", "requirements.txt"]) # Run pip-audit result = subprocess.run( cmd, cwd=str(project_root), capture_output=True, text=True, timeout=60, shell=IS_WINDOWS ) if result.stdout: audit_data = json.loads(result.stdout) # Parse pip-audit output format # pip-audit returns an array of vulnerability objects for vuln in audit_data: # Extract package info pkg_name = vuln.get("name", "") pkg_version = vuln.get("version", "") # Extract vulnerability info vuln_id = vuln.get("id", f"pip-audit-{pkg_name}") # Build aliases from different ID fields aliases = [] if vuln.get("aliases"): aliases.extend(vuln["aliases"]) vulnerability = { "package": pkg_name, "version": pkg_version, "manager": "py", "vulnerability_id": vuln_id, "severity": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else "", # pip-audit doesn't provide severity "summary": vuln.get("description", "No summary available"), "details": vuln.get("description", ""), "aliases": aliases, "published": "", # pip-audit doesn't provide dates "modified": "", "references": [], # pip-audit doesn't provide references in JSON "affected_ranges": [], "fixed_version": vuln.get("fix_versions", [""])[0] if vuln.get("fix_versions") else None, "source": "pip-audit" } vulnerabilities.append(vulnerability) except subprocess.TimeoutExpired: # Timeout after 60 seconds pass except (subprocess.SubprocessError, json.JSONDecodeError): # pip-audit failed or returned invalid JSON pass return vulnerabilities def write_vulnerabilities_json( vulnerabilities: List[Dict[str, Any]], output_path: str = "./.pf/vulnerabilities.json" ) -> None: """ Write vulnerability findings to JSON file. Args: vulnerabilities: List of vulnerability dictionaries output_path: Path to output JSON file """ output = Path(output_path) output.parent.mkdir(parents=True, exist_ok=True) # Count by severity severity_counts = { "critical": 0, "high": 0, "medium": 0, "low": 0 } for vuln in vulnerabilities: severity = vuln.get("severity", "").lower() if severity in severity_counts: severity_counts[severity] += 1 else: severity_counts["low"] += 1 # Build report structure report = { "vulnerabilities": vulnerabilities, "scan_metadata": { "timestamp": datetime.now(UTC).isoformat(), "packages_scanned": len(set(v["package"] for v in vulnerabilities)) if vulnerabilities else 0, "vulnerabilities_found": len(vulnerabilities), "critical_count": severity_counts["critical"], "high_count": severity_counts["high"], "medium_count": severity_counts["medium"], "low_count": severity_counts["low"], "sources_used": list(set(v.get("source", "unknown") for v in vulnerabilities)) } } with open(output, "w", encoding="utf-8") as f: json.dump(report, f, indent=2, sort_keys=True) def format_vulnerability_report(vulnerabilities: List[Dict[str, Any]]) -> str: """ Format vulnerabilities as human-readable text report. Args: vulnerabilities: List of vulnerability dictionaries Returns: Formatted text report """ if not vulnerabilities: return "[OK] No known vulnerabilities found in dependencies\n" lines = [] # Count by severity severity_counts = { "critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0 } for vuln in vulnerabilities: severity = vuln.get("severity", "").lower() if severity in ["critical", "high", "medium", "low"]: severity_counts[severity] += 1 else: severity_counts["unknown"] += 1 # Summary lines.append("[FACT] Native tool vulnerability scan results\n") lines.append("=" * 60) lines.append(f"Total: {len(vulnerabilities)} vulnerabilities reported\n") if severity_counts["critical"] > 0: lines.append(f"CRITICAL: {severity_counts['critical']}") if severity_counts["high"] > 0: lines.append(f"HIGH: {severity_counts['high']}") if severity_counts["medium"] > 0: lines.append(f"MEDIUM: {severity_counts['medium']}") if severity_counts["low"] > 0: lines.append(f"LOW: {severity_counts['low']}") if severity_counts["unknown"] > 0: lines.append(f"UNSPECIFIED: {severity_counts['unknown']}") lines.append("") lines.append("=" * 60) # Group by source tool npm_vulns = [v for v in vulnerabilities if v.get("source") == "npm audit"] pip_vulns = [v for v in vulnerabilities if v.get("source") == "pip-audit"] if npm_vulns: lines.append(f"\n[npm audit reported {len(npm_vulns)} issues]") lines.append("-" * 40) for vuln in npm_vulns[:5]: # Show first 5 lines.append(f" {vuln['package']} v{vuln['version']}") lines.append(f" {vuln.get('severity', 'UNSPECIFIED').upper()}: {vuln['summary']}") if vuln.get("fixed_version"): lines.append(f" Fix available: v{vuln['fixed_version']}") if len(npm_vulns) > 5: lines.append(f" ... and {len(npm_vulns) - 5} more") if pip_vulns: lines.append(f"\n[pip-audit reported {len(pip_vulns)} issues]") lines.append("-" * 40) for vuln in pip_vulns[:5]: # Show first 5 lines.append(f" {vuln['package']} v{vuln['version']}") lines.append(f" {vuln['summary']}") if vuln.get("fixed_version"): lines.append(f" Fix available: v{vuln['fixed_version']}") if len(pip_vulns) > 5: lines.append(f" ... and {len(pip_vulns) - 5} more") lines.append("") lines.append("=" * 60) lines.append("\nNative tool commands you can run:") lines.append(" npm audit fix # Auto-fix npm vulnerabilities") lines.append(" pip-audit --fix # Auto-fix Python vulnerabilities") return "\n".join(lines)