文章目录核心流程详细实现1. 网络数据接收与解析2. 消息传递到 Socket3. 用户层通知机制4. zmq_poll 实现技术要点1. 消息完整性检测2. 通知机制3. 线程安全完整流程图示总结核心流程ZeroMQ 检测完整消息可读并通知用户的完整流程网络数据接收stream_engine从网络读取数据消息解析decoder解析数据检测完整消息消息传递通过pipe传递给session消息存储socket将消息存储在内部队列通知用户通过文件描述符的可读事件通知用户详细实现1. 网络数据接收与解析stream_engine 接收数据boolzmq::stream_engine_base_t::in_event_internal(){// 读取网络数据constintrcread(_inpos,bufsize);// 解码数据intrc_decoder-decode(_inpos,_insize);// 消息就绪时if(_decoder-message_ready()){msg_t*msg_decoder-msg();// 写入 pipe_pipe-write(msg);_pipe-flush();}}2. 消息传递到 Socketsession 处理voidzmq::session_base_t::read_activated(zmq::pipe_t*pipe_){// 从 pipe 读取消息msg_t msg;while(pull_msg(msg)0){// 处理消息// 写入 socket 的 pipe}}socket 接收voidzmq::socket_base_t::read_activated(zmq::pipe_t*pipe_){// 从 pipe 读取消息msg_t msg;while(pipe_-read(msg)){// 存储到内部队列_fq.push(msg);// 通知用户关键步骤if(_fq.size()1){// 第一次有消息时通知用户signaler-send();}}}3. 用户层通知机制文件描述符通知ZeroMQ 为每个 socket 创建一个通知文件描述符当有消息可读时通过signaler向该文件描述符写入数据用户可以通过ZMQ_FD获取该文件描述符使用poll/select等系统调用监听该文件描述符用户 API// 获取通知文件描述符zmq_fd_t fd;size_t fd_sizesizeof(fd);zmq_getsockopt(socket,ZMQ_FD,fd,fd_size);// 监听文件描述符structpollfdpfd;pfd.fdfd;pfd.eventsPOLLIN;poll(pfd,1,-1);// 检查是否有消息uint32_tevents;size_t events_sizesizeof(events);zmq_getsockopt(socket,ZMQ_EVENTS,events,events_size);if(eventsZMQ_POLLIN){// 有消息可读zmq_recv(socket,buffer,sizeof(buffer),0);}4. zmq_poll 实现socket_poller 检测intzmq::socket_poller_t::check_events(zmq::socket_poller_t::event_t*events_,intn_events_){intfound0;for(items_t::iterator it_items.begin(),end_items.end();it!endfoundn_events_;it){if(it-socket){// 检查 socket 事件uint32_tevents;it-socket-getsockopt(ZMQ_EVENTS,events,events_size);if(it-eventsevents){// 有事件发生events_[found].socketit-socket;events_[found].eventsit-eventsevents;found;}}}returnfound;}技术要点1. 消息完整性检测decoder负责解析网络数据检测消息边界message_ready当解析出完整消息时返回 true帧边界ZeroMQ 消息由多个帧组成decoder 负责检测帧边界2. 通知机制signaler跨平台的信号机制用于通知用户文件描述符每个 socket 对应一个通知文件描述符事件状态通过ZMQ_EVENTS获取 socket 的事件状态3. 线程安全线程安全 socket使用 signaler 机制非线程安全 socket直接使用文件描述符poller支持同时监听多个 socket完整流程图示┌─────────────────────────────────────────────────────────────────────┐ │ 消息可读通知流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 网络数据到达 │ │ └─ stream_engine::in_event() │ │ └─ 读取网络数据 │ │ └─ decoder::decode() 解析数据 │ │ └─ 检测消息完整性 │ │ └─ 完整消息就绪 │ │ │ │ 2. 消息传递 │ │ └─ _pipe-write(msg) 写入 pipe │ │ └─ session::read_activated() 处理 │ │ └─ socket::read_activated() 接收 │ │ └─ _fq.push(msg) 存储到队列 │ │ └─ signaler-send() 发送通知 │ │ │ │ 3. 用户检测 │ │ └─ poll() 检测文件描述符可读 │ │ └─ zmq_getsockopt(ZMQ_EVENTS) 检查事件 │ │ └─ zmq_recv() 读取消息 │ │ │ └─────────────────────────────────────────────────────────────────────┘总结ZeroMQ 检测完整消息可读并通知用户的机制消息解析decoder解析网络数据检测完整消息消息存储消息存储在 socket 的内部队列通知触发通过signaler触发文件描述符的可读事件用户检测用户通过poll或zmq_poll检测事件消息读取调用zmq_recv读取消息这种设计确保了用户能够及时得知有消息可读同时避免了忙等带来的性能损耗是 ZeroMQ 高效通信的重要基础。