mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
Fix: Create .pf directory before writing files in indexer Commit Description: Fixed critical bug where `aud init` would fail on clean projects with: "Failed to write manifest: [Errno 2] No such file or directory" Cause: - The indexer tried to write to .pf/manifest.json without creating parent directory - `aud init` calls indexer directly, which had no directory creation logic - `aud full` worked because pipelines.py creates .pf/ before calling indexer Effect: - Users running `aud init` on fresh projects would get immediate failure - Only affected first-time users following README instructions - Did not affect `aud full` users or existing projects with .pf/ directory Solution: - Added Path().parent.mkdir(parents=True, exist_ok=True) before writing manifest - Added same logic before creating database file - Ensures .pf directory structure is created regardless of entry point This fix ensures both `aud init` and `aud full` work on clean projects.
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""Repository indexer - Backward Compatibility Shim.
|
|
|
|
This module provides backward compatibility for code that imports from indexer.py.
|
|
All functionality has been refactored into the theauditor.indexer package.
|
|
|
|
IMPORTANT: New code should import from theauditor.indexer package directly:
|
|
from theauditor.indexer import IndexerOrchestrator
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
# Import from the new package structure
|
|
from theauditor.indexer import IndexerOrchestrator
|
|
from theauditor.indexer.config import (
|
|
SKIP_DIRS, IMPORT_PATTERNS, ROUTE_PATTERNS, SQL_PATTERNS,
|
|
SQL_QUERY_PATTERNS, DEFAULT_BATCH_SIZE
|
|
)
|
|
from theauditor.indexer.core import (
|
|
FileWalker, is_text_file, get_first_lines, load_gitignore_patterns
|
|
)
|
|
from theauditor.indexer.database import create_database_schema
|
|
from theauditor.config_runtime import load_runtime_config
|
|
|
|
# Re-export commonly used items for backward compatibility
|
|
__all__ = [
|
|
'build_index',
|
|
'walk_directory',
|
|
'populate_database',
|
|
'create_database_schema',
|
|
'SKIP_DIRS',
|
|
'extract_imports',
|
|
'extract_routes',
|
|
'extract_sql_objects',
|
|
'extract_sql_queries'
|
|
]
|
|
|
|
|
|
def extract_imports(content: str, file_ext: str) -> List[tuple]:
|
|
"""Extract import statements - backward compatibility wrapper."""
|
|
imports = []
|
|
for pattern in IMPORT_PATTERNS:
|
|
for match in pattern.finditer(content):
|
|
value = match.group(1) if match.lastindex else match.group(0)
|
|
# Determine kind based on pattern
|
|
if "require" in pattern.pattern:
|
|
kind = "require"
|
|
elif "from" in pattern.pattern and "import" in pattern.pattern:
|
|
kind = "from"
|
|
elif "package" in pattern.pattern:
|
|
kind = "package"
|
|
else:
|
|
kind = "import"
|
|
imports.append((kind, value))
|
|
return imports
|
|
|
|
|
|
def extract_routes(content: str) -> List[tuple]:
|
|
"""Extract route definitions - backward compatibility wrapper."""
|
|
routes = []
|
|
for pattern in ROUTE_PATTERNS:
|
|
for match in pattern.finditer(content):
|
|
if match.lastindex == 2:
|
|
method = match.group(1).upper()
|
|
path = match.group(2)
|
|
else:
|
|
method = "ANY"
|
|
path = match.group(1) if match.lastindex else match.group(0)
|
|
routes.append((method, path))
|
|
return routes
|
|
|
|
|
|
def extract_sql_objects(content: str) -> List[tuple]:
|
|
"""Extract SQL object definitions - backward compatibility wrapper."""
|
|
objects = []
|
|
for pattern in SQL_PATTERNS:
|
|
for match in pattern.finditer(content):
|
|
name = match.group(1)
|
|
# Determine kind from pattern
|
|
pattern_text = pattern.pattern.lower()
|
|
if "table" in pattern_text:
|
|
kind = "table"
|
|
elif "index" in pattern_text:
|
|
kind = "index"
|
|
elif "view" in pattern_text:
|
|
kind = "view"
|
|
elif "function" in pattern_text:
|
|
kind = "function"
|
|
elif "policy" in pattern_text:
|
|
kind = "policy"
|
|
elif "constraint" in pattern_text:
|
|
kind = "constraint"
|
|
else:
|
|
kind = "unknown"
|
|
objects.append((kind, name))
|
|
return objects
|
|
|
|
|
|
def extract_sql_queries(content: str) -> List[dict]:
|
|
"""Extract SQL queries - backward compatibility wrapper.
|
|
|
|
Note: This requires sqlparse to be installed for full functionality.
|
|
"""
|
|
try:
|
|
import sqlparse
|
|
except ImportError:
|
|
return []
|
|
|
|
queries = []
|
|
for pattern in SQL_QUERY_PATTERNS:
|
|
for match in pattern.finditer(content):
|
|
query_text = match.group(1) if match.lastindex else match.group(0)
|
|
|
|
# Calculate line number
|
|
line = content[:match.start()].count('\n') + 1
|
|
|
|
# Clean up the query text
|
|
query_text = query_text.strip()
|
|
if not query_text:
|
|
continue
|
|
|
|
try:
|
|
# Parse the SQL query
|
|
parsed = sqlparse.parse(query_text)
|
|
if not parsed:
|
|
continue
|
|
|
|
for statement in parsed:
|
|
# Extract command type
|
|
command = statement.get_type()
|
|
if not command:
|
|
# Try to extract manually from first token
|
|
tokens = statement.tokens
|
|
for token in tokens:
|
|
if not token.is_whitespace:
|
|
command = str(token).upper()
|
|
break
|
|
|
|
# Extract table names
|
|
tables = []
|
|
tokens = list(statement.flatten())
|
|
for i, token in enumerate(tokens):
|
|
if token.ttype is None and token.value.upper() in ['FROM', 'INTO', 'UPDATE', 'TABLE', 'JOIN']:
|
|
# Look for the next non-whitespace token
|
|
for j in range(i + 1, len(tokens)):
|
|
next_token = tokens[j]
|
|
if not next_token.is_whitespace:
|
|
if next_token.ttype in [None, sqlparse.tokens.Name]:
|
|
table_name = next_token.value
|
|
# Clean up table name
|
|
table_name = table_name.strip('"\'`')
|
|
if '.' in table_name:
|
|
table_name = table_name.split('.')[-1]
|
|
if table_name and not table_name.upper() in ['SELECT', 'WHERE', 'SET', 'VALUES']:
|
|
tables.append(table_name)
|
|
break
|
|
|
|
queries.append({
|
|
'line': line,
|
|
'query_text': query_text[:1000], # Limit length
|
|
'command': command or 'UNKNOWN',
|
|
'tables': tables
|
|
})
|
|
except Exception:
|
|
# Skip queries that can't be parsed
|
|
continue
|
|
|
|
return queries
|
|
|
|
|
|
def walk_directory(
|
|
root_path: Path,
|
|
follow_symlinks: bool = False,
|
|
exclude_patterns: Optional[List[str]] = None
|
|
) -> tuple[List[dict], Dict[str, Any]]:
|
|
"""Walk directory and collect file information - backward compatibility wrapper.
|
|
|
|
Args:
|
|
root_path: Root directory to walk
|
|
follow_symlinks: Whether to follow symbolic links
|
|
exclude_patterns: Additional patterns to exclude
|
|
|
|
Returns:
|
|
Tuple of (files_list, statistics)
|
|
"""
|
|
config = load_runtime_config(str(root_path))
|
|
walker = FileWalker(root_path, config, follow_symlinks, exclude_patterns)
|
|
return walker.walk()
|
|
|
|
|
|
def populate_database(
|
|
conn: sqlite3.Connection,
|
|
files: List[dict],
|
|
root_path: Path,
|
|
batch_size: int = DEFAULT_BATCH_SIZE
|
|
) -> Dict[str, int]:
|
|
"""Populate SQLite database - backward compatibility wrapper.
|
|
|
|
Args:
|
|
conn: SQLite connection
|
|
files: List of file dictionaries
|
|
root_path: Project root path
|
|
batch_size: Batch size for database operations
|
|
|
|
Returns:
|
|
Dictionary of extraction counts
|
|
"""
|
|
# Create orchestrator with the existing connection's path
|
|
db_path = conn.execute("PRAGMA database_list").fetchone()[2]
|
|
orchestrator = IndexerOrchestrator(root_path, db_path, batch_size)
|
|
|
|
# Close the passed connection as orchestrator creates its own
|
|
conn.close()
|
|
|
|
# Run the indexing
|
|
counts, _ = orchestrator.index()
|
|
return counts
|
|
|
|
|
|
def build_index(
|
|
root_path: str = ".",
|
|
manifest_path: str = "manifest.json",
|
|
db_path: str = "repo_index.db",
|
|
print_stats: bool = False,
|
|
dry_run: bool = False,
|
|
follow_symlinks: bool = False,
|
|
exclude_patterns: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Build repository index - main entry point for backward compatibility.
|
|
|
|
Args:
|
|
root_path: Root directory to index
|
|
manifest_path: Path to write manifest JSON
|
|
db_path: Path to SQLite database
|
|
print_stats: Whether to print statistics
|
|
dry_run: If True, only scan files without creating database
|
|
follow_symlinks: Whether to follow symbolic links
|
|
exclude_patterns: Patterns to exclude from indexing
|
|
|
|
Returns:
|
|
Dictionary with success status and statistics
|
|
"""
|
|
start_time = time.time()
|
|
root = Path(root_path).resolve()
|
|
|
|
if not root.exists():
|
|
return {"error": f"Root path does not exist: {root_path}"}
|
|
|
|
# Walk directory and collect files
|
|
config = load_runtime_config(str(root))
|
|
walker = FileWalker(root, config, follow_symlinks, exclude_patterns)
|
|
files, walk_stats = walker.walk()
|
|
|
|
if dry_run:
|
|
if print_stats:
|
|
elapsed_ms = int((time.time() - start_time) * 1000)
|
|
print(f"Files scanned: {walk_stats['total_files']}")
|
|
print(f"Text files indexed: {walk_stats['text_files']}")
|
|
print(f"Binary files skipped: {walk_stats['binary_files']}")
|
|
print(f"Large files skipped: {walk_stats['large_files']}")
|
|
print(f"Elapsed: {elapsed_ms}ms")
|
|
return {"success": True, "dry_run": True, "stats": walk_stats}
|
|
|
|
# Write manifest
|
|
try:
|
|
# Ensure parent directory exists before writing
|
|
Path(manifest_path).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(manifest_path, "w", encoding="utf-8") as f:
|
|
json.dump(files, f, indent=2, sort_keys=True)
|
|
except Exception as e:
|
|
return {"error": f"Failed to write manifest: {e}"}
|
|
|
|
# Create and populate database
|
|
try:
|
|
# Ensure parent directory exists for database
|
|
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check if database already exists
|
|
db_exists = Path(db_path).exists()
|
|
|
|
# Create database schema
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
create_database_schema(conn)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Report database creation if new
|
|
if not db_exists:
|
|
print(f"[Indexer] Created database: {db_path}")
|
|
|
|
# Use orchestrator to populate the database
|
|
orchestrator = IndexerOrchestrator(root, db_path, DEFAULT_BATCH_SIZE,
|
|
follow_symlinks, exclude_patterns)
|
|
|
|
# Clear existing data to avoid UNIQUE constraint errors
|
|
orchestrator.db_manager.clear_tables()
|
|
|
|
extract_counts, _ = orchestrator.index()
|
|
|
|
except Exception as e:
|
|
return {"error": f"Failed to create database: {e}"}
|
|
|
|
if print_stats:
|
|
elapsed_ms = int((time.time() - start_time) * 1000)
|
|
print(f"Files scanned: {walk_stats['total_files']}")
|
|
print(f"Text files indexed: {walk_stats['text_files']}")
|
|
print(f"Binary files skipped: {walk_stats['binary_files']}")
|
|
print(f"Large files skipped: {walk_stats['large_files']}")
|
|
print(f"Refs extracted: {extract_counts['refs']}")
|
|
print(f"Routes extracted: {extract_counts['routes']}")
|
|
print(f"SQL objects extracted: {extract_counts['sql']}")
|
|
print(f"SQL queries extracted: {extract_counts['sql_queries']}")
|
|
print(f"Docker images analyzed: {extract_counts['docker']}")
|
|
print(f"Symbols extracted: {extract_counts['symbols']}")
|
|
print(f"Elapsed: {elapsed_ms}ms")
|
|
|
|
return {
|
|
"success": True,
|
|
"stats": walk_stats,
|
|
"extract_counts": extract_counts,
|
|
"elapsed_ms": int((time.time() - start_time) * 1000),
|
|
} |