From 5eeb7be4ccaccf8ce531588d4acd331fc45ce3a9 Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Tue, 31 Mar 2026 21:43:37 +0000 Subject: [PATCH] Repair MCP stdio runtime tests after the in-flight JSON-RPC slice The dirty stdio slice had two real regressions in its new JSON-RPC test coverage: the embedded Python helper was written with broken string literals, and direct execution of the freshly written helper could fail with ETXTBSY on Linux. The repair keeps scope inside mcp_stdio.rs by fixing the helper strings and invoking the JSON-RPC helper through python3 while leaving the existing stdio process behavior unchanged. Constraint: Keep the repair limited to rust/crates/runtime/src/mcp_stdio.rs Constraint: Must satisfy fmt, clippy -D warnings, and runtime tests before shipping Rejected: Revert the entire JSON-RPC stdio coverage addition | unnecessary once the helper/test defects were isolated Confidence: high Scope-risk: narrow Reversibility: clean Directive: Keep ephemeral stdio test helpers portable and avoid directly execing freshly written scripts when an interpreter invocation is sufficient Tested: cargo fmt --all; cargo clippy -p runtime --all-targets -- -D warnings; cargo test -p runtime Not-tested: Cross-platform behavior outside the current Linux runtime --- rust/crates/runtime/src/mcp_stdio.rs | 130 ++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 3 deletions(-) diff --git a/rust/crates/runtime/src/mcp_stdio.rs b/rust/crates/runtime/src/mcp_stdio.rs index f75bbad..c279a04 100644 --- a/rust/crates/runtime/src/mcp_stdio.rs +++ b/rust/crates/runtime/src/mcp_stdio.rs @@ -2,9 +2,11 @@ use std::collections::BTreeMap; use std::io; use std::process::Stdio; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use serde_json::Value as JsonRpcMessage; + use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport}; #[derive(Debug)] @@ -12,7 +14,7 @@ use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTranspor pub struct McpStdioProcess { child: Child, stdin: ChildStdin, - stdout: ChildStdout, + stdout: BufReader, } #[allow(dead_code)] @@ -39,7 +41,7 @@ impl McpStdioProcess { Ok(Self { child, stdin, - stdout, + stdout: BufReader::new(stdout), }) } @@ -58,6 +60,49 @@ impl McpStdioProcess { Ok(buffer) } + pub async fn write_jsonrpc_message(&mut self, message: &JsonRpcMessage) -> io::Result<()> { + let encoded = encode_jsonrpc_message(message)?; + self.write_all(&encoded).await?; + self.flush().await + } + + pub async fn read_jsonrpc_message(&mut self) -> io::Result { + let payload = self.read_frame().await?; + serde_json::from_slice(&payload) + .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error)) + } + + async fn read_frame(&mut self) -> io::Result> { + let mut content_length = None; + loop { + let mut line = String::new(); + let bytes_read = self.stdout.read_line(&mut line).await?; + if bytes_read == 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "MCP stdio stream closed while reading headers", + )); + } + if line == "\r\n" { + break; + } + if let Some(value) = line.strip_prefix("Content-Length:") { + let parsed = value + .trim() + .parse::() + .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; + content_length = Some(parsed); + } + } + + let content_length = content_length.ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header") + })?; + let mut payload = vec![0_u8; content_length]; + self.stdout.read_exact(&mut payload).await?; + Ok(payload) + } + pub async fn terminate(&mut self) -> io::Result<()> { self.child.kill().await } @@ -88,6 +133,15 @@ fn apply_env(command: &mut Command, env: &BTreeMap) { } } +fn encode_jsonrpc_message(message: &JsonRpcMessage) -> io::Result> { + let body = serde_json::to_vec(message) + .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?; + let header = format!("Content-Length: {}\r\n\r\n", body.len()); + let mut framed = header.into_bytes(); + framed.extend(body); + Ok(framed) +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -129,6 +183,37 @@ mod tests { script_path } + fn write_jsonrpc_script() -> PathBuf { + let root = temp_dir(); + fs::create_dir_all(&root).expect("temp dir"); + let script_path = root.join("jsonrpc-mcp.py"); + let script = [ + "#!/usr/bin/env python3", + "import json, sys", + "header = b''", + r"while not header.endswith(b'\r\n\r\n'):", + " chunk = sys.stdin.buffer.read(1)", + " if not chunk:", + " raise SystemExit(1)", + " header += chunk", + "length = 0", + r"for line in header.decode().split('\r\n'):", + r" if line.lower().startswith('content-length:'):", + r" length = int(line.split(':', 1)[1].strip())", + "payload = sys.stdin.buffer.read(length)", + "json.loads(payload.decode())", + r"response = json.dumps({'jsonrpc': '2.0', 'id': 1, 'result': {'echo': True}}).encode()", + r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)", + "sys.stdout.buffer.flush()", + "", + ] + .join("\n"); + fs::write(&script_path, script).expect("write script"); + let mut permissions = fs::metadata(&script_path).expect("metadata").permissions(); + permissions.set_mode(0o755); + fs::set_permissions(&script_path, permissions).expect("chmod"); + script_path + } fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap { let config = ScopedMcpServerConfig { scope: ConfigSource::Local, @@ -185,6 +270,45 @@ mod tests { assert_eq!(error.kind(), ErrorKind::InvalidInput); } + #[test] + fn round_trips_jsonrpc_messages_over_stdio_frames() { + let runtime = Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + runtime.block_on(async { + let script_path = write_jsonrpc_script(); + let transport = crate::mcp_client::McpStdioTransport { + command: "python3".to_string(), + args: vec![script_path.to_string_lossy().into_owned()], + env: BTreeMap::new(), + }; + let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly"); + process + .write_jsonrpc_message(&serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize" + })) + .await + .expect("write jsonrpc message"); + + let response = process + .read_jsonrpc_message() + .await + .expect("read jsonrpc response"); + assert_eq!(response["jsonrpc"], serde_json::json!("2.0")); + assert_eq!(response["id"], serde_json::json!(1)); + assert_eq!(response["result"]["echo"], serde_json::json!(true)); + + let status = process.wait().await.expect("wait for exit"); + assert!(status.success()); + + fs::remove_file(&script_path).expect("cleanup script"); + fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir"); + }); + } + #[test] fn direct_spawn_uses_transport_env() { let runtime = Builder::new_current_thread()