一、为什么需要多 Reactor在第一篇中我们已经有了Reactor单线程 ↓ Acceptoraccept ↓ Connection处理读写结构已经正确了但还有一个关键问题问题只能用一个 CPU 核心单线程 Reactor↓一个线程处理所有连接结果CPU 利用率低 ❌多核机器浪费 ❌handler 一旦稍重就拖慢整体 ❌所以必须升级从“单 Reactor” → “多 Reactor 多线程”二、多 Reactor 模型是什么核心思想主线程只负责 accept ↓ 把连接分发给多个 Reactor 线程 ↓ 每个线程独立 epoll 处理连接架构图必须记住Main Reactor主线程 ↓ accept ┌────────┬────────┬────────┐ ↓ ↓ ↓ ↓ Sub Reactor1 Sub Reactor2 Sub Reactor3 ... (线程1) (线程2) (线程3) 每个 Sub Reactor epoll Connection三、这一模型解决什么问题1️⃣ 多核利用每个 Reactor 一个线程CPU 核心1 → Reactor1 CPU 核心2 → Reactor2 CPU 核心3 → Reactor32️⃣ 避免锁竞争 一个连接只属于一个 Reactorfd → 固定 Reactor所以不需要跨线程锁 ✔不需要共享 Connection ✔3️⃣ 高并发核心模型工业级 Nginx / Netty 都是类似思路accept 分发 多事件循环四、这一篇要做什么我们要把第一篇的结构升级成Reactor主线程 ↓ Acceptor ↓ 分发连接 ↓ Sub Reactor多个线程 ↓ Connection五、工程结构升级版├── EventHandler.h ├── Reactor.h / Reactor.cpp ├── Connection.h / Connection.cpp ├── Acceptor.h / Acceptor.cpp ├── ReactorThread.h / ReactorThread.cpp ├── ReactorThreadPool.h / ReactorThreadPool.cpp ├── main.cpp六、新增组件讲解1️⃣ ReactorThread一个线程 一个 ReactorReactorThread.h#ifndef REACTOR_THREAD_H #define REACTOR_THREAD_H #include Reactor.h #include thread #include memory class ReactorThread { public: ReactorThread(); ~ReactorThread(); Reactor* getReactor(); void start(); private: std::unique_ptrReactor reactor_; std::thread thread_; }; #endifReactorThread.cpp#include ReactorThread.h ReactorThread::ReactorThread() { reactor_ std::make_uniqueReactor(); } ReactorThread::~ReactorThread() { if (thread_.joinable()) { thread_.join(); } } Reactor* ReactorThread::getReactor() { return reactor_.get(); } void ReactorThread::start() { thread_ std::thread([this]() { reactor_-loop(); }); }这一层干嘛 封装线程 Reactor 每个线程干一件事运行一个 epoll 事件循环2️⃣ ReactorThreadPool线程池ReactorThreadPool.h#ifndef REACTOR_THREAD_POOL_H #define REACTOR_THREAD_POOL_H #include ReactorThread.h #include vector #include memory class ReactorThreadPool { public: explicit ReactorThreadPool(int size); void start(); Reactor* getNextReactor(); private: std::vectorstd::unique_ptrReactorThread threads_; int index_; }; #endifReactorThreadPool.cpp#include ReactorThreadPool.h ReactorThreadPool::ReactorThreadPool(int size) : index_(0) { for (int i 0; i size; i) { threads_.emplace_back(std::make_uniqueReactorThread()); } } void ReactorThreadPool::start() { for (auto t : threads_) { t-start(); } } Reactor* ReactorThreadPool::getNextReactor() { Reactor* reactor threads_[index_]-getReactor(); index_ (index_ 1) % threads_.size(); return reactor; }这一层干嘛 做一件核心事情负载均衡分发连接分发策略当前轮询Round Robin 每次 acceptclient_fd → 下一个 Reactor3️⃣ 修改 Acceptor关键新版Acceptor.h#ifndef ACCEPTOR_H #define ACCEPTOR_H #include EventHandler.h class Reactor; class ReactorThreadPool; class Acceptor : public EventHandler { public: Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool); ~Acceptor() override; int fd() const override; void onRead() override; void onWrite() override; void onClose() override; private: int listenFd_; Reactor* mainReactor_; ReactorThreadPool* threadPool_; }; #endif新版Acceptor.cpp#include Acceptor.h #include Reactor.h #include Connection.h #include ReactorThreadPool.h #include unistd.h #include iostream #include fcntl.h #include errno.h namespace { int setNonBlocking(int fd) { int flags fcntl(fd, F_GETFL, 0); return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } } Acceptor::Acceptor(int listenFd, Reactor* mainReactor, ReactorThreadPool* pool) : listenFd_(listenFd), mainReactor_(mainReactor), threadPool_(pool) {} Acceptor::~Acceptor() { if (listenFd_ 0) { close(listenFd_); } } int Acceptor::fd() const { return listenFd_; } void Acceptor::onRead() { while (true) { int clientFd accept(listenFd_, nullptr, nullptr); if (clientFd -1) { if (errno EAGAIN) break; break; } setNonBlocking(clientFd); // 核心选择一个子 Reactor Reactor* subReactor threadPool_-getNextReactor(); Connection* conn new Connection(clientFd, subReactor); subReactor-addHandler(conn, EPOLLIN); std::cout [Acceptor] new connection fd clientFd std::endl; } } void Acceptor::onWrite() {} void Acceptor::onClose() {}七、main.cpp最终结构#include Reactor.h #include Acceptor.h #include ReactorThreadPool.h #include arpa/inet.h #include fcntl.h #include sys/socket.h #include unistd.h #include iostream int setNonBlocking(int fd) { int flags fcntl(fd, F_GETFL, 0); return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } int main() { int serverFd socket(AF_INET, SOCK_STREAM, 0); int opt 1; setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt)); setNonBlocking(serverFd); sockaddr_in addr{}; addr.sin_family AF_INET; addr.sin_port htons(8080); addr.sin_addr.s_addr INADDR_ANY; bind(serverFd, (sockaddr*)addr, sizeof(addr)); listen(serverFd, 128); // 主 Reactor只 accept Reactor mainReactor; // 子 Reactor 线程池 ReactorThreadPool pool(4); pool.start(); // Acceptor 注册到主 Reactor Acceptor* acceptor new Acceptor(serverFd, mainReactor, pool); mainReactor.addHandler(acceptor, EPOLLIN); std::cout server running on :8080 std::endl; mainReactor.loop(); return 0; }八、这一版最关键变化从一个 Reactor到Main Reactoraccept↓Sub Reactor处理连接关键一行必须记住Reactor* subReactor threadPool_-getNextReactor(); 这就是连接分发九、最终理解核心epoll Reactor单线程一个线程处理所有 fd多 Reactor 模型主线程接连接子线程处理连接十、这一篇本质提升第一篇✔ 建立 Connection✔ 建立服务骨架第二篇✔ 多核并发✔ 真正高并发架构总结单 Reactor 是模型多 Reactor 才是架构下一篇第三篇 《C 服务端进阶三—— Reactor 协程现代异步模型》你会进入回调 → 协程epoll → coroutine写出类似 Go / Netty 的模型