【微服务即时通讯】入口网关子服务
目录一.connection类的封装二.实现入口网关子服务2.1.成员变量的设计2.2.构造函数的设计2.2.1.初始化websocket服务器2.2.2.初始化http服务器2.3.服务端向客户端发送通知入口网关子服务是系统与客户端交互的统一入口其主要职责如下请求转发与业务解耦网关服务器本身不处理具体业务逻辑而是接收客户端的所有请求并将其转发至对应的业务子服务节点进行处理。待业务子服务返回结果后网关再将响应回传至客户端从而实现接入层与业务层的解耦。客户端身份识别与鉴权网关负责对客户端进行身份验证确保请求来源合法并根据权限控制对后续请求进行访问管理。通知推送网关还承担与客户端之间的主动通信职责通过推送机制将关键事件及时通知到相关客户端。在会话管理方面客户端登录成功后网关会为其创建一个登录会话并将会话标识会话ID返回给客户端。此后客户端在发起请求时须携带该会话ID否则将被视为未登录状态。对于涉及其他客户端操作的业务场景如好友申请、好友删除、会话创建、新消息发送等网关需要将相应事件以通知形式推送给受影响的目标客户端确保多端间状态与信息的实时同步。基于上述两类核心功能——即请求的分发与响应处理以及客户端的事件通知——网关服务器在通信层面采用两种协议协同工作HTTP通信用于处理常规业务请求完成客户端请求的接收、分发与响应返回WebSocket通信用于维持与客户端的持久连接实现对客户端的实时事件推送。整体架构如下一.connection类的封装服务器是通过websocket协议来向客户端推送通知的。一个服务器需要应对多个客户端那么就需要与多个客户端进行websocket长连接保持这样子服务端才能准确的去推送我们的通知。但是一个websocket连接只能应对一个客户端客户端一多websocket连接也会多那么我们就有必要将这些连接管理起来那么我们就封装出了下面这个类之所以需要两个哈希表是为了支持双向高效查找根据 uid 快速找到对应的 connection_ptr推送给指定用户时需要。根据 connection_ptr 快速找到对应的 uid 和 ssid处理连接关闭、异常时需要例如在 on_close 回调中通过连接指针删除映射。如果只用一个哈希表无论选择哪种方向做键另一个方向的查找就必须遍历整个容器O(n)在多连接场景下性能不可接受。用两个哈希表可以实现两种查询都是 O(1) 平均复杂度。同时删除时两个表同步更新保证了数据一致性。#include logger.hpp #include websocketpp/config/asio_no_tls.hpp #include websocketpp/server.hpp //一个服务器需要应对多个客户端那么就需要与多个客户端进行websocket长连接保持这样子服务端才能准确的去推送我们的通知 //但是一个websocket连接只能应对一个客户端客户端一多websocket连接也会多那么我们就有必要将这些连接管理起来那么我们就封装出了下面这个类 //之所以需要两个哈希表是为了支持双向高效查找 //根据 uid 快速找到对应的 connection_ptr推送给指定用户时需要。 //根据 connection_ptr 快速找到对应的 uid 和 ssid处理连接关闭、异常时需要例如在 on_close 回调中通过连接指针删除映射。 //如果只用一个哈希表无论选择哪种方向做键另一个方向的查找就必须遍历整个容器O(n)在多连接场景下性能不可接受。 //用两个哈希表可以实现两种查询都是 O(1) 平均复杂度。同时删除时两个表同步更新保证了数据一致性。 namespace IMS { // 定义 WebSocket 服务器类型使用 ASIO 无 TLS 配置 typedef websocketpp::serverwebsocketpp::config::asio server_t; // 连接类型别名为 server_t::connection_ptr // Connection 类管理用户 ID 与 WebSocket 连接的映射关系用于长连接维护 class Connection { public: // Client 结构体存储客户端的唯一标识uid和会话标识ssid struct Client { Client(const std::string u, const std::string s) : uid(u), ssid(s) {} std::string uid; // 用户唯一标识 std::string ssid; // 会话标识session id,这个会话ID是用户登录后服务器会自动分配一个会话ID来的 }; using ptr std::shared_ptrConnection; // 智能指针类型别名方便管理对象生命周期 // 构造函数 Connection() {} // 析构函数 ~Connection() {} // 插入连接和对应的客户端信息 void insert(const server_t::connection_ptr conn, const std::string uid, const std::string ssid) { std::unique_lockstd::mutex lock(_mutex); // 加锁保证线程安全 // 建立 uid - 连接 的映射 _uid_connections.insert(std::make_pair(uid, conn)); // 建立 连接 - Client 的映射 _conn_clients.insert(std::make_pair(conn, Client(uid, ssid))); LOG_DEBUG(新增长连接用户信息{}-{}-{}, (size_t)conn.get(), uid, ssid); } // 根据 uid 获取对应的连接 server_t::connection_ptr connection(const std::string uid) { std::unique_lockstd::mutex lock(_mutex); auto it _uid_connections.find(uid); if (it _uid_connections.end()) { LOG_ERROR(未找到 {} 客户端的长连接, uid); return server_t::connection_ptr(); // 返回空指针 } LOG_DEBUG(找到 {} 客户端的长连接, uid); return it-second; } // 根据连接获取对应的 uid 和 ssid bool client(const server_t::connection_ptr conn, std::string uid, std::string ssid) { std::unique_lockstd::mutex lock(_mutex); auto it _conn_clients.find(conn); if (it _conn_clients.end()) { LOG_ERROR(获取-未找到长连接 {} 对应的客户端信息, (size_t)conn.get()); return false; } uid it-second.uid; ssid it-second.ssid; LOG_DEBUG(获取长连接客户端信息成功); return true; } // 移除连接及其对应的客户端信息 void remove(const server_t::connection_ptr conn) { std::unique_lockstd::mutex lock(_mutex); auto it _conn_clients.find(conn); if (it _conn_clients.end()) { LOG_ERROR(删除-未找到长连接 {} 对应的客户端信息, (size_t)conn.get()); return; } // 从 uid 映射中删除 _uid_connections.erase(it-second.uid); // 从连接映射中删除 _conn_clients.erase(it); LOG_DEBUG(删除长连接信息完毕); } private: std::mutex _mutex; // 互斥锁保护多线程下的并发访问 // 映射用户唯一标识 - 连接指针 std::unordered_mapstd::string, server_t::connection_ptr _uid_connections; // 映射连接指针 - 客户端信息uid, ssid std::unordered_mapserver_t::connection_ptr, Client _conn_clients; }; }二.实现入口网关子服务2.1.成员变量的设计我们先来回答一个问题我们入口网关子服务是怎么调用我们其他子服务的其实很简单其他子服务都是RPC服务那么我们入口网关子服务也自然就是使用brpc框架来调用这些RPC服务啦那么我们的成员变量里面势必有一个ServiceManager和Discovery此外我们还需要知道我们这个入口网关子服务是和客户端直接打交道的客户端通过HTTP协议来向入口网关子服务发送服务请求我们入口网关子服务通过websocket来向客户端推送一些通知那么我们也不难理解我们的入口网关子服务一定运行有一个websocket服务端还有一个http服务端在我们的入口网关子服务中我们采用websocketpp来搭建我们的websocket服务端我们采用httplib来搭建我们的http服务端那么我们的这个成员变量是不是就很明朗了// // 网关服务器类定义 // class GatewayServer { public: using ptr std::shared_ptrGatewayServer; // 构造函数初始化网关服务器同时启动 WebSocket 和 HTTP 服务 GatewayServer( int websocket_port, // WebSocket 监听端口 int http_port, // HTTP 监听端口 const std::shared_ptrsw::redis::Redis redis_client, // Redis 客户端用于会话和状态管理 const ServiceManager::ptr channels, // 服务管理器用于消息通道 const Discovery::ptr service_discoverer, // 服务发现组件 const std::string user_service_name, // 用户服务名称 const std::string file_service_name, // 文件服务名称 const std::string speech_service_name, // 语音服务名称 const std::string message_service_name, // 消息存储服务名称 const std::string transmite_service_name, // 消息传输服务名称 const std::string friend_service_name) // 好友服务名称 : _redis_session(std::make_sharedSession(redis_client)), // 初始化会话管理基于 Redis _redis_status(std::make_sharedStatus(redis_client)), // 初始化状态管理基于 Redis _mm_channels(channels), // 消息通道管理器 _service_discoverer(service_discoverer), // 服务发现实例 _user_service_name(user_service_name), // 用户服务名 _file_service_name(file_service_name), // 文件服务名 _speech_service_name(speech_service_name), // 语音服务名 _message_service_name(message_service_name), // 消息存储服务名 _transmite_service_name(transmite_service_name), // 消息传输服务名 _friend_service_name(friend_service_name), // 好友服务名 _connections(std::make_sharedConnection()) // 连接管理实例 { } private: // Redis 会话管理对象用于存储用户会话信息 Session::ptr _redis_session; // Redis 状态管理对象用于存储用户在线状态等 Status::ptr _redis_status; // 用户服务的名称用于服务发现 std::string _user_service_name; // 文件服务的名称用于服务发现 std::string _file_service_name; // 语音识别服务的名称用于服务发现 std::string _speech_service_name; // 消息存储服务的名称用于服务发现 std::string _message_service_name; // 消息传输服务的名称用于服务发现 std::string _transmite_service_name; // 好友服务的名称用于服务发现 std::string _friend_service_name; // 服务管理器管理各服务的消息通道 ServiceManager::ptr _mm_channels; // 服务发现组件用于动态获取服务实例地址 Discovery::ptr _service_discoverer; // 连接管理器管理所有 WebSocket 连接 Connection::ptr _connections; // WebSocket 服务器实例 server_t _ws_server; // HTTP 服务器实例 httplib::Server _http_server; // HTTP 服务器运行线程 std::thread _http_thread; };至于后面的那些啥服务名我们传递的其实就是下面这些DEFINE_string(file_service, /service/file_service, 文件存储子服务名称); DEFINE_string(friend_service, /service/friend_service, 好友管理子服务名称); DEFINE_string(message_service, /service/message_service, 消息存储子服务名称); DEFINE_string(user_service, /service/user_service, 用户管理子服务名称); DEFINE_string(speech_service, /service/speech_service, 语音识别子服务名称); DEFINE_string(transmite_service, /service/transmite_service, 转发管理子服务名称);2.2.构造函数的设计我们的成员变量都有了那么我们是不是应该考虑一下构造函数怎么写其实很简单构造函数就是初始化成员变量的。但是大家需要注意在我们的入口网关子服务中我们采用websocketpp来搭建我们的websocket服务端我们采用httplib来搭建我们的http服务端这2个都是需要我们去设置回调函数的2.2.1.初始化websocket服务器那么我们需要在我们的构造函数里面初始化我们的websocket服务端这个就需要我们去设定下面这3个回调函数连接建立回调函数连接关闭回调函数消息到来回调函数那么我们就告诉大家我们是怎么// -------------------- WebSocket 服务器初始化 -------------------- // 关闭 WebSocket 日志输出避免控制台冗余信息 _ws_server.set_access_channels(websocketpp::log::alevel::none); // 初始化 ASIO 网络库 _ws_server.init_asio(); // 设置连接建立时的回调函数 _ws_server.set_open_handler(std::bind(GatewayServer::onOpen, this, std::placeholders::_1)); // 设置连接关闭时的回调函数 _ws_server.set_close_handler(std::bind(GatewayServer::onClose, this, std::placeholders::_1)); // 设置收到消息时的回调函数 auto wscb std::bind(GatewayServer::onMessage, this, std::placeholders::_1, std::placeholders::_2); _ws_server.set_message_handler(wscb); // 允许地址复用避免端口被占用时无法重启 _ws_server.set_reuse_addr(true); // 开始监听 WebSocket 端口 _ws_server.listen(websocket_port); // 开始接受连接 _ws_server.start_accept();我们看看连接建立回调函数// 第一部分这些都是设置给websocketpp服务器的回调函数 void onOpen(websocketpp::connection_hdl hdl) { LOG_DEBUG(websocket长连接建立成功 {}, (size_t)_ws_server.get_con_from_hdl(hdl).get()); }再看看连接关闭回调函数// 长连接一断开就说明用户下线了 void onClose(websocketpp::connection_hdl hdl) { // 长连接断开时做的清理工作 // 0. 通过连接对象获取对应的用户ID与登录会话ID auto conn _ws_server.get_con_from_hdl(hdl); std::string uid, ssid; bool ret _connections-client(conn, uid, ssid); if (ret false) { LOG_WARN(长连接断开未找到长连接对应的客户端信息); return; } // 1. 移除登录会话信息 _redis_session-remove(ssid); // 2. 移除登录状态信息 _redis_status-remove(uid); // 3. 移除长连接管理数据 _connections-remove(conn); LOG_DEBUG({} {} {} 长连接断开清理缓存数据!, ssid, uid, (size_t)conn.get()); }我们再看看这个回调处理函数// 保持连接活跃的函数通过定期发送 Ping 帧来维持 WebSocket 连接 void keepAlive(server_t::connection_ptr conn) { // 检查连接指针是否有效并且连接状态是否为打开状态 if (!conn || conn-get_state() ! websocketpp::session::state::value::open) { // 如果连接无效或状态不是打开状态记录调试日志并直接返回不再继续保活 LOG_DEBUG(非正常连接状态结束连接保活); return; } // 向对端发送一个空的 Ping 帧用于探测连接是否仍然存活 conn-ping(); // 设置一个定时器60 秒后再次执行本函数形成周期性的保活检查 _ws_server.set_timer(60000, std::bind(GatewayServer::keepAlive, this, conn)); } // 消息到达处理回调函数 void onMessage(websocketpp::connection_hdl hdl, server_t::message_ptr msg) { /*客户端并不是只使用 HTTP 来给服务端发消息。 在这个网关实现中WebSocket 连接建立后客户端会主动发送第一条认证消息ClientAuthenticationReq这就是通过 WebSocket 发送的。 客户端除了这里使用了websocket来给服务端发送消息后面就再也没有使用websocket给服务端发消息了都是通过HTTP来发送消息*/ // 收到第一条消息后根据消息中的会话ID进行身份识别将客户端长连接添加管理 // 1. 取出长连接对应的连接对象 auto conn _ws_server.get_con_from_hdl(hdl); // 2. 针对消息内容进行反序列化 -- ClientAuthenticationReq -- 提取登录会话ID ClientAuthenticationReq request; bool ret request.ParseFromString(msg-get_payload()); if (ret false) { LOG_ERROR(长连接身份识别失败正文反序列化失败); _ws_server.close(hdl, websocketpp::close::status::unsupported_data, 正文反序列化失败!); // 关闭websocket长连接 return; } // 3. 在会话信息缓存中查找会话信息 std::string ssid request.session_id(); auto uid _redis_session-uid(ssid); // 4. 会话信息不存在则关闭连接 if (!uid) { LOG_ERROR(长连接身份识别失败未找到会话信息 {}, ssid); _ws_server.close(hdl, websocketpp::close::status::unsupported_data, 未找到会话信息!); // 关闭websocket长连接 return; } // 5. 会话信息存在则添加长连接管理 _connections-insert(conn, *uid, ssid); LOG_DEBUG(新增长连接管理{}-{}-{}, ssid, *uid, (size_t)conn.get()); keepAlive(conn); // 这个函数会保证我们这个连接不会断开形成长连接 }客户端并不是只使用 HTTP 来给服务端发消息。在这个网关实现中WebSocket 连接建立后客户端会主动发送第一条认证消息ClientAuthenticationReq这就是通过 WebSocket 发送的。虽然大部分业务接口如注册、登录、发送消息等是通过 HTTP 处理的但 WebSocket 是全双工通信客户端同样可以向服务端发送消息。代码中的 onMessage 回调正是用来处理客户端通过 WebSocket 发来的数据。不过当前实现中onMessage 只处理了第一条认证消息后续如果客户端通过 WebSocket 发送其他业务消息比如新消息也会触发该回调只是代码中没有进一步分发处理。此外有的人可能好奇ClientAuthenticationReq是啥这个其实是我们新定义的一个数据结构存放在gateway.proto里面syntax proto3; package IMS; option cc_generic_services true; message ClientAuthenticationReq { string request_id 1; string session_id 2; // 用于向服务器表明当前长连接客户端的身份 }2.2.2.初始化http服务器我们的http服务器是借助这个httplib来实现的。那么对于这个httplib来说我们是需要去设置回调函数的。那么我们主要是为我们各个子服务的调用与我们的URL进行绑定起来我们先将我们每个子服务的RPC服务所对应的URL给定义出来// 获取邮箱验证码 #define GET_EMAIL_VERIFY_CODE /service/user/get_email_verify_code // 用户名注册 #define USERNAME_REGISTER /service/user/username_register // 用户名登录 #define USERNAME_LOGIN /service/user/username_login // 邮箱注册 #define EMAIL_REGISTER /service/user/email_register // 邮箱登录 #define EMAIL_LOGIN /service/user/email_login // 获取用户信息 #define GET_USERINFO /service/user/get_user_info // 设置用户头像 #define SET_USER_AVATAR /service/user/set_avatar // 设置用户昵称 #define SET_USER_NICKNAME /service/user/set_nickname // 设置用户描述 #define SET_USER_DESC /service/user/set_description // 设置用户邮箱 #define SET_USER_EMAIL /service/user/set_email // 获取好友列表 #define FRIEND_GET_LIST /service/friend/get_friend_list // 申请添加好友 #define FRIEND_APPLY /service/friend/add_friend_apply // 处理好友申请 #define FRIEND_APPLY_PROCESS /service/friend/add_friend_process // 删除好友 #define FRIEND_REMOVE /service/friend/remove_friend // 搜索好友 #define FRIEND_SEARCH /service/friend/search_friend // 获取待处理的好友事件 #define FRIEND_GET_PENDING_EV /service/friend/get_pending_friend_events // 获取聊天会话列表 #define CSS_GET_LIST /service/friend/get_chat_session_list // 创建聊天会话 #define CSS_CREATE /service/friend/create_chat_session // 获取聊天会话成员 #define CSS_GET_MEMBER /service/friend/get_chat_session_member // 获取历史消息范围 #define MSG_GET_RANGE /service/message_storage/get_history // 获取最近消息 #define MSG_GET_RECENT /service/message_storage/get_recent // 搜索历史消息关键词 #define MSG_KEY_SEARCH /service/message_storage/search_history // 发送新消息 #define NEW_MESSAGE /service/message_transmit/new_message // 获取单个文件 #define FILE_GET_SINGLE /service/file/get_single_file // 获取多个文件 #define FILE_GET_MULTI /service/file/get_multi_file // 上传单个文件 #define FILE_PUT_SINGLE /service/file/put_single_file // 上传多个文件 #define FILE_PUT_MULTI /service/file/put_multi_file // 语音识别 #define SPEECH_RECOGNITION /service/speech/recognition然后我们再将这些URL与我们的回调函数给绑定起来// 用户相关接口 _http_server.Post(GET_EMAIL_VERIFY_CODE, (httplib::Server::Handler)std::bind(GatewayServer::GetEmailVerifyCode, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(USERNAME_REGISTER, (httplib::Server::Handler)std::bind(GatewayServer::UserRegister, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(USERNAME_LOGIN, (httplib::Server::Handler)std::bind(GatewayServer::UserLogin, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(EMAIL_REGISTER, (httplib::Server::Handler)std::bind(GatewayServer::EmailRegister, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(EMAIL_LOGIN, (httplib::Server::Handler)std::bind(GatewayServer::EmailLogin, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(GET_USERINFO, (httplib::Server::Handler)std::bind(GatewayServer::GetUserInfo, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(SET_USER_AVATAR, (httplib::Server::Handler)std::bind(GatewayServer::SetUserAvatar, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(SET_USER_NICKNAME, (httplib::Server::Handler)std::bind(GatewayServer::SetUserNickname, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(SET_USER_DESC, (httplib::Server::Handler)std::bind(GatewayServer::SetUserDescription, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(SET_USER_EMAIL, (httplib::Server::Handler)std::bind(GatewayServer::SetUserEmailNumber, this, std::placeholders::_1, std::placeholders::_2)); // 好友相关接口 _http_server.Post(FRIEND_GET_LIST, (httplib::Server::Handler)std::bind(GatewayServer::GetFriendList, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FRIEND_APPLY, (httplib::Server::Handler)std::bind(GatewayServer::FriendAdd, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FRIEND_APPLY_PROCESS, (httplib::Server::Handler)std::bind(GatewayServer::FriendAddProcess, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FRIEND_REMOVE, (httplib::Server::Handler)std::bind(GatewayServer::FriendRemove, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FRIEND_SEARCH, (httplib::Server::Handler)std::bind(GatewayServer::FriendSearch, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FRIEND_GET_PENDING_EV, (httplib::Server::Handler)std::bind(GatewayServer::GetPendingFriendEventList, this, std::placeholders::_1, std::placeholders::_2)); // 聊天会话相关接口 _http_server.Post(CSS_GET_LIST, (httplib::Server::Handler)std::bind(GatewayServer::GetChatSessionList, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(CSS_CREATE, (httplib::Server::Handler)std::bind(GatewayServer::ChatSessionCreate, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(CSS_GET_MEMBER, (httplib::Server::Handler)std::bind(GatewayServer::GetChatSessionMember, this, std::placeholders::_1, std::placeholders::_2)); // 消息存储相关接口 _http_server.Post(MSG_GET_RANGE, (httplib::Server::Handler)std::bind(GatewayServer::GetHistoryMsg, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(MSG_GET_RECENT, (httplib::Server::Handler)std::bind(GatewayServer::GetRecentMsg, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(MSG_KEY_SEARCH, (httplib::Server::Handler)std::bind(GatewayServer::MsgSearch, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(NEW_MESSAGE, (httplib::Server::Handler)std::bind(GatewayServer::NewMessage, this, std::placeholders::_1, std::placeholders::_2)); // 文件相关接口 _http_server.Post(FILE_GET_SINGLE, (httplib::Server::Handler)std::bind(GatewayServer::GetSingleFile, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FILE_GET_MULTI, (httplib::Server::Handler)std::bind(GatewayServer::GetMultiFile, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FILE_PUT_SINGLE, (httplib::Server::Handler)std::bind(GatewayServer::PutSingleFile, this, std::placeholders::_1, std::placeholders::_2)); _http_server.Post(FILE_PUT_MULTI, (httplib::Server::Handler)std::bind(GatewayServer::PutMultiFile, this, std::placeholders::_1, std::placeholders::_2)); // 语音识别接口 _http_server.Post(SPEECH_RECOGNITION, (httplib::Server::Handler)std::bind(GatewayServer::SpeechRecognition, this, std::placeholders::_1, std::placeholders::_2)); // -------------------- 启动 HTTP 服务器线程 -------------------- // 在新线程中启动 HTTP 服务器监听所有网络接口的指定端口 _http_thread std::thread([this, http_port]() { _http_server.listen(0.0.0.0, http_port); }); // 分离线程使其独立运行避免阻塞主线程 _http_thread.detach();这些回调函数本质就是在通过brpc来向该子服务的服务器发起调用。但是这些回调函数和我们之前写的有一点是不太一样的。首先这些回调函数的结构都是下面这样子的只有这样子的参数我们才能设置给我们的httplib服务器。void 函数名(const httplib::Request request, httplib::Response response) { }注册/登录操作的回调函数结构我们的请求内容都存放在request里面所以在每个回调处理函数里面必定有下面这一句req.ParseFromString(request.body);取出我们的请求内容存放到我们定义好的数据结构EmailVerifyCodeReq req; …… bool ret req.ParseFromString(request.body);接着我们就拿这个req里面的信息去发起RPC服务。那么在发起请求之前我们需要先去看看有没有可用的服务节点auto channel _mm_channels-choose(_user_service_name);有服务节点我们就发起RPC调用IMS::UserService_Stub stub(channel.get()); brpc::Controller cntl; stub.GetEmailVerifyCode(cntl, req, rsp, nullptr); if (cntl.Failed()) { LOG_ERROR({} 用户子服务调用失败, req.request_id()); return err_response(用户子服务调用失败); }我们这里是同步RPC调用等待RPC调用返回我们只需要将响应设置给回调函数的第二个参数httplib::Response response里面去即可。response.set_content(rsp.SerializeAsString(), application/x-protbuf);httplib库会自动帮我们将响应返回给我们的客户端。我们看看完整的例子void GetEmailVerifyCode(const httplib::Request request, httplib::Response response) { // 1. 取出http请求正文将正文进行反序列化 EmailVerifyCodeReq req; EmailVerifyCodeRsp rsp; auto err_response [req, rsp, response](const std::string errmsg) - void { rsp.set_success(false); rsp.set_errmsg(errmsg); response.set_content(rsp.SerializeAsString(), application/x-protbuf); }; bool ret req.ParseFromString(request.body);//我们请求的数据都存在于回调函数的第一个参数httplib::Request request里面 if (ret false) { LOG_ERROR(获取短信验证码请求正文反序列化失败); return err_response(获取短信验证码请求正文反序列化失败); } // 2. 将请求转发给用户子服务进行业务处理向子服务发起RPC调用 auto channel _mm_channels-choose(_user_service_name); if (!channel) { LOG_ERROR({} 未找到可提供业务处理的用户子服务节点, req.request_id()); return err_response(未找到可提供业务处理的用户子服务节点); } IMS::UserService_Stub stub(channel.get()); brpc::Controller cntl; stub.GetEmailVerifyCode(cntl, req, rsp, nullptr); if (cntl.Failed()) { LOG_ERROR({} 用户子服务调用失败, req.request_id()); return err_response(用户子服务调用失败); } //这里是同步RPC调用他会将结构存储到rsp里面。 //我们只需要将这个rsp里面的数据设置给我们这个回调函数的第二个参数httplib::Response response即可。 // 3. 得到用户子服务的响应后将响应内容进行序列化作为http响应正文 response.set_content(rsp.SerializeAsString(), application/x-protbuf); //application/x-protbuf是一个 HTTP 的 Content-Type 头字段值它告诉接收方HTTP 请求或响应的消息体body中的数据是使用 Protocol BuffersProtobuf序列化格式编码的。 //httplib服务器会自动帮我们将响应返回给我们的客户端 }后面所有的处理回调函数都是这样子的我不想多说大家自己体会。登录后的处理回调函数结构那么我们在调用登录的RPC服务之后服务端会生成一个session_id给每一个登录的用户并且将session_iduid存储进我们的redis数据库里面。那么我们的客户端给我们发送请求的时候它是不知道我们的每个用户的uid的因为服务器没有告诉它们uid只告诉了session_id那么我们的客户端在发起请求的时候是拿session_id给我们的网关子服务我们的网关子服务就需要去拿这个session_id去Redis数据库里面查询相关键值对给我们的请求里面填充这个uid然后再去调用RPC服务。那么我们大家在登录之后就会看到相比于上面 注册/登录操作的回调函数结构 就会多了下面这一段根据session_id去Redis数据库里面查询uid并填充进我们的请求里面的操作。// 2. 客户端身份识别与鉴权 // 用户登录之后系统会给用户分配一个session // 从这里开始后面的RPC服务都需要有这个session来进行操作 std::string ssid req.session_id(); auto uid _redis_session-uid(ssid); if (!uid) { LOG_ERROR({} 获取登录会话关联用户信息失败, ssid); return err_response(获取登录会话关联用户信息失败); } req.set_user_id(*uid);2.3.服务端向客户端发送通知我们的服务器在使用过程中是需要对这个客户端进行消息推送的。以下所有事件都需要服务端主动通过 WebSocket 推送给客户端好友添加申请通知当有人申请添加好友时推送给被申请人好友添加处理结果通知当申请人收到同意/拒绝结果时推送给申请人聊天会话创建通知当新会话被创建时推送给相关成员新消息通知当有新消息时推送给会话中的其他成员好友移除通知当好友被删除时推送给被删除方那么我们将这些定义在了notify.proto里面syntax proto3; package IMS; import base.proto; option cc_generic_services true; //这里记载了服务端应该去提醒客户端的一些事件服务端会采用websocket协议来向客户端推送消息 // 通知类型枚举定义了系统中支持的各种通知事件 enum NotifyType { FRIEND_ADD_APPLY_NOTIFY 0; // 好友添加申请通知 FRIEND_ADD_PROCESS_NOTIFY 1; // 好友添加处理结果通知 CHAT_SESSION_CREATE_NOTIFY 2; // 聊天会话创建通知 CHAT_MESSAGE_NOTIFY 3; // 新消息通知 FRIEND_REMOVE_NOTIFY 4; // 好友移除通知 } // 好友添加申请通知的消息体包含申请人的详细信息 message NotifyFriendAddApply { UserInfo user_info 1; // 申请人信息 } // 好友添加处理结果通知的消息体包含处理结果和处理人信息 message NotifyFriendAddProcess { bool agree 1; // 是否同意添加好友true: 同意, false: 拒绝 UserInfo user_info 2; // 处理人信息即同意或拒绝添加的好友的用户信息 } // 好友移除通知的消息体仅包含被删除的好友的用户 ID message NotifyFriendRemove { string user_id 1; // 被删除的好友的用户 ID } // 新聊天会话创建通知的消息体包含新建会话的详细信息 message NotifyNewChatSession { ChatSessionInfo chat_session_info 1; // 新建会话信息 } // 新消息通知的消息体包含消息的详细信息 message NotifyNewMessage { MessageInfo message_info 1; // 新消息内容 } // 统一的通知消息结构用于封装所有类型的通知 message NotifyMessage { optional string notify_event_id 1; // 通知事件操作 ID若存在则用于追踪无则忽略 NotifyType notify_type 2; // 通知事件类型决定使用哪个 oneof 分支 oneof notify_remarks { // 根据通知类型存放对应的具体消息体 NotifyFriendAddApply friend_add_apply 3; // 好友添加申请详情 NotifyFriendAddProcess friend_process_result 4; // 好友处理结果详情 NotifyFriendRemove friend_remove 7; // 好友移除详情 NotifyNewChatSession new_chat_session_info 5; // 新会话详情 NotifyNewMessage new_message_info 6; // 新消息详情 } }服务端在需要主动通知客户端时如好友申请、新消息等会构造 NotifyMessage 对象根据事件类型填充对应的 oneof 分支然后通过 WebSocket 连接发送二进制数据。对于这些RPC服务的http回调函数它们里面肯定有类似于下面这段我们以好友添加申请通知为例//——————————————————特别注意这里———————————————————— // 4. 若业务处理成功 --- 且获取被申请方长连接成功则向被申请放进行好友申请事件通知 auto conn _connections-connection(req.respondent_id());//先通过被申请人的uid去获取到有没有对应的websocket连接 if (rsp.success() conn) { LOG_DEBUG(找到被申请人 {} 长连接对其进行好友申请通知, req.respondent_id()); auto user_rsp _GetUserInfo(req.request_id(), *uid);//通过用户管理子服务去获取当前登录用户的信息 if (!user_rsp) { LOG_ERROR({} 获取当前客户端用户信息失败, req.request_id()); return err_response(获取当前客户端用户信息失败); } NotifyMessage notify; notify.set_notify_type(NotifyType::FRIEND_ADD_APPLY_NOTIFY);//设置通知类型是FRIEND_ADD_APPLY_NOTIFY notify.mutable_friend_add_apply()-mutable_user_info()-CopyFrom(user_rsp-user_info()); conn-send(notify.SerializeAsString(), websocketpp::frame::opcode::value::binary);//通过websocket连接给客户端发送消息 }其他的都是类似的。