MCP Stdio 传输详解:进程通信的实现原理与关键细节
标签JavaMCPStdio进程通信JSON-RPCj-langchain前置阅读MCP 协议通信详解从握手到工具调用的完整流程适合人群希望深入理解 MCP Stdio 传输机制、或需要排查本地 MCP 服务器问题的 Java 开发者一、Stdio 是什么适合什么场景MCP 支持三种传输方式Stdio、SSE、HTTP。Stdio标准输入输出是其中最简单直接的一种通过进程间通信实现客户端和服务器的双向通信┌─────────────┐ stdin → ┌─────────────┐ │ 客户端进程 │ │ 服务器进程 │ │ │ ← stdout │ │ └─────────────┘ ← stderr └─────────────┘ 日志客户端启动服务器进程后通过stdin发送 JSON-RPC 请求从stdout读取响应stderr用于接收服务器日志。这种方式天然适合本地工具npx启动的官方 MCP 服务器server-filesystem、server-memory等、本地 Python/Go 脚本封装的工具都走 Stdio 通信。不需要任何网络配置进程级别天然隔离。代价是不支持跨网络并发模型是同步单线程进程的启动和销毁需要自己管理。二、完整对接流程Stdio 模式的通信流程与协议层完全一致握手三步不变差异只在传输细节上客户端 服务器进程 │ │ │──── 1. ProcessBuilder.start() ──────── │启动进程 │──── 2. 获取 stdin/stdout/stderr ────── │建立 I/O 流 │──── 3. 启动 stderr 监听线程 ────────── │必须防止缓冲区死锁 │ │ │──── 4. 写入 initialize 到 stdin ────── │ │─── 5. 从 stdout 读取响应 ───────────── │ │──── 6. 写入 initialized 通知到 stdin ── │ ← 握手完成 │ │ │──── 7. 正常工具调用 ─────────────────── │ │─── 8. 读取工具结果 ──────────────────── │ │ │ │──── 9. 关闭 stdin → 等待退出 ────────── │优雅关闭三、逐步实现第一步启动进程ListStringcommandnewArrayList();// npx 启动官方 MCP 服务器command.add(npx);command.add(-y);command.add(modelcontextprotocol/server-memory);// 或者 uv 启动 Python MCP 服务器// command.add(uv);// command.add(--directory); command.add(/path/to/mcp);// command.add(run); command.add(mcp-server);ProcessBuilderpbnewProcessBuilder(command);// 注入环境变量按需pb.environment().put(NODE_ENV,production);pb.environment().put(DEBUG,false);ProcessserverProcesspb.start();命令列表中每个元素是独立的参数不要把多个参数合并成一个字符串传入否则会被当成单个带空格的命令名执行失败。第二步建立 I/O 流// 明确指定 UTF-8避免中文乱码BufferedWriterstdinnewBufferedWriter(newOutputStreamWriter(serverProcess.getOutputStream(),StandardCharsets.UTF_8));BufferedReaderstdoutnewBufferedReader(newInputStreamReader(serverProcess.getInputStream(),StandardCharsets.UTF_8));BufferedReaderstderrnewBufferedReader(newInputStreamReader(serverProcess.getErrorStream(),StandardCharsets.UTF_8));名称容易让人困惑getOutputStream()返回的是写入到服务器 stdin 的流getInputStream()返回的是从服务器 stdout 读取的流。第三步启动 stderr 监听线程必须这一步很容易被忽略但它是防止死锁的关键。ThreadstderrThreadnewThread(()-{try{Stringline;while((linestderr.readLine())!null){if(line.contains(ERROR)||line.contains(error)){log.error([{}] {},serverName,line);lastErrorline;}else{log.info([{}] {},serverName,line);}}}catch(IOExceptione){if(connected){log.error([{}] stderr 读取异常{},serverName,e.getMessage());}}});stderrThread.setDaemon(true);// 守护线程主线程退出时自动结束stderrThread.setName(mcp-stderr-serverName);stderrThread.start();为什么必须用独立线程操作系统为进程间通信分配了有限的缓冲区。如果不持续读取stderr缓冲区满后服务器进程会阻塞在写日志上无法继续处理请求最终导致整个通信死锁。第四步发送请求和接收响应Stdio 的请求响应是严格同步的写一条读一条。publicsynchronizedMcpResponsesendRequest(Stringmethod,Objectparams)throwsException{if(!connected){thrownewIllegalStateException(未建立连接);}// 构建 JSON-RPC 请求McpRequestrequestnewMcpRequest();request.idnextRequestId();request.methodmethod;request.paramsparams;StringrequestJsonmapper.writeValueAsString(request);log.debug([{}] → {},serverName,requestJson);// 写入 stdin每条消息以换行符结尾stdin.write(requestJson\n);stdin.flush();// 必须 flush否则数据留在缓冲区不会发送// 从 stdout 读取响应阻塞直到有数据StringresponseJsonstdout.readLine();if(responseJsonnull){thrownewIOException(服务器已关闭连接);}log.debug([{}] ← {},serverName,responseJson);McpResponseresponsemapper.readValue(responseJson,McpResponse.class);if(response.error!null){thrownewMcpException(response.error.code,response.error.message,response.error.data);}returnresponse;}synchronized是必要的原因是 Stdio 通信是严格的一问一答必须等上一个请求的响应读取完毕才能发下一个请求否则多线程并发写入会导致 JSON 数据交错readLine()读到的响应也会对不上请求。第五步发送通知通知没有id也不等待响应initialized握手通知就是这种protectedvoidsendNotification(Stringmethod,Objectparams)throwsException{// 用 Map 手动构建确保序列化结果中没有 id 字段MapString,ObjectnotificationnewHashMap();notification.put(jsonrpc,2.0);notification.put(method,method);notification.put(params,params);Stringjsonmapper.writeValueAsString(notification);stdin.write(json\n);stdin.flush();// 不读取响应}第六步优雅关闭关闭顺序很重要错误的顺序可能导致进程无法退出publicvoidclose(){connectedfalse;try{// 1. 先关闭 stdin服务器收到 EOF 后会知道客户端断开开始清理if(stdin!null)stdin.close();if(stdout!null)stdout.close();if(stderr!null)stderr.close();// 2. 等待进程正常退出给服务器时间做清理if(serverProcess!null){booleanexitedserverProcess.waitFor(5,TimeUnit.SECONDS);if(!exited){log.warn([{}] 进程未正常退出发送 SIGTERM,serverName);serverProcess.destroy();if(!serverProcess.waitFor(2,TimeUnit.SECONDS)){log.warn([{}] 进程仍未退出发送 SIGKILL,serverName);serverProcess.destroyForcibly();}}log.info([{}] 进程退出码{},serverName,serverProcess.exitValue());}}catch(Exceptione){log.error([{}] 关闭连接异常{},serverName,e.getMessage());}}destroy()发送 SIGTERM允许进程做清理destroyForcibly()发送 SIGKILL强制终止。先发 SIGTERM超时后再 SIGKILL是标准的优雅关闭流程。四、三个容易踩坑的地方坑一不读 stderr 导致死锁症状请求发出后程序永久阻塞无响应CPU 占用低。原因服务器输出了大量日志到 stderr缓冲区满后服务器阻塞无法处理请求stdout.readLine()永远等不到响应。解决stderr必须在单独线程中持续读取且要在发送任何请求之前启动该线程。坑二忘记 flush 导致请求不发送症状程序卡在stdout.readLine()服务器没有收到任何请求。原因BufferedWriter有内部缓冲区写入的数据未必立即发送到操作系统管道。不调用flush()的话数据可能在缓冲区里待很久才发出甚至等到缓冲区满了才发。解决每次stdin.write(...)之后必须紧跟stdin.flush()。坑三进程无法退出变成僵尸进程症状serverProcess.waitFor()永久阻塞进程列表中能看到该进程还在运行。原因服务器进程在等待 stdin 输入因为 stdin 未关闭或者服务器启动了子进程而父进程退出后子进程没有一起退出。解决关闭时先stdin.close()让服务器感知到 EOF再waitFor(超时)超时后destroyForcibly()。五、超时处理stdout.readLine()默认无限阻塞生产环境需要加超时// 方案一用 CompletableFuture 加超时publicMcpResponsesendRequestWithTimeout(Stringmethod,Objectparams,longtimeoutMs)throwsException{CompletableFutureMcpResponsefutureCompletableFuture.supplyAsync(()-{try{returnsendRequest(method,params);}catch(Exceptione){thrownewCompletionException(e);}});returnfuture.get(timeoutMs,TimeUnit.MILLISECONDS);}// 方案二轮询检查适合简单场景publicStringreadLineWithTimeout(longtimeoutMs)throwsIOException,TimeoutException{longdeadlineSystem.currentTimeMillis()timeoutMs;while(System.currentTimeMillis()deadline){if(stdout.ready()){returnstdout.readLine();}Thread.sleep(10);}thrownewTimeoutException(读取响应超时);}CompletableFuture方案更干净但要注意超时后线程仍可能阻塞在readLine()——此时需要关闭流来中断阻塞或者接受线程泄漏在连接关闭时才会释放。六、与 SSE、HTTP 的横向对比维度StdioSSEHTTP适用场景本地工具、npx 服务器远程服务、需要推送简单 HTTP 接口跨网络❌✅✅并发模型同步单线程异步多路复用同步连接池进程管理需要自己管理无需管理无需管理实现复杂度低线性读写高异步匹配低请求响应启动延迟高启动进程中建立连接低调试难度中高低核心代码复杂度的差异体现在请求/响应的匹配方式上// Stdio同步写完直接读一一对应stdin.write(request\n);stdin.flush();Stringresponsestdout.readLine();// SSE异步用 CompletableFuture 按 id 匹配pendingResponses.put(requestId,newCompletableFuture());sendHttpPost(request);returnpendingResponses.get(requestId).get(30,TimeUnit.SECONDS);// HTTP同步但响应可能是 SSE 格式需额外解析ResponsehttpResphttpClient.newCall(request).execute();StringjsonparseSseFormatIfNeeded(httpResp.body().string());Stdio 的同步模型反而是最好理解和调试的代价是无法并发。对于本地工具调用这个代价通常完全可以接受。七、总结Stdio 对接的七个关键点命令参数逐个拆分不要合并成带空格的字符串明确指定 UTF-8 编码避免中文乱码必须在发请求前启动 stderr 监听线程防止缓冲区死锁每次写入后立即 flush确保数据发送用 synchronized 串行化请求保证请求响应一一对应关闭顺序先关 stdin → waitFor → destroy → destroyForcibly生产环境加超时避免 readLine() 无限阻塞在 j-langchain 中这些细节全部封装在McpServerConnection内部。通过McpConnectionFactory.createConnection(name, config)创建连接connect()完成启动和握手后续直接调用listTools()和callTool()即可不需要手动处理任何管道细节。 相关资源MCP 官方规范https://modelcontextprotocol.ioJSON-RPC 2.0 规范https://www.jsonrpc.org/specificationj-langchain GitHubhttps://github.com/flower-trees/j-langchainj-langchain Gitee 镜像https://gitee.com/flower-trees-z/j-langchain