#31 Agent 的实时性挑战:流式输出、低延迟推理与异步架构
从一次线上事故说起凌晨两点告警电话把我从床上拽起来。用户反馈我们的Agent在对话中“卡住了”——输入问题后等了整整8秒才看到第一个token输出。更诡异的是有些请求直接超时日志里全是asyncio.TimeoutError。我盯着监控面板CPU使用率只有40%内存也没爆。问题出在哪翻看调用链发现Agent内部在等一个RAG检索结果而检索服务因为并发请求堆积平均响应时间从50ms飙升到了2.3秒。更致命的是Agent的推理流程是串行阻塞的——检索没完成LLM推理就干等着用户界面一片空白。这不是个例。几乎所有生产环境下的Agent系统都会在某个时刻撞上“实时性”这堵墙。今天这篇笔记就聊聊我踩过的坑和填坑的姿势。流式输出别让用户盯着转圈圈第一个坑全量输出才返回早期版本我天真地让Agent等LLM生成完整回复再一次性返回。用户输入“帮我写一篇5000字的技术方案”然后看着转圈圈转了40秒。这种体验用户不骂娘才怪。正确的姿势用SSEServer-Sent Events或者WebSocket做流式传输。LLM每生成一个token立刻推送到前端。# 别这样写等全部生成完再返回asyncdefbad_agent(query):full_responseawaitllm.generate(query)# 阻塞等待全部tokenreturnfull_response# 用户要等几十秒# 这样写流式推送asyncdefstream_agent(query,websocket):asyncfortokeninllm.stream_generate(query):# 逐token流式awaitwebsocket.send_text(token)# 实时推送# 这里踩过坑记得加await否则协程不执行但流式输出不是简单地把LLM的stream接口透传就完事了。Agent内部可能有多个步骤先检索、再推理、再调用工具。每个步骤的中间结果都应该以流的形式推送给前端。我的做法定义一套流式事件协议。比如{type: thinking, content: 正在检索知识库...}{type: tool_call, name: search_web, args: {...}}{type: token, content: 根据}。前端根据事件类型渲染不同的UI组件。第二个坑流式中断与重连网络抖动是常态。用户在地铁上信号时断时续流式连接断了怎么办方案前端实现断线重连后端支持断点续传。LLM生成过程中后端把已生成的token缓存起来比如用Redis的List结构客户端重连时带上最后一个token的序号后端从断点处继续推送。# 断点续传的伪代码asyncdefstream_with_resume(query,client_id,last_token_id):cache_keyfstream_cache:{client_id}cached_tokensawaitredis.lrange(cache_key,0,-1)# 跳过已发送的tokenstart_idxlen(cached_tokens)iflast_token_idisNoneelselast_token_id1asyncforidx,tokeninenumerate(llm.stream_generate(query)):ifidxstart_idx:continueawaitredis.rpush(cache_key,token)# 缓存新tokenyieldtoken这个方案有个代价缓存会占用内存。我通常设置TTL为5分钟超时自动清理。低延迟推理从模型到硬件的全链路优化模型层面的取舍别迷信“大模型就是好”。在Agent场景下延迟和效果需要trade-off。我的经验法则简单意图识别、实体抽取用6B-7B的模型量化到INT4推理延迟控制在100ms以内复杂推理、代码生成用13B-14B的模型配合vLLM或TensorRT-LLM做推理加速只有核心决策环节才用70B的大模型而且要做好降级预案量化踩坑记录有一次我把一个13B模型用GPTQ量化到4bit推理速度是快了但输出质量明显下降——Agent开始频繁出现幻觉把“张三”说成“李四”。后来换成AWQ量化质量损失小很多。别为了速度牺牲太多精度用户不是傻子。推理引擎的选择vLLM是目前最成熟的方案支持PagedAttention、连续批处理、前缀缓存。但有个坑vLLM的异步接口在Python中需要小心使用。# 别这样写在异步循环中同步调用vLLMasyncdefbad_inference(prompts):forpromptinprompts:resultvllm.generate(prompt)# 阻塞会卡住事件循环process(result)# 这样写使用vLLM的异步APIfromvllmimportAsyncLLMEngine,SamplingParams engineAsyncLLMEngine.from_engine_args(engine_args)asyncdefgood_inference(prompts):tasks[]forpromptinprompts:sampling_paramsSamplingParams(temperature0.7,max_tokens512)taskengine.add_request(prompt,sampling_params)tasks.append(task)resultsawaitasyncio.gather(*tasks)# 并发执行returnresults硬件层面的骚操作如果你用的是NVIDIA GPU有几个参数值得调CUDA graphs减少kernel launch开销。vLLM默认开启但如果你自己写推理代码记得手动启用。MPSMulti-Process Service多进程共享GPU上下文减少显存占用。适合同时部署多个小模型。GPU Direct RDMA如果Agent需要频繁从向量数据库读取数据用RDMA绕过CPU直接访问GPU显存延迟能降30%以上。一个真实案例我们把Agent的embedding模型从CPU迁移到GPU上用TensorRT优化后向量化延迟从15ms降到了0.8ms。代价是显存多了2GB但值得。异步架构别让一个慢操作拖垮整个系统事件循环的陷阱Python的asyncio是协作式多任务一个协程如果阻塞了整个事件循环都会卡住。常见阻塞操作同步的HTTP请求requests.get同步的文件读写CPU密集型的计算比如正则匹配大量文本# 别这样写同步请求阻塞事件循环asyncdefagent_workflow(query):# 这里踩过坑requests.get是同步的会阻塞事件循环docsrequests.get(http://rag-service/search,params{q:query})resultawaitllm.generate(docs.text)returnresult# 这样写使用异步HTTP客户端importaiohttpasyncdefagent_workflow(query):asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(http://rag-service/search,params{q:query})asresp:docsawaitresp.text()resultawaitllm.generate(docs)returnresult异步工作流引擎Agent的流程往往包含多个步骤而且步骤之间有依赖关系。用简单的await串行执行效率太低。我的方案基于DAG有向无环图的异步工作流引擎。classAsyncWorkflow:def__init__(self):self.graph{}# node_id - (dependencies, coroutine)self.results{}defadd_node(self,node_id,deps,coro):self.graph[node_id](deps,coro)asyncdefexecute(self):# 拓扑排序并行执行无依赖的节点ready[nforn,(deps,_)inself.graph.items()ifnotdeps]whileready:tasks[self._run_node(n)forninready]awaitasyncio.gather(*tasks)# 更新ready队列readyself._get_ready_nodes()returnself.resultsasyncdef_run_node(self,node_id):deps,coroself.graph[node_id]# 收集依赖节点的结果dep_results{d:self.results[d]fordindeps}self.results[node_id]awaitcoro(dep_results)这个引擎的好处是检索、工具调用、LLM推理可以并行执行。比如用户问“今天北京天气怎么样顺便帮我查一下上海到北京的机票”检索天气和检索机票可以同时进行互不阻塞。超时与降级异步架构最怕“死等”。一个外部服务挂了整个Agent卡住。我的三板斧每个异步操作都设置超时asyncio.wait_for(coro, timeout5.0)熔断机制连续失败N次后直接返回缓存结果或降级回复优雅降级如果RAG检索超时直接让LLM基于自身知识回答虽然可能不准确但比卡死强asyncdefsafe_retrieve(query):try:resultawaitasyncio.wait_for(rag_service.search(query),timeout3.0)returnresultexceptasyncio.TimeoutError:# 降级返回空结果让LLM自己发挥logger.warning(fRAG检索超时query{query})return[]实战一个实时Agent的架构设计最后分享一个我在生产环境中验证过的架构。用户请求 ↓ API Gateway (Nginx 限流) ↓ WebSocket Manager (维护长连接) ↓ 异步工作流引擎 (DAG调度) ├── 意图识别 (小模型INT4量化100ms) ├── 上下文管理 (Redis缓存异步读写) ├── 工具调用 (并行执行每个工具独立超时) │ ├── RAG检索 (异步HTTP超时3s) │ ├── 数据库查询 (异步DB驱动超时2s) │ └── 外部API (异步HTTP超时5s) └── LLM推理 (vLLM异步API流式输出) ↓ 流式响应 (SSE/WebSocket) ↓ 前端渲染 (逐token展示支持中断)关键指标P50延迟从用户输入到第一个token输出控制在500ms以内P99延迟不超过3s流式输出速率每秒20-30个token对于7B模型INT4量化单卡A10系统吞吐单节点支持50并发连接CPU和GPU利用率均衡个人经验性建议先做可观测性再做优化。没有全链路追踪你根本不知道瓶颈在哪。我推荐OpenTelemetry Jaeger每个异步操作都打上span。别迷信“全异步”。Python的asyncio在CPU密集型任务上表现很差。如果Agent需要做大量文本处理比如正则匹配、JSON解析考虑用concurrent.futures.ProcessPoolExecutor丢到子进程执行。流式输出不是银弹。如果Agent的推理步骤很短比如简单的意图识别全量返回反而更简单。流式输出的价值在于长文本生成和中间状态展示。硬件预算要留余量。别把GPU显存用满留20%给峰值流量。我见过太多因为显存OOM导致推理服务崩溃的案例。最后一条也是最重要的实时性不是技术问题而是产品问题。和产品经理对齐预期哪些场景可以接受延迟哪些场景必须实时。别为了追求极致的延迟把系统搞得太复杂最后维护成本爆炸。以上是我在Agent实时性优化上的一些实战经验。代码片段都经过生产环境验证但具体参数需要根据你的业务场景调整。如果你有更好的方案欢迎在评论区交流——毕竟这行没有银弹只有不断踩坑和填坑。