bthread 中使用 libcontext 实现协程间的切换,原理类似汇编魔法实现 C++ 协程中的方法。看一个单元测试中的例子(在线执行):
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <iostream>
typedef void *bthread_fcontext_t;
extern "C" bthread_fcontext_t bthread_make_fcontext(void *sp, size_t size,
void (*fn)(intptr_t));
__asm(
".text\n"
".globl bthread_make_fcontext\n"
".type bthread_make_fcontext,@function\n"
".align 16\n"
"bthread_make_fcontext:\n"
" movq %rdi, %rax\n"
" andq $-16, %rax\n"
" leaq -0x48(%rax), %rax\n"
" movq %rdx, 0x38(%rax)\n"
" stmxcsr (%rax)\n"
" fnstcw 0x4(%rax)\n"
" leaq finish(%rip), %rcx\n"
" movq %rcx, 0x40(%rax)\n"
" ret \n"
"finish:\n"
" xorq %rdi, %rdi\n"
" call _exit@PLT\n"
" hlt\n"
".size bthread_make_fcontext,.-bthread_make_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n");
extern "C" intptr_t bthread_jump_fcontext(bthread_fcontext_t *ofc,
bthread_fcontext_t nfc, intptr_t vp);
__asm(
".text\n"
".globl bthread_jump_fcontext\n"
".type bthread_jump_fcontext,@function\n"
".align 16\n"
"bthread_jump_fcontext:\n"
" pushq %rbp \n"
" pushq %rbx \n"
" pushq %r15 \n"
" pushq %r14 \n"
" pushq %r13 \n"
" pushq %r12 \n"
" leaq -0x8(%rsp), %rsp\n"
" movq %rsp, (%rdi)\n"
" movq %rsi, %rsp\n"
" leaq 0x8(%rsp), %rsp\n"
" popq %r12 \n"
" popq %r13 \n"
" popq %r14 \n"
" popq %r15 \n"
" popq %rbx \n"
" popq %rbp \n"
" popq %r8\n"
" movq %rdx, %rax\n"
" movq %rdx, %rdi\n"
" jmp *%r8\n"
".size bthread_jump_fcontext,.-bthread_jump_fcontext\n"
".section .note.GNU-stack,\"\",%progbits\n"
".previous\n");
bthread_fcontext_t fcm;
bthread_fcontext_t fc;
typedef std::pair<int, int> pair_t;
static void f(intptr_t param) {
pair_t *p = (pair_t *)param;
printf("In Routine: fcm %p fc %p\n", fcm, fc);
p = (pair_t *)bthread_jump_fcontext(&fc, fcm,
(intptr_t)(p->first + p->second));
printf("In Routine Again: fcm %p fc %p\n", fcm, fc);
bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first + p->second));
}
int main() {
fcm = NULL;
std::size_t size(8192);
void *sp = malloc(size);
pair_t p(std::make_pair(2, 7));
fc = bthread_make_fcontext((char *)sp + size, size, f);
printf("Start Routine: fcm %p fc %p\n", fcm, fc);
int res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
printf("Back to Main: %d + %d = %d\n", p.first, p.second, res);
p = std::make_pair(5, 6);
printf("Resume Routine: fcm %p fc %p\n", fcm, fc);
res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
printf("Back to Main Again: %d + %d = %d\n", p.first, p.second, res);
return 0;
}
bthread 中使用 TaskMeta
结构封装协程的元信息:
// task_meta.h
struct TaskMeta {
// [Not Reset]
butil::atomic<ButexWaiter*> current_waiter;
uint64_t current_sleep;
// A builtin flag to mark if the thread is stopping.
bool stop;
// The thread is interrupted and should wake up from some blocking ops.
bool interrupted;
// Scheduling of the thread can be delayed.
bool about_to_quit;
// [Not Reset] guarantee visibility of version_butex.
pthread_spinlock_t version_lock;
// [Not Reset] only modified by one bthread at any time, no need to be atomic
uint32_t* version_butex;
// The identifier. It does not have to be here, however many code is
// simplified if they can get tid from TaskMeta.
bthread_t tid;
// User function and argument
void* (*fn)(void*);
void* arg;
// Stack of this task.
ContextualStack* stack;
// Attributes creating this task
bthread_attr_t attr;
// Statistics
int64_t cpuwide_start_ns;
TaskStatistics stat;
// bthread local storage, sync with tls_bls (defined in task_group.cpp)
// when the bthread is created or destroyed.
// DO NOT use this field directly, use tls_bls instead.
LocalStorage local_storage;
public:
// Only initialize [Not Reset] fields, other fields will be reset in
// bthread_start* functions
TaskMeta()
: current_waiter(NULL)
, current_sleep(0)
, stack(NULL) {
pthread_spin_init(&version_lock, 0);
version_butex = butex_create_checked<uint32_t>();
*version_butex = 1;
}
~TaskMeta() {
butex_destroy(version_butex);
version_butex = NULL;
pthread_spin_destroy(&version_lock);
}
void set_stack(ContextualStack* s) {
stack = s;
}
ContextualStack* release_stack() {
ContextualStack* tmp = stack;
stack = NULL;
return tmp;
}
StackType stack_type() const {
return static_cast<StackType>(attr.stack_type);
}
};
这里先只关注协程栈 stack
。协程首次调度前会分配对应的调用栈:
// task_group_inl.h
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
// stack_inl.h
inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
switch (type) {
case STACK_TYPE_PTHREAD:
return NULL;
case STACK_TYPE_SMALL:
return StackFactory<SmallStackClass>::get_stack(entry);
case STACK_TYPE_NORMAL: // 默认使用 Normal
return StackFactory<NormalStackClass>::get_stack(entry);
case STACK_TYPE_LARGE:
return StackFactory<LargeStackClass>::get_stack(entry);
case STACK_TYPE_MAIN:
return StackFactory<MainStackClass>::get_stack(entry);
}
return NULL;
}
template <typename StackClass> struct StackFactory {
struct Wrapper : public ContextualStack {
explicit Wrapper(void (*entry)(intptr_t)) {
// 分配栈内存,默认带内存保护,注意这里的 StackClass::stack_size_flag
if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
FLAGS_guard_page_size) != 0) {
storage.zeroize();
context = NULL;
return;
}
// 初始化 context
context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
stacktype = (StackType)StackClass::stacktype;
}
~Wrapper() {
if (context) {
context = NULL;
// 析构时释放
deallocate_stack_storage(&storage);
storage.zeroize();
}
}
};
static ContextualStack* get_stack(void (*entry)(intptr_t)) {
return butil::get_object<Wrapper>(entry);
}
static void return_stack(ContextualStack* sc) {
// 对象池回收时不会执行析构,申请的栈空间可以复用
butil::return_object(static_cast<Wrapper*>(sc));
}
};
// stack.h
struct StackStorage {
int stacksize;
int guardsize;
void* bottom;
unsigned valgrind_stack_id;
// Clears all members.
void zeroize() {
stacksize = 0;
guardsize = 0;
bottom = NULL;
valgrind_stack_id = 0;
}
};
// Allocate a piece of stack.
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
// Deallocate a piece of stack. Parameters MUST be returned or set by the
// corresponding allocate_stack_storage() otherwise behavior is undefined.
void deallocate_stack_storage(StackStorage* s);
enum StackType {
STACK_TYPE_MAIN = 0,
STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};
struct ContextualStack {
bthread_fcontext_t context;
StackType stacktype;
StackStorage storage;
};
// stack.cpp
int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
const static int PAGESIZE = getpagesize();
const int PAGESIZE_M1 = PAGESIZE - 1;
const int MIN_STACKSIZE = PAGESIZE * 2;
const int MIN_GUARDSIZE = PAGESIZE;
// Align stacksize
const int stacksize =
(std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
~PAGESIZE_M1; // 栈对齐,PAGESIZE 一般是 4K
if (guardsize_in <= 0) {
...
} else {
// Align guardsize
const int guardsize =
(std::max(guardsize_in, MIN_GUARDSIZE) + PAGESIZE_M1) &
~PAGESIZE_M1;
const int memsize = stacksize + guardsize;
void* const mem = mmap(NULL, memsize, (PROT_READ | PROT_WRITE),
(MAP_PRIVATE | MAP_ANONYMOUS), -1, 0);
if (MAP_FAILED == mem) {
PLOG_EVERY_SECOND(ERROR)
<< "Fail to mmap size=" << memsize << " stack_count="
<< s_stack_count.load(butil::memory_order_relaxed)
<< ", possibly limited by /proc/sys/vm/max_map_count";
// may fail due to limit of max_map_count (65536 in default)
return -1;
}
void* aligned_mem = (void*)(((intptr_t)mem + PAGESIZE_M1) & ~PAGESIZE_M1);
if (aligned_mem != mem) {
LOG_ONCE(ERROR) << "addr=" << mem << " returned by mmap is not "
"aligned by pagesize=" << PAGESIZE;
}
const int offset = (char*)aligned_mem - (char*)mem;
// 使用 mprotect,当栈溢出时抛出异常
if (guardsize <= offset ||
mprotect(aligned_mem, guardsize - offset, PROT_NONE) != 0) {
munmap(mem, memsize);
PLOG_EVERY_SECOND(ERROR)
<< "Fail to mprotect " << (void*)aligned_mem << " length="
<< guardsize - offset;
return -1;
}
s_stack_count.fetch_add(1, butil::memory_order_relaxed);
s->bottom = (char*)mem + memsize;
s->stacksize = stacksize;
s->guardsize = guardsize;
if (RunningOnValgrind()) {
s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
s->bottom, (char*)s->bottom - stacksize);
} else {
s->valgrind_stack_id = 0;
}
return 0;
}
}
int* SmallStackClass::stack_size_flag = &FLAGS_stack_size_small;
int* NormalStackClass::stack_size_flag = &FLAGS_stack_size_normal;
int* LargeStackClass::stack_size_flag = &FLAGS_stack_size_large;
核心思路是当本线程内没有待执行的任务时,从其他线程的任务队列中窃取任务执行。首先来看 work stealing 时使用的无锁队列 src/bthread/work_stealing_queue.h:
template <typename T>
class WorkStealingQueue {
public:
WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}
int init(size_t capacity) {
if (_capacity != 0) {
LOG(ERROR) << "Already initialized";
return -1;
}
if (capacity == 0) {
LOG(ERROR) << "Invalid capacity=" << capacity;
return -1;
}
if (capacity & (capacity - 1)) {
LOG(ERROR) << "Invalid capacity=" << capacity
<< " which must be power of 2";
return -1;
}
_buffer = new (std::nothrow) T[capacity];
if (NULL == _buffer) {
return -1;
}
_capacity = capacity;
return 0;
}
// 从底部追加,非线程安全,与 steal 线程安全
bool push(const T& x) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
const size_t t = _top.load(butil::memory_order_acquire);
if (b >= t + _capacity) { // Full queue.
return false;
}
_buffer[b & (_capacity - 1)] = x;
_bottom.store(b + 1, butil::memory_order_release);
return true;
}
// 从底部弹出,非线程安全,与 steal 线程安全
bool pop(T* val) {
const size_t b = _bottom.load(butil::memory_order_relaxed);
size_t t = _top.load(butil::memory_order_relaxed);
if (t >= b) {
// fast check since we call pop() in each sched.
// Stale _top which is smaller should not enter this branch.
return false;
}
const size_t newb = b - 1;
_bottom.store(newb, butil::memory_order_relaxed);
butil::atomic_thread_fence(butil::memory_order_seq_cst);
t = _top.load(butil::memory_order_relaxed);
if (t > newb) {
_bottom.store(b, butil::memory_order_relaxed);
return false;
}
*val = _buffer[newb & (_capacity - 1)];
if (t != newb) {
return true;
}
// Single last element, compete with steal()
// 对于最后一个元素,使用 CAS 保证和 steal 并发时的线程安全
const bool popped = _top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
_bottom.store(b, butil::memory_order_relaxed);
return popped;
}
// 从顶部窃取,线程安全
bool steal(T* val) {
size_t t = _top.load(butil::memory_order_acquire);
size_t b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
// Permit false negative for performance considerations.
return false;
}
do {
butil::atomic_thread_fence(butil::memory_order_seq_cst);
b = _bottom.load(butil::memory_order_acquire);
if (t >= b) {
return false;
}
*val = _buffer[t & (_capacity - 1)];
// CAS 保证线程安全
} while (!_top.compare_exchange_strong(
t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed));
return true;
}
private:
// Copying a concurrent structure makes no sense.
DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);
butil::atomic<size_t> _bottom;
size_t _capacity;
T* _buffer;
butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top; // 分开到两个 CacheLine
};
push
和 pop
仅在底部操作,非线程安全。steal
仅在顶部窃取,通过 CAS 保证线程安全。
接着来看 bthread 启动的流程:
// test/bthread_unittest.cpp
TEST_F(BthreadTest, sanity) {
LOG(INFO) << "main thread " << pthread_self();
bthread_t th1;
ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, misc, (void*)1));
LOG(INFO) << "back to main thread " << th1 << " " << pthread_self();
ASSERT_EQ(0, bthread_join(th1, NULL));
}
// bthread.cpp
int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
}
// 首次执行,需要初始化
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
BUTIL_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
// 获取 TaskControl 全局单例
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
// Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
// 1. NOSIGNAL is often for creating many bthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. bthread_flush() needs to know which TaskGroup to flush.
TaskGroup* g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
// 加入队列
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}
inline TaskControl* get_or_new_task_control() {
butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(butil::memory_order_consume);
if (c != NULL) {
return c;
}
BAIDU_SCOPED_LOCK(g_task_control_mutex); // 全局锁
c = p->load(butil::memory_order_consume);
if (c != NULL) {
return c;
}
c = new (std::nothrow) TaskControl;
if (NULL == c) {
return NULL;
}
int concurrency = FLAGS_bthread_min_concurrency > 0 ?
FLAGS_bthread_min_concurrency :
FLAGS_bthread_concurrency;
// 初始化,concurrency 为工作线程数
if (c->init(concurrency) != 0) {
LOG(ERROR) << "Fail to init g_task_control";
delete c;
return NULL;
}
p->store(c, butil::memory_order_release);
return c;
}
// task_control.cpp
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
return -1;
}
if (concurrency <= 0) {
LOG(ERROR) << "Invalid concurrency=" << concurrency;
return -1;
}
_concurrency = concurrency;
// Make sure TimerThread is ready.
if (get_or_create_global_timer_thread() == NULL) {
LOG(ERROR) << "Fail to get global_timer_thread";
return -1;
}
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
// 启动工作线程
const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
if (rc) {
LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}
_worker_usage_second.expose("bthread_worker_usage");
_switch_per_second.expose("bthread_switch_second");
_signal_per_second.expose("bthread_signal_second");
_status.expose("bthread_group_status");
// Wait for at least one group is added so that choose_one_group()
// never returns NULL.
// TODO: Handle the case that worker quits before add_group
while (_ngroup == 0) {
usleep(100); // TODO: Elaborate
}
return 0;
}
bthread 后台会开启多个 worker_thread
线程执行 bthread 任务:
// task_control.cpp
void* TaskControl::worker_thread(void* arg) {
run_worker_startfn();
TaskControl* c = static_cast<TaskControl*>(arg);
TaskGroup* g = c->create_group(); // 每个线程有一个对应的 TaskGroup
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
}
BT_VLOG << "Created worker=" << pthread_self()
<< " bthread=" << g->main_tid();
tls_task_group = g; // 使用 TLS 存储线程对应的 TaskGroup
c->_nworkers << 1;
g->run_main_task(); // 任务主循环
stat = g->main_stat();
BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
<< g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
<< "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
return NULL;
}
// task_group.cpp
void TaskGroup::run_main_task() {
bvar::PassiveStatus<double> cumulated_cputime(
get_cumulated_cputime_from_this, this);
std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
TaskGroup* dummy = this;
bthread_t tid;
while (wait_task(&tid)) { // 获取任务
TaskGroup::sched_to(&dummy, tid); // 调度执行
DCHECK_EQ(this, dummy);
DCHECK_EQ(_cur_meta->stack, _main_stack);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1/*skip remained*/);
}
}
}
bool TaskGroup::wait_task(bthread_t* tid) {
do {
if (_last_pl_state.stopped()) {
return false;
}
_pl->wait(_last_pl_state);
if (steal_task(tid)) { // 窃取任务
return true;
}
} while (true);
}
bool steal_task(bthread_t* tid) {
// 本地队列中有任务,优先本地
if (_remote_rq.pop(tid)) {
return true;
}
// 否则通过 TaskControl 窃取全局的任务
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
// task_control.cpp
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) { // 无锁窃取
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) { // 有锁窃取
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
// task_group_inl.h
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
TaskMeta* next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
// In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
// This basically means that if we can't allocate stack, run
// the task in pthread directly.
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
// Update now_ns only when wait_task did yield.
sched_to(pg, next_meta); // 执行
}
// task_group.cpp
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
// Save errno so that errno is bthread-specific.
const int saved_errno = errno;
void* saved_unique_user_ptr = tls_unique_user_ptr;
TaskMeta* const cur_meta = g->_cur_meta;
const int64_t now = butil::cpuwide_time_ns();
const int64_t elp_ns = now - g->_last_run_ns;
g->_last_run_ns = now;
cur_meta->stat.cputime_ns += elp_ns;
if (cur_meta->tid != g->main_tid()) {
g->_cumulated_cputime_ns += elp_ns;
}
++cur_meta->stat.nswitch;
++ g->_nswitch;
// Switch to the task
if (__builtin_expect(next_meta != cur_meta, 1)) {
g->_cur_meta = next_meta;
// Switch tls_bls
cur_meta->local_storage = tls_bls;
tls_bls = next_meta->local_storage;
// Logging must be done after switching the local storage, since the logging lib
// use bthread local storage internally, or will cause memory leak.
if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
(next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
<< next_meta->tid;
}
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
jump_stack(cur_meta->stack, next_meta->stack); // 协程切换
// probably went to another group, need to assign g again.
g = tls_task_group;
}
}
// else because of ending_sched(including pthread_task->pthread_task)
} else {
LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
}
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
// Restore errno
errno = saved_errno;
tls_unique_user_ptr = saved_unique_user_ptr;
*pg = g;
}
// stack_inl.h
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}
从外部线程通过 TaskControl 新增 bthread 的流程:
// task_group.cpp
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
if (__builtin_expect(!fn, 0)) {
return EINVAL;
}
const int64_t start_ns = butil::cpuwide_time_ns();
const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
butil::ResourceId<TaskMeta> slot;
TaskMeta* m = butil::get_resource(&slot);
if (__builtin_expect(!m, 0)) {
return ENOMEM;
}
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->stop = false;
m->interrupted = false;
m->about_to_quit = false;
m->fn = fn;
m->arg = arg;
CHECK(m->stack == NULL);
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
}
_control->_nbthreads << 1;
if (REMOTE) {
// 外部线程
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
}
return 0;
}
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
// 加锁后加入队列
_remote_rq._mutex.lock();
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock();
_control->signal_task(1 + additional_signal); // 唤醒工作线程执行任务
}
}
Linux 中提供了快速的用户态锁 futex,bRPC 中进行了简单的封装:
// sys_futex.h
inline int futex_wait_private(void *addr1, int expected,
const timespec *timeout) {
// 当 *addr1 == expected 时,线程挂起等待
return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG), expected,
timeout, NULL, 0);
}
inline int futex_wake_private(void *addr1, int nwake) {
// 唤醒 nwake 个等待的线程
return syscall(SYS_futex, addr1, (FUTEX_WAKE | FUTEX_PRIVATE_FLAG), nwake,
NULL, NULL, 0);
}
对于 MacOS,bthread 也提供了用户态的实现,可以自行查看 src/bthread/sys_futex.cpp 中对应的实现。使用 futex 可以构造更快速的互斥锁:
// mutex.h
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
void lock();
void unlock();
bool try_lock();
private:
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
int lock_contended();
unsigned _futex;
};
// mutex.cpp
// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
butil::static_atomic<unsigned char> contended;
unsigned short padding;
};
const MutexInternal MUTEX_CONTENDED_RAW = {{1}, {1}, 0};
const MutexInternal MUTEX_LOCKED_RAW = {{1}, {0}, 0};
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED \
(*(const unsigned *)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned *)&bthread::MUTEX_LOCKED_RAW)
int FastPthreadMutex::lock_contended() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
// 将状态原子修改为加锁 + 等待
// 如果这期间锁被释放了,那么该函数直接退出
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
// 否则调用 futex 尝试挂起当前线程,并且要求状态为加锁 + 等待(因为中间锁可能会被释放)
if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK) {
return errno;
}
// futex_wait_private 非 EWOULDBLOCK 失败的情况下,继续循环重试
}
return 0;
}
void FastPthreadMutex::lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
// 加锁,如果 locked 原先的值为 0,lock 直接返回成功
if (split->locked.exchange(1, butil::memory_order_acquire)) {
// 如果已经是加锁状态,那么进入锁竞争的处理
(void)lock_contended();
}
}
bool FastPthreadMutex::try_lock() {
bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
return !split->locked.exchange(1, butil::memory_order_acquire);
}
void FastPthreadMutex::unlock() {
butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
const unsigned prev = whole->exchange(0, butil::memory_order_release);
// CAUTION: the mutex may be destroyed, check comments before butex_create
if (prev != BTHREAD_MUTEX_LOCKED) {
// 如果有挂起等待的线程,执行唤醒
futex_wake_private(whole, 1);
}
}
写了一个简单的测试样例,这里的 FastPthreadMutex
可以比 std::mutex
快一倍。
对于 bthread,如果直接使用互斥锁,会导致整个工作线程阻塞,进而影响同线程下的其他 bthread。这里更好的解决方案是仅挂起 bthread,主动挂起出让 CPU 让其他 bthread 继续执行。bthread 中使用 butex 提供类 futex 的同步原语,下面来看具体实现:
// butex.h
// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
// NOTE: all butexes are private(not inter-process).
void *butex_create();
// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void *butex);
// Atomically wait on |butex| if *butex equals |expected_value|, until the
// butex is woken up by butex_wake*, or CLOCK_REALTIME reached |abstime| if
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void *butex, int expected_value, const timespec *abstime);
// butex.cpp
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
// tids of pthreads are 0
bthread_t tid;
// Erasing node from middle of LinkedList is thread-unsafe, we need
// to hold its container's lock.
butil::atomic<Butex *> container;
};
struct ButexBthreadWaiter : public ButexWaiter {
TaskMeta *task_meta;
TimerThread::TaskId sleep_id;
WaiterState waiter_state;
int expected_value;
Butex *initial_butex;
TaskControl *control;
};
// 双向链表
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
struct BAIDU_CACHELINE_ALIGNMENT Butex {
Butex() {}
~Butex() {}
butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
};
// 注意 value 的地址与 butex 对象一致,方便后面转换
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
void *butex_create() {
Butex *b = butil::get_object<Butex>();
if (b) {
return &b->value;
}
return NULL;
}
int butex_wait(void *arg, int expected_value, const timespec *abstime) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
// Sometimes we may take actions immediately after unmatched butex,
// this fence makes sure that we see changes before changing butex.
butil::atomic_thread_fence(butil::memory_order_acquire);
return -1;
}
TaskGroup *g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
}
// 在栈上创建 waiter 对象
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
bbw.tid = g->current_tid();
bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task();
bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value;
bbw.initial_butex = b;
bbw.control = g->control();
if (abstime != NULL) {
... // 先忽略对时间的处理
}
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw); // 在执行下个 bthread 前执行 wait_for_butex
TaskGroup::sched(&g); // 主动 Yield
// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
// running and using bbw. The chance is small, just spin until it's done.
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
30 /*nops before sched_yield*/);
// If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
// Spin until current_waiter != NULL.
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL,
30 /*nops before sched_yield*/);
bool is_interrupted = false;
if (bbw.task_meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
bbw.task_meta->interrupted = false;
is_interrupted = true;
}
// If timed out as well as value unmatched, return ETIMEDOUT.
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT;
return -1;
} else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK;
return -1;
} else if (is_interrupted) {
errno = EINTR;
return -1;
}
return 0;
}
static void wait_for_butex(void *arg) {
ButexBthreadWaiter *const bw = static_cast<ButexBthreadWaiter *>(arg);
Butex *const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
// and removed by TimerThread, in which case we should stop queueing.
//
// Visibility of waiter_state:
// [bthread] [TimerThread]
// waiter_state = TIMED
// tt_lock { add task }
// tt_lock { get task }
// waiter_lock { waiter_state=TIMEDOUT }
// waiter_lock { use waiter_state }
// tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
// sequenced by two locks, both threads are guaranteed to see the correct
// value.
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
// 加锁后,校验 value 和 expected_value 是否相等
if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw); // 加入 waiters 队列
bw->container.store(b, butil::memory_order_relaxed);
return;
}
}
// b->container is NULL which makes erase_from_butex_and_wakeup() and
// TaskGroup::interrupt() no-op, there's no race between following code and
// the two functions. The on-stack ButexBthreadWaiter is safe to use and
// bw->waiter_state will not change again.
// 如果没有成功 wait,则清理定时器,并切回原协程
unsleep_if_necessary(bw, get_global_timer_thread());
tls_task_group->ready_to_run(bw->tid);
// FIXME: jump back to original thread is buggy.
// // Value unmatched or waiter is already woken up by TimerThread, jump
// // back to original bthread.
// TaskGroup* g = tls_task_group;
// ReadyToRunArgs args = { g->current_tid(), false };
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
// // 2: Don't run remained because we're already in a remained function
// // otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}
int butex_wake(void *arg) {
Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
ButexWaiter *front = NULL;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->waiters.empty()) {
return 0;
}
front = b->waiters.head()->value();
front->RemoveFromList(); // 从队列中取出等待的 waiter
front->container.store(NULL, butil::memory_order_relaxed);
}
if (front->tid == 0) {
wakeup_pthread(static_cast<ButexPthreadWaiter *>(front));
return 1;
}
ButexBthreadWaiter *bbw = static_cast<ButexBthreadWaiter *>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup *g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid); // 如果在 bthread 环境中,直接唤醒
} else {
// 否认触发重新调度
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
}
return 1;
}
没错,Tokio 源码分析被“鸽”置了。去年下半年组里项目非常忙,导致周末也不想学习。好在项目赶在年前上线了,年初的答辩也挺顺利,今年就有更多时间自我提升了。开 bRPC 的新坑是因为该项目里有不少先进经验可以应用到自己的工作上,Tokio 会排在 bRPC 之后补上。