最近在使用 Folly 的协程做 RPC 框架,遇到一些问题:
co_await
操作时,会出现线程切换的情况这就导致了目前只能使用单线程的 IOThreadPoolExecutor 执行协程中的 IO 操作。即使修复了这个问题,每个线程一个 EventBase 也会限制对应连接的 IO 能力。好在目前的 RPC 框架中已经使用队列剥离了 IO 操作和请求处理,请求处理阶段依然可以使用 Folly 的协程框架,IO 部分可以替换为无协程的实现。为了尽可能地提升性能,笔者自己造了一个单 Epoll 多线程 IO 的轮子,供参考。
对于这个 IO 模型,笔者计划的目标是:
先看一个 Epoll 边缘触发的例子(在线执行):
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include <atomic>
#include <thread>
#define CHECK(expr) \
do { \
int ret = (expr); \
if (ret != 0) { \
printf("%d error %d\n", __LINE__, ret); \
return ret; \
} \
} while (0);
constexpr uint16_t port = 8000;
constexpr uint32_t maxevents = 32;
std::atomic<bool> quit{false};
int server() {
struct sockaddr_in serv_addr;
socklen_t serv_len = sizeof(serv_addr);
int lfd = ::socket(AF_INET, SOCK_STREAM, 0);
CHECK(::fcntl(lfd, F_SETFL, ::fcntl(lfd, F_GETFL) | O_NONBLOCK));
int value = 1;
CHECK(::setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(int)));
CHECK(::setsockopt(lfd, SOL_SOCKET, SO_REUSEPORT, &value, sizeof(int)));
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(port);
CHECK(::bind(lfd, (struct sockaddr *)&serv_addr, serv_len));
CHECK(::listen(lfd, 64));
int epfd = ::epoll_create(maxevents);
struct epoll_event ev {
EPOLLIN | EPOLLET, { .fd = lfd }
};
CHECK(::epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev));
while (true) {
struct epoll_event events[maxevents];
int ret = ::epoll_wait(epfd, events, maxevents, -1);
CHECK(ret == -1 ? errno : 0);
if (quit.load()) {
break;
}
for (int i = 0; i < ret; ++i) {
int fd = events[i].data.fd;
if (fd == lfd) {
struct sockaddr_in client_addr;
socklen_t cli_len = sizeof(client_addr);
int cfd = ::accept(lfd, (struct sockaddr *)&client_addr, &cli_len);
CHECK(cfd == -1 ? errno : 0);
int flags = ::fcntl(cfd, F_GETFL, 0);
CHECK(flags == -1 ? errno : 0);
CHECK(::fcntl(cfd, F_SETFL, (flags | O_NONBLOCK)));
struct epoll_event evt {
(EPOLLIN | EPOLLOUT | EPOLLET), { .fd = cfd }
};
CHECK(::epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &evt));
continue;
}
int e = events[i].events;
bool in = e & EPOLLIN;
bool out = e & EPOLLOUT;
printf("fd: %d, events %d, epoll in %d, epoll out %d\n", fd, e, in, out);
}
}
::close(epfd);
::close(lfd);
return 0;
}
int client() {
struct sockaddr_in cli_addr;
socklen_t cli_len = sizeof(cli_addr);
memset(&cli_addr, 0, sizeof(cli_addr));
cli_addr.sin_family = AF_INET;
cli_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
cli_addr.sin_port = htons(port);
int fd = ::socket(AF_INET, SOCK_STREAM, 0);
CHECK(::connect(fd, (struct sockaddr *)&cli_addr, cli_len));
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(::write(fd, "write", 5) - 5);
std::this_thread::sleep_for(std::chrono::seconds(1));
CHECK(::write(fd, "write", 5) - 5);
std::this_thread::sleep_for(std::chrono::seconds(1));
quit = true;
CHECK(::close(fd));
return 0;
}
int main() {
std::thread svr(server);
std::thread cli(client);
cli.join();
svr.join();
return 0;
}
从上面的例子得到的结论是,当 fd 发生以下几种状态变化时,epoll_wait
会返回该 fd 所有关注的事件。
进而,当 fd 可读或可写时,Epoll 也可能会重复的报告该事件。
设计目标中希望 Epoll 收到可读事件后,打包成 IO 任务丢给线程池。但 Epoll 可能会重复的报告可读事件,此时可能有正在执行的读操作,造成竞争。这也就需要一种机制,保证可读事件出现时,有且仅有一个正在执行的读操作,完成所有可读事件的处理。
加锁非吾所愿。分析一下当前的需求:
该需求可以通过原子量完成,需要使用两个 Bit 位,分别命名为 Reading 和 NewEvent。Reading 为 1 时表示当前已经有读操作启动,NewEvent 为 1 时表示有新的可读事件出现。在可读事件出现时,同时打上 Reading 和 NewEvent 标记,如果此前没有 Reading 标记,则启动一个新的读任务;读任务启动时去除 NewEvent 标记,而后持续进行读操作直至返回 EAGAIN
,此时检查是否有 NewEvent 标记,如果有则去除该标记继续重试,否则尝试 CAS 去除 Reading 标记,如果成功则该读任务成功退出。这样可以保证总会有一个正在执行或待执行的读任务处理新出现的读事件。
// Epoll thread
if (events & (EPOLLIN | EPOLLERR | EPOLLHUP)) {
auto flags = socket->flags.fetch_or(ReadingFlag | NewEventFlag);
if ((flags & ReadingFlag) == 0) {
socket->start_read_task();
}
}
// Read thread
void read_task(Socket *socket) {
socket->flags &= ~NewEventFlag;
while (true) {
int ret = ::read(socket->fd, buff, size);
if (ret > 0) {
// ...
} else if (ret == 0) {
// closed.
} else if (errno == EAGAIN) {
auto flags = socket->flags.load(std::memory_order_acquire);
while (true) {
if (flags & NewEventFlag) {
socket->flags &= ~NewEventFlag;
// 当前有新的可读事件出现,可以直接重试,或者重新打包一个读任务丢给线程池
// ...
}
auto newFlags = flags & ~ReadingFlag.
if (socket->flags.compare_exchange_strong(flags, newFlags)) {
// CAS 成功,去除了 ReadingFlag,可以成功退出
}
// 如果失败,则继续重试
}
} else {
// error.
}
}
}
与读事件的处理略微有些不同,写事件的处理要麻烦一些。当写入队列中有数据、且 fd 可写时,就需要有且仅有一个写任务处理所有的写入。也就说,当前没有写任务的前提下,会有两种可能的启动写任务的场景:
写入任务退出也有两种场景:
bRPC 是这样处理上述需求的:
bRPC 的处理是完备的,但可能需要频繁地操作 Epoll,这也是非吾所愿。笔者这里仍然依靠原子量实现写任务的启动与退出。这里需要使用四个 Bit 位,使用 Writable 和 NewEpollOut 表示当前 fd 是否可写、是否有新的可写入事件,使用 HasMsg 和 NewMsg 表示当前写入队列是否有数据、是否有新的数据插入队列。当且仅当 Writable 和 HasMsg 首次同时变为 1 时才启动写入任务。当写入队列写完时,尝试去除 HasMsg 和 NewMsg 标记;当 fd 不可写时,尝试去除 Writable 和 NewEpollOut 标记;当 Epoll 收到新的可写入事件时,仍然使用 fetch_or
打上可写入标记,并判断是否启动新的写入任务。事实上 Epoll 收到可读可写事件时,可以一次性地完成 fetch_or
打标记的任务。