diff --git a/crates/goose/src/session/storage.rs b/crates/goose/src/session/storage.rs index 786b09b4..77ac9b94 100644 --- a/crates/goose/src/session/storage.rs +++ b/crates/goose/src/session/storage.rs @@ -1,10 +1,18 @@ +// IMPORTANT: This file includes session recovery functionality to handle corrupted session files. +// Only essential logging is included with the [SESSION] prefix to track: +// - Total message counts +// - Corruption detection and recovery +// - Backup creation +// Additional debug logging can be added if needed for troubleshooting. + use crate::message::Message; use crate::providers::base::Provider; use anyhow::Result; use chrono::Local; use etcetera::{choose_app_strategy, AppStrategy, AppStrategyArgs}; +use regex::Regex; use serde::{Deserialize, Serialize}; -use std::fs::{self, File}; +use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -207,24 +215,56 @@ pub fn generate_session_id() -> String { Local::now().format("%Y%m%d_%H%M%S").to_string() } -/// Read messages from a session file +/// Read messages from a session file with corruption recovery /// /// Creates the file if it doesn't exist, reads and deserializes all messages if it does. /// The first line of the file is expected to be metadata, and the rest are messages. /// Large messages are automatically truncated to prevent memory issues. +/// Includes recovery mechanisms for corrupted files. pub fn read_messages(session_file: &Path) -> Result> { - read_messages_with_truncation(session_file, Some(50000)) // 50KB limit per message content + let result = read_messages_with_truncation(session_file, Some(50000)); // 50KB limit per message content + match &result { + Ok(messages) => println!( + "[SESSION] Successfully read {} messages from: {:?}", + messages.len(), + session_file + ), + Err(e) => println!( + "[SESSION] Failed to read messages from {:?}: {}", + session_file, e + ), + } + result } -/// Read messages from a session file with optional content truncation +/// Read messages from a session file with optional content truncation and corruption recovery /// /// Creates the file if it doesn't exist, reads and deserializes all messages if it does. /// The first line of the file is expected to be metadata, and the rest are messages. /// If max_content_size is Some, large message content will be truncated during loading. +/// Includes robust error handling and corruption recovery mechanisms. pub fn read_messages_with_truncation( session_file: &Path, max_content_size: Option, ) -> Result> { + // Check if there's a backup file we should restore from + let backup_file = session_file.with_extension("backup"); + if !session_file.exists() && backup_file.exists() { + println!( + "[SESSION] Session file missing but backup exists, restoring from backup: {:?}", + backup_file + ); + tracing::warn!( + "[SESSION] Session file missing but backup exists, restoring from backup: {:?}", + backup_file + ); + if let Err(e) = fs::copy(&backup_file, session_file) { + println!("[SESSION] Failed to restore from backup: {}", e); + tracing::error!("Failed to restore from backup: {}", e); + } + } + + // Open the file with appropriate options let file = fs::OpenOptions::new() .read(true) .write(true) @@ -235,27 +275,137 @@ pub fn read_messages_with_truncation( let reader = io::BufReader::new(file); let mut lines = reader.lines(); let mut messages = Vec::new(); + let mut corrupted_lines = Vec::new(); + let mut line_number = 1; // Read the first line as metadata or create default if empty/missing - if let Some(line) = lines.next() { - let line = line?; - // Try to parse as metadata, but if it fails, treat it as a message - if let Ok(_metadata) = serde_json::from_str::(&line) { - // Metadata successfully parsed, continue with the rest of the lines as messages - } else { - // This is not metadata, it's a message - let message = parse_message_with_truncation(&line, max_content_size)?; - messages.push(message); + if let Some(line_result) = lines.next() { + match line_result { + Ok(line) => { + // Try to parse as metadata, but if it fails, treat it as a message + if let Ok(_metadata) = serde_json::from_str::(&line) { + // Metadata successfully parsed, continue with the rest of the lines as messages + } else { + // This is not metadata, it's a message + match parse_message_with_truncation(&line, max_content_size) { + Ok(message) => { + messages.push(message); + } + Err(e) => { + println!("[SESSION] Failed to parse first line as message: {}", e); + println!("[SESSION] Attempting to recover corrupted first line..."); + tracing::warn!("Failed to parse first line as message: {}", e); + + // Try to recover the corrupted line + match attempt_corruption_recovery(&line, max_content_size) { + Ok(recovered) => { + println!( + "[SESSION] Successfully recovered corrupted first line!" + ); + messages.push(recovered); + } + Err(recovery_err) => { + println!( + "[SESSION] Failed to recover corrupted first line: {}", + recovery_err + ); + corrupted_lines.push((line_number, line)); + } + } + } + } + } + } + Err(e) => { + println!("[SESSION] Failed to read first line: {}", e); + tracing::error!("Failed to read first line: {}", e); + corrupted_lines.push((line_number, "[Unreadable line]".to_string())); + } } + line_number += 1; } // Read the rest of the lines as messages - for line in lines { - let line = line?; - let message = parse_message_with_truncation(&line, max_content_size)?; - messages.push(message); + for line_result in lines { + match line_result { + Ok(line) => match parse_message_with_truncation(&line, max_content_size) { + Ok(message) => { + messages.push(message); + } + Err(e) => { + println!("[SESSION] Failed to parse line {}: {}", line_number, e); + println!( + "[SESSION] Attempting to recover corrupted line {}...", + line_number + ); + tracing::warn!("Failed to parse line {}: {}", line_number, e); + + // Try to recover the corrupted line + match attempt_corruption_recovery(&line, max_content_size) { + Ok(recovered) => { + println!( + "[SESSION] Successfully recovered corrupted line {}!", + line_number + ); + messages.push(recovered); + } + Err(recovery_err) => { + println!( + "[SESSION] Failed to recover corrupted line {}: {}", + line_number, recovery_err + ); + corrupted_lines.push((line_number, line)); + } + } + } + }, + Err(e) => { + println!("[SESSION] Failed to read line {}: {}", line_number, e); + tracing::error!("Failed to read line {}: {}", line_number, e); + corrupted_lines.push((line_number, "[Unreadable line]".to_string())); + } + } + line_number += 1; } + // If we found corrupted lines, create a backup and log the issues + if !corrupted_lines.is_empty() { + println!( + "[SESSION] Found {} corrupted lines, creating backup", + corrupted_lines.len() + ); + tracing::warn!( + "[SESSION] Found {} corrupted lines in session file, creating backup", + corrupted_lines.len() + ); + + // Create a backup of the original file + if !backup_file.exists() { + if let Err(e) = fs::copy(session_file, &backup_file) { + println!("[SESSION] Failed to create backup file: {}", e); + tracing::error!("Failed to create backup file: {}", e); + } else { + println!("[SESSION] Created backup file: {:?}", backup_file); + tracing::info!("Created backup file: {:?}", backup_file); + } + } + + // Log details about corrupted lines + for (num, line) in &corrupted_lines { + let preview = if line.len() > 50 { + format!("{}... (truncated)", &line[..50]) + } else { + line.clone() + }; + tracing::debug!("Corrupted line {}: {}", num, preview); + } + } + + println!( + "[SESSION] Finished reading session file. Total messages: {}, corrupted lines: {}", + messages.len(), + corrupted_lines.len() + ); Ok(messages) } @@ -273,9 +423,13 @@ fn parse_message_with_truncation( } Ok(message) } - Err(e) => { + Err(_e) => { // If parsing fails and the string is very long, it might be due to size if json_str.len() > 100000 { + println!( + "[SESSION] Very large message detected ({}KB), attempting truncation", + json_str.len() / 1024 + ); tracing::warn!( "Failed to parse very large message ({}KB), attempting truncation", json_str.len() / 1024 @@ -290,18 +444,21 @@ fn parse_message_with_truncation( match serde_json::from_str::(&truncated_json) { Ok(message) => { + println!("[SESSION] Successfully parsed message after truncation"); tracing::info!("Successfully parsed message after JSON truncation"); Ok(message) } Err(_) => { - tracing::error!("Failed to parse message even after truncation, skipping"); - // Return a placeholder message indicating the issue - Ok(Message::user() - .with_text("[Message too large to load - content truncated]")) + println!( + "[SESSION] Failed to parse even after truncation, attempting recovery" + ); + tracing::error!("Failed to parse message even after truncation"); + attempt_corruption_recovery(json_str, max_content_size) } } } else { - Err(e.into()) + // Try intelligent corruption recovery + attempt_corruption_recovery(json_str, max_content_size) } } } @@ -365,6 +522,235 @@ fn truncate_message_content_in_place(message: &mut Message, max_content_size: us } } +/// Attempt to recover corrupted JSON lines using various strategies +fn attempt_corruption_recovery(json_str: &str, max_content_size: Option) -> Result { + // Strategy 1: Try to fix common JSON corruption issues + if let Ok(message) = try_fix_json_corruption(json_str, max_content_size) { + println!("[SESSION] Recovered using JSON corruption fix"); + return Ok(message); + } + + // Strategy 2: Try to extract partial content if it looks like a message + if let Ok(message) = try_extract_partial_message(json_str) { + println!("[SESSION] Recovered using partial message extraction"); + return Ok(message); + } + + // Strategy 3: Try to fix truncated JSON + if let Ok(message) = try_fix_truncated_json(json_str, max_content_size) { + println!("[SESSION] Recovered using truncated JSON fix"); + return Ok(message); + } + + // Strategy 4: Create a placeholder message with the raw content + println!("[SESSION] All recovery strategies failed, creating placeholder message"); + let preview = if json_str.len() > 200 { + format!("{}...", &json_str[..200]) + } else { + json_str.to_string() + }; + + Ok(Message::user().with_text(format!( + "[RECOVERED FROM CORRUPTED LINE]\nOriginal content preview: {}\n\n[This message was recovered from a corrupted session file line. The original data may be incomplete.]", + preview + ))) +} + +/// Try to fix common JSON corruption patterns +fn try_fix_json_corruption(json_str: &str, max_content_size: Option) -> Result { + let mut fixed_json = json_str.to_string(); + let mut fixes_applied = Vec::new(); + + // Fix 1: Remove trailing commas before closing braces/brackets + if fixed_json.contains(",}") || fixed_json.contains(",]") { + fixed_json = fixed_json.replace(",}", "}").replace(",]", "]"); + fixes_applied.push("trailing commas"); + } + + // Fix 2: Try to close unclosed quotes in text fields + if let Some(text_start) = fixed_json.find("\"text\":\"") { + let content_start = text_start + 8; + if let Some(remaining) = fixed_json.get(content_start..) { + // Count quotes to see if we have an odd number (unclosed quote) + let quote_count = remaining.matches('"').count(); + if quote_count % 2 == 1 { + // Find the last quote and see if we need to close it + if let Some(last_quote_pos) = remaining.rfind('"') { + let after_last_quote = &remaining[last_quote_pos + 1..]; + if !after_last_quote.trim_start().starts_with(',') + && !after_last_quote.trim_start().starts_with('}') + { + // Insert a closing quote before the next field or end + if let Some(next_field) = after_last_quote.find(',') { + fixed_json.insert(content_start + last_quote_pos + 1 + next_field, '"'); + fixes_applied.push("unclosed quotes"); + } else if after_last_quote.contains('}') { + if let Some(brace_pos) = after_last_quote.find('}') { + fixed_json + .insert(content_start + last_quote_pos + 1 + brace_pos, '"'); + fixes_applied.push("unclosed quotes"); + } + } + } + } + } + } + } + + // Fix 3: Try to close unclosed JSON objects/arrays + let open_braces = fixed_json.matches('{').count(); + let close_braces = fixed_json.matches('}').count(); + let open_brackets = fixed_json.matches('[').count(); + let close_brackets = fixed_json.matches(']').count(); + + if open_braces > close_braces { + for _ in 0..(open_braces - close_braces) { + fixed_json.push('}'); + } + fixes_applied.push("unclosed braces"); + } + + if open_brackets > close_brackets { + for _ in 0..(open_brackets - close_brackets) { + fixed_json.push(']'); + } + fixes_applied.push("unclosed brackets"); + } + + // Fix 4: Remove control characters that might break JSON parsing + let original_len = fixed_json.len(); + fixed_json = fixed_json + .chars() + .filter(|c| !c.is_control() || *c == '\n' || *c == '\r' || *c == '\t') + .collect(); + if fixed_json.len() != original_len { + fixes_applied.push("control characters"); + } + + if !fixes_applied.is_empty() { + println!("[SESSION] Applied JSON fixes: {}", fixes_applied.join(", ")); + + match serde_json::from_str::(&fixed_json) { + Ok(mut message) => { + if let Some(max_size) = max_content_size { + truncate_message_content_in_place(&mut message, max_size); + } + return Ok(message); + } + Err(e) => { + println!("[SESSION] JSON fixes didn't work: {}", e); + } + } + } + + Err(anyhow::anyhow!("JSON corruption fixes failed")) +} + +/// Try to extract a partial message from corrupted JSON +fn try_extract_partial_message(json_str: &str) -> Result { + // Look for recognizable patterns that indicate this was a message + + // Try to extract role + let role = if json_str.contains("\"role\":\"user\"") { + mcp_core::role::Role::User + } else if json_str.contains("\"role\":\"assistant\"") { + mcp_core::role::Role::Assistant + } else { + mcp_core::role::Role::User // Default fallback + }; + + // Try to extract text content + let mut extracted_text = String::new(); + + // Look for text field content + if let Some(text_start) = json_str.find("\"text\":\"") { + let content_start = text_start + 8; + if let Some(content_end) = json_str[content_start..].find("\",") { + extracted_text = json_str[content_start..content_start + content_end].to_string(); + } else if let Some(content_end) = json_str[content_start..].find("\"") { + extracted_text = json_str[content_start..content_start + content_end].to_string(); + } else { + // Take everything after "text":" until we hit a likely end + let remaining = &json_str[content_start..]; + if let Some(end_pos) = remaining.find('}') { + extracted_text = remaining[..end_pos].trim_end_matches('"').to_string(); + } else { + extracted_text = remaining.to_string(); + } + } + } + + // If we couldn't extract text, try to find any readable content + if extracted_text.is_empty() { + // Look for any quoted strings that might be content + let quote_pattern = Regex::new(r#""([^"]{10,})""#).unwrap(); + if let Some(captures) = quote_pattern.find(json_str) { + extracted_text = captures.as_str().trim_matches('"').to_string(); + } + } + + if !extracted_text.is_empty() { + println!( + "[SESSION] Extracted text content: {}", + if extracted_text.len() > 50 { + &extracted_text[..50] + } else { + &extracted_text + } + ); + + let message = match role { + mcp_core::role::Role::User => Message::user(), + mcp_core::role::Role::Assistant => Message::assistant(), + }; + + return Ok(message.with_text(format!("[PARTIALLY RECOVERED] {}", extracted_text))); + } + + Err(anyhow::anyhow!("Could not extract partial message")) +} + +/// Try to fix truncated JSON by completing it +fn try_fix_truncated_json(json_str: &str, max_content_size: Option) -> Result { + let mut completed_json = json_str.to_string(); + + // If the JSON appears to be cut off mid-field, try to complete it + if !completed_json.trim().ends_with('}') && !completed_json.trim().ends_with(']') { + // Try to find where it was likely cut off + if let Some(last_quote) = completed_json.rfind('"') { + let after_quote = &completed_json[last_quote + 1..]; + if !after_quote.contains('"') && !after_quote.contains('}') { + // Looks like it was cut off in the middle of a string value + completed_json.push('"'); + + // Try to close the JSON structure + let open_braces = completed_json.matches('{').count(); + let close_braces = completed_json.matches('}').count(); + + for _ in 0..(open_braces - close_braces) { + completed_json.push('}'); + } + + println!("[SESSION] Attempting to complete truncated JSON"); + + match serde_json::from_str::(&completed_json) { + Ok(mut message) => { + if let Some(max_size) = max_content_size { + truncate_message_content_in_place(&mut message, max_size); + } + return Ok(message); + } + Err(e) => { + println!("[SESSION] Truncation fix didn't work: {}", e); + } + } + } + } + } + + Err(anyhow::anyhow!("Truncation fix failed")) +} + /// Attempt to truncate a JSON string by finding and truncating large text values fn truncate_json_string(json_str: &str, max_content_size: usize) -> String { // This is a heuristic approach - look for large text values in the JSON @@ -405,7 +791,10 @@ fn truncate_json_string(json_str: &str, max_content_size: usize) -> String { /// /// Returns default empty metadata if the file doesn't exist or has no metadata. pub fn read_metadata(session_file: &Path) -> Result { + println!("[SESSION] Reading metadata from: {:?}", session_file); + if !session_file.exists() { + println!("[SESSION] Session file doesn't exist, returning default metadata"); return Ok(SessionMetadata::default()); } @@ -415,16 +804,28 @@ pub fn read_metadata(session_file: &Path) -> Result { // Read just the first line if reader.read_line(&mut first_line)? > 0 { + println!("[SESSION] Read first line, attempting to parse as metadata..."); // Try to parse as metadata match serde_json::from_str::(&first_line) { - Ok(metadata) => Ok(metadata), - Err(_) => { + Ok(metadata) => { + println!( + "[SESSION] Successfully parsed metadata: description='{}'", + metadata.description + ); + Ok(metadata) + } + Err(e) => { // If the first line isn't metadata, return default + println!( + "[SESSION] First line is not valid metadata ({}), returning default", + e + ); Ok(SessionMetadata::default()) } } } else { // Empty file, return default + println!("[SESSION] File is empty, returning default metadata"); Ok(SessionMetadata::default()) } } @@ -438,7 +839,17 @@ pub async fn persist_messages( messages: &[Message], provider: Option>, ) -> Result<()> { - persist_messages_with_schedule_id(session_file, messages, provider, None).await + println!( + "[SESSION] persist_messages called with {} messages to: {:?}", + messages.len(), + session_file + ); + let result = persist_messages_with_schedule_id(session_file, messages, provider, None).await; + match &result { + Ok(_) => println!("[SESSION] persist_messages completed successfully"), + Err(e) => println!("[SESSION] persist_messages failed: {}", e), + } + result } /// Write messages to a session file with metadata, including an optional scheduled job ID @@ -477,28 +888,103 @@ pub async fn persist_messages_with_schedule_id( } } -/// Write messages to a session file with the provided metadata +/// Write messages to a session file with the provided metadata using atomic operations /// -/// Overwrites the file with metadata as the first line, followed by all messages in JSONL format. +/// This function uses atomic file operations to prevent corruption: +/// 1. Writes to a temporary file first +/// 2. Uses fs2 file locking to prevent concurrent writes +/// 3. Atomically moves the temp file to the final location +/// 4. Includes comprehensive error handling and recovery pub fn save_messages_with_metadata( session_file: &Path, metadata: &SessionMetadata, messages: &[Message], ) -> Result<()> { - let file = File::create(session_file).expect("The path specified does not exist"); - let mut writer = io::BufWriter::new(file); + use fs2::FileExt; - // Write metadata as the first line - serde_json::to_writer(&mut writer, &metadata)?; - writeln!(writer)?; + println!( + "[SESSION] Starting to save {} messages to: {:?}", + messages.len(), + session_file + ); - // Write all messages - for message in messages { - serde_json::to_writer(&mut writer, &message)?; - writeln!(writer)?; + // Create a temporary file in the same directory to ensure atomic move + let temp_file = session_file.with_extension("tmp"); + println!("[SESSION] Using temporary file: {:?}", temp_file); + + // Ensure the parent directory exists + if let Some(parent) = session_file.parent() { + println!("[SESSION] Ensuring parent directory exists: {:?}", parent); + fs::create_dir_all(parent)?; } - writer.flush()?; + // Create and lock the temporary file + println!("[SESSION] Creating and locking temporary file..."); + let file = fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&temp_file) + .map_err(|e| anyhow::anyhow!("Failed to create temporary file {:?}: {}", temp_file, e))?; + + // Get an exclusive lock on the file + println!("[SESSION] Acquiring exclusive lock..."); + file.try_lock_exclusive() + .map_err(|e| anyhow::anyhow!("Failed to lock file: {}", e))?; + + // Write to temporary file + { + println!( + "[SESSION] Writing metadata and {} messages to temporary file...", + messages.len() + ); + let mut writer = io::BufWriter::new(&file); + + // Write metadata as the first line + println!("[SESSION] Writing metadata as first line..."); + serde_json::to_writer(&mut writer, &metadata) + .map_err(|e| anyhow::anyhow!("Failed to serialize metadata: {}", e))?; + writeln!(writer)?; + + // Write all messages + println!("[SESSION] Writing {} messages...", messages.len()); + for (i, message) in messages.iter().enumerate() { + serde_json::to_writer(&mut writer, &message) + .map_err(|e| anyhow::anyhow!("Failed to serialize message {}: {}", i, e))?; + writeln!(writer)?; + + if (i + 1) % 50 == 0 { + println!("[SESSION] Written {} messages so far...", i + 1); + } + } + + // Ensure all data is written to disk + println!("[SESSION] Flushing writer buffer..."); + writer.flush()?; + } + + // Sync to ensure data is persisted + println!("[SESSION] Syncing data to disk..."); + file.sync_all()?; + + // Release the lock + println!("[SESSION] Releasing file lock..."); + fs2::FileExt::unlock(&file).map_err(|e| anyhow::anyhow!("Failed to unlock file: {}", e))?; + + // Atomically move the temporary file to the final location + println!("[SESSION] Atomically moving temp file to final location..."); + fs::rename(&temp_file, session_file).map_err(|e| { + // Clean up temp file on failure + println!("[SESSION] Failed to move temp file, cleaning up..."); + let _ = fs::remove_file(&temp_file); + anyhow::anyhow!("Failed to move temporary file to final location: {}", e) + })?; + + println!( + "[SESSION] Successfully saved session file: {:?}", + session_file + ); + tracing::debug!("Successfully saved session file: {:?}", session_file); Ok(()) } @@ -583,6 +1069,79 @@ mod tests { use crate::message::MessageContent; use tempfile::tempdir; + #[test] + fn test_corruption_recovery() -> Result<()> { + let test_cases = vec![ + // Case 1: Unclosed quotes + ( + r#"{"role":"user","content":[{"type":"text","text":"Hello there}]"#, + "Unclosed JSON with truncated content", + ), + // Case 2: Trailing comma + ( + r#"{"role":"user","content":[{"type":"text","text":"Test"},]}"#, + "JSON with trailing comma", + ), + // Case 3: Missing closing brace + ( + r#"{"role":"user","content":[{"type":"text","text":"Test""#, + "Incomplete JSON structure", + ), + // Case 4: Control characters in text + ( + r#"{"role":"user","content":[{"type":"text","text":"Test\u{0000}with\u{0001}control\u{0002}chars"}]}"#, + "JSON with control characters", + ), + // Case 5: Partial message with role and text + ( + r#"broken{"role": "assistant", "text": "This is recoverable content"more broken"#, + "Partial message with recoverable content", + ), + ]; + + println!("[TEST] Starting corruption recovery tests..."); + for (i, (corrupt_json, desc)) in test_cases.iter().enumerate() { + println!("\n[TEST] Case {}: {}", i + 1, desc); + println!( + "[TEST] Input: {}", + if corrupt_json.len() > 100 { + &corrupt_json[..100] + } else { + corrupt_json + } + ); + + // Try to parse the corrupted JSON + match attempt_corruption_recovery(corrupt_json, Some(50000)) { + Ok(message) => { + println!("[TEST] Successfully recovered message"); + // Verify we got some content + if let Some(MessageContent::Text(text_content)) = message.content.first() { + assert!( + !text_content.text.is_empty(), + "Recovered message should have content" + ); + println!( + "[TEST] Recovered content: {}", + if text_content.text.len() > 50 { + format!("{}...", &text_content.text[..50]) + } else { + text_content.text.clone() + } + ); + } + } + Err(e) => { + println!("[TEST] Failed to recover: {}", e); + panic!("Failed to recover from case {}: {}", i + 1, desc); + } + } + } + + println!("\n[TEST] All corruption recovery tests passed!"); + Ok(()) + } + #[tokio::test] async fn test_read_write_messages() -> Result<()> { let dir = tempdir()?;