diff --git a/Cargo.lock b/Cargo.lock index 9d6149a..3bcb32c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1569,6 +1569,7 @@ dependencies = [ "llama-cpp-2", "lsp-server", "lsp-types", + "md5", "minijinja", "once_cell", "parking_lot", diff --git a/crates/lsp-ai/Cargo.toml b/crates/lsp-ai/Cargo.toml index bdb6223..76e882b 100644 --- a/crates/lsp-ai/Cargo.toml +++ b/crates/lsp-ai/Cargo.toml @@ -36,6 +36,7 @@ tree-sitter = "0.22" utils-tree-sitter = { workspace = true, features = ["all"] } splitter-tree-sitter = { workspace = true } text-splitter = { version = "0.13.3" } +md5 = "0.7.0" [build-dependencies] cc="*" diff --git a/crates/lsp-ai/src/crawl.rs b/crates/lsp-ai/src/crawl.rs index edbd56c..2dc1721 100644 --- a/crates/lsp-ai/src/crawl.rs +++ b/crates/lsp-ai/src/crawl.rs @@ -57,7 +57,6 @@ impl Crawl { for result in WalkBuilder::new(&root_uri[7..]).build() { let result = result?; let path = result.path(); - eprintln!("CRAWLING: {}", path.display()); if !path.is_dir() { if let Some(path_str) = path.to_str() { if self.crawl_config.all_files { diff --git a/crates/lsp-ai/src/memory_backends/postgresml/mod.rs b/crates/lsp-ai/src/memory_backends/postgresml/mod.rs index a849aa8..c2ea435 100644 --- a/crates/lsp-ai/src/memory_backends/postgresml/mod.rs +++ b/crates/lsp-ai/src/memory_backends/postgresml/mod.rs @@ -2,6 +2,7 @@ use anyhow::Context; use lsp_types::TextDocumentPositionParams; use parking_lot::Mutex; use pgml::{Collection, Pipeline}; +use rand::{distributions::Alphanumeric, Rng}; use serde_json::{json, Value}; use std::{ io::Read, @@ -26,6 +27,8 @@ use super::{ ContextAndCodePrompt, FIMPrompt, MemoryBackend, MemoryRunParams, Prompt, PromptType, }; +const RESYNC_MAX_FILE_SIZE: u64 = 10_000_000; + fn chunk_to_document(uri: &str, chunk: Chunk) -> Value { json!({ "id": chunk_to_id(uri, &chunk), @@ -94,11 +97,21 @@ impl PostgresML { let database_url = if let Some(database_url) = postgresml_config.database_url { database_url } else { - std::env::var("PGML_DATABASE_URL")? + std::env::var("PGML_DATABASE_URL").context("please provide either the `database_url` in the `postgresml` config, or set the `PGML_DATABASE_URL` environment variable")? }; - // TODO: Think through Collections and Pipelines - let mut collection = Collection::new("test-lsp-ai-5", Some(database_url))?; + let collection_name = match configuration.client_params.root_uri.clone() { + Some(root_uri) => format!("{:x}", md5::compute(root_uri.as_bytes())), + None => { + warn!("no root_uri provided in server configuration - generating random string for collection name"); + rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(21) + .map(char::from) + .collect() + } + }; + let mut collection = Collection::new(&collection_name, Some(database_url))?; let mut pipeline = Pipeline::new( "v1", Some( @@ -145,7 +158,6 @@ impl PostgresML { if file_uris.is_empty() { continue; } - // Build the chunks for our changed files let chunks: Vec> = match file_uris .iter() @@ -160,11 +172,10 @@ impl PostgresML { { Ok(chunks) => chunks, Err(e) => { - error!("{e}"); + error!("{e:?}"); continue; } }; - // Delete old chunks that no longer exist after the latest file changes let delete_or_statements: Vec = file_uris .iter() @@ -196,10 +207,10 @@ impl PostgresML { .into(), ) .await + .context("PGML - error deleting documents") { - error!("PGML - Error deleting file: {e:?}"); + error!("{e:?}"); } - // Prepare and upsert our new chunks let documents: Vec = chunks .into_iter() @@ -218,7 +229,7 @@ impl PostgresML { .await .context("PGML - Error upserting changed files") { - error!("{e}"); + error!("{e:?}"); continue; } @@ -237,12 +248,105 @@ impl PostgresML { splitter, }; + // Resync our Collection + let task_s = s.clone(); + TOKIO_RUNTIME.spawn(async move { + if let Err(e) = task_s.resync().await { + error!("{e:?}") + } + }); + if let Err(e) = s.maybe_do_crawl(None) { - error!("{e}") + error!("{e:?}") } Ok(s) } + async fn resync(&self) -> anyhow::Result<()> { + let mut collection = self.collection.clone(); + + let documents = collection + .get_documents(Some( + json!({ + "limit": 100_000_000, + "keys": ["uri"] + }) + .into(), + )) + .await?; + + let try_get_file_contents = |path: &std::path::Path| { + // Open the file and see if it is small enough to read + let mut f = std::fs::File::open(path)?; + let metadata = f.metadata()?; + if metadata.len() > RESYNC_MAX_FILE_SIZE { + anyhow::bail!("file size is greater than: {RESYNC_MAX_FILE_SIZE}") + } + // Read the file contents + let mut contents = vec![]; + f.read_to_end(&mut contents)?; + anyhow::Ok(String::from_utf8(contents)?) + }; + + let mut documents_to_delete = vec![]; + let mut chunks_to_upsert = vec![]; + let mut current_chunks_bytes = 0; + for document in documents.into_iter() { + let uri = match document["document"]["uri"].as_str() { + Some(uri) => uri, + None => continue, // This should never happen, but is really bad as we now have a document with essentially no way to delete it + }; + + let path = uri.replace("file://", ""); + let path = std::path::Path::new(&path); + if !path.exists() { + documents_to_delete.push(uri.to_string()); + } else { + // Try to read the file. If we fail delete it + let contents = match try_get_file_contents(path) { + Ok(contents) => contents, + Err(e) => { + error!("{e:?}"); + documents_to_delete.push(uri.to_string()); + continue; + } + }; + // Split the file into chunks + current_chunks_bytes += contents.len(); + let chunks: Vec = self + .splitter + .split_file_contents(&uri, &contents) + .into_iter() + .map(|chunk| chunk_to_document(&uri, chunk).into()) + .collect(); + chunks_to_upsert.extend(chunks); + // If we have over 10 mega bytes of chunks do the upsert + if current_chunks_bytes > 10_000_000 { + collection + .upsert_documents(chunks_to_upsert, None) + .await + .context("PGML - error upserting documents during resync")?; + } + chunks_to_upsert = vec![]; + } + } + // Delete documents + if !documents_to_delete.is_empty() { + collection + .delete_documents( + json!({ + "uri": { + "$in": documents_to_delete + } + }) + .into(), + ) + .await + .context("PGML - error deleting documents during resync")?; + } + Ok(()) + } + fn maybe_do_crawl(&self, triggered_file: Option) -> anyhow::Result<()> { if let Some(crawl) = &self.crawl { let mut documents = vec![]; @@ -281,8 +385,8 @@ impl PostgresML { .map(|chunk| chunk_to_document(&uri, chunk).into()) .collect(); documents.extend(chunks); - // If we have over 100 mega bytes of data do the upsert - if current_bytes >= 100_000_000 || total_bytes as u64 >= config.max_crawl_memory + // If we have over 10 mega bytes of data do the upsert + if current_bytes >= 10_000_000 || total_bytes as u64 >= config.max_crawl_memory { // Upsert the documents let mut collection = self.collection.clone();