用 C++ 实现一个简单的 Reactor 模式

何谓 Reactor 模式?(请看 Schmidt, D.C. (1995). Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events)

Reactor 模式的目的:处理多个客户端的并发服务请求 (以下是论文 Reactor 的原话)

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. —— Reactor

Reactor 模式的主要结构:

  • Handles:标识由 OS 管理的一系列资源。这些资源通常包括:网络连接,打开文件,定时器,同步对象等等。Handle 可以看成是对这些资源的句柄,例如 fd (file description), timerfd。
  • Synchronous Event Demultiplexer:这在 Linux 中其实就是 select、poll、epoll。它利用 IO multiplexing 机制,通知用户哪些 Handles 已经 IO ready (即 调用 IO 函数不会导致 阻塞)。在我的实现中,epoll 接口被封装到了 Poller 类中。
  • Initiation Dispatcher:用于注册、移除以及分发事件处理器 (Event Handler) 的接口。在我的实现中就是 Channel 类,它实现了事件分发机制。
  • Event Handler:这是一个接口,它实现了根据指定的 IO Event 来进行指定的操作。
  • Concrete Event Handler:这是 事件处理器 的具体实现。

reactor 模式

让我来解释一下整个流程:

  1. Concrete Event Handler 不断地监视是否有 IO event 发生
  2. 有 IO event 发生,Concrete Event Handler 立即通知 Handle,并让 Dispatcher 把 IO event 分发给对应的 event handler
  3. event handler 处理 event
  4. 在处理 event 的过程中,可能又会注册需要 monitor 的 IO event 到 Concrete Event Handler 中去,并把对应的 event handler 注册到 Disapatcher 中去
  5. 回到步骤 1

​ 上述整个流程其实是管理所有文件描述符的所有事件(包括可读可写,等等),并在适当时机执行指定的回调函数(event handler)。所以,这实际上是一个事件循环 (event loop)。因此我用 EventLoop 类来描述这一整个流程。


一个简单的 Reactor 就仅仅需要以下三个类:

  • EventLoop: 实现事件循环
  • Channel: 实现事件分发机制
  • Poller: 实现监视 IO event 并通知

EventLoop

其核心是实现一个 事件循环 loop

1
2
3
4
5
6
7
8
9
10
11
12
13
void EventLoop::loop() {
···
std::vector<Handle> activeHandles;
while (1) {
activeHandles.clear();
// 执行流程 1:一旦有 IO 事件发生就让 Poller 通知我
// 并且把 对应 IO 的 Handle 告诉我
Poller.poll(activeHandles);
// 执行流程 2, 3
for (auto handle : activeHandles)
Dispatcher.handleEvent(handle); // 在具体 event handler 执行的过程中回去执行流程 4, 5
}
}

​ 这里仅仅是伪代码,还有很多细节是需要补全的,例如 Dispatcher 会根据 handle 以及发生的 IO event 去挑选具体的 handler,这些细节可以全部封装到 Channel 中!所有实际的实现就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
void EventLoop::loop() {
···
std::vector<Channels*> activeChannels;
while (1) {
activeChannels.clear();
// 执行流程 1:一旦有 IO 事件发生就让 Poller 通知我
// 并且把 对应 IO 的 Handle 告诉我
Poller.poll(activeChannels);
// 执行流程 2, 3
for (auto channel : activeChannels)
channel->handleEvent(); // 在具体 event handler 执行的过程中回去执行流程 4, 5
}
}

Poller

它就是对 整个 Epoll 机制的封装,即 epoll_create、epoll_wait、epoll_ctl

它无非就是通过 epoll_create 创建一个 epoll instance,然后 通过 epoll_ctl 注册用户需要让 OS monitor 的 IO event。所以代码整体骨架如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Poller::Poller()
: _epfd(epoll_create(1))
{}

Poller::~Poller() {
::close(_epfd);
}

int Poller::poll(std::vector<channel*> activeChannel, int timeout) {
struct epoll_events events[kMaxEvents];
int n = epoll_wait(_epfd, events, kMaxEvents, timeout); // 等待 IO event 发生
for (int i = 0;i < n;++i) {
Channel* pChannel = reinterpret_cast<Channel*>(events[i].data.ptr);
pChannel->setOccurEvents(events[i].events); // 把 epoll 监测到的 IO 事件(可读、可写)记录到对应 Handle 的 channel 中去
activeChannel.push_back(pChannel);
}
return n;
}

void Poller::update(Channel* pChannel) {
// 该函数用于注册 IO event 到 epoll instance 中去
struct epoll_events ev;
ev.events = pChannel.getWaitEvents(); // 获取用户想要 OS monitor 的 IO event
ev.data.ptr = pChannel;
epoll_ctl(); // 根据具体情况选择 EPOLL_CTL_ADD、EPOLL_CTL_MOD 或 EPOLL_CTL_DEL
}

Channel

​ 这个类实现了 IO 事件分发 机制,也是 Reactor 模式中重要的一环。它记录了用户想要 epoll instance 帮忙 monitor 的 fd 的具体 IO event(例如可读事件),也记录了当 IO event 发生时应该进行的操作(即 event handler)。有时候我们要让 epoll instance 监视多种 IO event,例如 可读、可写,因此一旦 IO 可读,epoll instance 就会通知我们,我们此时需要记录下 具体是哪种 IO event 导致的通知以便做出正确的响应操作,因此也需要记录 已发生的 IO event

​ 当前我的设计是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <functional>
class EventLoop;

class Channel {
public:
using EventCallBack = std::function<void()>;

// 每个 Channel 独属于 一个 EventLoop
explicit Channel(EventLoop*, int);

// 将 io multi-plexing 检测到的已发生的事件 记录在 _occurEvents 中
void setOccurEvents(uint32_t occurEvents) { _occurEvents = occurEvents; }

uint32_t getWaitEvents() const { return _waitEvents; }

// 设置事件回调
void setReadEventCallBack(EventCallBack cb) { _readEventCallBack = std::move(cb); }
void setWriteEventCallBack(EventCallBack cb) { _writeEventCallBack = std::move(cb);}

// 把事件分发给具体的事件处理函数,它就是一个 Event Handler
void handleEvent();

// 让 epoll instance 关注 指定 事件
void monitorReading() { _waitEvents |= kReadEvent; update(); }
void monitorWriting() { _waitEvents |= kWriteEvent; update(); }

static const uint32_t kNoneEvent; // 无任何 IO event
static const uint32_t kReadEvent; // 可读 事件
static const uint32_t kWriteEvent;// 可写 事件
private:
// 将事件注册到 epoll instance 中去
void update();
private:
EventLoop *_pLoop;
int _fd;

uint32_t _waitEvents; // 等待发生的事件
uint32_t _occurEvents; // 已发生的事件

// Concrete Event Handler
EventCallBack _readEventCallBack;
EventCallBack _writeEventCallBack;
};

这就是一个简单易用的 Reactor 模式的 C++ 实现。

参考

  1. Schmidt, D.C. (1995). Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events
  2. Reactor 模式
  3. 使用C++实现简单的Reactor模式