Python WebSocket 实时通信实战:构建实时Web应用
Python WebSocket 实时通信实战构建实时Web应用引言在现代Web开发中实时通信是构建交互式应用的关键技术。作为一名从Rust转向Python的后端开发者我深刻体会到WebSocket在构建实时功能方面的重要性。WebSocket提供了双向通信能力使得服务器和客户端可以实时交换数据。WebSocket 核心概念什么是WebSocketWebSocket是一种在单个TCP连接上进行全双工通信的协议具有以下特点双向通信服务器和客户端可以互相发送消息低延迟建立连接后保持持久连接避免HTTP请求开销实时性支持实时推送数据跨域支持支持跨域通信架构设计┌─────────────────────────────────────────────────────────────┐ │ WebSocket 客户端 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ JavaScript WebSocket API │ │ │ │ 连接 → 发送消息 → 接收消息 → 断开连接 │ │ │ └─────────────────────────┬─────────────────────────┘ │ └─────────────────────────────┼─────────────────────────────┘ │ WebSocket协议 ▼ ┌─────────────────────────────────────────────────────────────┐ │ WebSocket 服务端 │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ Python WebSocket Server │ │ │ │ 接收连接 → 处理消息 → 广播消息 → 管理连接 │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘环境搭建与基础配置安装依赖pip install websockets fastapi uvicorn基本WebSocket服务器import asyncio import websockets connected_clients set() async def handle_client(websocket, path): connected_clients.add(websocket) print(fClient connected. Total clients: {len(connected_clients)}) try: async for message in websocket: print(fReceived: {message}) # 广播消息给所有客户端 for client in connected_clients: if client ! websocket: await client.send(message) finally: connected_clients.remove(websocket) print(fClient disconnected. Total clients: {len(connected_clients)}) start_server websockets.serve(handle_client, localhost, 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()客户端实现const ws new WebSocket(ws://localhost:8765); ws.onopen function() { console.log(Connected to server); ws.send(Hello, Server!); }; ws.onmessage function(event) { console.log(Received:, event.data); }; ws.onclose function() { console.log(Disconnected from server); };FastAPI WebSocket集成创建WebSocket端点from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List app FastAPI() class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager ConnectionManager() app.websocket(/ws) async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data await websocket.receive_text() await manager.broadcast(fClient says: {data}) except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(A client has disconnected)运行服务器uvicorn main:app --reload高级特性实战消息认证from fastapi import WebSocket, Depends async def get_token(websocket: WebSocket): token websocket.query_params.get(token) if token ! secret: await websocket.close(code1008, reasonInvalid token) raise WebSocketDisconnect(code1008) return token app.websocket(/ws/auth) async def websocket_auth(websocket: WebSocket, token: str Depends(get_token)): await websocket.accept() await websocket.send_text(fAuthenticated with token: {token}) while True: data await websocket.receive_text() await websocket.send_text(fReceived: {data})分组广播class RoomManager: def __init__(self): self.rooms: dict[str, List[WebSocket]] {} async def join_room(self, room_id: str, websocket: WebSocket): if room_id not in self.rooms: self.rooms[room_id] [] self.rooms[room_id].append(websocket) await websocket.accept() def leave_room(self, room_id: str, websocket: WebSocket): if room_id in self.rooms: self.rooms[room_id].remove(websocket) async def broadcast_to_room(self, room_id: str, message: str): if room_id in self.rooms: for connection in self.rooms[room_id]: await connection.send_text(message) room_manager RoomManager() app.websocket(/ws/room/{room_id}) async def websocket_room(websocket: WebSocket, room_id: str): await room_manager.join_room(room_id, websocket) try: while True: data await websocket.receive_text() await room_manager.broadcast_to_room(room_id, fRoom {room_id}: {data}) except WebSocketDisconnect: room_manager.leave_room(room_id, websocket)二进制数据传输app.websocket(/ws/binary) async def websocket_binary(websocket: WebSocket): await websocket.accept() while True: data await websocket.receive_bytes() print(fReceived binary data: {len(data)} bytes) # 处理二进制数据 response process_binary(data) await websocket.send_bytes(response)实际业务场景场景一实时聊天应用from datetime import datetime app.websocket(/ws/chat/{username}) async def chat_endpoint(websocket: WebSocket, username: str): await manager.connect(websocket) await manager.broadcast(f{username} joined the chat) try: while True: message await websocket.receive_text() timestamp datetime.now().strftime(%H:%M:%S) await manager.broadcast(f[{timestamp}] {username}: {message}) except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f{username} left the chat)场景二实时数据推送import asyncio from fastapi import BackgroundTasks async def send_realtime_data(websocket: WebSocket): while True: data generate_realtime_data() await websocket.send_json(data) await asyncio.sleep(1) app.websocket(/ws/realtime) async def realtime_endpoint(websocket: WebSocket): await websocket.accept() task asyncio.create_task(send_realtime_data(websocket)) try: while True: await websocket.receive_text() except WebSocketDisconnect: task.cancel()场景三协作编辑document_store {} app.websocket(/ws/collab/{doc_id}) async def collab_edit(websocket: WebSocket, doc_id: str): await websocket.accept() if doc_id not in document_store: document_store[doc_id] {content: , users: []} document_store[doc_id][users].append(websocket) try: while True: update await websocket.receive_json() document_store[doc_id][content] update[content] for user in document_store[doc_id][users]: if user ! websocket: await user.send_json(update) except WebSocketDisconnect: document_store[doc_id][users].remove(websocket)性能优化异步处理async def process_message(message: str) - str: # 异步处理消息 await asyncio.sleep(0.1) return fProcessed: {message} app.websocket(/ws/async) async def async_websocket(websocket: WebSocket): await websocket.accept() while True: message await websocket.receive_text() processed await process_message(message) await websocket.send_text(processed)消息压缩import gzip app.websocket(/ws/compressed) async def compressed_websocket(websocket: WebSocket): await websocket.accept() while True: compressed_data await websocket.receive_bytes() data gzip.decompress(compressed_data).decode(utf-8) response fReceived: {data} compressed_response gzip.compress(response.encode(utf-8)) await websocket.send_bytes(compressed_response)总结WebSocket为Python后端开发者提供了构建实时Web应用的强大工具。通过双向通信和低延迟特性WebSocket在实时聊天、数据推送和协作编辑等场景中表现出色。从Rust开发者的角度来看Python的WebSocket实现虽然在性能上不如Rust但在开发效率和生态成熟度方面具有优势。在实际项目中建议合理使用分组广播和消息认证来构建安全、高效的实时应用。