mirror of
https://github.com/lWolvesl/claw-code.git
synced 2026-04-03 00:51:52 +08:00
The runtime crate already had typed MCP config parsing, bootstrap metadata, and stdio JSON-RPC transport primitives, but it lacked the stateful layer that owns configured subprocesses and routes discovered tools back to the right server. This change adds a thin lazy McpServerManager in mcp_stdio, keeps unsupported transports explicit, and locks the behavior with subprocess-backed discovery, routing, reuse, shutdown, and error tests. Constraint: Keep the change narrow to the runtime crate and stdio transport only Constraint: Reuse existing MCP config/bootstrap/process helpers instead of adding new dependencies Rejected: Eagerly spawn all configured servers at construction | unnecessary startup cost and failure coupling Rejected: Spawn a fresh process per request | defeats lifecycle management and tool routing cache Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep higher-level runtime/session integration separate until a caller needs this manager surface Tested: cargo fmt --all; cargo clippy -p runtime --all-targets -- -D warnings; cargo test -p runtime Not-tested: Integration into conversation/runtime flows outside direct manager APIs
1698 lines
60 KiB
Rust
1698 lines
60 KiB
Rust
use std::collections::BTreeMap;
|
|
use std::io;
|
|
use std::process::Stdio;
|
|
|
|
use serde::de::DeserializeOwned;
|
|
use serde::{Deserialize, Serialize};
|
|
use serde_json::Value as JsonValue;
|
|
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
|
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
|
|
|
|
use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
|
|
use crate::mcp::mcp_tool_name;
|
|
use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(untagged)]
|
|
pub enum JsonRpcId {
|
|
Number(u64),
|
|
String(String),
|
|
Null,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct JsonRpcRequest<T = JsonValue> {
|
|
pub jsonrpc: String,
|
|
pub id: JsonRpcId,
|
|
pub method: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub params: Option<T>,
|
|
}
|
|
|
|
impl<T> JsonRpcRequest<T> {
|
|
#[must_use]
|
|
pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
|
|
Self {
|
|
jsonrpc: "2.0".to_string(),
|
|
id,
|
|
method: method.into(),
|
|
params,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct JsonRpcError {
|
|
pub code: i64,
|
|
pub message: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub data: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct JsonRpcResponse<T = JsonValue> {
|
|
pub jsonrpc: String,
|
|
pub id: JsonRpcId,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub result: Option<T>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub error: Option<JsonRpcError>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpInitializeParams {
|
|
pub protocol_version: String,
|
|
pub capabilities: JsonValue,
|
|
pub client_info: McpInitializeClientInfo,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpInitializeClientInfo {
|
|
pub name: String,
|
|
pub version: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpInitializeResult {
|
|
pub protocol_version: String,
|
|
pub capabilities: JsonValue,
|
|
pub server_info: McpInitializeServerInfo,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpInitializeServerInfo {
|
|
pub name: String,
|
|
pub version: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpListToolsParams {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub cursor: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct McpTool {
|
|
pub name: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub description: Option<String>,
|
|
#[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
|
|
pub input_schema: Option<JsonValue>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub annotations: Option<JsonValue>,
|
|
#[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
|
|
pub meta: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpListToolsResult {
|
|
pub tools: Vec<McpTool>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub next_cursor: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpToolCallParams {
|
|
pub name: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub arguments: Option<JsonValue>,
|
|
#[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
|
|
pub meta: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct McpToolCallContent {
|
|
#[serde(rename = "type")]
|
|
pub kind: String,
|
|
#[serde(flatten)]
|
|
pub data: BTreeMap<String, JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpToolCallResult {
|
|
#[serde(default)]
|
|
pub content: Vec<McpToolCallContent>,
|
|
#[serde(default)]
|
|
pub structured_content: Option<JsonValue>,
|
|
#[serde(default)]
|
|
pub is_error: Option<bool>,
|
|
#[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
|
|
pub meta: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpListResourcesParams {
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub cursor: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct McpResource {
|
|
pub uri: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub name: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub description: Option<String>,
|
|
#[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
|
|
pub mime_type: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub annotations: Option<JsonValue>,
|
|
#[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
|
|
pub meta: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpListResourcesResult {
|
|
pub resources: Vec<McpResource>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub next_cursor: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct McpReadResourceParams {
|
|
pub uri: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct McpResourceContents {
|
|
pub uri: String,
|
|
#[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
|
|
pub mime_type: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub text: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub blob: Option<String>,
|
|
#[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
|
|
pub meta: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
pub struct McpReadResourceResult {
|
|
pub contents: Vec<McpResourceContents>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct ManagedMcpTool {
|
|
pub server_name: String,
|
|
pub qualified_name: String,
|
|
pub raw_name: String,
|
|
pub tool: McpTool,
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct UnsupportedMcpServer {
|
|
pub server_name: String,
|
|
pub transport: McpTransport,
|
|
pub reason: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum McpServerManagerError {
|
|
Io(io::Error),
|
|
JsonRpc {
|
|
server_name: String,
|
|
method: &'static str,
|
|
error: JsonRpcError,
|
|
},
|
|
InvalidResponse {
|
|
server_name: String,
|
|
method: &'static str,
|
|
details: String,
|
|
},
|
|
UnknownTool {
|
|
qualified_name: String,
|
|
},
|
|
UnknownServer {
|
|
server_name: String,
|
|
},
|
|
}
|
|
|
|
impl std::fmt::Display for McpServerManagerError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Self::Io(error) => write!(f, "{error}"),
|
|
Self::JsonRpc {
|
|
server_name,
|
|
method,
|
|
error,
|
|
} => write!(
|
|
f,
|
|
"MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
|
|
error.message, error.code
|
|
),
|
|
Self::InvalidResponse {
|
|
server_name,
|
|
method,
|
|
details,
|
|
} => write!(
|
|
f,
|
|
"MCP server `{server_name}` returned invalid response for {method}: {details}"
|
|
),
|
|
Self::UnknownTool { qualified_name } => {
|
|
write!(f, "unknown MCP tool `{qualified_name}`")
|
|
}
|
|
Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for McpServerManagerError {
|
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
match self {
|
|
Self::Io(error) => Some(error),
|
|
Self::JsonRpc { .. }
|
|
| Self::InvalidResponse { .. }
|
|
| Self::UnknownTool { .. }
|
|
| Self::UnknownServer { .. } => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<io::Error> for McpServerManagerError {
|
|
fn from(value: io::Error) -> Self {
|
|
Self::Io(value)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
struct ToolRoute {
|
|
server_name: String,
|
|
raw_name: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct ManagedMcpServer {
|
|
bootstrap: McpClientBootstrap,
|
|
process: Option<McpStdioProcess>,
|
|
initialized: bool,
|
|
}
|
|
|
|
impl ManagedMcpServer {
|
|
fn new(bootstrap: McpClientBootstrap) -> Self {
|
|
Self {
|
|
bootstrap,
|
|
process: None,
|
|
initialized: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct McpServerManager {
|
|
servers: BTreeMap<String, ManagedMcpServer>,
|
|
unsupported_servers: Vec<UnsupportedMcpServer>,
|
|
tool_index: BTreeMap<String, ToolRoute>,
|
|
next_request_id: u64,
|
|
}
|
|
|
|
impl McpServerManager {
|
|
#[must_use]
|
|
pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
|
|
Self::from_servers(config.mcp().servers())
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
|
|
let mut managed_servers = BTreeMap::new();
|
|
let mut unsupported_servers = Vec::new();
|
|
|
|
for (server_name, server_config) in servers {
|
|
if server_config.transport() == McpTransport::Stdio {
|
|
let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
|
|
managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
|
|
} else {
|
|
unsupported_servers.push(UnsupportedMcpServer {
|
|
server_name: server_name.clone(),
|
|
transport: server_config.transport(),
|
|
reason: format!(
|
|
"transport {:?} is not supported by McpServerManager",
|
|
server_config.transport()
|
|
),
|
|
});
|
|
}
|
|
}
|
|
|
|
Self {
|
|
servers: managed_servers,
|
|
unsupported_servers,
|
|
tool_index: BTreeMap::new(),
|
|
next_request_id: 1,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
|
|
&self.unsupported_servers
|
|
}
|
|
|
|
pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
|
|
let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
|
|
let mut discovered_tools = Vec::new();
|
|
|
|
for server_name in server_names {
|
|
self.ensure_server_ready(&server_name).await?;
|
|
self.clear_routes_for_server(&server_name);
|
|
|
|
let mut cursor = None;
|
|
loop {
|
|
let request_id = self.take_request_id();
|
|
let response = {
|
|
let server = self.server_mut(&server_name)?;
|
|
let process = server.process.as_mut().ok_or_else(|| {
|
|
McpServerManagerError::InvalidResponse {
|
|
server_name: server_name.clone(),
|
|
method: "tools/list",
|
|
details: "server process missing after initialization".to_string(),
|
|
}
|
|
})?;
|
|
process
|
|
.list_tools(
|
|
request_id,
|
|
Some(McpListToolsParams {
|
|
cursor: cursor.clone(),
|
|
}),
|
|
)
|
|
.await?
|
|
};
|
|
|
|
if let Some(error) = response.error {
|
|
return Err(McpServerManagerError::JsonRpc {
|
|
server_name: server_name.clone(),
|
|
method: "tools/list",
|
|
error,
|
|
});
|
|
}
|
|
|
|
let result =
|
|
response
|
|
.result
|
|
.ok_or_else(|| McpServerManagerError::InvalidResponse {
|
|
server_name: server_name.clone(),
|
|
method: "tools/list",
|
|
details: "missing result payload".to_string(),
|
|
})?;
|
|
|
|
for tool in result.tools {
|
|
let qualified_name = mcp_tool_name(&server_name, &tool.name);
|
|
self.tool_index.insert(
|
|
qualified_name.clone(),
|
|
ToolRoute {
|
|
server_name: server_name.clone(),
|
|
raw_name: tool.name.clone(),
|
|
},
|
|
);
|
|
discovered_tools.push(ManagedMcpTool {
|
|
server_name: server_name.clone(),
|
|
qualified_name,
|
|
raw_name: tool.name.clone(),
|
|
tool,
|
|
});
|
|
}
|
|
|
|
match result.next_cursor {
|
|
Some(next_cursor) => cursor = Some(next_cursor),
|
|
None => break,
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(discovered_tools)
|
|
}
|
|
|
|
pub async fn call_tool(
|
|
&mut self,
|
|
qualified_tool_name: &str,
|
|
arguments: Option<JsonValue>,
|
|
) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
|
|
let route = self
|
|
.tool_index
|
|
.get(qualified_tool_name)
|
|
.cloned()
|
|
.ok_or_else(|| McpServerManagerError::UnknownTool {
|
|
qualified_name: qualified_tool_name.to_string(),
|
|
})?;
|
|
|
|
self.ensure_server_ready(&route.server_name).await?;
|
|
let request_id = self.take_request_id();
|
|
let response =
|
|
{
|
|
let server = self.server_mut(&route.server_name)?;
|
|
let process = server.process.as_mut().ok_or_else(|| {
|
|
McpServerManagerError::InvalidResponse {
|
|
server_name: route.server_name.clone(),
|
|
method: "tools/call",
|
|
details: "server process missing after initialization".to_string(),
|
|
}
|
|
})?;
|
|
process
|
|
.call_tool(
|
|
request_id,
|
|
McpToolCallParams {
|
|
name: route.raw_name,
|
|
arguments,
|
|
meta: None,
|
|
},
|
|
)
|
|
.await?
|
|
};
|
|
Ok(response)
|
|
}
|
|
|
|
pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
|
|
let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
|
|
for server_name in server_names {
|
|
let server = self.server_mut(&server_name)?;
|
|
if let Some(process) = server.process.as_mut() {
|
|
process.shutdown().await?;
|
|
}
|
|
server.process = None;
|
|
server.initialized = false;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn clear_routes_for_server(&mut self, server_name: &str) {
|
|
self.tool_index
|
|
.retain(|_, route| route.server_name != server_name);
|
|
}
|
|
|
|
fn server_mut(
|
|
&mut self,
|
|
server_name: &str,
|
|
) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
|
|
self.servers
|
|
.get_mut(server_name)
|
|
.ok_or_else(|| McpServerManagerError::UnknownServer {
|
|
server_name: server_name.to_string(),
|
|
})
|
|
}
|
|
|
|
fn take_request_id(&mut self) -> JsonRpcId {
|
|
let id = self.next_request_id;
|
|
self.next_request_id = self.next_request_id.saturating_add(1);
|
|
JsonRpcId::Number(id)
|
|
}
|
|
|
|
async fn ensure_server_ready(
|
|
&mut self,
|
|
server_name: &str,
|
|
) -> Result<(), McpServerManagerError> {
|
|
let needs_spawn = self
|
|
.servers
|
|
.get(server_name)
|
|
.map(|server| server.process.is_none())
|
|
.ok_or_else(|| McpServerManagerError::UnknownServer {
|
|
server_name: server_name.to_string(),
|
|
})?;
|
|
|
|
if needs_spawn {
|
|
let server = self.server_mut(server_name)?;
|
|
server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
|
|
server.initialized = false;
|
|
}
|
|
|
|
let needs_initialize = self
|
|
.servers
|
|
.get(server_name)
|
|
.map(|server| !server.initialized)
|
|
.ok_or_else(|| McpServerManagerError::UnknownServer {
|
|
server_name: server_name.to_string(),
|
|
})?;
|
|
|
|
if needs_initialize {
|
|
let request_id = self.take_request_id();
|
|
let response = {
|
|
let server = self.server_mut(server_name)?;
|
|
let process = server.process.as_mut().ok_or_else(|| {
|
|
McpServerManagerError::InvalidResponse {
|
|
server_name: server_name.to_string(),
|
|
method: "initialize",
|
|
details: "server process missing before initialize".to_string(),
|
|
}
|
|
})?;
|
|
process
|
|
.initialize(request_id, default_initialize_params())
|
|
.await?
|
|
};
|
|
|
|
if let Some(error) = response.error {
|
|
return Err(McpServerManagerError::JsonRpc {
|
|
server_name: server_name.to_string(),
|
|
method: "initialize",
|
|
error,
|
|
});
|
|
}
|
|
|
|
if response.result.is_none() {
|
|
return Err(McpServerManagerError::InvalidResponse {
|
|
server_name: server_name.to_string(),
|
|
method: "initialize",
|
|
details: "missing result payload".to_string(),
|
|
});
|
|
}
|
|
|
|
let server = self.server_mut(server_name)?;
|
|
server.initialized = true;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct McpStdioProcess {
|
|
child: Child,
|
|
stdin: ChildStdin,
|
|
stdout: BufReader<ChildStdout>,
|
|
}
|
|
|
|
impl McpStdioProcess {
|
|
pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
|
|
let mut command = Command::new(&transport.command);
|
|
command
|
|
.args(&transport.args)
|
|
.stdin(Stdio::piped())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::inherit());
|
|
apply_env(&mut command, &transport.env);
|
|
|
|
let mut child = command.spawn()?;
|
|
let stdin = child
|
|
.stdin
|
|
.take()
|
|
.ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
|
|
let stdout = child
|
|
.stdout
|
|
.take()
|
|
.ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
|
|
|
|
Ok(Self {
|
|
child,
|
|
stdin,
|
|
stdout: BufReader::new(stdout),
|
|
})
|
|
}
|
|
|
|
pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
|
|
self.stdin.write_all(bytes).await
|
|
}
|
|
|
|
pub async fn flush(&mut self) -> io::Result<()> {
|
|
self.stdin.flush().await
|
|
}
|
|
|
|
pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
|
|
self.write_all(line.as_bytes()).await?;
|
|
self.write_all(b"\n").await?;
|
|
self.flush().await
|
|
}
|
|
|
|
pub async fn read_line(&mut self) -> io::Result<String> {
|
|
let mut line = String::new();
|
|
let bytes_read = self.stdout.read_line(&mut line).await?;
|
|
if bytes_read == 0 {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::UnexpectedEof,
|
|
"MCP stdio stream closed while reading line",
|
|
));
|
|
}
|
|
Ok(line)
|
|
}
|
|
|
|
pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
|
|
let mut buffer = vec![0_u8; 4096];
|
|
let read = self.stdout.read(&mut buffer).await?;
|
|
buffer.truncate(read);
|
|
Ok(buffer)
|
|
}
|
|
|
|
pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
|
|
let encoded = encode_frame(payload);
|
|
self.write_all(&encoded).await?;
|
|
self.flush().await
|
|
}
|
|
|
|
pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
|
|
let mut content_length = None;
|
|
loop {
|
|
let mut line = String::new();
|
|
let bytes_read = self.stdout.read_line(&mut line).await?;
|
|
if bytes_read == 0 {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::UnexpectedEof,
|
|
"MCP stdio stream closed while reading headers",
|
|
));
|
|
}
|
|
if line == "\r\n" {
|
|
break;
|
|
}
|
|
if let Some(value) = line.strip_prefix("Content-Length:") {
|
|
let parsed = value
|
|
.trim()
|
|
.parse::<usize>()
|
|
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
|
|
content_length = Some(parsed);
|
|
}
|
|
}
|
|
|
|
let content_length = content_length.ok_or_else(|| {
|
|
io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
|
|
})?;
|
|
let mut payload = vec![0_u8; content_length];
|
|
self.stdout.read_exact(&mut payload).await?;
|
|
Ok(payload)
|
|
}
|
|
|
|
pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
|
|
let body = serde_json::to_vec(message)
|
|
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
|
|
self.write_frame(&body).await
|
|
}
|
|
|
|
pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
|
|
let payload = self.read_frame().await?;
|
|
serde_json::from_slice(&payload)
|
|
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
|
|
}
|
|
|
|
pub async fn send_request<T: Serialize>(
|
|
&mut self,
|
|
request: &JsonRpcRequest<T>,
|
|
) -> io::Result<()> {
|
|
self.write_jsonrpc_message(request).await
|
|
}
|
|
|
|
pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
|
|
self.read_jsonrpc_message().await
|
|
}
|
|
|
|
pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
method: impl Into<String>,
|
|
params: Option<TParams>,
|
|
) -> io::Result<JsonRpcResponse<TResult>> {
|
|
let request = JsonRpcRequest::new(id, method, params);
|
|
self.send_request(&request).await?;
|
|
self.read_response().await
|
|
}
|
|
|
|
pub async fn initialize(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
params: McpInitializeParams,
|
|
) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
|
|
self.request(id, "initialize", Some(params)).await
|
|
}
|
|
|
|
pub async fn list_tools(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
params: Option<McpListToolsParams>,
|
|
) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
|
|
self.request(id, "tools/list", params).await
|
|
}
|
|
|
|
pub async fn call_tool(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
params: McpToolCallParams,
|
|
) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
|
|
self.request(id, "tools/call", Some(params)).await
|
|
}
|
|
|
|
pub async fn list_resources(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
params: Option<McpListResourcesParams>,
|
|
) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
|
|
self.request(id, "resources/list", params).await
|
|
}
|
|
|
|
pub async fn read_resource(
|
|
&mut self,
|
|
id: JsonRpcId,
|
|
params: McpReadResourceParams,
|
|
) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
|
|
self.request(id, "resources/read", Some(params)).await
|
|
}
|
|
|
|
pub async fn terminate(&mut self) -> io::Result<()> {
|
|
self.child.kill().await
|
|
}
|
|
|
|
pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
|
|
self.child.wait().await
|
|
}
|
|
|
|
async fn shutdown(&mut self) -> io::Result<()> {
|
|
if self.child.try_wait()?.is_none() {
|
|
self.child.kill().await?;
|
|
}
|
|
let _ = self.child.wait().await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
|
|
match &bootstrap.transport {
|
|
McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
|
|
other => Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
format!(
|
|
"MCP bootstrap transport for {} is not stdio: {other:?}",
|
|
bootstrap.server_name
|
|
),
|
|
)),
|
|
}
|
|
}
|
|
|
|
fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
|
|
for (key, value) in env {
|
|
command.env(key, value);
|
|
}
|
|
}
|
|
|
|
fn encode_frame(payload: &[u8]) -> Vec<u8> {
|
|
let header = format!("Content-Length: {}\r\n\r\n", payload.len());
|
|
let mut framed = header.into_bytes();
|
|
framed.extend_from_slice(payload);
|
|
framed
|
|
}
|
|
|
|
fn default_initialize_params() -> McpInitializeParams {
|
|
McpInitializeParams {
|
|
protocol_version: "2025-03-26".to_string(),
|
|
capabilities: JsonValue::Object(serde_json::Map::new()),
|
|
client_info: McpInitializeClientInfo {
|
|
name: "runtime".to_string(),
|
|
version: env!("CARGO_PKG_VERSION").to_string(),
|
|
},
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::collections::BTreeMap;
|
|
use std::fs;
|
|
use std::io::ErrorKind;
|
|
use std::os::unix::fs::PermissionsExt;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use serde_json::json;
|
|
use tokio::runtime::Builder;
|
|
|
|
use crate::config::{
|
|
ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
|
|
McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
|
|
};
|
|
use crate::mcp::mcp_tool_name;
|
|
use crate::mcp_client::McpClientBootstrap;
|
|
|
|
use super::{
|
|
spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
|
|
McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
|
|
McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
|
|
McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
|
|
};
|
|
|
|
fn temp_dir() -> PathBuf {
|
|
let nanos = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.expect("time should be after epoch")
|
|
.as_nanos();
|
|
std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}"))
|
|
}
|
|
|
|
fn write_echo_script() -> PathBuf {
|
|
let root = temp_dir();
|
|
fs::create_dir_all(&root).expect("temp dir");
|
|
let script_path = root.join("echo-mcp.sh");
|
|
fs::write(
|
|
&script_path,
|
|
"#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
|
|
)
|
|
.expect("write script");
|
|
let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
|
|
permissions.set_mode(0o755);
|
|
fs::set_permissions(&script_path, permissions).expect("chmod");
|
|
script_path
|
|
}
|
|
|
|
fn write_jsonrpc_script() -> PathBuf {
|
|
let root = temp_dir();
|
|
fs::create_dir_all(&root).expect("temp dir");
|
|
let script_path = root.join("jsonrpc-mcp.py");
|
|
let script = [
|
|
"#!/usr/bin/env python3",
|
|
"import json, sys",
|
|
"header = b''",
|
|
r"while not header.endswith(b'\r\n\r\n'):",
|
|
" chunk = sys.stdin.buffer.read(1)",
|
|
" if not chunk:",
|
|
" raise SystemExit(1)",
|
|
" header += chunk",
|
|
"length = 0",
|
|
r"for line in header.decode().split('\r\n'):",
|
|
r" if line.lower().startswith('content-length:'):",
|
|
r" length = int(line.split(':', 1)[1].strip())",
|
|
"payload = sys.stdin.buffer.read(length)",
|
|
"request = json.loads(payload.decode())",
|
|
r"assert request['jsonrpc'] == '2.0'",
|
|
r"assert request['method'] == 'initialize'",
|
|
r"response = json.dumps({",
|
|
r" 'jsonrpc': '2.0',",
|
|
r" 'id': request['id'],",
|
|
r" 'result': {",
|
|
r" 'protocolVersion': request['params']['protocolVersion'],",
|
|
r" 'capabilities': {'tools': {}},",
|
|
r" 'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
|
|
r" }",
|
|
r"}).encode()",
|
|
r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
|
|
"sys.stdout.buffer.flush()",
|
|
"",
|
|
]
|
|
.join("\n");
|
|
fs::write(&script_path, script).expect("write script");
|
|
let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
|
|
permissions.set_mode(0o755);
|
|
fs::set_permissions(&script_path, permissions).expect("chmod");
|
|
script_path
|
|
}
|
|
|
|
#[allow(clippy::too_many_lines)]
|
|
fn write_mcp_server_script() -> PathBuf {
|
|
let root = temp_dir();
|
|
fs::create_dir_all(&root).expect("temp dir");
|
|
let script_path = root.join("fake-mcp-server.py");
|
|
let script = [
|
|
"#!/usr/bin/env python3",
|
|
"import json, sys",
|
|
"",
|
|
"def read_message():",
|
|
" header = b''",
|
|
r" while not header.endswith(b'\r\n\r\n'):",
|
|
" chunk = sys.stdin.buffer.read(1)",
|
|
" if not chunk:",
|
|
" return None",
|
|
" header += chunk",
|
|
" length = 0",
|
|
r" for line in header.decode().split('\r\n'):",
|
|
r" if line.lower().startswith('content-length:'):",
|
|
r" length = int(line.split(':', 1)[1].strip())",
|
|
" payload = sys.stdin.buffer.read(length)",
|
|
" return json.loads(payload.decode())",
|
|
"",
|
|
"def send_message(message):",
|
|
" payload = json.dumps(message).encode()",
|
|
r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
|
|
" sys.stdout.buffer.flush()",
|
|
"",
|
|
"while True:",
|
|
" request = read_message()",
|
|
" if request is None:",
|
|
" break",
|
|
" method = request['method']",
|
|
" if method == 'initialize':",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'protocolVersion': request['params']['protocolVersion'],",
|
|
" 'capabilities': {'tools': {}, 'resources': {}},",
|
|
" 'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
|
|
" }",
|
|
" })",
|
|
" elif method == 'tools/list':",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'tools': [",
|
|
" {",
|
|
" 'name': 'echo',",
|
|
" 'description': 'Echoes text',",
|
|
" 'inputSchema': {",
|
|
" 'type': 'object',",
|
|
" 'properties': {'text': {'type': 'string'}},",
|
|
" 'required': ['text']",
|
|
" }",
|
|
" }",
|
|
" ]",
|
|
" }",
|
|
" })",
|
|
" elif method == 'tools/call':",
|
|
" args = request['params'].get('arguments') or {}",
|
|
" if request['params']['name'] == 'fail':",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'error': {'code': -32001, 'message': 'tool failed'},",
|
|
" })",
|
|
" else:",
|
|
" text = args.get('text', '')",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'content': [{'type': 'text', 'text': f'echo:{text}'}],",
|
|
" 'structuredContent': {'echoed': text},",
|
|
" 'isError': False",
|
|
" }",
|
|
" })",
|
|
" elif method == 'resources/list':",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'resources': [",
|
|
" {",
|
|
" 'uri': 'file://guide.txt',",
|
|
" 'name': 'guide',",
|
|
" 'description': 'Guide text',",
|
|
" 'mimeType': 'text/plain'",
|
|
" }",
|
|
" ]",
|
|
" }",
|
|
" })",
|
|
" elif method == 'resources/read':",
|
|
" uri = request['params']['uri']",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'contents': [",
|
|
" {",
|
|
" 'uri': uri,",
|
|
" 'mimeType': 'text/plain',",
|
|
" 'text': f'contents for {uri}'",
|
|
" }",
|
|
" ]",
|
|
" }",
|
|
" })",
|
|
" else:",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
|
|
" })",
|
|
"",
|
|
]
|
|
.join("\n");
|
|
fs::write(&script_path, script).expect("write script");
|
|
let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
|
|
permissions.set_mode(0o755);
|
|
fs::set_permissions(&script_path, permissions).expect("chmod");
|
|
script_path
|
|
}
|
|
|
|
#[allow(clippy::too_many_lines)]
|
|
fn write_manager_mcp_server_script() -> PathBuf {
|
|
let root = temp_dir();
|
|
fs::create_dir_all(&root).expect("temp dir");
|
|
let script_path = root.join("manager-mcp-server.py");
|
|
let script = [
|
|
"#!/usr/bin/env python3",
|
|
"import json, os, sys",
|
|
"",
|
|
"LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
|
|
"LOG_PATH = os.environ.get('MCP_LOG_PATH')",
|
|
"initialize_count = 0",
|
|
"",
|
|
"def log(method):",
|
|
" if LOG_PATH:",
|
|
" with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
|
|
" handle.write(f'{method}\\n')",
|
|
"",
|
|
"def read_message():",
|
|
" header = b''",
|
|
r" while not header.endswith(b'\r\n\r\n'):",
|
|
" chunk = sys.stdin.buffer.read(1)",
|
|
" if not chunk:",
|
|
" return None",
|
|
" header += chunk",
|
|
" length = 0",
|
|
r" for line in header.decode().split('\r\n'):",
|
|
r" if line.lower().startswith('content-length:'):",
|
|
r" length = int(line.split(':', 1)[1].strip())",
|
|
" payload = sys.stdin.buffer.read(length)",
|
|
" return json.loads(payload.decode())",
|
|
"",
|
|
"def send_message(message):",
|
|
" payload = json.dumps(message).encode()",
|
|
r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
|
|
" sys.stdout.buffer.flush()",
|
|
"",
|
|
"while True:",
|
|
" request = read_message()",
|
|
" if request is None:",
|
|
" break",
|
|
" method = request['method']",
|
|
" log(method)",
|
|
" if method == 'initialize':",
|
|
" initialize_count += 1",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'protocolVersion': request['params']['protocolVersion'],",
|
|
" 'capabilities': {'tools': {}},",
|
|
" 'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
|
|
" }",
|
|
" })",
|
|
" elif method == 'tools/list':",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'tools': [",
|
|
" {",
|
|
" 'name': 'echo',",
|
|
" 'description': f'Echo tool for {LABEL}',",
|
|
" 'inputSchema': {",
|
|
" 'type': 'object',",
|
|
" 'properties': {'text': {'type': 'string'}},",
|
|
" 'required': ['text']",
|
|
" }",
|
|
" }",
|
|
" ]",
|
|
" }",
|
|
" })",
|
|
" elif method == 'tools/call':",
|
|
" args = request['params'].get('arguments') or {}",
|
|
" text = args.get('text', '')",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'result': {",
|
|
" 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
|
|
" 'structuredContent': {",
|
|
" 'server': LABEL,",
|
|
" 'echoed': text,",
|
|
" 'initializeCount': initialize_count",
|
|
" },",
|
|
" 'isError': False",
|
|
" }",
|
|
" })",
|
|
" else:",
|
|
" send_message({",
|
|
" 'jsonrpc': '2.0',",
|
|
" 'id': request['id'],",
|
|
" 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
|
|
" })",
|
|
"",
|
|
]
|
|
.join("\n");
|
|
fs::write(&script_path, script).expect("write script");
|
|
let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
|
|
permissions.set_mode(0o755);
|
|
fs::set_permissions(&script_path, permissions).expect("chmod");
|
|
script_path
|
|
}
|
|
|
|
fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
|
|
let config = ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Stdio(McpStdioServerConfig {
|
|
command: "/bin/sh".to_string(),
|
|
args: vec![script_path.to_string_lossy().into_owned()],
|
|
env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
|
|
}),
|
|
};
|
|
McpClientBootstrap::from_scoped_config("stdio server", &config)
|
|
}
|
|
|
|
fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
|
|
crate::mcp_client::McpStdioTransport {
|
|
command: "python3".to_string(),
|
|
args: vec![script_path.to_string_lossy().into_owned()],
|
|
env: BTreeMap::new(),
|
|
}
|
|
}
|
|
|
|
fn cleanup_script(script_path: &Path) {
|
|
fs::remove_file(script_path).expect("cleanup script");
|
|
fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
|
|
}
|
|
|
|
fn manager_server_config(
|
|
script_path: &Path,
|
|
label: &str,
|
|
log_path: &Path,
|
|
) -> ScopedMcpServerConfig {
|
|
ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Stdio(McpStdioServerConfig {
|
|
command: "python3".to_string(),
|
|
args: vec![script_path.to_string_lossy().into_owned()],
|
|
env: BTreeMap::from([
|
|
("MCP_SERVER_LABEL".to_string(), label.to_string()),
|
|
(
|
|
"MCP_LOG_PATH".to_string(),
|
|
log_path.to_string_lossy().into_owned(),
|
|
),
|
|
]),
|
|
}),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn spawns_stdio_process_and_round_trips_io() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_echo_script();
|
|
let bootstrap = sample_bootstrap(&script_path);
|
|
let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
|
|
|
|
let ready = process.read_line().await.expect("read ready");
|
|
assert_eq!(ready, "READY:secret-value\n");
|
|
|
|
process
|
|
.write_line("ping from client")
|
|
.await
|
|
.expect("write line");
|
|
|
|
let echoed = process.read_line().await.expect("read echo");
|
|
assert_eq!(echoed, "ECHO:ping from client\n");
|
|
|
|
let status = process.wait().await.expect("wait for exit");
|
|
assert!(status.success());
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn rejects_non_stdio_bootstrap() {
|
|
let config = ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
|
|
name: "sdk-server".to_string(),
|
|
}),
|
|
};
|
|
let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
|
|
let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
|
|
assert_eq!(error.kind(), ErrorKind::InvalidInput);
|
|
}
|
|
|
|
#[test]
|
|
fn round_trips_initialize_request_and_response_over_stdio_frames() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_jsonrpc_script();
|
|
let transport = script_transport(&script_path);
|
|
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
|
|
|
|
let response = process
|
|
.initialize(
|
|
JsonRpcId::Number(1),
|
|
McpInitializeParams {
|
|
protocol_version: "2025-03-26".to_string(),
|
|
capabilities: json!({"roots": {}}),
|
|
client_info: McpInitializeClientInfo {
|
|
name: "runtime-tests".to_string(),
|
|
version: "0.1.0".to_string(),
|
|
},
|
|
},
|
|
)
|
|
.await
|
|
.expect("initialize roundtrip");
|
|
|
|
assert_eq!(response.id, JsonRpcId::Number(1));
|
|
assert_eq!(response.error, None);
|
|
assert_eq!(
|
|
response.result,
|
|
Some(McpInitializeResult {
|
|
protocol_version: "2025-03-26".to_string(),
|
|
capabilities: json!({"tools": {}}),
|
|
server_info: McpInitializeServerInfo {
|
|
name: "fake-mcp".to_string(),
|
|
version: "0.1.0".to_string(),
|
|
},
|
|
})
|
|
);
|
|
|
|
let status = process.wait().await.expect("wait for exit");
|
|
assert!(status.success());
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_jsonrpc_request_emits_content_length_frame() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_jsonrpc_script();
|
|
let transport = script_transport(&script_path);
|
|
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
|
|
let request = JsonRpcRequest::new(
|
|
JsonRpcId::Number(7),
|
|
"initialize",
|
|
Some(json!({
|
|
"protocolVersion": "2025-03-26",
|
|
"capabilities": {},
|
|
"clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
|
|
})),
|
|
);
|
|
|
|
process.send_request(&request).await.expect("send request");
|
|
let response: JsonRpcResponse<serde_json::Value> =
|
|
process.read_response().await.expect("read response");
|
|
|
|
assert_eq!(response.id, JsonRpcId::Number(7));
|
|
assert_eq!(response.jsonrpc, "2.0");
|
|
|
|
let status = process.wait().await.expect("wait for exit");
|
|
assert!(status.success());
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn direct_spawn_uses_transport_env() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_echo_script();
|
|
let transport = crate::mcp_client::McpStdioTransport {
|
|
command: "/bin/sh".to_string(),
|
|
args: vec![script_path.to_string_lossy().into_owned()],
|
|
env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
|
|
};
|
|
let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
|
|
let ready = process.read_available().await.expect("read ready");
|
|
assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
|
|
process.terminate().await.expect("terminate child");
|
|
let _ = process.wait().await.expect("wait after kill");
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_mcp_server_script();
|
|
let transport = script_transport(&script_path);
|
|
let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
|
|
|
|
let tools = process
|
|
.list_tools(JsonRpcId::Number(2), None)
|
|
.await
|
|
.expect("list tools");
|
|
assert_eq!(tools.error, None);
|
|
assert_eq!(tools.id, JsonRpcId::Number(2));
|
|
assert_eq!(
|
|
tools.result,
|
|
Some(McpListToolsResult {
|
|
tools: vec![McpTool {
|
|
name: "echo".to_string(),
|
|
description: Some("Echoes text".to_string()),
|
|
input_schema: Some(json!({
|
|
"type": "object",
|
|
"properties": {"text": {"type": "string"}},
|
|
"required": ["text"]
|
|
})),
|
|
annotations: None,
|
|
meta: None,
|
|
}],
|
|
next_cursor: None,
|
|
})
|
|
);
|
|
|
|
let call = process
|
|
.call_tool(
|
|
JsonRpcId::String("call-1".to_string()),
|
|
McpToolCallParams {
|
|
name: "echo".to_string(),
|
|
arguments: Some(json!({"text": "hello"})),
|
|
meta: None,
|
|
},
|
|
)
|
|
.await
|
|
.expect("call tool");
|
|
assert_eq!(call.error, None);
|
|
let call_result = call.result.expect("tool result");
|
|
assert_eq!(call_result.is_error, Some(false));
|
|
assert_eq!(
|
|
call_result.structured_content,
|
|
Some(json!({"echoed": "hello"}))
|
|
);
|
|
assert_eq!(call_result.content.len(), 1);
|
|
assert_eq!(call_result.content[0].kind, "text");
|
|
assert_eq!(
|
|
call_result.content[0].data.get("text"),
|
|
Some(&json!("echo:hello"))
|
|
);
|
|
|
|
let resources = process
|
|
.list_resources(JsonRpcId::Number(3), None)
|
|
.await
|
|
.expect("list resources");
|
|
let resources_result = resources.result.expect("resources result");
|
|
assert_eq!(resources_result.resources.len(), 1);
|
|
assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
|
|
assert_eq!(
|
|
resources_result.resources[0].mime_type.as_deref(),
|
|
Some("text/plain")
|
|
);
|
|
|
|
let read = process
|
|
.read_resource(
|
|
JsonRpcId::Number(4),
|
|
McpReadResourceParams {
|
|
uri: "file://guide.txt".to_string(),
|
|
},
|
|
)
|
|
.await
|
|
.expect("read resource");
|
|
assert_eq!(
|
|
read.result,
|
|
Some(McpReadResourceResult {
|
|
contents: vec![super::McpResourceContents {
|
|
uri: "file://guide.txt".to_string(),
|
|
mime_type: Some("text/plain".to_string()),
|
|
text: Some("contents for file://guide.txt".to_string()),
|
|
blob: None,
|
|
meta: None,
|
|
}],
|
|
})
|
|
);
|
|
|
|
process.terminate().await.expect("terminate child");
|
|
let _ = process.wait().await.expect("wait after kill");
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn surfaces_jsonrpc_errors_from_tool_calls() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_mcp_server_script();
|
|
let transport = script_transport(&script_path);
|
|
let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
|
|
|
|
let response = process
|
|
.call_tool(
|
|
JsonRpcId::Number(9),
|
|
McpToolCallParams {
|
|
name: "fail".to_string(),
|
|
arguments: None,
|
|
meta: None,
|
|
},
|
|
)
|
|
.await
|
|
.expect("call tool with error response");
|
|
|
|
assert_eq!(response.id, JsonRpcId::Number(9));
|
|
assert!(response.result.is_none());
|
|
assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
|
|
assert_eq!(
|
|
response.error.as_ref().map(|e| e.message.as_str()),
|
|
Some("tool failed")
|
|
);
|
|
|
|
process.terminate().await.expect("terminate child");
|
|
let _ = process.wait().await.expect("wait after kill");
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn manager_discovers_tools_from_stdio_config() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_manager_mcp_server_script();
|
|
let root = script_path.parent().expect("script parent");
|
|
let log_path = root.join("alpha.log");
|
|
let servers = BTreeMap::from([(
|
|
"alpha".to_string(),
|
|
manager_server_config(&script_path, "alpha", &log_path),
|
|
)]);
|
|
let mut manager = McpServerManager::from_servers(&servers);
|
|
|
|
let tools = manager.discover_tools().await.expect("discover tools");
|
|
|
|
assert_eq!(tools.len(), 1);
|
|
assert_eq!(tools[0].server_name, "alpha");
|
|
assert_eq!(tools[0].raw_name, "echo");
|
|
assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
|
|
assert_eq!(tools[0].tool.name, "echo");
|
|
assert!(manager.unsupported_servers().is_empty());
|
|
|
|
manager.shutdown().await.expect("shutdown");
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn manager_routes_tool_calls_to_correct_server() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_manager_mcp_server_script();
|
|
let root = script_path.parent().expect("script parent");
|
|
let alpha_log = root.join("alpha.log");
|
|
let beta_log = root.join("beta.log");
|
|
let servers = BTreeMap::from([
|
|
(
|
|
"alpha".to_string(),
|
|
manager_server_config(&script_path, "alpha", &alpha_log),
|
|
),
|
|
(
|
|
"beta".to_string(),
|
|
manager_server_config(&script_path, "beta", &beta_log),
|
|
),
|
|
]);
|
|
let mut manager = McpServerManager::from_servers(&servers);
|
|
|
|
let tools = manager.discover_tools().await.expect("discover tools");
|
|
assert_eq!(tools.len(), 2);
|
|
|
|
let alpha = manager
|
|
.call_tool(
|
|
&mcp_tool_name("alpha", "echo"),
|
|
Some(json!({"text": "hello"})),
|
|
)
|
|
.await
|
|
.expect("call alpha tool");
|
|
let beta = manager
|
|
.call_tool(
|
|
&mcp_tool_name("beta", "echo"),
|
|
Some(json!({"text": "world"})),
|
|
)
|
|
.await
|
|
.expect("call beta tool");
|
|
|
|
assert_eq!(
|
|
alpha
|
|
.result
|
|
.as_ref()
|
|
.and_then(|result| result.structured_content.as_ref())
|
|
.and_then(|value| value.get("server")),
|
|
Some(&json!("alpha"))
|
|
);
|
|
assert_eq!(
|
|
beta.result
|
|
.as_ref()
|
|
.and_then(|result| result.structured_content.as_ref())
|
|
.and_then(|value| value.get("server")),
|
|
Some(&json!("beta"))
|
|
);
|
|
|
|
manager.shutdown().await.expect("shutdown");
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn manager_records_unsupported_non_stdio_servers_without_panicking() {
|
|
let servers = BTreeMap::from([
|
|
(
|
|
"http".to_string(),
|
|
ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Http(McpRemoteServerConfig {
|
|
url: "https://example.test/mcp".to_string(),
|
|
headers: BTreeMap::new(),
|
|
headers_helper: None,
|
|
oauth: None,
|
|
}),
|
|
},
|
|
),
|
|
(
|
|
"sdk".to_string(),
|
|
ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Sdk(McpSdkServerConfig {
|
|
name: "sdk-server".to_string(),
|
|
}),
|
|
},
|
|
),
|
|
(
|
|
"ws".to_string(),
|
|
ScopedMcpServerConfig {
|
|
scope: ConfigSource::Local,
|
|
config: McpServerConfig::Ws(McpWebSocketServerConfig {
|
|
url: "wss://example.test/mcp".to_string(),
|
|
headers: BTreeMap::new(),
|
|
headers_helper: None,
|
|
}),
|
|
},
|
|
),
|
|
]);
|
|
|
|
let manager = McpServerManager::from_servers(&servers);
|
|
let unsupported = manager.unsupported_servers();
|
|
|
|
assert_eq!(unsupported.len(), 3);
|
|
assert_eq!(unsupported[0].server_name, "http");
|
|
assert_eq!(unsupported[1].server_name, "sdk");
|
|
assert_eq!(unsupported[2].server_name, "ws");
|
|
}
|
|
|
|
#[test]
|
|
fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_manager_mcp_server_script();
|
|
let root = script_path.parent().expect("script parent");
|
|
let log_path = root.join("alpha.log");
|
|
let servers = BTreeMap::from([(
|
|
"alpha".to_string(),
|
|
manager_server_config(&script_path, "alpha", &log_path),
|
|
)]);
|
|
let mut manager = McpServerManager::from_servers(&servers);
|
|
|
|
manager.discover_tools().await.expect("discover tools");
|
|
manager.shutdown().await.expect("first shutdown");
|
|
manager.shutdown().await.expect("second shutdown");
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn manager_reuses_spawned_server_between_discovery_and_call() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_manager_mcp_server_script();
|
|
let root = script_path.parent().expect("script parent");
|
|
let log_path = root.join("alpha.log");
|
|
let servers = BTreeMap::from([(
|
|
"alpha".to_string(),
|
|
manager_server_config(&script_path, "alpha", &log_path),
|
|
)]);
|
|
let mut manager = McpServerManager::from_servers(&servers);
|
|
|
|
manager.discover_tools().await.expect("discover tools");
|
|
let response = manager
|
|
.call_tool(
|
|
&mcp_tool_name("alpha", "echo"),
|
|
Some(json!({"text": "reuse"})),
|
|
)
|
|
.await
|
|
.expect("call tool");
|
|
|
|
assert_eq!(
|
|
response
|
|
.result
|
|
.as_ref()
|
|
.and_then(|result| result.structured_content.as_ref())
|
|
.and_then(|value| value.get("initializeCount")),
|
|
Some(&json!(1))
|
|
);
|
|
|
|
let log = fs::read_to_string(&log_path).expect("read log");
|
|
assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
|
|
assert_eq!(
|
|
log.lines().collect::<Vec<_>>(),
|
|
vec!["initialize", "tools/list", "tools/call"]
|
|
);
|
|
|
|
manager.shutdown().await.expect("shutdown");
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn manager_reports_unknown_qualified_tool_name() {
|
|
let runtime = Builder::new_current_thread()
|
|
.enable_all()
|
|
.build()
|
|
.expect("runtime");
|
|
runtime.block_on(async {
|
|
let script_path = write_manager_mcp_server_script();
|
|
let root = script_path.parent().expect("script parent");
|
|
let log_path = root.join("alpha.log");
|
|
let servers = BTreeMap::from([(
|
|
"alpha".to_string(),
|
|
manager_server_config(&script_path, "alpha", &log_path),
|
|
)]);
|
|
let mut manager = McpServerManager::from_servers(&servers);
|
|
|
|
let error = manager
|
|
.call_tool(
|
|
&mcp_tool_name("alpha", "missing"),
|
|
Some(json!({"text": "nope"})),
|
|
)
|
|
.await
|
|
.expect_err("unknown qualified tool should fail");
|
|
|
|
match error {
|
|
McpServerManagerError::UnknownTool { qualified_name } => {
|
|
assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
|
|
}
|
|
other => panic!("expected unknown tool error, got {other:?}"),
|
|
}
|
|
|
|
cleanup_script(&script_path);
|
|
});
|
|
}
|
|
}
|