汇编魔法实现 C++ 协程

2019.09.28
SF-Zhou

协程,简而言之就是用户态线程。C++ 官方协程已经被提上日程,乐观估计 2020 年能发布,然后 2025 年能用上。但当下的性能问题亟待解决,没有官方实现也可以自己造轮子。除了切换到 Go、使用语言级支持的协程外,也可以在 C++ 内使用基于汇编实现的协程,比如微信开源的 libco。本文将从简单的汇编开始,逐步分析 x86-64 环境下协程的实现原理。

1. 栈帧 Stack Frames

在 C++ 的执行环境下,通常会在调用函数时为其分配栈帧使其有独立的内存空间放置临时变量。一般使用 RBP 寄存器(x86 则是 EBP 寄存器)存储栈帧的底部位置,然后使用 RBP 寄存器和偏移量定位临时变量。举个例子:

void func() {
  int a, b, c;
  ...
  return;
}

int main() {
  func();
}

对应的汇编可能是这样的:

_Z4funcv:
  push rbp     ; 存储 RBP 的值
  mov rbp, rsp ; 将 RPB 指向当前栈顶 RSP
  sub rsp, 12  ; 将栈顶上移 12 个字节,为临时变量 a/b/c 腾出空间
               ; a = [rbp - 4], b = [rbp - 8], c = [rbp - 12]
  mov rsp, rbp ; 恢复栈顶指针 RSP
  pop rbp      ; 恢复 RBP 的值,即为函数调用者的栈帧地址
  ret          ; 读取栈顶存储的返回地址,弹出,返回

main:
  push rbp
  mov rbp, rsp
  call _Z4funcv
  mov eax, 0
  pop rbp
  ret

其中 call 会将函数的返回地址压栈,并 jump 到函数的起始位置,相当于:

push rip + 2 ; 返回地址为当前地址加两条指令的偏移量
jmp _Z4funcv

对于有参数的函数,编译器会根据参数的数量和大小,使用寄存器或栈进行传递,例如:

int add(int x, int y) {
  return x + y;
}

struct Array {
  int values[8];
};

int sum(Array arr) {
  return arr.values[0] + arr.values[1];
}

对应的汇编:

_Z3addii:
  push rbp
  mov rbp, rsp
  mov DWORD PTR [rbp-4], edi ; 参数较少时使用 rdi 和 rsi 等寄存器传递,这里取其低 32 位
  mov DWORD PTR [rbp-8], esi ; 如果使用 -O2 优化,会消除这几句冗余的传值
  mov edx, DWORD PTR [rbp-4]
  mov eax, DWORD PTR [rbp-8]
  add eax, edx
  pop rbp
  ret

_Z3sum5Array:
  push rbp
  mov rbp, rsp
  mov edx, DWORD PTR [rbp+16] ; 参数较大/多时使用栈传递,此时 [rbp] 存储调用者 rbp 寄存器值
  mov eax, DWORD PTR [rbp+20] ; [rbp+8] 存储返回地址
  add eax, edx
  pop rbp
  ret

由于依靠 RBP 定位临时变量的位置,一般称其为 Frame Pointer,称 RSP 为 Stack Pointer。一般会要求 RSP 始终指向栈顶的位置,以保证可以在中断的情况下根据 RSP 申请新的栈帧处理中断任务。注意栈一般是朝着内存地址减小的方向增长的。栈帧示意图:

|low  addr|
|---------|
|   ...   | <- [rsp] 栈顶
|   ...   |
|   ...   | <- [rbp - x] 函数临时变量
|---------|
|   RBP'  | <- [rbp + 0] 调用者原 RBP 值
|---------|
| RetAddr | <- [rbp + 8] 返回地址
|---------|
|   ...   | <- [rbp + x] 函数参数
|---------|
|high addr|

了解了栈帧之后,就可以尝试使用汇编实现一些魔法了,比如:

#include <iostream>

extern "C" void run(void *) asm("run");
asm(R"(
run:
  jmp *%rdi
)");

void func() {
  std::cout << "Hello ASM" << std::endl;
}

int main() {
  run((void *)func);
}

上述代码需要 C++11 支持,可以点击此处在线编译执行。这里手写了一个汇编函数 run,其功能为跳转到第一个参数指向的地址处,即 func 函数。由于返回地址仍然是 main 函数的位置,故不需要其他额外的修改操作。

而这就是实现协程切换的核心魔法了。

2. 协程切换 Swap Coroutines

在协程的执行过程中,一般会在遇到 IO 等待事件时主动出让执行权;而当 IO 事件完成、获得执行权时,再恢复出让前的状态,继续执行。举个例子:

