From 1e5002b52188fdb72a0acdf952c39accf44a68bd Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Tue, 31 Mar 2026 23:31:37 +0000 Subject: [PATCH] Add MCP server orchestration so configured stdio tools can be discovered and called The runtime crate already had typed MCP config parsing, bootstrap metadata, and stdio JSON-RPC transport primitives, but it lacked the stateful layer that owns configured subprocesses and routes discovered tools back to the right server. This change adds a thin lazy McpServerManager in mcp_stdio, keeps unsupported transports explicit, and locks the behavior with subprocess-backed discovery, routing, reuse, shutdown, and error tests. Constraint: Keep the change narrow to the runtime crate and stdio transport only Constraint: Reuse existing MCP config/bootstrap/process helpers instead of adding new dependencies Rejected: Eagerly spawn all configured servers at construction | unnecessary startup cost and failure coupling Rejected: Spawn a fresh process per request | defeats lifecycle management and tool routing cache Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep higher-level runtime/session integration separate until a caller needs this manager surface Tested: cargo fmt --all; cargo clippy -p runtime --all-targets -- -D warnings; cargo test -p runtime Not-tested: Integration into conversation/runtime flows outside direct manager APIs --- rust/crates/runtime/src/lib.rs | 9 +- rust/crates/runtime/src/mcp_stdio.rs | 765 ++++++++++++++++++++++++++- 2 files changed, 767 insertions(+), 7 deletions(-) diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 1d7af28..581b0dc 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -46,10 +46,11 @@ pub use mcp_client::{ }; pub use mcp_stdio::{ spawn_mcp_stdio_process, JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse, - McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo, - McpListResourcesParams, McpListResourcesResult, McpListToolsParams, McpListToolsResult, - McpReadResourceParams, McpReadResourceResult, McpResource, McpResourceContents, - McpStdioProcess, McpTool, McpToolCallContent, McpToolCallParams, McpToolCallResult, + ManagedMcpTool, McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, + McpInitializeServerInfo, McpListResourcesParams, McpListResourcesResult, McpListToolsParams, + McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpResource, + McpResourceContents, McpServerManager, McpServerManagerError, McpStdioProcess, McpTool, + McpToolCallContent, McpToolCallParams, McpToolCallResult, UnsupportedMcpServer, }; pub use oauth::{ code_challenge_s256, generate_pkce_pair, generate_state, loopback_redirect_uri, diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index 02927bc..7e67d5d 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -8,6 +8,8 @@ use serde_json::Value as JsonValue; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig}; +use crate::mcp::mcp_tool_name; use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -200,6 +202,374 @@ pub struct McpReadResourceResult { pub contents: Vec, } +#[derive(Debug, Clone, PartialEq)] +pub struct ManagedMcpTool { + pub server_name: String, + pub qualified_name: String, + pub raw_name: String, + pub tool: McpTool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UnsupportedMcpServer { + pub server_name: String, + pub transport: McpTransport, + pub reason: String, +} + +#[derive(Debug)] +pub enum McpServerManagerError { + Io(io::Error), + JsonRpc { + server_name: String, + method: &'static str, + error: JsonRpcError, + }, + InvalidResponse { + server_name: String, + method: &'static str, + details: String, + }, + UnknownTool { + qualified_name: String, + }, + UnknownServer { + server_name: String, + }, +} + +impl std::fmt::Display for McpServerManagerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Io(error) => write!(f, "{error}"), + Self::JsonRpc { + server_name, + method, + error, + } => write!( + f, + "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})", + error.message, error.code + ), + Self::InvalidResponse { + server_name, + method, + details, + } => write!( + f, + "MCP server `{server_name}` returned invalid response for {method}: {details}" + ), + Self::UnknownTool { qualified_name } => { + write!(f, "unknown MCP tool `{qualified_name}`") + } + Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"), + } + } +} + +impl std::error::Error for McpServerManagerError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Io(error) => Some(error), + Self::JsonRpc { .. } + | Self::InvalidResponse { .. } + | Self::UnknownTool { .. } + | Self::UnknownServer { .. } => None, + } + } +} + +impl From for McpServerManagerError { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ToolRoute { + server_name: String, + raw_name: String, +} + +#[derive(Debug)] +struct ManagedMcpServer { + bootstrap: McpClientBootstrap, + process: Option, + initialized: bool, +} + +impl ManagedMcpServer { + fn new(bootstrap: McpClientBootstrap) -> Self { + Self { + bootstrap, + process: None, + initialized: false, + } + } +} + +#[derive(Debug)] +pub struct McpServerManager { + servers: BTreeMap, + unsupported_servers: Vec, + tool_index: BTreeMap, + next_request_id: u64, +} + +impl McpServerManager { + #[must_use] + pub fn from_runtime_config(config: &RuntimeConfig) -> Self { + Self::from_servers(config.mcp().servers()) + } + + #[must_use] + pub fn from_servers(servers: &BTreeMap) -> Self { + let mut managed_servers = BTreeMap::new(); + let mut unsupported_servers = Vec::new(); + + for (server_name, server_config) in servers { + if server_config.transport() == McpTransport::Stdio { + let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config); + managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap)); + } else { + unsupported_servers.push(UnsupportedMcpServer { + server_name: server_name.clone(), + transport: server_config.transport(), + reason: format!( + "transport {:?} is not supported by McpServerManager", + server_config.transport() + ), + }); + } + } + + Self { + servers: managed_servers, + unsupported_servers, + tool_index: BTreeMap::new(), + next_request_id: 1, + } + } + + #[must_use] + pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] { + &self.unsupported_servers + } + + pub async fn discover_tools(&mut self) -> Result, McpServerManagerError> { + let server_names = self.servers.keys().cloned().collect::>(); + let mut discovered_tools = Vec::new(); + + for server_name in server_names { + self.ensure_server_ready(&server_name).await?; + self.clear_routes_for_server(&server_name); + + let mut cursor = None; + loop { + let request_id = self.take_request_id(); + let response = { + let server = self.server_mut(&server_name)?; + let process = server.process.as_mut().ok_or_else(|| { + McpServerManagerError::InvalidResponse { + server_name: server_name.clone(), + method: "tools/list", + details: "server process missing after initialization".to_string(), + } + })?; + process + .list_tools( + request_id, + Some(McpListToolsParams { + cursor: cursor.clone(), + }), + ) + .await? + }; + + if let Some(error) = response.error { + return Err(McpServerManagerError::JsonRpc { + server_name: server_name.clone(), + method: "tools/list", + error, + }); + } + + let result = + response + .result + .ok_or_else(|| McpServerManagerError::InvalidResponse { + server_name: server_name.clone(), + method: "tools/list", + details: "missing result payload".to_string(), + })?; + + for tool in result.tools { + let qualified_name = mcp_tool_name(&server_name, &tool.name); + self.tool_index.insert( + qualified_name.clone(), + ToolRoute { + server_name: server_name.clone(), + raw_name: tool.name.clone(), + }, + ); + discovered_tools.push(ManagedMcpTool { + server_name: server_name.clone(), + qualified_name, + raw_name: tool.name.clone(), + tool, + }); + } + + match result.next_cursor { + Some(next_cursor) => cursor = Some(next_cursor), + None => break, + } + } + } + + Ok(discovered_tools) + } + + pub async fn call_tool( + &mut self, + qualified_tool_name: &str, + arguments: Option, + ) -> Result, McpServerManagerError> { + let route = self + .tool_index + .get(qualified_tool_name) + .cloned() + .ok_or_else(|| McpServerManagerError::UnknownTool { + qualified_name: qualified_tool_name.to_string(), + })?; + + self.ensure_server_ready(&route.server_name).await?; + let request_id = self.take_request_id(); + let response = + { + let server = self.server_mut(&route.server_name)?; + let process = server.process.as_mut().ok_or_else(|| { + McpServerManagerError::InvalidResponse { + server_name: route.server_name.clone(), + method: "tools/call", + details: "server process missing after initialization".to_string(), + } + })?; + process + .call_tool( + request_id, + McpToolCallParams { + name: route.raw_name, + arguments, + meta: None, + }, + ) + .await? + }; + Ok(response) + } + + pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> { + let server_names = self.servers.keys().cloned().collect::>(); + for server_name in server_names { + let server = self.server_mut(&server_name)?; + if let Some(process) = server.process.as_mut() { + process.shutdown().await?; + } + server.process = None; + server.initialized = false; + } + Ok(()) + } + + fn clear_routes_for_server(&mut self, server_name: &str) { + self.tool_index + .retain(|_, route| route.server_name != server_name); + } + + fn server_mut( + &mut self, + server_name: &str, + ) -> Result<&mut ManagedMcpServer, McpServerManagerError> { + self.servers + .get_mut(server_name) + .ok_or_else(|| McpServerManagerError::UnknownServer { + server_name: server_name.to_string(), + }) + } + + fn take_request_id(&mut self) -> JsonRpcId { + let id = self.next_request_id; + self.next_request_id = self.next_request_id.saturating_add(1); + JsonRpcId::Number(id) + } + + async fn ensure_server_ready( + &mut self, + server_name: &str, + ) -> Result<(), McpServerManagerError> { + let needs_spawn = self + .servers + .get(server_name) + .map(|server| server.process.is_none()) + .ok_or_else(|| McpServerManagerError::UnknownServer { + server_name: server_name.to_string(), + })?; + + if needs_spawn { + let server = self.server_mut(server_name)?; + server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?); + server.initialized = false; + } + + let needs_initialize = self + .servers + .get(server_name) + .map(|server| !server.initialized) + .ok_or_else(|| McpServerManagerError::UnknownServer { + server_name: server_name.to_string(), + })?; + + if needs_initialize { + let request_id = self.take_request_id(); + let response = { + let server = self.server_mut(server_name)?; + let process = server.process.as_mut().ok_or_else(|| { + McpServerManagerError::InvalidResponse { + server_name: server_name.to_string(), + method: "initialize", + details: "server process missing before initialize".to_string(), + } + })?; + process + .initialize(request_id, default_initialize_params()) + .await? + }; + + if let Some(error) = response.error { + return Err(McpServerManagerError::JsonRpc { + server_name: server_name.to_string(), + method: "initialize", + error, + }); + } + + if response.result.is_none() { + return Err(McpServerManagerError::InvalidResponse { + server_name: server_name.to_string(), + method: "initialize", + details: "missing result payload".to_string(), + }); + } + + let server = self.server_mut(server_name)?; + server.initialized = true; + } + + Ok(()) + } +} + #[derive(Debug)] pub struct McpStdioProcess { child: Child, @@ -385,6 +755,14 @@ impl McpStdioProcess { pub async fn wait(&mut self) -> io::Result { self.child.wait().await } + + async fn shutdown(&mut self) -> io::Result<()> { + if self.child.try_wait()?.is_none() { + self.child.kill().await?; + } + let _ = self.child.wait().await?; + Ok(()) + } } pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result { @@ -413,6 +791,17 @@ fn encode_frame(payload: &[u8]) -> Vec { framed } +fn default_initialize_params() -> McpInitializeParams { + McpInitializeParams { + protocol_version: "2025-03-26".to_string(), + capabilities: JsonValue::Object(serde_json::Map::new()), + client_info: McpInitializeClientInfo { + name: "runtime".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -426,15 +815,17 @@ mod tests { use tokio::runtime::Builder; use crate::config::{ - ConfigSource, McpServerConfig, McpStdioServerConfig, ScopedMcpServerConfig, + ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig, + McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig, }; + use crate::mcp::mcp_tool_name; use crate::mcp_client::McpClientBootstrap; use super::{ spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse, McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo, - McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpStdioProcess, McpTool, - McpToolCallParams, + McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager, + McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams, }; fn temp_dir() -> PathBuf { @@ -628,6 +1019,110 @@ mod tests { script_path } + #[allow(clippy::too_many_lines)] + fn write_manager_mcp_server_script() -> PathBuf { + let root = temp_dir(); + fs::create_dir_all(&root).expect("temp dir"); + let script_path = root.join("manager-mcp-server.py"); + let script = [ + "#!/usr/bin/env python3", + "import json, os, sys", + "", + "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')", + "LOG_PATH = os.environ.get('MCP_LOG_PATH')", + "initialize_count = 0", + "", + "def log(method):", + " if LOG_PATH:", + " with open(LOG_PATH, 'a', encoding='utf-8') as handle:", + " handle.write(f'{method}\\n')", + "", + "def read_message():", + " header = b''", + r" while not header.endswith(b'\r\n\r\n'):", + " chunk = sys.stdin.buffer.read(1)", + " if not chunk:", + " return None", + " header += chunk", + " length = 0", + r" for line in header.decode().split('\r\n'):", + r" if line.lower().startswith('content-length:'):", + r" length = int(line.split(':', 1)[1].strip())", + " payload = sys.stdin.buffer.read(length)", + " return json.loads(payload.decode())", + "", + "def send_message(message):", + " payload = json.dumps(message).encode()", + r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)", + " sys.stdout.buffer.flush()", + "", + "while True:", + " request = read_message()", + " if request is None:", + " break", + " method = request['method']", + " log(method)", + " if method == 'initialize':", + " initialize_count += 1", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'protocolVersion': request['params']['protocolVersion'],", + " 'capabilities': {'tools': {}},", + " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}", + " }", + " })", + " elif method == 'tools/list':", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'tools': [", + " {", + " 'name': 'echo',", + " 'description': f'Echo tool for {LABEL}',", + " 'inputSchema': {", + " 'type': 'object',", + " 'properties': {'text': {'type': 'string'}},", + " 'required': ['text']", + " }", + " }", + " ]", + " }", + " })", + " elif method == 'tools/call':", + " args = request['params'].get('arguments') or {}", + " text = args.get('text', '')", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'result': {", + " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],", + " 'structuredContent': {", + " 'server': LABEL,", + " 'echoed': text,", + " 'initializeCount': initialize_count", + " },", + " 'isError': False", + " }", + " })", + " else:", + " send_message({", + " 'jsonrpc': '2.0',", + " 'id': request['id'],", + " 'error': {'code': -32601, 'message': f'unknown method: {method}'},", + " })", + "", + ] + .join("\n"); + fs::write(&script_path, script).expect("write script"); + let mut permissions = fs::metadata(&script_path).expect("metadata").permissions(); + permissions.set_mode(0o755); + fs::set_permissions(&script_path, permissions).expect("chmod"); + script_path + } + fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap { let config = ScopedMcpServerConfig { scope: ConfigSource::Local, @@ -653,6 +1148,27 @@ mod tests { fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); } + fn manager_server_config( + script_path: &Path, + label: &str, + log_path: &Path, + ) -> ScopedMcpServerConfig { + ScopedMcpServerConfig { + scope: ConfigSource::Local, + config: McpServerConfig::Stdio(McpStdioServerConfig { + command: "python3".to_string(), + args: vec![script_path.to_string_lossy().into_owned()], + env: BTreeMap::from([ + ("MCP_SERVER_LABEL".to_string(), label.to_string()), + ( + "MCP_LOG_PATH".to_string(), + log_path.to_string_lossy().into_owned(), + ), + ]), + }), + } + } + #[test] fn spawns_stdio_process_and_round_trips_io() { let runtime = Builder::new_current_thread() @@ -935,4 +1451,247 @@ mod tests { cleanup_script(&script_path); }); } + + #[test] + fn manager_discovers_tools_from_stdio_config() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let log_path = root.join("alpha.log"); + let servers = BTreeMap::from([( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &log_path), + )]); + let mut manager = McpServerManager::from_servers(&servers); + + let tools = manager.discover_tools().await.expect("discover tools"); + + assert_eq!(tools.len(), 1); + assert_eq!(tools[0].server_name, "alpha"); + assert_eq!(tools[0].raw_name, "echo"); + assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo")); + assert_eq!(tools[0].tool.name, "echo"); + assert!(manager.unsupported_servers().is_empty()); + + manager.shutdown().await.expect("shutdown"); + cleanup_script(&script_path); + }); + } + + #[test] + fn manager_routes_tool_calls_to_correct_server() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let alpha_log = root.join("alpha.log"); + let beta_log = root.join("beta.log"); + let servers = BTreeMap::from([ + ( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &alpha_log), + ), + ( + "beta".to_string(), + manager_server_config(&script_path, "beta", &beta_log), + ), + ]); + let mut manager = McpServerManager::from_servers(&servers); + + let tools = manager.discover_tools().await.expect("discover tools"); + assert_eq!(tools.len(), 2); + + let alpha = manager + .call_tool( + &mcp_tool_name("alpha", "echo"), + Some(json!({"text": "hello"})), + ) + .await + .expect("call alpha tool"); + let beta = manager + .call_tool( + &mcp_tool_name("beta", "echo"), + Some(json!({"text": "world"})), + ) + .await + .expect("call beta tool"); + + assert_eq!( + alpha + .result + .as_ref() + .and_then(|result| result.structured_content.as_ref()) + .and_then(|value| value.get("server")), + Some(&json!("alpha")) + ); + assert_eq!( + beta.result + .as_ref() + .and_then(|result| result.structured_content.as_ref()) + .and_then(|value| value.get("server")), + Some(&json!("beta")) + ); + + manager.shutdown().await.expect("shutdown"); + cleanup_script(&script_path); + }); + } + + #[test] + fn manager_records_unsupported_non_stdio_servers_without_panicking() { + let servers = BTreeMap::from([ + ( + "http".to_string(), + ScopedMcpServerConfig { + scope: ConfigSource::Local, + config: McpServerConfig::Http(McpRemoteServerConfig { + url: "https://example.test/mcp".to_string(), + headers: BTreeMap::new(), + headers_helper: None, + oauth: None, + }), + }, + ), + ( + "sdk".to_string(), + ScopedMcpServerConfig { + scope: ConfigSource::Local, + config: McpServerConfig::Sdk(McpSdkServerConfig { + name: "sdk-server".to_string(), + }), + }, + ), + ( + "ws".to_string(), + ScopedMcpServerConfig { + scope: ConfigSource::Local, + config: McpServerConfig::Ws(McpWebSocketServerConfig { + url: "wss://example.test/mcp".to_string(), + headers: BTreeMap::new(), + headers_helper: None, + }), + }, + ), + ]); + + let manager = McpServerManager::from_servers(&servers); + let unsupported = manager.unsupported_servers(); + + assert_eq!(unsupported.len(), 3); + assert_eq!(unsupported[0].server_name, "http"); + assert_eq!(unsupported[1].server_name, "sdk"); + assert_eq!(unsupported[2].server_name, "ws"); + } + + #[test] + fn manager_shutdown_terminates_spawned_children_and_is_idempotent() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let log_path = root.join("alpha.log"); + let servers = BTreeMap::from([( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &log_path), + )]); + let mut manager = McpServerManager::from_servers(&servers); + + manager.discover_tools().await.expect("discover tools"); + manager.shutdown().await.expect("first shutdown"); + manager.shutdown().await.expect("second shutdown"); + + cleanup_script(&script_path); + }); + } + + #[test] + fn manager_reuses_spawned_server_between_discovery_and_call() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let log_path = root.join("alpha.log"); + let servers = BTreeMap::from([( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &log_path), + )]); + let mut manager = McpServerManager::from_servers(&servers); + + manager.discover_tools().await.expect("discover tools"); + let response = manager + .call_tool( + &mcp_tool_name("alpha", "echo"), + Some(json!({"text": "reuse"})), + ) + .await + .expect("call tool"); + + assert_eq!( + response + .result + .as_ref() + .and_then(|result| result.structured_content.as_ref()) + .and_then(|value| value.get("initializeCount")), + Some(&json!(1)) + ); + + let log = fs::read_to_string(&log_path).expect("read log"); + assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1); + assert_eq!( + log.lines().collect::>(), + vec!["initialize", "tools/list", "tools/call"] + ); + + manager.shutdown().await.expect("shutdown"); + cleanup_script(&script_path); + }); + } + + #[test] + fn manager_reports_unknown_qualified_tool_name() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_manager_mcp_server_script(); + let root = script_path.parent().expect("script parent"); + let log_path = root.join("alpha.log"); + let servers = BTreeMap::from([( + "alpha".to_string(), + manager_server_config(&script_path, "alpha", &log_path), + )]); + let mut manager = McpServerManager::from_servers(&servers); + + let error = manager + .call_tool( + &mcp_tool_name("alpha", "missing"), + Some(json!({"text": "nope"})), + ) + .await + .expect_err("unknown qualified tool should fail"); + + match error { + McpServerManagerError::UnknownTool { qualified_name } => { + assert_eq!(qualified_name, mcp_tool_name("alpha", "missing")); + } + other => panic!("expected unknown tool error, got {other:?}"), + } + + cleanup_script(&script_path); + }); + } }