Merge 'Add a native MCP server' from Glauber Costa

The SQLite command line has facilities to ingest things like csv, and
other formats. But here we are, in 2025, and I asked Claude if Turso's
CLI should, in the same vein, have a native MCP server.
Claude told me: "You're absolutely right!" "That's a great insight!"
"That's a fantastic idea!" and then proceeded to help me with the
boilerplate for this beautiful server.
Rust has a crate, mcp_server, that implements an mcp_server trait.
However, that depends on Tokio, and I think that would bloat our binary
too much.
I have also considered implementing an MCP server that operates on a
directory and allows to list many SQLite files, but figured that would
be a good job for a more advanced and specialized server, not for the
one that comes by default with the CLI. Let's go for simple.

Closes #2148
This commit is contained in:
Pekka Enberg
2025-07-18 16:05:57 +03:00
7 changed files with 721 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -3806,6 +3806,7 @@ dependencies = [
"rustyline",
"schemars",
"serde",
"serde_json",
"shlex",
"syntect",
"toml",

View File

@@ -77,6 +77,52 @@ You can also build and run the latest development version with:
```shell
cargo run
```
### MCP Server Mode
The Turso CLI includes a built-in [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) server that allows AI assistants to interact with your databases. Start the MCP server with:
```shell
tursodb your_database.db --mcp
```
The MCP server provides seven tools for database interaction:
#### Available Tools
1. **`list_tables`** - List all tables in the database
2. **`describe_table`** - Describe the structure of a specific table
3. **`execute_query`** - Execute read-only SELECT queries
4. **`insert_data`** - Insert new data into tables
5. **`update_data`** - Update existing data in tables
6. **`delete_data`** - Delete data from tables
7. **`schema_change`** - Execute schema modification statements (CREATE TABLE, ALTER TABLE, DROP TABLE)
#### Example Usage
The MCP server runs as a single process that handles multiple JSON-RPC requests over stdin/stdout. Here's how to interact with it:
#### Example with In-Memory Database
```bash
cat << 'EOF' | tursodb --mcp
{"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "client", "version": "1.0"}}}
{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "schema_change", "arguments": {"query": "CREATE TABLE users (id INTEGER, name TEXT, email TEXT)"}}}
{"jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": {"name": "list_tables", "arguments": {}}}
{"jsonrpc": "2.0", "id": 4, "method": "tools/call", "params": {"name": "insert_data", "arguments": {"query": "INSERT INTO users VALUES (1, 'Alice', 'alice@example.com')"}}}
{"jsonrpc": "2.0", "id": 5, "method": "tools/call", "params": {"name": "execute_query", "arguments": {"query": "SELECT * FROM users"}}}
EOF
```
#### Example with Existing Database
```bash
# Working with an existing database file
cat << 'EOF' | tursodb mydb.db --mcp
{"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "client", "version": "1.0"}}}
{"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "list_tables", "arguments": {}}}
EOF
```
</details>
<details>

View File

@@ -45,6 +45,7 @@ schemars = {version = "0.8.22", features = ["preserve_order"]}
serde = { workspace = true, features = ["derive"]}
validator = {version = "0.20.0", features = ["derive"]}
toml_edit = {version = "0.22.24", features = ["serde"]}
serde_json = "1.0"
[features]
default = ["io_uring"]

View File

@@ -64,6 +64,8 @@ pub struct Opts {
pub experimental_indexes: bool,
#[clap(short = 't', long, help = "specify output file for log traces")]
pub tracing_output: Option<String>,
#[clap(long, help = "Start MCP server instead of interactive shell")]
pub mcp: bool,
}
const PROMPT: &str = "turso> ";
@@ -185,6 +187,11 @@ impl Limbo {
}
fn first_run(&mut self, sql: Option<String>, quiet: bool) -> Result<(), LimboError> {
// Skip startup messages and SQL execution in MCP mode
if self.is_mcp_mode() {
return Ok(());
}
if let Some(sql) = sql {
self.handle_first_input(&sql)?;
}
@@ -350,6 +357,18 @@ impl Limbo {
self.conn.close()
}
pub fn get_connection(&self) -> Arc<turso_core::Connection> {
self.conn.clone()
}
pub fn is_mcp_mode(&self) -> bool {
self.opts.mcp
}
pub fn get_interrupt_count(&self) -> Arc<AtomicUsize> {
self.interrupt_count.clone()
}
fn toggle_echo(&mut self, arg: EchoMode) {
match arg {
EchoMode::On => self.opts.echo = true,

View File

@@ -84,6 +84,7 @@ pub struct Settings {
pub tracing_output: Option<String>,
pub timer: bool,
pub headers: bool,
pub mcp: bool,
}
impl From<Opts> for Settings {
@@ -109,6 +110,7 @@ impl From<Opts> for Settings {
tracing_output: opts.tracing_output,
timer: false,
headers: false,
mcp: opts.mcp,
}
}
}

View File

@@ -4,9 +4,11 @@ mod commands;
mod config;
mod helper;
mod input;
mod mcp_server;
mod opcodes_dictionary;
use config::CONFIG_DIR;
use mcp_server::TursoMcpServer;
use rustyline::{error::ReadlineError, Config, Editor};
use std::{
path::PathBuf,
@@ -25,9 +27,21 @@ pub static HOME_DIR: LazyLock<PathBuf> =
pub static HISTORY_FILE: LazyLock<PathBuf> = LazyLock::new(|| HOME_DIR.join(".limbo_history"));
fn run_mcp_server(app: app::Limbo) -> anyhow::Result<()> {
let conn = app.get_connection();
let interrupt_count = app.get_interrupt_count();
let mcp_server = TursoMcpServer::new(conn, interrupt_count);
mcp_server.run()
}
fn main() -> anyhow::Result<()> {
let (mut app, _guard) = app::Limbo::new()?;
if app.is_mcp_mode() {
return run_mcp_server(app);
}
if std::io::IsTerminal::is_terminal(&std::io::stdin()) {
let mut rl = Editor::with_config(rustyline_config())?;
if HISTORY_FILE.exists() {

638
cli/mcp_server.rs Normal file
View File

@@ -0,0 +1,638 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::io::{self, BufRead, BufReader, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use turso_core::{Connection, StepResult, Value as DbValue};
#[derive(Debug, Serialize, Deserialize)]
struct JsonRpcRequest {
jsonrpc: String,
id: Option<Value>,
method: String,
params: Option<Value>,
}
#[derive(Debug, Serialize, Deserialize)]
struct JsonRpcResponse {
jsonrpc: String,
id: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize, Deserialize)]
struct JsonRpcError {
code: i32,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Value>,
}
#[derive(Debug, Serialize, Deserialize)]
struct InitializeRequest {
#[serde(rename = "protocolVersion")]
protocol_version: String,
capabilities: Value,
#[serde(rename = "clientInfo")]
client_info: Value,
}
#[derive(Debug, Serialize, Deserialize)]
struct CallToolRequest {
name: String,
arguments: Option<Value>,
}
pub struct TursoMcpServer {
conn: Arc<Connection>,
interrupt_count: Arc<AtomicUsize>,
}
impl TursoMcpServer {
pub fn new(conn: Arc<Connection>, interrupt_count: Arc<AtomicUsize>) -> Self {
Self {
conn,
interrupt_count,
}
}
pub fn run(&self) -> Result<()> {
let stdout = io::stdout();
let mut stdout_lock = stdout.lock();
// Create a channel to receive lines from stdin
let (tx, rx) = mpsc::channel();
// Spawn a thread to read from stdin
thread::spawn(move || {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
for line in reader.lines() {
match line {
Ok(line) => {
if tx.send(Ok(line)).is_err() {
break; // Main thread has dropped the receiver
}
}
Err(e) => {
let _ = tx.send(Err(e));
break;
}
}
}
});
loop {
// Check if we've been interrupted
if self.interrupt_count.load(Ordering::SeqCst) > 0 {
eprintln!("MCP server interrupted, shutting down...");
break;
}
// Try to receive a line with a timeout so we can check for interruption
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok(line)) => {
if line.trim().is_empty() {
continue;
}
let request: JsonRpcRequest = match serde_json::from_str(&line) {
Ok(req) => req,
Err(e) => {
eprintln!("Failed to parse JSON-RPC request: {}", e);
continue;
}
};
let response = self.handle_request(request);
let response_json = serde_json::to_string(&response)?;
writeln!(stdout_lock, "{}", response_json)?;
stdout_lock.flush()?;
}
Ok(Err(_)) => {
// Error reading from stdin
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Timeout - continue loop to check for interruption
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Stdin thread has finished (EOF)
break;
}
}
}
Ok(())
}
fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
match request.method.as_str() {
"initialize" => self.handle_initialize(request),
"tools/list" => self.handle_list_tools(request),
"tools/call" => self.handle_call_tool(request),
_ => JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: "Method not found".to_string(),
data: None,
}),
},
}
}
fn handle_initialize(&self, request: JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(json!({
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "turso-mcp",
"version": "1.0.0"
}
})),
error: None,
}
}
fn handle_list_tools(&self, request: JsonRpcRequest) -> JsonRpcResponse {
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(json!({
"tools": [
{
"name": "list_tables",
"description": "List all tables in the database",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "describe_table",
"description": "Describe the structure of a specific table",
"inputSchema": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe"
}
},
"required": ["table_name"]
}
},
{
"name": "execute_query",
"description": "Execute a read-only SELECT query",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SELECT query to execute"
}
},
"required": ["query"]
}
},
{
"name": "insert_data",
"description": "Insert new data into a table",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The INSERT statement to execute"
}
},
"required": ["query"]
}
},
{
"name": "update_data",
"description": "Update existing data in a table",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The UPDATE statement to execute"
}
},
"required": ["query"]
}
},
{
"name": "delete_data",
"description": "Delete data from a table",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The DELETE statement to execute"
}
},
"required": ["query"]
}
},
{
"name": "schema_change",
"description": "Execute schema modification statements (CREATE TABLE, ALTER TABLE, DROP TABLE)",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The schema modification statement to execute"
}
},
"required": ["query"]
}
}
]
})),
error: None,
}
}
fn handle_call_tool(&self, request: JsonRpcRequest) -> JsonRpcResponse {
let tool_request: CallToolRequest = match request.params.as_ref() {
Some(params) => match serde_json::from_value(params.clone()) {
Ok(req) => req,
Err(e) => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32602,
message: format!("Invalid params: {}", e),
data: None,
}),
};
}
},
None => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing params".to_string(),
data: None,
}),
};
}
};
let result = match tool_request.name.as_str() {
"list_tables" => self.list_tables(),
"describe_table" => self.describe_table(&tool_request.arguments),
"execute_query" => self.execute_query(&tool_request.arguments),
"insert_data" => self.insert_data(&tool_request.arguments),
"update_data" => self.update_data(&tool_request.arguments),
"delete_data" => self.delete_data(&tool_request.arguments),
"schema_change" => self.schema_change(&tool_request.arguments),
_ => {
return JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32601,
message: format!("Unknown tool: {}", tool_request.name),
data: None,
}),
};
}
};
JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(json!({
"content": [{
"type": "text",
"text": result
}]
})),
error: None,
}
}
fn list_tables(&self) -> String {
let query = "SELECT name FROM sqlite_schema WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY 1";
match self.conn.query(query) {
Ok(Some(mut rows)) => {
let mut tables = Vec::new();
loop {
match rows.step() {
Ok(StepResult::Row) => {
let row = rows.row().unwrap();
if let Ok(DbValue::Text(table)) = row.get::<&DbValue>(0) {
tables.push(table.to_string());
}
}
Ok(StepResult::IO) => {
if rows.run_once().is_err() {
break;
}
}
Ok(StepResult::Done) => break,
Ok(StepResult::Interrupt) => break,
Ok(StepResult::Busy) => {
return "Database is busy".to_string();
}
Err(e) => {
return format!("Error listing tables: {}", e);
}
}
}
if tables.is_empty() {
"No tables found in the database".to_string()
} else {
tables.join(", ")
}
}
Ok(None) => "No results returned from the query".to_string(),
Err(e) => format!("Error querying database: {}", e),
}
}
fn describe_table(&self, arguments: &Option<Value>) -> String {
let table_name = match arguments {
Some(args) => match args.get("table_name") {
Some(Value::String(name)) => name,
_ => return "Missing or invalid table_name parameter".to_string(),
},
None => return "Missing table_name parameter".to_string(),
};
let query = format!("PRAGMA table_info({})", table_name);
match self.conn.query(&query) {
Ok(Some(mut rows)) => {
let mut columns = Vec::new();
loop {
match rows.step() {
Ok(StepResult::Row) => {
let row = rows.row().unwrap();
if let (
Ok(col_name),
Ok(col_type),
Ok(not_null),
Ok(default_value),
Ok(pk),
) = (
row.get::<&DbValue>(1),
row.get::<&DbValue>(2),
row.get::<&DbValue>(3),
row.get::<&DbValue>(4),
row.get::<&DbValue>(5),
) {
let default_str = if matches!(default_value, DbValue::Null) {
"".to_string()
} else {
format!("DEFAULT {}", default_value)
};
columns.push(
format!(
"{} {} {} {} {}",
col_name,
col_type,
if matches!(not_null, DbValue::Integer(1)) {
"NOT NULL"
} else {
"NULL"
},
default_str,
if matches!(pk, DbValue::Integer(1)) {
"PRIMARY KEY"
} else {
""
}
)
.trim()
.to_string(),
);
}
}
Ok(StepResult::IO) => {
if rows.run_once().is_err() {
break;
}
}
Ok(StepResult::Done) => break,
Ok(StepResult::Interrupt) => break,
Ok(StepResult::Busy) => {
return "Database is busy".to_string();
}
Err(e) => {
return format!("Error describing table: {}", e);
}
}
}
if columns.is_empty() {
format!("Table '{}' not found", table_name)
} else {
format!("Table '{}' columns:\n{}", table_name, columns.join("\n"))
}
}
Ok(None) => format!("Table '{}' not found", table_name),
Err(e) => format!("Error querying database: {}", e),
}
}
fn execute_query(&self, arguments: &Option<Value>) -> String {
let query = match arguments {
Some(args) => match args.get("query") {
Some(Value::String(q)) => q,
_ => return "Missing or invalid query parameter".to_string(),
},
None => return "Missing query parameter".to_string(),
};
// Basic validation to ensure it's a read-only query
let trimmed_query = query.trim().to_lowercase();
if !trimmed_query.starts_with("select") {
return "Only SELECT queries are allowed".to_string();
}
match self.conn.query(query) {
Ok(Some(mut rows)) => {
let mut results = Vec::new();
// Get column names
let headers: Vec<String> = (0..rows.num_columns())
.map(|i| rows.get_column_name(i).to_string())
.collect();
// Get the data
loop {
match rows.step() {
Ok(StepResult::Row) => {
let row = rows.row().unwrap();
let mut row_data = Vec::new();
for value in row.get_values() {
row_data.push(value.to_string());
}
results.push(row_data);
}
Ok(StepResult::IO) => {
if rows.run_once().is_err() {
break;
}
}
Ok(StepResult::Done) => break,
Ok(StepResult::Interrupt) => break,
Ok(StepResult::Busy) => {
return "Database is busy".to_string();
}
Err(e) => {
return format!("Error executing query: {}", e);
}
}
}
// Format results as text table
let mut output = String::new();
if !headers.is_empty() {
output.push_str(&headers.join(" | "));
output.push('\n');
output.push_str(&"-".repeat(headers.join(" | ").len()));
output.push('\n');
}
for row in results {
output.push_str(&row.join(" | "));
output.push('\n');
}
if output.is_empty() {
"No results returned from the query".to_string()
} else {
output
}
}
Ok(None) => "No results returned from the query".to_string(),
Err(e) => format!("Error executing query: {}", e),
}
}
fn insert_data(&self, arguments: &Option<Value>) -> String {
let query = match arguments {
Some(args) => match args.get("query") {
Some(Value::String(q)) => q,
_ => return "Missing or invalid query parameter".to_string(),
},
None => return "Missing query parameter".to_string(),
};
// Basic validation to ensure it's an INSERT query
let trimmed_query = query.trim().to_lowercase();
if !trimmed_query.starts_with("insert") {
return "Only INSERT statements are allowed".to_string();
}
match self.conn.execute(query) {
Ok(()) => "INSERT successful.".to_string(),
Err(e) => format!("Error executing INSERT: {}", e),
}
}
fn update_data(&self, arguments: &Option<Value>) -> String {
let query = match arguments {
Some(args) => match args.get("query") {
Some(Value::String(q)) => q,
_ => return "Missing or invalid query parameter".to_string(),
},
None => return "Missing query parameter".to_string(),
};
// Basic validation to ensure it's an UPDATE query
let trimmed_query = query.trim().to_lowercase();
if !trimmed_query.starts_with("update") {
return "Only UPDATE statements are allowed".to_string();
}
match self.conn.execute(query) {
Ok(()) => "UPDATE successful.".to_string(),
Err(e) => format!("Error executing UPDATE: {}", e),
}
}
fn delete_data(&self, arguments: &Option<Value>) -> String {
let query = match arguments {
Some(args) => match args.get("query") {
Some(Value::String(q)) => q,
_ => return "Missing or invalid query parameter".to_string(),
},
None => return "Missing query parameter".to_string(),
};
// Basic validation to ensure it's a DELETE query
let trimmed_query = query.trim().to_lowercase();
if !trimmed_query.starts_with("delete") {
return "Only DELETE statements are allowed".to_string();
}
match self.conn.execute(query) {
Ok(()) => "DELETE successful.".to_string(),
Err(e) => format!("Error executing DELETE: {}", e),
}
}
fn schema_change(&self, arguments: &Option<Value>) -> String {
let query = match arguments {
Some(args) => match args.get("query") {
Some(Value::String(q)) => q,
_ => return "Missing or invalid query parameter".to_string(),
},
None => return "Missing query parameter".to_string(),
};
// Basic validation to ensure it's a schema modification query
let trimmed_query = query.trim().to_lowercase();
if !trimmed_query.starts_with("create")
&& !trimmed_query.starts_with("alter")
&& !trimmed_query.starts_with("drop")
{
return "Only CREATE, ALTER, and DROP statements are allowed".to_string();
}
match self.conn.execute(query) {
Ok(()) => "Schema change successful.".to_string(),
Err(e) => format!("Error executing schema change: {}", e),
}
}
}