void func() {
  std::cout << "Wait!" << std::endl;
  yield();  // 出让执行权
  std::cout << "Finish!" << std::endl;
}

在出让执行权时需要保存当前执行的状态,包括堆栈和寄存器等;而当再次获得执行权时,恢复先前保存的状态即可。实际上堆栈信息也保存在寄存器中(RBPRSP),所以关注寄存器就够了,这一点可以参考 libaco 中关于寄存器状态的说明。使用汇编实现寄存器的保存和恢复后,就可以实现上述函数了:

#include <iostream>
#include <stdint.h>
#include <vector>

extern "C" void swap_context(void *, void *) asm("swap_context");
asm(R"(
swap_context:
  mov 0x00(%rsp), %rdx
  lea 0x08(%rsp), %rcx
  mov %r12, 0x00(%rdi)
  mov %r13, 0x08(%rdi)
  mov %r14, 0x10(%rdi)
  mov %r15, 0x18(%rdi)
  mov %rdx, 0x20(%rdi)
  mov %rcx, 0x28(%rdi)
  mov %rbx, 0x30(%rdi)
  mov %rbp, 0x38(%rdi)
  mov 0x00(%rsi), %r12
  mov 0x08(%rsi), %r13
  mov 0x10(%rsi), %r14
  mov 0x18(%rsi), %r15
  mov 0x20(%rsi), %rax
  mov 0x28(%rsi), %rcx
  mov 0x30(%rsi), %rbx
  mov 0x38(%rsi), %rbp
  mov %rcx, %rsp
  jmpq *%rax
)");

struct Registers {
  void *reg[8];
} ma, co;

void func() {
  std::cout << "Wait!" << std::endl;
  swap_context(&co, &ma);  // yield
  std::cout << "Finish!" << std::endl;
  swap_context(&co, &ma);  // exit
}

int main() {
  std::vector<char> mem(4096);
  void *stack = (char *)((uintptr_t)(&mem.back()) & ~15ull) - sizeof(void *);

  co.reg[4] = (void *)func;
  co.reg[5] = stack;

  swap_context(&ma, &co);  // start coroutine
  std::cout << "Resume" << std::endl;
  swap_context(&ma, &co);  // resume coroutine
  return 0;
}

可以点击此处在线编译执行。定义的 Registers 结构体内会存储 8 个寄存器,定义两个 maco 状态用来保存主协程和子协程的寄存器值。main 函数中申请了一段空间作为子协程的栈,设定子协程存储的寄存器中的 RAXfunc 函数地址,RCX 为栈顶,而后执行 swap_context 保存当前主协程状态并切换到子协程。

swap_context:
  mov 0x00(%rsp), %rdx  ; 将返回地址保存到 RDX 寄存器
  lea 0x08(%rsp), %rcx  ; 将原 RSP 地址存储到 RCX 寄存器
  mov %r12, 0x00(%rdi)  ; 将当前协程的 R12 寄存器保存到 RDI 指向的 Registers 结构体中,下同
  mov %r13, 0x08(%rdi)
  mov %r14, 0x10(%rdi)
  mov %r15, 0x18(%rdi)
  mov %rdx, 0x20(%rdi)  ; 这里保存了当前协程的返回地址
  mov %rcx, 0x28(%rdi)  ; 这里保存了当前协程的栈顶地址
  mov %rbx, 0x30(%rdi)
  mov %rbp, 0x38(%rdi)  ; 这里保存了当前协程的栈帧地址
  mov 0x00(%rsi), %r12  ; 从 RSI 指向的 Registers 结构体中恢复 R12 寄存器的值,下同
  mov 0x08(%rsi), %r13
  mov 0x10(%rsi), %r14
  mov 0x18(%rsi), %r15
  mov 0x20(%rsi), %rax  ; 恢复返回地址,暂存到 RAX 寄存器
  mov 0x28(%rsi), %rcx  ; 恢复栈顶地址,暂存到 RCX 寄存器
  mov 0x30(%rsi), %rbx
  mov 0x38(%rsi), %rbp  ; 恢复栈帧地址
  mov %rcx, %rsp        ; 恢复栈顶地址
  jmpq *%rax            ; 跳回 RAX 保存的返回地址,完成切换

swap_context 函数有两个参数,无返回值,两个参数使用 RDIRSI 传递。先执行保存,再执行恢复,最后利用返回地址跳到应该前往的位置,完成协程的启动和切换,奇技淫巧矣。有兴趣可以自己用 GDB/LLDB 调试运行看看。

3. 事件循环 Event Loop

