mirror of
https://github.com/aljazceru/Auditor.git
synced 2025-12-17 03:24:18 +01:00
446 lines
15 KiB
Python
446 lines
15 KiB
Python
"""Journal system for tracking audit execution history.
|
|
|
|
This module provides functionality to write and read execution journals in NDJSON format.
|
|
The journal tracks all pipeline events, file touches, and results for ML training.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, UTC
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
|
|
class JournalWriter:
|
|
"""Writes execution events to journal.ndjson file."""
|
|
|
|
def __init__(self, journal_path: str = "./.pf/journal.ndjson", history_dir: Optional[str] = None):
|
|
"""Initialize journal writer.
|
|
|
|
Args:
|
|
journal_path: Path to the journal file
|
|
history_dir: Optional history directory for archival copies
|
|
"""
|
|
self.journal_path = Path(journal_path)
|
|
self.history_dir = Path(history_dir) if history_dir else None
|
|
self.session_id = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
|
|
|
# Ensure parent directory exists
|
|
self.journal_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Open file in append mode for continuous writing
|
|
self.file_handle = None
|
|
self._open_journal()
|
|
|
|
def _open_journal(self):
|
|
"""Open journal file for writing."""
|
|
try:
|
|
self.file_handle = open(self.journal_path, 'a', encoding='utf-8', buffering=1)
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not open journal file {self.journal_path}: {e}")
|
|
self.file_handle = None
|
|
|
|
def write_event(self, event_type: str, data: Dict[str, Any]) -> bool:
|
|
"""Write an event to the journal.
|
|
|
|
Args:
|
|
event_type: Type of event (phase, file_touch, result, error, etc.)
|
|
data: Event data dictionary
|
|
|
|
Returns:
|
|
True if written successfully, False otherwise
|
|
"""
|
|
if not self.file_handle:
|
|
return False
|
|
|
|
try:
|
|
event = {
|
|
"timestamp": datetime.now(UTC).isoformat(),
|
|
"session_id": self.session_id,
|
|
"event_type": event_type,
|
|
**data
|
|
}
|
|
|
|
# Write as NDJSON (one JSON object per line)
|
|
json.dump(event, self.file_handle)
|
|
self.file_handle.write('\n')
|
|
self.file_handle.flush() # Force write to disk
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Failed to write journal event: {e}")
|
|
return False
|
|
|
|
def phase_start(self, phase_name: str, command: str, phase_num: int = 0) -> bool:
|
|
"""Record the start of a pipeline phase.
|
|
|
|
Args:
|
|
phase_name: Human-readable phase name
|
|
command: Command being executed
|
|
phase_num: Phase number in sequence
|
|
"""
|
|
return self.write_event("phase_start", {
|
|
"phase": phase_name,
|
|
"command": command,
|
|
"phase_num": phase_num
|
|
})
|
|
|
|
def phase_end(self, phase_name: str, success: bool, elapsed: float,
|
|
exit_code: int = 0, error_msg: Optional[str] = None) -> bool:
|
|
"""Record the end of a pipeline phase.
|
|
|
|
Args:
|
|
phase_name: Human-readable phase name
|
|
success: Whether phase succeeded
|
|
elapsed: Execution time in seconds
|
|
exit_code: Process exit code
|
|
error_msg: Optional error message
|
|
"""
|
|
return self.write_event("phase_end", {
|
|
"phase": phase_name,
|
|
"result": "success" if success else "fail",
|
|
"elapsed": elapsed,
|
|
"exit_code": exit_code,
|
|
"error": error_msg
|
|
})
|
|
|
|
def file_touch(self, file_path: str, operation: str = "analyze",
|
|
success: bool = True, findings: int = 0) -> bool:
|
|
"""Record a file being touched/analyzed.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
operation: Type of operation (analyze, modify, create, etc.)
|
|
success: Whether operation succeeded
|
|
findings: Number of findings/issues found
|
|
"""
|
|
return self.write_event("file_touch", {
|
|
"file": file_path,
|
|
"operation": operation,
|
|
"result": "success" if success else "fail",
|
|
"findings": findings
|
|
})
|
|
|
|
def finding(self, file_path: str, severity: str, category: str,
|
|
message: str, line: Optional[int] = None) -> bool:
|
|
"""Record a specific finding/issue.
|
|
|
|
Args:
|
|
file_path: File where finding was detected
|
|
severity: Severity level (critical, high, medium, low)
|
|
category: Category of finding
|
|
message: Finding message
|
|
line: Optional line number
|
|
"""
|
|
return self.write_event("finding", {
|
|
"file": file_path,
|
|
"severity": severity,
|
|
"category": category,
|
|
"message": message,
|
|
"line": line
|
|
})
|
|
|
|
def apply_patch(self, file_path: str, success: bool,
|
|
patch_type: str = "fix", error_msg: Optional[str] = None) -> bool:
|
|
"""Record a patch/fix being applied to a file.
|
|
|
|
Args:
|
|
file_path: File being patched
|
|
success: Whether patch succeeded
|
|
patch_type: Type of patch (fix, refactor, update, etc.)
|
|
error_msg: Optional error message
|
|
"""
|
|
return self.write_event("apply_patch", {
|
|
"file": file_path,
|
|
"result": "success" if success else "fail",
|
|
"patch_type": patch_type,
|
|
"error": error_msg
|
|
})
|
|
|
|
def pipeline_summary(self, total_phases: int, failed_phases: int,
|
|
total_files: int, total_findings: int,
|
|
elapsed: float, status: str = "complete") -> bool:
|
|
"""Record pipeline execution summary.
|
|
|
|
Args:
|
|
total_phases: Total number of phases executed
|
|
failed_phases: Number of failed phases
|
|
total_files: Total files analyzed
|
|
total_findings: Total findings detected
|
|
elapsed: Total execution time
|
|
status: Overall status (complete, partial, failed)
|
|
"""
|
|
return self.write_event("pipeline_summary", {
|
|
"total_phases": total_phases,
|
|
"failed_phases": failed_phases,
|
|
"total_files": total_files,
|
|
"total_findings": total_findings,
|
|
"elapsed": elapsed,
|
|
"status": status
|
|
})
|
|
|
|
def close(self, copy_to_history: bool = True):
|
|
"""Close the journal file and optionally copy to history.
|
|
|
|
Args:
|
|
copy_to_history: Whether to copy journal to history directory
|
|
"""
|
|
if self.file_handle:
|
|
try:
|
|
self.file_handle.close()
|
|
except:
|
|
pass
|
|
self.file_handle = None
|
|
|
|
# Copy to history if requested and history_dir is set
|
|
if copy_to_history and self.history_dir and self.journal_path.exists():
|
|
try:
|
|
import shutil
|
|
self.history_dir.mkdir(parents=True, exist_ok=True)
|
|
dest_path = self.history_dir / f"journal_{self.session_id}.ndjson"
|
|
shutil.copy2(self.journal_path, dest_path)
|
|
print(f"[INFO] Journal copied to history: {dest_path}")
|
|
except Exception as e:
|
|
print(f"[WARNING] Could not copy journal to history: {e}")
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit - close journal."""
|
|
self.close()
|
|
|
|
|
|
class JournalReader:
|
|
"""Reads and queries journal.ndjson files."""
|
|
|
|
def __init__(self, journal_path: str = "./.pf/journal.ndjson"):
|
|
"""Initialize journal reader.
|
|
|
|
Args:
|
|
journal_path: Path to the journal file
|
|
"""
|
|
self.journal_path = Path(journal_path)
|
|
|
|
def read_events(self, event_type: Optional[str] = None,
|
|
since: Optional[datetime] = None,
|
|
session_id: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Read events from journal with optional filtering.
|
|
|
|
Args:
|
|
event_type: Filter by event type
|
|
since: Only events after this timestamp
|
|
session_id: Filter by session ID
|
|
|
|
Returns:
|
|
List of matching events
|
|
"""
|
|
if not self.journal_path.exists():
|
|
return []
|
|
|
|
events = []
|
|
try:
|
|
with open(self.journal_path, 'r', encoding='utf-8') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
event = json.loads(line)
|
|
|
|
# Apply filters
|
|
if event_type and event.get("event_type") != event_type:
|
|
continue
|
|
|
|
if session_id and event.get("session_id") != session_id:
|
|
continue
|
|
|
|
if since:
|
|
event_time = datetime.fromisoformat(event.get("timestamp", ""))
|
|
if event_time < since:
|
|
continue
|
|
|
|
events.append(event)
|
|
|
|
except json.JSONDecodeError:
|
|
print(f"[WARNING] Skipping malformed JSON at line {line_num}")
|
|
continue
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Error reading journal: {e}")
|
|
|
|
return events
|
|
|
|
def get_file_stats(self) -> Dict[str, Dict[str, int]]:
|
|
"""Get statistics for file touches and failures.
|
|
|
|
Returns:
|
|
Dict mapping file paths to stats (touches, failures, successes)
|
|
"""
|
|
stats = {}
|
|
|
|
for event in self.read_events(event_type="file_touch"):
|
|
file_path = event.get("file", "")
|
|
if not file_path:
|
|
continue
|
|
|
|
if file_path not in stats:
|
|
stats[file_path] = {
|
|
"touches": 0,
|
|
"failures": 0,
|
|
"successes": 0,
|
|
"findings": 0
|
|
}
|
|
|
|
stats[file_path]["touches"] += 1
|
|
|
|
if event.get("result") == "fail":
|
|
stats[file_path]["failures"] += 1
|
|
else:
|
|
stats[file_path]["successes"] += 1
|
|
|
|
stats[file_path]["findings"] += event.get("findings", 0)
|
|
|
|
# Also count apply_patch events
|
|
for event in self.read_events(event_type="apply_patch"):
|
|
file_path = event.get("file", "")
|
|
if not file_path:
|
|
continue
|
|
|
|
if file_path not in stats:
|
|
stats[file_path] = {
|
|
"touches": 0,
|
|
"failures": 0,
|
|
"successes": 0,
|
|
"findings": 0
|
|
}
|
|
|
|
stats[file_path]["touches"] += 1
|
|
|
|
if event.get("result") == "fail":
|
|
stats[file_path]["failures"] += 1
|
|
else:
|
|
stats[file_path]["successes"] += 1
|
|
|
|
return stats
|
|
|
|
def get_phase_stats(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get statistics for pipeline phases.
|
|
|
|
Returns:
|
|
Dict mapping phase names to execution stats
|
|
"""
|
|
stats = {}
|
|
|
|
# Track phase starts
|
|
for event in self.read_events(event_type="phase_start"):
|
|
phase = event.get("phase", "")
|
|
if not phase:
|
|
continue
|
|
|
|
if phase not in stats:
|
|
stats[phase] = {
|
|
"executions": 0,
|
|
"failures": 0,
|
|
"total_elapsed": 0.0,
|
|
"last_executed": None
|
|
}
|
|
|
|
stats[phase]["executions"] += 1
|
|
stats[phase]["last_executed"] = event.get("timestamp")
|
|
|
|
# Track phase ends
|
|
for event in self.read_events(event_type="phase_end"):
|
|
phase = event.get("phase", "")
|
|
if not phase or phase not in stats:
|
|
continue
|
|
|
|
if event.get("result") == "fail":
|
|
stats[phase]["failures"] += 1
|
|
|
|
stats[phase]["total_elapsed"] += event.get("elapsed", 0.0)
|
|
|
|
return stats
|
|
|
|
def get_recent_failures(self, limit: int = 10) -> List[Dict[str, Any]]:
|
|
"""Get recent failure events.
|
|
|
|
Args:
|
|
limit: Maximum number of failures to return
|
|
|
|
Returns:
|
|
List of recent failure events
|
|
"""
|
|
failures = []
|
|
|
|
# Get all failure events
|
|
for event in self.read_events():
|
|
if event.get("result") == "fail" or event.get("event_type") == "error":
|
|
failures.append(event)
|
|
|
|
# Sort by timestamp (most recent first)
|
|
failures.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
|
|
|
|
return failures[:limit]
|
|
|
|
|
|
# Integration functions for pipeline
|
|
def get_journal_writer(run_type: str = "full") -> JournalWriter:
|
|
"""Get a journal writer for the current run.
|
|
|
|
Args:
|
|
run_type: Type of run (full, diff, etc.)
|
|
|
|
Returns:
|
|
JournalWriter instance
|
|
"""
|
|
# Determine history directory based on run type
|
|
history_dir = Path("./.pf/history") / run_type / datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
|
|
|
return JournalWriter(
|
|
journal_path="./.pf/journal.ndjson",
|
|
history_dir=str(history_dir)
|
|
)
|
|
|
|
|
|
def integrate_with_pipeline(pipeline_func):
|
|
"""Decorator to integrate journal writing with pipeline execution.
|
|
|
|
This decorator wraps pipeline functions to automatically write journal events.
|
|
"""
|
|
def wrapper(*args, **kwargs):
|
|
# Get or create journal writer
|
|
journal = kwargs.pop('journal', None)
|
|
close_journal = False
|
|
|
|
if journal is None:
|
|
journal = get_journal_writer(kwargs.get('run_type', 'full'))
|
|
close_journal = True
|
|
|
|
try:
|
|
# Inject journal into kwargs
|
|
kwargs['journal'] = journal
|
|
|
|
# Execute pipeline
|
|
result = pipeline_func(*args, **kwargs)
|
|
|
|
# Write summary if available
|
|
if isinstance(result, dict):
|
|
journal.pipeline_summary(
|
|
total_phases=result.get('total_phases', 0),
|
|
failed_phases=result.get('failed_phases', 0),
|
|
total_files=len(result.get('created_files', [])),
|
|
total_findings=result.get('findings', {}).get('total_vulnerabilities', 0),
|
|
elapsed=result.get('elapsed_time', 0.0),
|
|
status='complete' if result.get('success') else 'failed'
|
|
)
|
|
|
|
return result
|
|
|
|
finally:
|
|
if close_journal:
|
|
journal.close()
|
|
|
|
return wrapper |