Files
Auditor/theauditor/indexer_compat.py
TheAuditorTool 8ffacca419 Critical bug in aud init.
Fix: Create .pf directory before writing files in indexer

  Commit Description:

  Fixed critical bug where `aud init` would fail on clean projects with:
  "Failed to write manifest: [Errno 2] No such file or directory"

  Cause:
  - The indexer tried to write to .pf/manifest.json without creating parent directory
  - `aud init` calls indexer directly, which had no directory creation logic
  - `aud full` worked because pipelines.py creates .pf/ before calling indexer

  Effect:
  - Users running `aud init` on fresh projects would get immediate failure
  - Only affected first-time users following README instructions
  - Did not affect `aud full` users or existing projects with .pf/ directory

  Solution:
  - Added Path().parent.mkdir(parents=True, exist_ok=True) before writing manifest
  - Added same logic before creating database file
  - Ensures .pf directory structure is created regardless of entry point

  This fix ensures both `aud init` and `aud full` work on clean projects.
2025-09-08 14:15:45 +07:00

326 lines
12 KiB
Python

"""Repository indexer - Backward Compatibility Shim.
This module provides backward compatibility for code that imports from indexer.py.
All functionality has been refactored into the theauditor.indexer package.
IMPORTANT: New code should import from theauditor.indexer package directly:
from theauditor.indexer import IndexerOrchestrator
"""
import json
import sqlite3
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
# Import from the new package structure
from theauditor.indexer import IndexerOrchestrator
from theauditor.indexer.config import (
SKIP_DIRS, IMPORT_PATTERNS, ROUTE_PATTERNS, SQL_PATTERNS,
SQL_QUERY_PATTERNS, DEFAULT_BATCH_SIZE
)
from theauditor.indexer.core import (
FileWalker, is_text_file, get_first_lines, load_gitignore_patterns
)
from theauditor.indexer.database import create_database_schema
from theauditor.config_runtime import load_runtime_config
# Re-export commonly used items for backward compatibility
__all__ = [
'build_index',
'walk_directory',
'populate_database',
'create_database_schema',
'SKIP_DIRS',
'extract_imports',
'extract_routes',
'extract_sql_objects',
'extract_sql_queries'
]
def extract_imports(content: str, file_ext: str) -> List[tuple]:
"""Extract import statements - backward compatibility wrapper."""
imports = []
for pattern in IMPORT_PATTERNS:
for match in pattern.finditer(content):
value = match.group(1) if match.lastindex else match.group(0)
# Determine kind based on pattern
if "require" in pattern.pattern:
kind = "require"
elif "from" in pattern.pattern and "import" in pattern.pattern:
kind = "from"
elif "package" in pattern.pattern:
kind = "package"
else:
kind = "import"
imports.append((kind, value))
return imports
def extract_routes(content: str) -> List[tuple]:
"""Extract route definitions - backward compatibility wrapper."""
routes = []
for pattern in ROUTE_PATTERNS:
for match in pattern.finditer(content):
if match.lastindex == 2:
method = match.group(1).upper()
path = match.group(2)
else:
method = "ANY"
path = match.group(1) if match.lastindex else match.group(0)
routes.append((method, path))
return routes
def extract_sql_objects(content: str) -> List[tuple]:
"""Extract SQL object definitions - backward compatibility wrapper."""
objects = []
for pattern in SQL_PATTERNS:
for match in pattern.finditer(content):
name = match.group(1)
# Determine kind from pattern
pattern_text = pattern.pattern.lower()
if "table" in pattern_text:
kind = "table"
elif "index" in pattern_text:
kind = "index"
elif "view" in pattern_text:
kind = "view"
elif "function" in pattern_text:
kind = "function"
elif "policy" in pattern_text:
kind = "policy"
elif "constraint" in pattern_text:
kind = "constraint"
else:
kind = "unknown"
objects.append((kind, name))
return objects
def extract_sql_queries(content: str) -> List[dict]:
"""Extract SQL queries - backward compatibility wrapper.
Note: This requires sqlparse to be installed for full functionality.
"""
try:
import sqlparse
except ImportError:
return []
queries = []
for pattern in SQL_QUERY_PATTERNS:
for match in pattern.finditer(content):
query_text = match.group(1) if match.lastindex else match.group(0)
# Calculate line number
line = content[:match.start()].count('\n') + 1
# Clean up the query text
query_text = query_text.strip()
if not query_text:
continue
try:
# Parse the SQL query
parsed = sqlparse.parse(query_text)
if not parsed:
continue
for statement in parsed:
# Extract command type
command = statement.get_type()
if not command:
# Try to extract manually from first token
tokens = statement.tokens
for token in tokens:
if not token.is_whitespace:
command = str(token).upper()
break
# Extract table names
tables = []
tokens = list(statement.flatten())
for i, token in enumerate(tokens):
if token.ttype is None and token.value.upper() in ['FROM', 'INTO', 'UPDATE', 'TABLE', 'JOIN']:
# Look for the next non-whitespace token
for j in range(i + 1, len(tokens)):
next_token = tokens[j]
if not next_token.is_whitespace:
if next_token.ttype in [None, sqlparse.tokens.Name]:
table_name = next_token.value
# Clean up table name
table_name = table_name.strip('"\'`')
if '.' in table_name:
table_name = table_name.split('.')[-1]
if table_name and not table_name.upper() in ['SELECT', 'WHERE', 'SET', 'VALUES']:
tables.append(table_name)
break
queries.append({
'line': line,
'query_text': query_text[:1000], # Limit length
'command': command or 'UNKNOWN',
'tables': tables
})
except Exception:
# Skip queries that can't be parsed
continue
return queries
def walk_directory(
root_path: Path,
follow_symlinks: bool = False,
exclude_patterns: Optional[List[str]] = None
) -> tuple[List[dict], Dict[str, Any]]:
"""Walk directory and collect file information - backward compatibility wrapper.
Args:
root_path: Root directory to walk
follow_symlinks: Whether to follow symbolic links
exclude_patterns: Additional patterns to exclude
Returns:
Tuple of (files_list, statistics)
"""
config = load_runtime_config(str(root_path))
walker = FileWalker(root_path, config, follow_symlinks, exclude_patterns)
return walker.walk()
def populate_database(
conn: sqlite3.Connection,
files: List[dict],
root_path: Path,
batch_size: int = DEFAULT_BATCH_SIZE
) -> Dict[str, int]:
"""Populate SQLite database - backward compatibility wrapper.
Args:
conn: SQLite connection
files: List of file dictionaries
root_path: Project root path
batch_size: Batch size for database operations
Returns:
Dictionary of extraction counts
"""
# Create orchestrator with the existing connection's path
db_path = conn.execute("PRAGMA database_list").fetchone()[2]
orchestrator = IndexerOrchestrator(root_path, db_path, batch_size)
# Close the passed connection as orchestrator creates its own
conn.close()
# Run the indexing
counts, _ = orchestrator.index()
return counts
def build_index(
root_path: str = ".",
manifest_path: str = "manifest.json",
db_path: str = "repo_index.db",
print_stats: bool = False,
dry_run: bool = False,
follow_symlinks: bool = False,
exclude_patterns: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Build repository index - main entry point for backward compatibility.
Args:
root_path: Root directory to index
manifest_path: Path to write manifest JSON
db_path: Path to SQLite database
print_stats: Whether to print statistics
dry_run: If True, only scan files without creating database
follow_symlinks: Whether to follow symbolic links
exclude_patterns: Patterns to exclude from indexing
Returns:
Dictionary with success status and statistics
"""
start_time = time.time()
root = Path(root_path).resolve()
if not root.exists():
return {"error": f"Root path does not exist: {root_path}"}
# Walk directory and collect files
config = load_runtime_config(str(root))
walker = FileWalker(root, config, follow_symlinks, exclude_patterns)
files, walk_stats = walker.walk()
if dry_run:
if print_stats:
elapsed_ms = int((time.time() - start_time) * 1000)
print(f"Files scanned: {walk_stats['total_files']}")
print(f"Text files indexed: {walk_stats['text_files']}")
print(f"Binary files skipped: {walk_stats['binary_files']}")
print(f"Large files skipped: {walk_stats['large_files']}")
print(f"Elapsed: {elapsed_ms}ms")
return {"success": True, "dry_run": True, "stats": walk_stats}
# Write manifest
try:
# Ensure parent directory exists before writing
Path(manifest_path).parent.mkdir(parents=True, exist_ok=True)
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(files, f, indent=2, sort_keys=True)
except Exception as e:
return {"error": f"Failed to write manifest: {e}"}
# Create and populate database
try:
# Ensure parent directory exists for database
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
# Check if database already exists
db_exists = Path(db_path).exists()
# Create database schema
conn = sqlite3.connect(db_path)
conn.execute("BEGIN IMMEDIATE")
create_database_schema(conn)
conn.commit()
conn.close()
# Report database creation if new
if not db_exists:
print(f"[Indexer] Created database: {db_path}")
# Use orchestrator to populate the database
orchestrator = IndexerOrchestrator(root, db_path, DEFAULT_BATCH_SIZE,
follow_symlinks, exclude_patterns)
# Clear existing data to avoid UNIQUE constraint errors
orchestrator.db_manager.clear_tables()
extract_counts, _ = orchestrator.index()
except Exception as e:
return {"error": f"Failed to create database: {e}"}
if print_stats:
elapsed_ms = int((time.time() - start_time) * 1000)
print(f"Files scanned: {walk_stats['total_files']}")
print(f"Text files indexed: {walk_stats['text_files']}")
print(f"Binary files skipped: {walk_stats['binary_files']}")
print(f"Large files skipped: {walk_stats['large_files']}")
print(f"Refs extracted: {extract_counts['refs']}")
print(f"Routes extracted: {extract_counts['routes']}")
print(f"SQL objects extracted: {extract_counts['sql']}")
print(f"SQL queries extracted: {extract_counts['sql_queries']}")
print(f"Docker images analyzed: {extract_counts['docker']}")
print(f"Symbols extracted: {extract_counts['symbols']}")
print(f"Elapsed: {elapsed_ms}ms")
return {
"success": True,
"stats": walk_stats,
"extract_counts": extract_counts,
"elapsed_ms": int((time.time() - start_time) * 1000),
}