第08章 FastAPI 与 SSE 流式 RAG 后端到目前为止知识库、检索工具、MCP 客户端都已经就绪但仍缺少一个面向最终用户的入口。本章用 FastAPI 把整条 RAG 链路串起来接收前端发来的自然语言问题调用 MCP 工具检索相关工单构造检索增强提示词喂给 Ollama再用 SSE 把模型的流式回答推送到前端。完成本章后读者将拥有一个可被任意前端调用的 /llm/rag 接口并理解 SSE 协议为何特别适合本地 LLM 这种“边算边出”的场景。8.1 后端在 RAG 链路中的职责后端在 RAG 链路中不只是简单的转发它需要承担四个职责检索调度、提示工程、流式生成、状态广播。完整链路与后端在其中的位置“如图8-1”所示。读者从图中可以看到后端是唯一同时与三方通讯的角色向上对前端推送状态与回答向下调用 MCP 工具获取数据横向调用 Ollama 完成生成。把这四件事用一个 FastAPI 应用统一编排可以让职责清晰、错误集中处理。8.1.1 SSE 协议的适配性Server-Sent Events 是一种基于 HTTP 的单向流式协议服务器持续向客户端推送事件客户端通过 EventSource API 监听。SSE 与 WebSocket、HTTP 流式响应的差异“如表8-1”所示。表 8-1 SSE 与其他流式协议的差异协议通讯方向浏览器原生 API自动重连适合场景SSE服务器到客户端单向EventSource内置模型流式输出、状态推送WebSocket双向WebSocket需手动实现双向实时通讯如聊天室HTTP 流式服务器到客户端单向fetch 加流式读取无需要自定义协议时本书选用 SSE是因为它对前端友好EventSource 自动断线重连、对服务端轻量标准 HTTP 流、对 LLM 输出形态匹配一段段文本增量推送。8.1.2 FastAPI 与 sse-starlette 的搭配FastAPI 本身不直接提供 SSE 响应常见做法是引入 sse-starlette 提供 EventSourceResponse 与 ServerSentEvent 两个类。前者把异步生成器转为 SSE 响应后者承载单个事件。注意从 FastAPI 路由函数返回 EventSourceResponse 时函数体应是 async 生成器含 yield而非直接返回值否则前端只会收到完整响应而非流式分段。8.2 服务骨架与 CORS 配置后端代码集中在 backend/main.py本节先看应用初始化与跨域处理这是任何前后端分离项目都绕不开的两件事。8.2.1 应用初始化与中间件FastAPI 应用的基础初始化与 Ollama 配置加载放在文件顶部。importosfromfastapiimportFastAPIfromfastapi.middleware.corsimportCORSMiddlewarefromfastapi.responsesimportEventSourceResponsefromfastapi.sseimportServerSentEventfromdotenvimportload_dotenv load_dotenv()OLLAMA_BASE_URLos.getenv(OLLAMA_BASE_URL,http://localhost:11434)OLLAMA_MODELos.getenv(OLLAMA_MODEL,llama3.2:latest)appFastAPI()把 Ollama 的地址与模型名通过环境变量注入是为了不在代码中写死部署信息。本地开发用 .env 文件、生产环境用容器或编排平台的环境变量注入调用方式一致。8.2.2 跨域中间件的开放范围前端 Next.js 默认运行在 3000 端口后端在 8000 端口跨域是必经一关。app.add_middleware(CORSMiddleware,allow_origins[http://localhost:3000,http://127.0.0.1:3000],allow_credentialsTrue,allow_methods[*],allow_headers[*],)allow_origins 明确列出允许来源比使用通配符 [“*”] 更安全。生产环境中应改为具体的前端域名并配合 HTTPS。注意浏览器对 EventSource 的跨域请求只支持 GET 方法且不允许自定义请求头前端无法像普通 fetch 那样附加 Authorization Header如果需要认证应通过 URL 参数或 Cookie 传递。8.3 流式 LLM 调用的实现整条 RAG 链路里 LLM 调用占据最长时间必须以流式方式输出。本节把第 2 章的 httpx 流式调用进一步打磨为可在生产中使用的版本。8.3.1 异步流式生成器stream_ollama_llm 接收 prompt按 NDJSON 协议解析 Ollama 响应每读到一段 content 就 yield 出去。asyncdefstream_ollama_llm(prompt:str):importhttpx payload{model:OLLAMA_MODEL,system:你是一个专业的电商客服数据分析助手。请根据提供的工单数据进行分析并给出简洁的总结和建议。,messages:[{role:user,content:prompt}],stream:True,options:{temperature:0.7,num_predict:2000},}try:asyncwithhttpx.AsyncClient(timeout120.0)asclient:asyncwithclient.stream(POST,f{OLLAMA_BASE_URL}/api/chat,jsonpayload,)asresponse:response.raise_for_status()asyncforlineinresponse.aiter_lines():lineline.strip()ifnotline:continuetry:chunkjson.loads(line)ifchunk.get(done,False):breakmessagechunk.get(message,{})contentmessage.get(content,)ifcontent:yieldcontentexceptjson.JSONDecodeError:continueexcepthttpx.HTTPErrorase:yieldfHTTP 错误:{str(e)}代码中三处防御值得读者留意跳过空行避免触发 JSONDecodeError、捕获 JSONDecodeError 后 continue 而非 break、把 HTTP 错误也作为 yield 输出而非抛异常。前两条让单行损坏不影响整段生成第三条让前端能在体验上感知错误而不是莫名其妙地停在某处。8.3.2 system 提示与生成参数payload 中的 system 字段约束模型扮演的角色options 中的 temperature 与 num_predict 控制生成行为。本书常用值与含义“如表8-2”所示。表 8-2 Ollama 生成参数的常用设置参数含义本书取值temperature采样随机性0.7 兼顾确定性与表达力num_predict最大生成 token 数2000 满足典型工单分析长度top_p核采样阈值默认 0.9不显式设置stop停止符暂未使用笔者建议把生成参数集中到一个 dict 中按场景命名管理如 ANALYSIS_OPTIONS、SUMMARY_OPTIONS避免在多个调用点散落配置。8.4 RAG 端点的完整编排主路由 /llm/rag 把检索、提示构造、模型调用、状态推送串成一条异步生成器。本节按阶段拆解。8.4.1 阶段一参数校验与状态广播接收查询参数后立即给前端一个状态事件让用户感知请求已被接收。app.get(/llm/rag,response_classEventSourceResponse)asyncdefrag_stream(query:str,n_results:int5):ifnotquery:yieldServerSentEvent(eventerror,data{message:查询内容不能为空})returnyieldServerSentEvent(eventstatus,data{message:f正在分析问题:{query[:50]}...},)awaitasyncio.sleep(0.3)status 事件用于推送进度提示前端可据此渲染加载指示。短暂 sleep 是为了避免阶段切换过快造成“看不到状态”的体验。8.4.2 阶段二调用 MCP 检索工单通过上一章实现的 call_search_tickets_semantic 拿到相关工单。yieldServerSentEvent(eventstatus,data{message:正在进行语义检索...})search_resultawaitcall_search_tickets_semantic(query,n_results)search_datajson.loads(search_result)total_resultssearch_data.get(total_results,0)iftotal_results0:yieldServerSentEvent(eventstatus,data{message:未找到相关工单将基于通用知识回答})relevant_tickets[]else:yieldServerSentEvent(eventstatus,data{message:f检索到{total_results}条相关工单},)relevant_ticketssearch_data.get(results,[])yieldServerSentEvent(eventretrieved_data,data{total:total_results,tickets:relevant_tickets},)retrieved_data 事件把检索到的工单提前推给前端前端可在 LLM 还在生成时先把“参考来源”区域渲染出来让用户感知信息来源。8.4.3 阶段三构造提示词提示词是 RAG 的核心工程上需要“约束格式、限定能力边界、给出回答结构”。本书的提示模板分两套检索到工单时用 RAG 模板检索为空时用兜底模板。RAG 模板的关键片段如下tickets_text\n\n.join([f【工单{i1}】(相关度:{t.get(similarity_score,0):.1%})\nf工单号:{t[ticket_no]}\nf客户:{t[customer_name]}\nf类型:{t[issue_type]}\nf优先级:{t[priority]}\nf状态:{t[status]}\nf主题:{t[subject]}\nf描述:{t.get(description,无)}fori,tinenumerate(relevant_tickets)])promptf你是电商客服数据分析专家。请根据以下检索到的相关工单回答用户的问题。 用户问题{query}检索到的相关工单共{total_results}条{tickets_text}请按以下格式回答 1. 直接回答 2. 相关工单分析 3. 建议措施 提示词的设计要点把用户问题与检索到的工单都标记清楚、要求模型按固定结构输出、对数据不足的情况主动表态。结构化输出便于前端做样式化渲染也便于后续接入更精细的解析逻辑。注意提示词长度直接影响生成速度与上下文占用本地模型上下文有限工单条数过多时应在检索阶段截断而非塞进提示词后让模型自行忽略。8.4.4 阶段四流式生成与 SSE 推送把模型输出的每个文本片段封装为 llm_chunk 事件推送给前端。yieldServerSentEvent(eventstatus,data{message:正在生成回答...})asyncforchunkinstream_ollama_llm(prompt):yieldServerSentEvent(eventllm_chunk,data{content:chunk})yieldServerSentEvent(eventstatus,data{message:回答完成})事件类型按职责分工llm_chunk 承载生成文本、status 承载阶段提示、retrieved_data 承载检索结果、error 承载错误。前端按事件类型分别处理结构清晰。8.5 错误处理与超时后端在调度模型与外部服务时最容易因外部依赖故障而长时间无响应必须显式处理三类异常MCP 调用失败、Ollama 调用失败、客户端断连。8.5.1 把异常转为 SSE error 事件外层用 try/except 把任意异常转换为 error 事件避免连接被静默断开。try:# 检索、构造提示词、流式生成...exceptExceptionase:yieldServerSentEvent(eventerror,data{message:fRAG 处理失败:{str(e)}},)读者可以根据需要细化异常类型分类比如 ConnectionError 单独提示用户检查 Ollama 是否启动、TimeoutError 单独提示重试。8.5.2 任务取消的处理前端用户关闭页面时浏览器会断开 SSE 连接FastAPI 会在生成器中抛 asyncio.CancelledError。这一异常无需手动捕获框架会自动结束生成器httpx 流也会一并关闭最终 Ollama 端的生成也会被取消。注意避免在 except Exception 中吞掉 CancelledError否则取消机制失效会出现“用户已断开但服务器还在生成”的资源泄露。8.6 本章小结本章把 FastAPI 后端从空白搭建到完整可用跨域配置、httpx 流式调用、MCP 客户端集成、SSE 事件分类、错误兜底逐一到位。读者现在掌握的是一种通用的 RAG 后端模板把工单数据换成产品知识库、客户档案、技术文档整套链路结构几乎不变。接下来的工作是让前端用户能用上这套能力。下一章笔者将基于 Next.js 实现一个流式聊天界面把 SSE 事件按类型渲染为状态提示、参考来源与回答内容。作者光谷老亢配套源码https://github.com/kang-airtc/agent-ollama-book