From 32532b4febcdce861aae0ceb92faa4b374a5f873 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 17 Jul 2025 15:05:29 -0500 Subject: [PATCH] Add a native MCP server The SQLite command line has facilities to ingest things like csv, and other formats. But here we are, in 2025, and I asked Claude if Turso's CLI should, in the same vein, have a native MCP server. Claude told me: "You're absolutely right!" "That's a great insight!" "That's a fantastic idea!" and then proceeded to help me with the boilerplate for this beautiful server. Rust has a crate, mcp_server, that implements an mcp_server trait. However, that depends on Tokio, and I think that would bloat our binary too much. I have also considered implementing an MCP server that operates on a directory and allows to list many SQLite files, but figured that would be a good job for a more advanced and specialized server, not for the one that comes by default with the CLI. Let's go for simple. --- Cargo.lock | 1 + README.md | 46 ++++ cli/Cargo.toml | 1 + cli/app.rs | 19 ++ cli/input.rs | 2 + cli/main.rs | 14 + cli/mcp_server.rs | 638 ++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 721 insertions(+) create mode 100644 cli/mcp_server.rs diff --git a/Cargo.lock b/Cargo.lock index 7fa54ae6e..f9219cd89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3806,6 +3806,7 @@ dependencies = [ "rustyline", "schemars", "serde", + "serde_json", "shlex", "syntect", "toml", diff --git a/README.md b/README.md index 4b8cffca2..39e4bcecb 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,52 @@ You can also build and run the latest development version with: ```shell cargo run ``` + +### MCP Server Mode + +The Turso CLI includes a built-in [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) server that allows AI assistants to interact with your databases. Start the MCP server with: + +```shell +tursodb your_database.db --mcp +``` + +The MCP server provides seven tools for database interaction: + +#### Available Tools + +1. **`list_tables`** - List all tables in the database +2. **`describe_table`** - Describe the structure of a specific table +3. **`execute_query`** - Execute read-only SELECT queries +4. **`insert_data`** - Insert new data into tables +5. **`update_data`** - Update existing data in tables +6. **`delete_data`** - Delete data from tables +7. **`schema_change`** - Execute schema modification statements (CREATE TABLE, ALTER TABLE, DROP TABLE) + +#### Example Usage + +The MCP server runs as a single process that handles multiple JSON-RPC requests over stdin/stdout. Here's how to interact with it: + +#### Example with In-Memory Database + +```bash +cat << 'EOF' | tursodb --mcp +{"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "client", "version": "1.0"}}} +{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "schema_change", "arguments": {"query": "CREATE TABLE users (id INTEGER, name TEXT, email TEXT)"}}} +{"jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": {"name": "list_tables", "arguments": {}}} +{"jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": {"name": "insert_data", "arguments": {"query": "INSERT INTO users VALUES (1, 'Alice', 'alice@example.com')"}}} +{"jsonrpc": "2.0", "id": 5, "method": "tools/call", "params": {"name": "execute_query", "arguments": {"query": "SELECT * FROM users"}}} +EOF +``` + +#### Example with Existing Database + +```bash +# Working with an existing database file +cat << 'EOF' | tursodb mydb.db --mcp +{"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "client", "version": "1.0"}}} +{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "list_tables", "arguments": {}}} +EOF +```
diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 5519f7ae8..cf64436a1 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -45,6 +45,7 @@ schemars = {version = "0.8.22", features = ["preserve_order"]} serde = { workspace = true, features = ["derive"]} validator = {version = "0.20.0", features = ["derive"]} toml_edit = {version = "0.22.24", features = ["serde"]} +serde_json = "1.0" [features] default = ["io_uring"] diff --git a/cli/app.rs b/cli/app.rs index 3ce052711..498241605 100644 --- a/cli/app.rs +++ b/cli/app.rs @@ -64,6 +64,8 @@ pub struct Opts { pub experimental_indexes: bool, #[clap(short = 't', long, help = "specify output file for log traces")] pub tracing_output: Option, + #[clap(long, help = "Start MCP server instead of interactive shell")] + pub mcp: bool, } const PROMPT: &str = "turso> "; @@ -185,6 +187,11 @@ impl Limbo { } fn first_run(&mut self, sql: Option, quiet: bool) -> Result<(), LimboError> { + // Skip startup messages and SQL execution in MCP mode + if self.is_mcp_mode() { + return Ok(()); + } + if let Some(sql) = sql { self.handle_first_input(&sql)?; } @@ -350,6 +357,18 @@ impl Limbo { self.conn.close() } + pub fn get_connection(&self) -> Arc { + self.conn.clone() + } + + pub fn is_mcp_mode(&self) -> bool { + self.opts.mcp + } + + pub fn get_interrupt_count(&self) -> Arc { + self.interrupt_count.clone() + } + fn toggle_echo(&mut self, arg: EchoMode) { match arg { EchoMode::On => self.opts.echo = true, diff --git a/cli/input.rs b/cli/input.rs index 1f9be125a..447c741f0 100644 --- a/cli/input.rs +++ b/cli/input.rs @@ -84,6 +84,7 @@ pub struct Settings { pub tracing_output: Option, pub timer: bool, pub headers: bool, + pub mcp: bool, } impl From for Settings { @@ -109,6 +110,7 @@ impl From for Settings { tracing_output: opts.tracing_output, timer: false, headers: false, + mcp: opts.mcp, } } } diff --git a/cli/main.rs b/cli/main.rs index 13e50e7d3..f60aaaad5 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -4,9 +4,11 @@ mod commands; mod config; mod helper; mod input; +mod mcp_server; mod opcodes_dictionary; use config::CONFIG_DIR; +use mcp_server::TursoMcpServer; use rustyline::{error::ReadlineError, Config, Editor}; use std::{ path::PathBuf, @@ -25,9 +27,21 @@ pub static HOME_DIR: LazyLock = pub static HISTORY_FILE: LazyLock = LazyLock::new(|| HOME_DIR.join(".limbo_history")); +fn run_mcp_server(app: app::Limbo) -> anyhow::Result<()> { + let conn = app.get_connection(); + let interrupt_count = app.get_interrupt_count(); + let mcp_server = TursoMcpServer::new(conn, interrupt_count); + + mcp_server.run() +} + fn main() -> anyhow::Result<()> { let (mut app, _guard) = app::Limbo::new()?; + if app.is_mcp_mode() { + return run_mcp_server(app); + } + if std::io::IsTerminal::is_terminal(&std::io::stdin()) { let mut rl = Editor::with_config(rustyline_config())?; if HISTORY_FILE.exists() { diff --git a/cli/mcp_server.rs b/cli/mcp_server.rs new file mode 100644 index 000000000..5043a56d2 --- /dev/null +++ b/cli/mcp_server.rs @@ -0,0 +1,638 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::io::{self, BufRead, BufReader, Write}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use turso_core::{Connection, StepResult, Value as DbValue}; + +#[derive(Debug, Serialize, Deserialize)] +struct JsonRpcRequest { + jsonrpc: String, + id: Option, + method: String, + params: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JsonRpcResponse { + jsonrpc: String, + id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JsonRpcError { + code: i32, + message: String, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct InitializeRequest { + #[serde(rename = "protocolVersion")] + protocol_version: String, + capabilities: Value, + #[serde(rename = "clientInfo")] + client_info: Value, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CallToolRequest { + name: String, + arguments: Option, +} + +pub struct TursoMcpServer { + conn: Arc, + interrupt_count: Arc, +} + +impl TursoMcpServer { + pub fn new(conn: Arc, interrupt_count: Arc) -> Self { + Self { + conn, + interrupt_count, + } + } + + pub fn run(&self) -> Result<()> { + let stdout = io::stdout(); + let mut stdout_lock = stdout.lock(); + + // Create a channel to receive lines from stdin + let (tx, rx) = mpsc::channel(); + + // Spawn a thread to read from stdin + thread::spawn(move || { + let stdin = io::stdin(); + let reader = BufReader::new(stdin); + + for line in reader.lines() { + match line { + Ok(line) => { + if tx.send(Ok(line)).is_err() { + break; // Main thread has dropped the receiver + } + } + Err(e) => { + let _ = tx.send(Err(e)); + break; + } + } + } + }); + + loop { + // Check if we've been interrupted + if self.interrupt_count.load(Ordering::SeqCst) > 0 { + eprintln!("MCP server interrupted, shutting down..."); + break; + } + + // Try to receive a line with a timeout so we can check for interruption + match rx.recv_timeout(Duration::from_millis(100)) { + Ok(Ok(line)) => { + if line.trim().is_empty() { + continue; + } + + let request: JsonRpcRequest = match serde_json::from_str(&line) { + Ok(req) => req, + Err(e) => { + eprintln!("Failed to parse JSON-RPC request: {}", e); + continue; + } + }; + + let response = self.handle_request(request); + let response_json = serde_json::to_string(&response)?; + writeln!(stdout_lock, "{}", response_json)?; + stdout_lock.flush()?; + } + Ok(Err(_)) => { + // Error reading from stdin + break; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // Timeout - continue loop to check for interruption + continue; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + // Stdin thread has finished (EOF) + break; + } + } + } + + Ok(()) + } + + fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse { + match request.method.as_str() { + "initialize" => self.handle_initialize(request), + "tools/list" => self.handle_list_tools(request), + "tools/call" => self.handle_call_tool(request), + _ => JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: -32601, + message: "Method not found".to_string(), + data: None, + }), + }, + } + } + + fn handle_initialize(&self, request: JsonRpcRequest) -> JsonRpcResponse { + JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(json!({ + "protocolVersion": "2024-11-05", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "turso-mcp", + "version": "1.0.0" + } + })), + error: None, + } + } + + fn handle_list_tools(&self, request: JsonRpcRequest) -> JsonRpcResponse { + JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(json!({ + "tools": [ + { + "name": "list_tables", + "description": "List all tables in the database", + "inputSchema": { + "type": "object", + "properties": {}, + "required": [] + } + }, + { + "name": "describe_table", + "description": "Describe the structure of a specific table", + "inputSchema": { + "type": "object", + "properties": { + "table_name": { + "type": "string", + "description": "Name of the table to describe" + } + }, + "required": ["table_name"] + } + }, + { + "name": "execute_query", + "description": "Execute a read-only SELECT query", + "inputSchema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The SELECT query to execute" + } + }, + "required": ["query"] + } + }, + { + "name": "insert_data", + "description": "Insert new data into a table", + "inputSchema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The INSERT statement to execute" + } + }, + "required": ["query"] + } + }, + { + "name": "update_data", + "description": "Update existing data in a table", + "inputSchema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The UPDATE statement to execute" + } + }, + "required": ["query"] + } + }, + { + "name": "delete_data", + "description": "Delete data from a table", + "inputSchema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The DELETE statement to execute" + } + }, + "required": ["query"] + } + }, + { + "name": "schema_change", + "description": "Execute schema modification statements (CREATE TABLE, ALTER TABLE, DROP TABLE)", + "inputSchema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The schema modification statement to execute" + } + }, + "required": ["query"] + } + } + ] + })), + error: None, + } + } + + fn handle_call_tool(&self, request: JsonRpcRequest) -> JsonRpcResponse { + let tool_request: CallToolRequest = match request.params.as_ref() { + Some(params) => match serde_json::from_value(params.clone()) { + Ok(req) => req, + Err(e) => { + return JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: -32602, + message: format!("Invalid params: {}", e), + data: None, + }), + }; + } + }, + None => { + return JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing params".to_string(), + data: None, + }), + }; + } + }; + + let result = match tool_request.name.as_str() { + "list_tables" => self.list_tables(), + "describe_table" => self.describe_table(&tool_request.arguments), + "execute_query" => self.execute_query(&tool_request.arguments), + "insert_data" => self.insert_data(&tool_request.arguments), + "update_data" => self.update_data(&tool_request.arguments), + "delete_data" => self.delete_data(&tool_request.arguments), + "schema_change" => self.schema_change(&tool_request.arguments), + _ => { + return JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: -32601, + message: format!("Unknown tool: {}", tool_request.name), + data: None, + }), + }; + } + }; + + JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: request.id, + result: Some(json!({ + "content": [{ + "type": "text", + "text": result + }] + })), + error: None, + } + } + + fn list_tables(&self) -> String { + let query = "SELECT name FROM sqlite_schema WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY 1"; + + match self.conn.query(query) { + Ok(Some(mut rows)) => { + let mut tables = Vec::new(); + + loop { + match rows.step() { + Ok(StepResult::Row) => { + let row = rows.row().unwrap(); + if let Ok(DbValue::Text(table)) = row.get::<&DbValue>(0) { + tables.push(table.to_string()); + } + } + Ok(StepResult::IO) => { + if rows.run_once().is_err() { + break; + } + } + Ok(StepResult::Done) => break, + Ok(StepResult::Interrupt) => break, + Ok(StepResult::Busy) => { + return "Database is busy".to_string(); + } + Err(e) => { + return format!("Error listing tables: {}", e); + } + } + } + + if tables.is_empty() { + "No tables found in the database".to_string() + } else { + tables.join(", ") + } + } + Ok(None) => "No results returned from the query".to_string(), + Err(e) => format!("Error querying database: {}", e), + } + } + + fn describe_table(&self, arguments: &Option) -> String { + let table_name = match arguments { + Some(args) => match args.get("table_name") { + Some(Value::String(name)) => name, + _ => return "Missing or invalid table_name parameter".to_string(), + }, + None => return "Missing table_name parameter".to_string(), + }; + + let query = format!("PRAGMA table_info({})", table_name); + + match self.conn.query(&query) { + Ok(Some(mut rows)) => { + let mut columns = Vec::new(); + + loop { + match rows.step() { + Ok(StepResult::Row) => { + let row = rows.row().unwrap(); + if let ( + Ok(col_name), + Ok(col_type), + Ok(not_null), + Ok(default_value), + Ok(pk), + ) = ( + row.get::<&DbValue>(1), + row.get::<&DbValue>(2), + row.get::<&DbValue>(3), + row.get::<&DbValue>(4), + row.get::<&DbValue>(5), + ) { + let default_str = if matches!(default_value, DbValue::Null) { + "".to_string() + } else { + format!("DEFAULT {}", default_value) + }; + + columns.push( + format!( + "{} {} {} {} {}", + col_name, + col_type, + if matches!(not_null, DbValue::Integer(1)) { + "NOT NULL" + } else { + "NULL" + }, + default_str, + if matches!(pk, DbValue::Integer(1)) { + "PRIMARY KEY" + } else { + "" + } + ) + .trim() + .to_string(), + ); + } + } + Ok(StepResult::IO) => { + if rows.run_once().is_err() { + break; + } + } + Ok(StepResult::Done) => break, + Ok(StepResult::Interrupt) => break, + Ok(StepResult::Busy) => { + return "Database is busy".to_string(); + } + Err(e) => { + return format!("Error describing table: {}", e); + } + } + } + + if columns.is_empty() { + format!("Table '{}' not found", table_name) + } else { + format!("Table '{}' columns:\n{}", table_name, columns.join("\n")) + } + } + Ok(None) => format!("Table '{}' not found", table_name), + Err(e) => format!("Error querying database: {}", e), + } + } + + fn execute_query(&self, arguments: &Option) -> String { + let query = match arguments { + Some(args) => match args.get("query") { + Some(Value::String(q)) => q, + _ => return "Missing or invalid query parameter".to_string(), + }, + None => return "Missing query parameter".to_string(), + }; + + // Basic validation to ensure it's a read-only query + let trimmed_query = query.trim().to_lowercase(); + if !trimmed_query.starts_with("select") { + return "Only SELECT queries are allowed".to_string(); + } + + match self.conn.query(query) { + Ok(Some(mut rows)) => { + let mut results = Vec::new(); + + // Get column names + let headers: Vec = (0..rows.num_columns()) + .map(|i| rows.get_column_name(i).to_string()) + .collect(); + + // Get the data + loop { + match rows.step() { + Ok(StepResult::Row) => { + let row = rows.row().unwrap(); + let mut row_data = Vec::new(); + + for value in row.get_values() { + row_data.push(value.to_string()); + } + + results.push(row_data); + } + Ok(StepResult::IO) => { + if rows.run_once().is_err() { + break; + } + } + Ok(StepResult::Done) => break, + Ok(StepResult::Interrupt) => break, + Ok(StepResult::Busy) => { + return "Database is busy".to_string(); + } + Err(e) => { + return format!("Error executing query: {}", e); + } + } + } + + // Format results as text table + let mut output = String::new(); + if !headers.is_empty() { + output.push_str(&headers.join(" | ")); + output.push('\n'); + output.push_str(&"-".repeat(headers.join(" | ").len())); + output.push('\n'); + } + + for row in results { + output.push_str(&row.join(" | ")); + output.push('\n'); + } + + if output.is_empty() { + "No results returned from the query".to_string() + } else { + output + } + } + Ok(None) => "No results returned from the query".to_string(), + Err(e) => format!("Error executing query: {}", e), + } + } + + fn insert_data(&self, arguments: &Option) -> String { + let query = match arguments { + Some(args) => match args.get("query") { + Some(Value::String(q)) => q, + _ => return "Missing or invalid query parameter".to_string(), + }, + None => return "Missing query parameter".to_string(), + }; + + // Basic validation to ensure it's an INSERT query + let trimmed_query = query.trim().to_lowercase(); + if !trimmed_query.starts_with("insert") { + return "Only INSERT statements are allowed".to_string(); + } + + match self.conn.execute(query) { + Ok(()) => "INSERT successful.".to_string(), + Err(e) => format!("Error executing INSERT: {}", e), + } + } + + fn update_data(&self, arguments: &Option) -> String { + let query = match arguments { + Some(args) => match args.get("query") { + Some(Value::String(q)) => q, + _ => return "Missing or invalid query parameter".to_string(), + }, + None => return "Missing query parameter".to_string(), + }; + + // Basic validation to ensure it's an UPDATE query + let trimmed_query = query.trim().to_lowercase(); + if !trimmed_query.starts_with("update") { + return "Only UPDATE statements are allowed".to_string(); + } + + match self.conn.execute(query) { + Ok(()) => "UPDATE successful.".to_string(), + Err(e) => format!("Error executing UPDATE: {}", e), + } + } + + fn delete_data(&self, arguments: &Option) -> String { + let query = match arguments { + Some(args) => match args.get("query") { + Some(Value::String(q)) => q, + _ => return "Missing or invalid query parameter".to_string(), + }, + None => return "Missing query parameter".to_string(), + }; + + // Basic validation to ensure it's a DELETE query + let trimmed_query = query.trim().to_lowercase(); + if !trimmed_query.starts_with("delete") { + return "Only DELETE statements are allowed".to_string(); + } + + match self.conn.execute(query) { + Ok(()) => "DELETE successful.".to_string(), + Err(e) => format!("Error executing DELETE: {}", e), + } + } + + fn schema_change(&self, arguments: &Option) -> String { + let query = match arguments { + Some(args) => match args.get("query") { + Some(Value::String(q)) => q, + _ => return "Missing or invalid query parameter".to_string(), + }, + None => return "Missing query parameter".to_string(), + }; + + // Basic validation to ensure it's a schema modification query + let trimmed_query = query.trim().to_lowercase(); + if !trimmed_query.starts_with("create") + && !trimmed_query.starts_with("alter") + && !trimmed_query.starts_with("drop") + { + return "Only CREATE, ALTER, and DROP statements are allowed".to_string(); + } + + match self.conn.execute(query) { + Ok(()) => "Schema change successful.".to_string(), + Err(e) => format!("Error executing schema change: {}", e), + } + } +}