距上一次初探 C++20 协程近一年了,对 C++20 的无栈协程方案有了一些新的认识,写在这里。
C++20 的协程方案中有三个核心的对象:
promise object
,由用户实现,用于在协程内部提交结果或者抛出异常;coroutine handle
,无所有权的协程内部状态指针,用于在协程外部恢复协程运行或者执行销毁;coroutine state
,协程内部状态,存储 promise object
、参数、局部变量以及协程恢复上下文。再看一下官方文档中的样例(在线执行):
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
// 用于 co_await,需要实现 await_ready/await_suspend/await_resume 的 traits
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; } // 判断是否需要挂起
void await_suspend(std::coroutine_handle<> h) { // 挂起时调用,传入 handle
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); }); // 在新的线程恢复协程
// 假设新线程立即执行,恢复协程,则该 awaiter 会立即被销毁,故 p_out 不可用
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() {}
};
// 返回 awaitable 对象
return awaitable{&out};
}
struct task {
// 用户实现的 promise 类
struct promise_type {
task get_return_object() { return {}; } // 协程首次挂起时返回给调用方的对象
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {} // 协程内部 co_return void 时调用
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
// 构造 coroutine state 对象,并初始化内部的 promise object,由编译器生成代码
// 调用 co_await promise.initial_suspend(),不挂起,继续执行
std::cout << "Coroutine started on thread: " << std::this_thread::get_id()
<< '\n';
// 1. 调用 switch_to_new_thread 构造 awaitable 对象
// 2. co_await awaitable,调用 awaitable.await_ready(),返回 false 执行挂起
// 3. 挂起时调用 awaitable.await_suspend(),并且给调用方返回 task 对象
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id()
<< '\n';
// 调用 co_await promise.final_suspend(),不挂起,继续执行
// 销毁 coroutine state 对象。若 final_suspend 返回 std::suspend_always 则需要用户自行调用 handle.destroy() 进行销毁
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}
C++20 协程使用了无栈协程的方案,那么实际执行时协程究竟运行在何处呢?对于这个问题,可以先回忆下栈在程序执行过程中的作用。
当调用某个函数时,调用方会先将当前执行的位置以及函数参数压栈,然后跳转到被调函数的起点,被调函数会根据自身局部变量的使用情况申请栈空间并执行对应的初始化,被调函数执行完成后自行回收栈空间,并跳转回原先调用的位置,栈也恢复了调用前的状态。栈中存储了恢复位置和局部变量。根据先进后出、单调增长的特性,每个函数可以根据栈顶置针和偏移量确定自身局部变量在栈上的位置。
对有栈协程来说,可以直接申请内存空间并修改栈顶指针实现自定义的栈,可以通过保存所有 called saved 寄存器的方式存储当前的执行状态,包括栈顶置针和恢复位置。当需要切换协程时,恢复寄存器也就恢复了挂起前的执行状态,局部变量依然可以根据栈顶置针和偏移量来确定位置。其优势是完全兼容原先的函数调用约定,劣势是需要根据最坏情况申请足够的栈空间,并且切换协程时会强行切换栈继而破坏 return stack buffer 的跳转预测,在空间时间效率上比不过无栈协程。
对无栈协程来说,协程切换时不会切换当前的栈,而是像直接调用函数一样恢复协程,类似下方的样例:
#include <cassert>
#include <iostream>
#include <memory>
// void coroutine() {
// std::uint64_t a = 0;
// std::uint64_t b = 0;
// a = 1;
// printf("set a = %lu and yield 1\n", a);
// yield();
//
// b = 2;
// printf("set b = %lu and yield 2\n", b);
// yield();
//
// printf("a + b = %lu and yield 3\n", a + b);
// yield();
// }
struct coroutine {
std::uint32_t state = 0;
std::uint64_t a = 0;
std::uint64_t b = 0;
void resume() {
switch (state) {
case 0:
a = 1;
printf("set a = %lu and yield 1\n", a);
state = 1;
return;
case 1:
b = 2;
printf("set b = %lu and yield 2\n", b);
state = 2;
return;
case 2:
printf("a + b = %lu and yield 3\n", a + b);
state = 3;
return;
case 3:
puts("done");
}
}
};
int main() {
auto co = std::make_unique<coroutine>();
co->resume();
puts("after yield 1");
co->resume();
puts("after yield 2");
co->resume();
puts("after yield 3");
co->resume();
}
无栈协程中的局部变量存储不再与栈相关,整个函数相当于被可挂起点切分为多个小函数,恢复时只需要在当前栈上调用下一个小函数。这意味着无栈协程需要一段独立的堆内存空间存储自身的局部变量以及运行状态,并且需要一套新的局部变量定位方式,而这些都是由编译器生成的代码来完成的。无栈协程的优势是可以按需申请栈空间,协程切换时仍然是传统函数调用、返回流程,CPU 的跳转预测优化仍然有效。
仔细观察协程函数调用的过程,你会发现每一次协程函数的调用都会触发一次堆内存分配(极少数场景下会被编译器优化掉),这对某些高性能低延迟的系统来说是需要进一步优化的。C++20 的协程方案也提供了自定义内存分配释放的接口,可以在 promise_type
中重载 operator new
和 operator delete
,类似下方的例子(在线执行):
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << '\n';
std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
}
void await_resume() {}
};
return awaitable{&out};
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
void* operator new(size_t size) {
auto ptr = new char[size];
printf("alloc %lu %p\n", size, ptr);
return ptr;
}
void operator delete(void* ptr, std::size_t size) noexcept {
printf("delete %lu %p\n", size, ptr);
delete[] static_cast<char*>(ptr);
}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id()
<< '\n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id()
<< '\n';
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}
/*
alloc 64 0x7fa45b505880
Coroutine started on thread: 0x10b219e00
New thread ID: 0x7000034a9000
Coroutine resumed on thread: 0x7000034a9000
delete 64 0x7fa45b505880
*/
同一个协程函数每次调用时需要申请的堆内存大小是一致的,可以根据这一点设计出高效的内存池。
理想中的协程应用应该是类似这样的:
Promise<uint64_t> A() { ... }
Promise<uint32_t> B() { ... }
Promise<uint64_t> C() {
auto a = co_await A();
auto b = co_await B();
co_return a + b;
}
int main() {
auto result = block_on(C());
}
换句话说,协程函数返回的 Promise 对象也需要是 Awaitable 的,在 Promise 类中实现 co_await
需要的 traits 即可,类似下面的例子(在线执行):
#include <coroutine>
#include <future>
#include <iostream>
#include <queue>
#include <thread>
// 简单的定时器
class Timer {
public:
void Add(uint32_t timeout, std::coroutine_handle<> h) {
tasks.emplace(Task{.expire = MsTick() + timeout, .coroutine = h});
}
void Loop() {
while (!tasks.empty()) {
if (MsTick() >= tasks.top().expire) {
tasks.top().coroutine.resume();
tasks.pop();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
static uint64_t MsTick() {
auto now = std::chrono::system_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
}
private:
struct Task {
uint64_t expire;
std::coroutine_handle<> coroutine;
bool operator<(const Task& other) const { return expire > other.expire; }
};
std::priority_queue<Task> tasks;
} timer;
class Sleeper {
public:
Sleeper(uint32_t timeout) : timeout_(timeout) {}
bool await_ready() { return false; }
void await_resume() {}
void await_suspend(std::coroutine_handle<> h) { timer.Add(timeout_, h); }
private:
uint32_t timeout_;
};
class Promise {
public:
struct promise_type {
Promise get_return_object() { return Promise(this); }
auto initial_suspend() noexcept { return std::suspend_never{}; }
auto final_suspend() noexcept { return std::suspend_never{}; }
void unhandled_exception() { std::terminate(); }
void return_void() {
// 子协程结束时唤醒父协程继续执行
if (parent) {
parent.resume();
}
}
std::coroutine_handle<> parent;
};
Promise(promise_type* promise) : promise_(promise) {}
// 当前协程结束才能返回 ready
bool await_ready() {
return std::coroutine_handle<promise_type>::from_promise(*promise_).done();
}
void await_resume() {}
// 父协程需要挂起时在子协程中注册一个结束的回调函数用以唤醒
void await_suspend(std::coroutine_handle<> h) { promise_->parent = h; }
private:
promise_type* promise_;
};
Promise Sleep(std::uint32_t msec) { co_await Sleeper(500); }
Promise CoroutineFunc() {
for (uint32_t i = 0; i < 10; ++i) {
co_await Sleep(500);
co_await Sleep(500);
puts("await 500ms * 2");
}
}
int main() {
CoroutineFunc();
timer.Loop();
}
显然,协程的使用具有传染性,如果在子函数中使用了协程,那么整个应用的调用链路上基本都需要启用协程。Golang 原生全局支持协程,而 Rust 中类似 Tokio 的框架也推荐直接在 main
函数中启用协程环境:
#[tokio::main]
async fn main() {
println!("hello");
}
// transform into
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}