SSE流式响应:从Reactor Flux到生产级AI聊天的工程实践——5分钟超时、线程隔离、背压处理全解析
大家好我是程序员小策。首先给大家去一个例子凌晨两点P0 告警炸了。AI 聊天接口全部超时用户消息发出去转圈转了 120 秒然后报错。你打开监控一看Tomcat 线程池满了200 个工作线程全部卡在等待 AI 回复这个状态上。新的请求进来直接 reject。原因很简单——OpenAI 那边的 API 延迟突然飙到了 90 秒。而你的后端用的是同步阻塞调用httpClient.post(url, body)线程就一直干等着。一个用户等 90 秒十个用户就是 900 秒·线程两百个线程全部耗尽只用了不到三分钟。这就是同步调用大模型在生产上最经典的翻车场景。今天这篇文章我就带你看一个生产级的 AI 聊天系统是怎么解决这个问题的——用 Reactor Flux SSE 把同步阻塞变成异步流式用独立的线程池把 AI 调用和 Web 请求线程隔离开用 CountDownLatch 守住最后一道超时防线。问题定义AI 聊天的核心矛盾LLM 响应时间极度不确定5 秒到 180 秒而 HTTP 请求线程是稀缺资源Tomcat 默认 200 个。朴素方案是同步阻塞调用请求进来 → 调 LLM → 等返回 → 返回给前端。在 LLM 响应稳定的情况下比如 5 秒以内这个方案没问题。但 LLM 的响应时间取决于模型负载、输入长度、输出长度波动巨大。一旦延迟飙上去线程池就被打满。那必然的改进方向是异步 流式。但这不是简单的把同步改成异步就完事了。你还要面对流式数据如何一帧一帧推给前端用什么协议AI 调用在哪个线程里执行会不会影响主线程如果 AI 调用一直不返回谁来兜底超时流式过程中用户刷新页面断开连接后端怎么感知核心概念SSEServer-Sent Events基于 HTTP 协议的单向流式传输服务端可以持续向客户端推送数据客户端通过EventSourceAPI 接收。相比 WebSocketSSE 更轻量不需要协议升级网关/CDN 兼容性更好。相比短轮询SSE 实时性更高不需要客户端反复请求。FluxReactor 框架中的响应式流发布者代表一个包含 0 到 N 个元素的异步序列。FluxString可以逐个推送字符串给订阅者。FluxSinkFlux 的编程式发射器——你不需要预先准备好所有数据而是在运行时按需调用sink.next(data)把数据推出去。用交通类比理解这个架构想象一条高速公路收费站。同步阻塞就是一辆车到了收费窗口等它交完钱、找完零、拿完票据、开走下一辆车才能上来。高峰期必堵。异步流式就是ETC 不停车收费——车开过去交易在后台异步完成。多车道并行处理一辆车堵了不影响其他车道。在这个系统里收费站 HTTP 请求线程稀缺资源快速释放ETC 后台 独立线程池ThreadPoolTaskExecutor专门执行 AI 调用车道 FluxSink每个请求一条车道互不干扰超时栏杆 CountDownLatch.await(5, TimeUnit.MINUTES)5 分钟没通过就拦截实现看代码。第一步Controller 层——SSE 的入口RestControllerRequestMapping(/api/xunzhi/v1/ai)RequiredArgsConstructorpublicclassAiMessageController{privatefinalAiMessageServiceaiMessageService;PostMapping(value/sessions/{sessionId}/chat,producesMediaType.TEXT_EVENT_STREAM_VALUE)// ← 告诉浏览器这是SSE流publicFluxStringchat(PathVariableStringsessionId,RequestBodyAiMessageReqDTOrequestParam,CurrentUserStringusername,HttpServletResponseresponse){response.setHeader(Cache-Control,no-cache);// ← 禁止浏览器缓存SSE数据response.setHeader(Connection,keep-alive);// ← 保持连接response.setHeader(Access-Control-Allow-Origin,*);response.setHeader(Access-Control-Allow-Headers,Cache-Control);requestParam.setSessionId(sessionId);returnaiMessageService.aiChatFlux(requestParam,username);// ← 返回FluxSpring自动转SSE}}注意produces MediaType.TEXT_EVENT_STREAM_VALUE——这一句告诉 Spring“这个接口的返回值不要一次性序列化成 JSON 返回而是以text/event-stream格式逐条推送。”第二步核心——Service 层的流式编排。这是整个系统最重要的一段代码ServiceRequiredArgsConstructorpublicclassAiMessageServiceImplimplementsAiMessageService{privatestaticfinalStringDEFAULT_ERROR_CONTENTSorry, an error occurred while processing your request.;privatestaticfinalStringUNSUPPORTED_AI_TYPECurrent AI type is not supported;privatefinalAiPropertiesServiceaiPropertiesService;privatefinalAiConversationServiceaiConversationService;privatefinalAiChatHandlerFactoryaiChatHandlerFactory;privatefinalConversationMessageHistoryServiceconversationMessageHistoryService;privatefinalConversationMessagePersistenceServiceconversationMessagePersistenceService;privatefinalConversationStreamingSupportconversationStreamingSupport;privatefinalThreadPoolTaskExecutorthreadPoolTaskExecutor;// ← 独立线程池OverridepublicFluxStringaiChatFlux(AiMessageReqDTOrequestParam,Stringusername){// 参数校验 权限校验if(requestParamnull){returnFlux.error(newClientException(request body cannot be empty));}if(StrUtil.isBlank(requestParam.getSessionId())){returnFlux.error(newClientException(sessionId cannot be empty));}aiConversationService.requireOwnedConversation(requestParam.getSessionId(),username);requestParam.setUserName(username);returnaiChatFluxInternal(requestParam);}privateFluxStringaiChatFluxInternal(AiMessageReqDTOrequestParam){returnFlux.create(sink-{// ← Flux.create()编程式创建FluxStringuserMessageStrUtil.blankToDefault(requestParam.getInputMessage(),No input);LongaiIdrequestParam.getAiId();AIContentAccumulatoraccumulatornewAIContentAccumulator();// ← 内容累积器// 核心AI调用提交到独立线程池不占用Tomcat线程threadPoolTaskExecutor.submit(()-processChat(sessionId,aiId,userMessage,sink,accumulator));// 监听前端断开连接事件sink.onCancel(()-log.warn(AI chat flux cancelled, sessionId{},sessionId));sink.onDispose(()-log.info(AI chat flux disposed, sessionId{},sessionId));});}}这段代码的设计精髓在三个点第一个Flux.create()而不是Flux.just()。Flux.just(a, b, c)是你事先知道所有数据一次性列出来。但 AI 聊天是流式的——你不可能事先知道 AI 会说什么。Flux.create()给你一个FluxSink你可以在任何时候调用sink.next(data)推数据非常灵活。第二个threadPoolTaskExecutor.submit()把 AI 调用隔离到独立线程池。这一步极其关键。Tomcat 的工作线程拿到请求后校验参数、校验权限然后把 AI 调用丢给独立线程池自己立刻返回去处理下一个请求。Tomcat 线程不会被阻塞 90 秒。第三个sink.onCancel()和sink.onDispose()监听的是前端断开连接。用户刷新页面或关闭标签页时SSE 连接会断开后端需要知道并及时释放资源。第三步Handler 层的流式发射和超时兜底。这是UniversalAiChatHandler.streamToSink()的核心OverridepublicvoidstreamToSink(AiPropertiesDOaiProperties,StringuserMessage,ListAiMessageHistoryRespDTOhistoryMessages,FluxSinkStringsink,AIContentAccumulatoraccumulator)throwsException{ChatClientchatClientcreateChatClient(aiProperties);ListMessagemessagesbuildMessages(aiProperties,userMessage,historyMessages);CountDownLatchlatchnewCountDownLatch(1);// ← 超时防线finalThrowable[]streamErrornewThrowable[1];// ← 异常收集器chatClient.prompt().messages(messages).stream().chatResponse().subscribe(chatResponse-{// 正常收到一个token → 推送给前端GenerationgenerationchatResponse.getResult();if(generation!null){Stringcontentgeneration.getOutput().getText();if(StrUtil.isNotEmpty(content)){AiChatStreamRespDTOrespAiChatStreamRespDTO.builder().type(content).content(content).build();sink.next(JSON.toJSONString(resp));// ← 推给SSEaccumulator.appendSimpleContent(content);// ← 累积完整回复}// DeepSeek R1 的 reasoning_content 处理 ← 关键Stringreasoningnull;try{MethodgetReasoningContentgeneration.getOutput().getClass().getMethod(getReasoningContent);ObjectreasoningValgetReasoningContent.invoke(generation.getOutput());if(reasoningVal!null)reasoningreasoningVal.toString();}catch(Exceptionignore){}if(reasoningnull){ObjectreasoningObjgeneration.getOutput().getMetadata().get(reasoningContent);if(reasoningObj!null)reasoningreasoningObj.toString();}if(StrUtil.isNotEmpty(reasoning)){AiChatStreamRespDTOrespAiChatStreamRespDTO.builder().type(reasoning_content).content(reasoning).build();sink.next(JSON.toJSONString(resp));// ← 深度思考推给前端accumulator.appendReasoningChunk(reasoning.getBytes());}}},error-{log.error(流式响应发生错误,error);streamError[0]error;sink.error(error);latch.countDown();},latch::countDown// ← 正常完成时countDown);// 核心最多等5分钟 ← 超时兜底if(!latch.await(5,TimeUnit.MINUTES)){thrownewRuntimeException(AI 响应超时);}if(streamError[0]!null){thrownewRuntimeException(streamError[0]);}}这段代码里有三个生产级的实践细节第一CountDownLatch作为同步屏障。Spring AI 的stream().subscribe()是异步的——你发起流式调用后代码会立刻继续往下走。如果不加等待方法直接返回了FluxSink 还没收到任何数据。latch.await(5, MINUTES)的意思是“我等你最多 5 分钟5 分钟后还没完成就抛超时异常”。第二streamError[]数组收集异常。为什么不是直接throw因为subscribe()的 error 回调是在另一个线程里执行的直接 throw 不会传播到当前线程。用数组做中转当前线程在latch.await()返回后检查。第三DeepSeek R1 的reasoningContent处理用了反射。因为 Spring AI 的通用AssistantMessage类没有直接暴露reasoningContent字段但 DeepSeek 的返回里有。代码先用反射去取getReasoningContent()方法取不到再 fallback 到 metadata 字段——兼容的优雅写法。边界情况与陷阱陷阱一用户刷新页面前端断开连接FluxSink继续写数据会怎样会抛异常。但如果你的代码在收到onCancel信号后还在向 AI 发请求这个请求就成了孤儿请求——白白消耗 Token结果没人接收。这个系统的防御是在sink.onCancel()里打日志并在上层processChat里的sink.isCancelled()检查中跳过推送。陷阱二CountDownLatch设 5 分钟够不够这取决于你的业务场景。对于 GPT-4 生成一篇长文章5 分钟可能不够。但注意——CountDownLatch的超时是整个流式过程的总时长不是单个 token 的等待时间。只要 AI 一直在输出 token即使很慢latch就不会超时因为latch.countDown()只有在流式完成后才会调用。所以 5 分钟超时真正罩住的是流式调用发起了但一直没有返回任何数据或者中间网络断了导致无限等待的场景。陷阱三线程池的拒绝策略。ThreadPoolTaskExecutor如果配置不当核心线程数太小、队列太大高峰期任务会被堆积。默认的AbortPolicy会直接抛异常。这个系统用的是ThreadPoolTaskExecutor需要去看实际配置有没有设置合理的拒绝策略——比如CallerRunsPolicy让调用线程自己执行虽然慢但不会丢任务。高级考量SSE vs WebSocket vs gRPC Stream当你的 AI 聊天不只是文本还要支持图片、语音、文件传输时SSE 还够不够用SSE 只支持服务端到客户端的单向推送。如果未来你需要客户端上传语音流、服务端实时转文字并流式返回那 WebSocket 的双向通信就更合适。但 WebSocket 有它的代价——需要协议升级HTTP → WS某些网关/CDN 不兼容调试也比 SSE 复杂。gRPC Stream 是另一个选择——双向流性能比 WebSocket 好但需要客户端支持 gRPC通常是后端服务之间通信用浏览器端不太方便。本项目选 SSE 的原因AI 聊天就是单向的用户发一条消息 → AI 流式回复SSE 正好够用不引入额外复杂度。对比表格方案实现方式线程模型超时控制前端断开感知适用场景同步阻塞httpClient.post()等返回Tomcat线程直接等待HTTP超时粗糙HTTP断开抛异常响应3秒的低延迟APIDeferredResultSpring MVC 异步业务线程池 Servlet3.0异步Future.get(timeout)onTimeout回调传统Spring MVC项目改异步WebFlux SSE本项目Flux.create()FluxSink独立线程池Tomcat线程立刻释放CountDownLatch.await(5min)sink.onCancel()AI流式聊天、实时推送WebSocketWebSocketHandler长连接线程池心跳检测onClose事件双向实时通信gRPC StreamStreamObservergRPC线程池deadlineonCompleted/onError微服务间流式通信面试追问面试追问 1为什么用Flux.create()而不是Flux.push()或Flux.generate()→ 回答方向Flux.create()最适合外部异步源推数据的场景——你拿到一个FluxSink可以在任何地方包括其他线程调用sink.next()。Flux.generate()是同步的、一次一个地生成数据不适合这里。Flux.push()是create()的单线程版本但 AI 回调可能在多个线程里触发。所以Flux.create()是正确选择。面试追问 2CountDownLatch等待 5 分钟如果主线程是 Tomcat 线程不还是阻塞了吗→ 回答方向问得好这就是threadPoolTaskExecutor.submit()的关键作用。streamToSink()不是在 Tomcat 线程里执行的——它被提交到了独立线程池。Tomcat 线程在Flux.create()返回后就已经释放了。CountDownLatch.await()阻塞的是独立线程池里的工作线程不影响 Tomcat 接收新请求。面试追问 3AiChatStreamRespDTO有两种 type——content和reasoning_content前端怎么区分和渲染→ 回答方向前端收到 SSE 事件后按type字段分流。content类型的渲染到聊天气泡的主区域reasoning_content类型的渲染到一个可折叠的思考过程区域类似 DeepSeek 官网那种灰色小字。两种类型交替推送——可能在思考过程中间夹杂正式回复前端要做好顺序拼接。总结流式输出的关键是两个隔离线程隔离AI 调用不占用 Web 线程和时间隔离超时不依赖 HTTP 超时。读完这篇你应该能用Flux.create()FluxSink实现一个 SSE 流式推送接口用CountDownLatch给异步流式调用加超时兜底理解为什么需要独立线程池来执行 AI 调用在面试时说出FluxSink 编程式发射 线程池隔离 CountDownLatch 超时而不只是用了 WebFlux