告别轮询!用FastAPI的SSE轻松搞定服务器主动推送(附完整前后端代码)
告别轮询用FastAPI的SSE轻松搞定服务器主动推送附完整前后端代码实时数据推送一直是现代Web开发中的核心需求之一。无论是股票行情更新、社交媒体通知还是在线协作工具的状态同步传统的轮询方式早已显得力不从心。我曾经在一个电商促销监控系统中亲眼目睹了每秒数千次的无效轮询请求如何拖垮了整个后端服务。直到发现了Server-Sent EventsSSE这个被低估的技术瑰宝才真正解决了这个痛点。与WebSocket的双向通信不同SSE专注于服务器向客户端的单向数据推送这种专注让它变得异常轻量且易于实现。FastAPI作为Python生态中最现代的Web框架其StreamingResponse特性与SSE简直是天作之合。本文将带你从零开始用不到100行代码构建一个完整的实时推送系统。1. 为什么SSE是轮询的完美替代方案在传统的轮询方案中客户端需要不断向服务器发送有新数据吗的询问。这种模式存在三个致命缺陷无效请求泛滥即使没有数据更新客户端仍然会持续发起请求高延迟数据更新后必须等待下一次轮询才能到达客户端资源浪费每次请求都包含完整的HTTP头信息SSE的工作原理则截然不同。它建立一条持久化的HTTP连接允许服务器在任何需要的时候主动推送数据。这种机制带来了几个显著优势真正的实时性数据产生后立即推送无等待间隔极低的协议开销连接建立后只有数据本身会被传输自动重连内置的连接恢复机制网络波动时自动重新连接浏览器原生支持现代浏览器都内置了EventSource接口# 传统轮询 vs SSE的请求对比 轮询请求周期: 客户端: GET /updates?since123456 → 服务端: 204 No Content 客户端: GET /updates?since123456 → 服务端: 204 No Content 客户端: GET /updates?since123456 → 服务端: 200 OK {new_data} SSE流程: 客户端: GET /updates (Connection: keep-alive) 服务端: 200 OK (Content-Type: text/event-stream) 服务端主动推送: data: {new_data}\n\n提示SSE特别适合需要从服务器接收高频更新但客户端不需要频繁发送数据的场景如实时通知、股票行情、日志流等。2. FastAPI中实现SSE的核心架构FastAPI的异步特性使其成为实现SSE的理想选择。下面是我们将要构建的系统架构连接管理器跟踪所有活跃的客户端连接消息广播器将消息分发到所有连接的客户端事件生成器生成符合SSE规范的数据流后台任务模拟实时数据更新from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio from typing import List app FastAPI() # 全局连接池 active_connections: List[asyncio.Queue] [] async def event_stream(request: Request): # 为每个客户端创建独立的消息队列 message_queue asyncio.Queue() active_connections.append(message_queue) try: while True: message await message_queue.get() yield fdata: {message}\n\n except asyncio.CancelledError: active_connections.remove(message_queue)这个核心架构的美妙之处在于它的简洁性。asyncio.Queue为每个客户端提供了独立的消息通道而全局的active_connections列表让我们可以轻松实现广播功能。3. 完整实现从后端到前端的SSE系统3.1 后端实现让我们完善FastAPI后端的各个组件。首先是广播功能它需要遍历所有活跃连接并发送消息async def broadcast(message: str): for connection in active_connections: await connection.put(message)接下来添加一个定时任务模拟实时数据更新app.on_event(startup) async def startup_event(): async def generate_messages(): counter 0 while True: await asyncio.sleep(1) # 每秒推送一次 await broadcast(fUpdate #{counter}: {datetime.now()}) counter 1 asyncio.create_task(generate_messages())最后暴露SSE端点app.get(/stream) async def stream_updates(request: Request): return StreamingResponse( event_stream(request), media_typetext/event-stream )3.2 前端实现前端使用浏览器原生的EventSourceAPI代码简洁得令人难以置信!DOCTYPE html html head titleSSE实时数据看板/title style #updates { font-family: monospace; white-space: pre; margin: 20px; padding: 15px; border: 1px solid #eee; border-radius: 5px; max-height: 500px; overflow-y: auto; } /style /head body h1实时数据流/h1 div idupdates/div script const updateContainer document.getElementById(updates); const eventSource new EventSource(/stream); eventSource.onmessage (event) { const newLine document.createElement(div); newLine.textContent event.data; updateContainer.prepend(newLine); // 保持显示最新的20条消息 if (updateContainer.children.length 20) { updateContainer.removeChild(updateContainer.lastChild); } }; eventSource.onerror () { console.log(连接中断尝试重新连接...); }; /script /body /html4. 高级应用场景与性能优化4.1 多频道广播系统实际项目中我们往往需要向不同的用户组广播不同的消息。下面实现一个多频道广播系统from typing import Dict channels: Dict[str, List[asyncio.Queue]] {} async def subscribe(channel: str): queue asyncio.Queue() if channel not in channels: channels[channel] [] channels[channel].append(queue) try: while True: message await queue.get() yield fdata: {message}\n\n finally: channels[channel].remove(queue) async def publish(channel: str, message: str): if channel in channels: for queue in channels[channel]: await queue.put(message) app.get(/stream/{channel}) async def channel_stream(request: Request, channel: str): return StreamingResponse( subscribe(channel), media_typetext/event-stream )4.2 性能优化技巧连接心跳防止代理服务器断开空闲连接async def event_stream(request: Request): while True: yield :heartbeat\n\n # 注释行作为心跳 await asyncio.sleep(15) # 每15秒发送一次消息缓冲防止慢客户端拖慢整个系统message_queue asyncio.Queue(maxsize10) # 最多缓冲10条消息连接数监控app.get(/stats) async def connection_stats(): return { total_connections: len(active_connections), channels: {k: len(v) for k, v in channels.items()} }4.3 生产环境注意事项代理配置确保Nginx等代理服务器支持长连接proxy_buffering off; proxy_read_timeout 24h;跨域支持from fastapi.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins[*], allow_methods[GET], )认证集成app.get(/private-stream) async def private_stream(request: Request, token: str Depends(oauth2_scheme)): user authenticate(token) return StreamingResponse( generate_user_specific_events(user), media_typetext/event-stream )5. 实战案例实时日志监控系统让我们把这些知识应用到一个真实场景中——构建一个实时日志监控系统。这个系统可以实时显示服务器日志并支持按日志级别过滤。import logging from logging.handlers import QueueHandler log_queue asyncio.Queue() def setup_logger(): logger logging.getLogger(sse_logger) logger.setLevel(logging.INFO) queue_handler QueueHandler(log_queue) logger.addHandler(queue_handler) return logger app.on_event(startup) async def log_monitor(): logger setup_logger() while True: log_record await log_queue.get() await broadcast( f[{log_record.levelname}] {log_record.getMessage()} ) app.get(/log-stream) async def log_stream(request: Request, level: str INFO): async def filtered_logs(): while True: log_record await log_queue.get() if log_record.levelno logging.getLevelName(level): yield fdata: {log_record.getMessage()}\n\n return StreamingResponse( filtered_logs(), media_typetext/event-stream )前端可以添加级别过滤器function setupLogStream(level) { if (window.logEventSource) { window.logEventSource.close(); } window.logEventSource new EventSource(/log-stream?level${level}); // ...消息处理逻辑 }