正常工作的协程是不会无缘无故出让自己的执行权的,只有在等待 IO、无法继续执行下去时才会出让,例如 usleep(1000)。这时候闲着也是闲着,把执行权让出去、等 IO 完成再继续执行,系统才能获得最大的执行效率。由于协程无法做到主动抢占式执行,就需要有一个事件循环在协程等待的事件完成时唤醒协程,恰好主协程就特别适合做这件事情。

usleep 为例,当子协程执行 usleep 时,可以将其加入定时器中,并执行 yield 出让执行权;事件循环则不断地重复,检查定时器中有没有等待结束的协程,如有则出让自己的执行权将其唤醒、继续子协程的任务,如此循环往复。

#include <stdint.h>
#include <time.h>
#include <unistd.h>
#include <iostream>
#include <queue>
#include <vector>

extern "C" void swap_context(void *, void *) asm("swap_context");
asm(R"(
swap_context:
  mov 0x00(%rsp), %rdx
  lea 0x08(%rsp), %rcx
  mov %r12, 0x00(%rdi)
  mov %r13, 0x08(%rdi)
  mov %r14, 0x10(%rdi)
  mov %r15, 0x18(%rdi)
  mov %rdx, 0x20(%rdi)
  mov %rcx, 0x28(%rdi)
  mov %rbx, 0x30(%rdi)
  mov %rbp, 0x38(%rdi)
  mov 0x00(%rsi), %r12
  mov 0x08(%rsi), %r13
  mov 0x10(%rsi), %r14
  mov 0x18(%rsi), %r15
  mov 0x20(%rsi), %rax
  mov 0x28(%rsi), %rcx
  mov 0x30(%rsi), %rbx
  mov 0x38(%rsi), %rbp
  mov %rcx, %rsp
  jmpq *%rax
)");

struct Context {
  void *reg[8];
  std::vector<char> mem;
  Context(void (*func)() = nullptr) : mem(4096) {
    reg[4] = (void *)func;
    reg[5] = (char *)((uintptr_t)(&mem.back()) & ~15ull) - sizeof(void *);
  }
} ma;
Context *current = nullptr;

void resume_coroutine(Context *coroutine) {
  current = coroutine;
  swap_context(&ma, current);
}

uint64_t GetMs() {
  timespec ts;
  clock_gettime(CLOCK_MONOTONIC, &ts);
  return ts.tv_nsec / 1000000 + ts.tv_sec * 1000ull;
}

struct Task {
  uint64_t expire;
  Context *coroutine;
  bool operator<(const Task &other) const { return expire > other.expire; }
};
std::priority_queue<Task> tasks;

void coroutine_sleep(int ms) {
  uint64_t expire = GetMs() + ms;
  tasks.push(Task{.expire = expire, .coroutine = current});
  swap_context(current, &ma);
}

void event_loop(int timeout_in_seconds) {
  uint64_t start = GetMs();
  while (true) {
    usleep(1000);

    while (!tasks.empty()) {
      if (GetMs() > tasks.top().expire) {
        Task task = tasks.top();
        tasks.pop();
        resume_coroutine(task.coroutine);
      } else {
        break;
      }
    }

    if ((GetMs() - start) > timeout_in_seconds * 1000) {
      break;
    }
  }
}

void func1() {
  while (true) {
    std::cout << "Coroutine 1 print per 500ms" << std::endl;
    coroutine_sleep(500);
  }
}

void func2() {
  while (true) {
    std::cout << "Coroutine 2 print per 1000ms" << std::endl;
    coroutine_sleep(1000);
  }
}

int main() {
  Context co1(func1);
  resume_coroutine(&co1);

  Context co2(func2);
  resume_coroutine(&co2);

  event_loop(5);
  return 0;
}

可以点击此处在线编译执行。这里使用优先队列记录定时的任务,在事件循环中每 1ms 检查一次有没有超时的任务,有则将其唤醒执行。一共创建了两个协程,通过不断地切换执行权,在单线程的环境中实现“并行”。

工业级协程库的原理与上文所述的类似,不过其一般会 hook 大量系统 IO 函数,如 read / write / send / recv,使其在 IO 等待过程中主动出让执行权,在事件循环中使用类似 EpollKqueue 管理等待的事件。工业级协程库也会提供更丰富的基础组件,这方面可以参考 libcolibgo 的实现。

References

  1. "Coroutines (C++20)", C++ References
  2. "libco", GitHub/Tencent
  3. "libaco", GitHub/hnes
  4. "libgo", GitHub/yyzybb537
  5. "libco 分析(上):协程的实现", Kaiyuan Blog
  6. "Functions and Stack Frames", x86 Disassembly
  7. "FPO", Larry Osterman's WebLog
  8. Compiler Explorer