zmq源码分析之poller和signaler如何建立联动实现用户层通知
文章目录核心实现1. Signaler 实现2. Socket Poller 与 Signaler3. 信号与 Poll 的配合详细流程1. 信号发送流程2. 信号接收流程技术要点1. 跨平台实现2. 线程安全3. 高效处理代码示例总结先看一段用户层代码// 创建线程安全的 socketvoid*socketzmq_socket(ctx,ZMQ_PUB);// PUB 是线程安全的zmq_bind(socket,tcp://*:5555);// 创建 pollerzmq_pollitem_t items[]{{socket,0,ZMQ_POLLOUT,0}// 监听可写事件};// 轮询while(1){intrczmq_poll(items,1,1000);if(rc-1)break;if(items[0].reventsZMQ_POLLOUT){// 可以发送消息zmq_send(socket,Hello,5,0);}}此处的socket是封装后的socket并非我们linux编程时的套接字它怎么能被zmq_poll所管理的。我们印象中poll管理的必须是文件描述符自然的socket内部借助了文件描述符的数据可读来实现通知用户消息可读而这其中linux的管道最适合这个通知机制被zmq封装成了signaler。核心实现signaler用户通知事件发生为何能让其被poll管理unix平台上使用管道。ZeroMQ 的信号signaler通过文件描述符与 poll 系统调用配合工作具体实现如下1. Signaler 实现signaler_t是一个跨平台的信号机制Unix使用 pipe 实现Windows使用 event object 实现关键方法get_fd()获取信号的文件描述符send()发送信号recv()接收信号2. Socket Poller 与 Signalersocket_poller_t在处理线程安全的 socket 时使用 signalerintzmq::socket_poller_t::add(socket_base_t*socket_,void*user_data_,shortevents_){// 处理线程安全 socketif(is_thread_safe(*socket_)){if(_signalerNULL){_signalernew(std::nothrow)signaler_t();// ...}socket_-add_signaler(_signaler);}// ...}3. 信号与 Poll 的配合poll 处理信号的流程添加信号文件描述符if(_use_signaler){item_nbr1;_pollfds[0].fd_signaler-get_fd();_pollfds[0].eventsPOLLIN;}等待信号// 调用 poll()constintrcpoll(_pollfds,_pollset_size,timeout);处理信号// 检测 signaler 事件if(_use_signaler_pollfds[0].reventsPOLLIN)_signaler-recv();// 接收信号检查 socket 事件// 检查所有 socket 的事件constintfoundcheck_events(events_,n_events_);详细流程1. 信号发送流程┌─────────────────────────────────────────────────────────────────────┐ │ 信号发送流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. 消息到达 socket │ │ └─ socket::read_activated() │ │ └─ 消息进入接收队列 │ │ └─ signaler-send() 发送信号 │ │ │ │ 2. Signaler 处理 │ │ └─ signaler::send() │ │ └─ Unix: write(pipe[1], ...) │ │ └─ Windows: SetEvent(...) │ │ │ └─────────────────────────────────────────────────────────────────────┘2. 信号接收流程┌─────────────────────────────────────────────────────────────────────┐ │ 信号接收流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ 1. zmq_poll 等待 │ │ └─ socket_poller_t::wait() │ │ └─ poll(_pollfds, ...) │ │ └─ 检测到 signaler 的文件描述符可读 │ │ │ │ 2. 处理信号 │ │ └─ signaler-recv() │ │ └─ Unix: read(pipe[0], ...) │ │ └─ Windows: WaitForSingleObject(...) │ │ │ │ 3. 检查事件 │ │ └─ check_events() │ │ └─ 对每个 socket 调用 getsockopt(ZMQ_EVENTS) │ │ └─ 检查是否有可读事件 │ │ │ └─────────────────────────────────────────────────────────────────────┘技术要点1. 跨平台实现Signaler 在不同平台的实现平台实现方式发送操作接收操作Unixpipewrite(pipe[1], ...)read(pipe[0], ...)WindowseventSetEvent(...)WaitForSingleObject(...)2. 线程安全信号机制确保线程安全线程安全的 socket 使用 signaler 进行通知非线程安全的 socket 直接使用文件描述符信号机制避免了线程竞争3. 高效处理信号处理的高效性信号只作为通知不传递数据避免了频繁的轮询减少了系统调用的开销代码示例使用 signaler 的典型场景// 创建线程安全的 socketvoid*socketzmq_socket(ctx,ZMQ_PUB);// PUB 是线程安全的zmq_bind(socket,tcp://*:5555);// 创建 pollerzmq_pollitem_t items[]{{socket,0,ZMQ_POLLOUT,0}// 监听可写事件};// 轮询while(1){intrczmq_poll(items,1,1000);if(rc-1)break;if(items[0].reventsZMQ_POLLOUT){// 可以发送消息zmq_send(socket,Hello,5,0);}}总结ZeroMQ 信号与 Poll 的配合机制信号发送当 socket 状态变化时如收到消息通过 signaler 发送信号信号检测zmq_poll 将 signaler 的文件描述符加入 poll 集合使用系统的 poll/select 等调用等待事件信号处理检测到 signaler 事件后调用 signaler-recv() 清除信号然后检查所有 socket 的事件状态事件通知将有事件的 socket 通知给用户这种设计使得 ZeroMQ 能够高效、线程安全地处理多个 socket 的事件是其高性能的重要基础。