bRPC 源码分析「一、协程设计」

2021.03.02
SF-Zhou

1. Context Switching

bthread 中使用 libcontext 实现协程间的切换,原理类似汇编魔法实现 C++ 协程中的方法。看一个单元测试中的例子(在线执行):

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <iostream>

typedef void *bthread_fcontext_t;

extern "C" bthread_fcontext_t bthread_make_fcontext(void *sp, size_t size,
                                                    void (*fn)(intptr_t));
__asm(
    ".text\n"
    ".globl bthread_make_fcontext\n"
    ".type bthread_make_fcontext,@function\n"
    ".align 16\n"
    "bthread_make_fcontext:\n"
    "    movq  %rdi, %rax\n"
    "    andq  $-16, %rax\n"
    "    leaq  -0x48(%rax), %rax\n"
    "    movq  %rdx, 0x38(%rax)\n"
    "    stmxcsr  (%rax)\n"
    "    fnstcw   0x4(%rax)\n"
    "    leaq  finish(%rip), %rcx\n"
    "    movq  %rcx, 0x40(%rax)\n"
    "    ret \n"
    "finish:\n"
    "    xorq  %rdi, %rdi\n"
    "    call  _exit@PLT\n"
    "    hlt\n"
    ".size bthread_make_fcontext,.-bthread_make_fcontext\n"
    ".section .note.GNU-stack,\"\",%progbits\n"
    ".previous\n");

extern "C" intptr_t bthread_jump_fcontext(bthread_fcontext_t *ofc,
                                          bthread_fcontext_t nfc, intptr_t vp);
__asm(
    ".text\n"
    ".globl bthread_jump_fcontext\n"
    ".type bthread_jump_fcontext,@function\n"
    ".align 16\n"
    "bthread_jump_fcontext:\n"
    "    pushq  %rbp  \n"
    "    pushq  %rbx  \n"
    "    pushq  %r15  \n"
    "    pushq  %r14  \n"
    "    pushq  %r13  \n"
    "    pushq  %r12  \n"
    "    leaq  -0x8(%rsp), %rsp\n"
    "    movq  %rsp, (%rdi)\n"
    "    movq  %rsi, %rsp\n"
    "    leaq  0x8(%rsp), %rsp\n"
    "    popq  %r12  \n"
    "    popq  %r13  \n"
    "    popq  %r14  \n"
    "    popq  %r15  \n"
    "    popq  %rbx  \n"
    "    popq  %rbp  \n"
    "    popq  %r8\n"
    "    movq  %rdx, %rax\n"
    "    movq  %rdx, %rdi\n"
    "    jmp  *%r8\n"
    ".size bthread_jump_fcontext,.-bthread_jump_fcontext\n"
    ".section .note.GNU-stack,\"\",%progbits\n"
    ".previous\n");

bthread_fcontext_t fcm;
bthread_fcontext_t fc;

typedef std::pair<int, int> pair_t;
static void f(intptr_t param) {
  pair_t *p = (pair_t *)param;
  printf("In Routine: fcm %p fc %p\n", fcm, fc);

  p = (pair_t *)bthread_jump_fcontext(&fc, fcm,
                                      (intptr_t)(p->first + p->second));

  printf("In Routine Again: fcm %p fc %p\n", fcm, fc);
  bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first + p->second));
}

int main() {
  fcm = NULL;
  std::size_t size(8192);
  void *sp = malloc(size);

  pair_t p(std::make_pair(2, 7));
  fc = bthread_make_fcontext((char *)sp + size, size, f);

  printf("Start Routine: fcm %p fc %p\n", fcm, fc);
  int res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
  printf("Back to Main: %d + %d = %d\n", p.first, p.second, res);

  p = std::make_pair(5, 6);
  printf("Resume Routine: fcm %p fc %p\n", fcm, fc);
  res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
  printf("Back to Main Again: %d + %d = %d\n", p.first, p.second, res);

  return 0;
}

bthread 中使用 TaskMeta 结构封装协程的元信息:

// task_meta.h
struct TaskMeta {
  // [Not Reset]
  butil::atomic<ButexWaiter*> current_waiter;
  uint64_t current_sleep;

  // A builtin flag to mark if the thread is stopping.
  bool stop;

  // The thread is interrupted and should wake up from some blocking ops.
  bool interrupted;

  // Scheduling of the thread can be delayed.
  bool about_to_quit;

  // [Not Reset] guarantee visibility of version_butex.
  pthread_spinlock_t version_lock;

  // [Not Reset] only modified by one bthread at any time, no need to be atomic
  uint32_t* version_butex;

  // The identifier. It does not have to be here, however many code is
  // simplified if they can get tid from TaskMeta.
  bthread_t tid;

  // User function and argument
  void* (*fn)(void*);
  void* arg;

  // Stack of this task.
  ContextualStack* stack;

  // Attributes creating this task
  bthread_attr_t attr;

  // Statistics
  int64_t cpuwide_start_ns;
  TaskStatistics stat;

  // bthread local storage, sync with tls_bls (defined in task_group.cpp)
  // when the bthread is created or destroyed.
  // DO NOT use this field directly, use tls_bls instead.
  LocalStorage local_storage;

 public:
  // Only initialize [Not Reset] fields, other fields will be reset in
  // bthread_start* functions
  TaskMeta()
    : current_waiter(NULL)
      , current_sleep(0)
      , stack(NULL) {
      pthread_spin_init(&version_lock, 0);
      version_butex = butex_create_checked<uint32_t>();
      *version_butex = 1;
    }

  ~TaskMeta() {
    butex_destroy(version_butex);
    version_butex = NULL;
    pthread_spin_destroy(&version_lock);
  }

  void set_stack(ContextualStack* s) {
    stack = s;
  }

  ContextualStack* release_stack() {
    ContextualStack* tmp = stack;
    stack = NULL;
    return tmp;
  }

  StackType stack_type() const {
    return static_cast<StackType>(attr.stack_type);
  }
};

这里先只关注协程栈 stack。协程首次调度前会分配对应的调用栈:

// task_group_inl.h
ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);


// stack_inl.h
inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
  switch (type) {
    case STACK_TYPE_PTHREAD:
      return NULL;
    case STACK_TYPE_SMALL:
      return StackFactory<SmallStackClass>::get_stack(entry);
    case STACK_TYPE_NORMAL:  // 默认使用 Normal
      return StackFactory<NormalStackClass>::get_stack(entry);
    case STACK_TYPE_LARGE:
      return StackFactory<LargeStackClass>::get_stack(entry);
    case STACK_TYPE_MAIN:
      return StackFactory<MainStackClass>::get_stack(entry);
  }
  return NULL;
}

template <typename StackClass> struct StackFactory {
  struct Wrapper : public ContextualStack {
    explicit Wrapper(void (*entry)(intptr_t)) {
      // 分配栈内存,默认带内存保护,注意这里的 StackClass::stack_size_flag
      if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
                                 FLAGS_guard_page_size) != 0) {
        storage.zeroize();
        context = NULL;
        return;
      }
      // 初始化 context
      context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
      stacktype = (StackType)StackClass::stacktype;
    }
    ~Wrapper() {
      if (context) {
        context = NULL;
        // 析构时释放
        deallocate_stack_storage(&storage);
        storage.zeroize();
      }
    }
  };

  static ContextualStack* get_stack(void (*entry)(intptr_t)) {
    return butil::get_object<Wrapper>(entry);
  }

  static void return_stack(ContextualStack* sc) {
    // 对象池回收时不会执行析构,申请的栈空间可以复用
    butil::return_object(static_cast<Wrapper*>(sc));
  }
};


// stack.h
struct StackStorage {
  int stacksize;
  int guardsize;
  void* bottom;
  unsigned valgrind_stack_id;

  // Clears all members.
  void zeroize() {
    stacksize = 0;
    guardsize = 0;
    bottom = NULL;
    valgrind_stack_id = 0;
  }
};

// Allocate a piece of stack.
int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);
// Deallocate a piece of stack. Parameters MUST be returned or set by the
// corresponding allocate_stack_storage() otherwise behavior is undefined.
void deallocate_stack_storage(StackStorage* s);

enum StackType {
  STACK_TYPE_MAIN = 0,
  STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
  STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
  STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
  STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
};

struct ContextualStack {
  bthread_fcontext_t context;
  StackType stacktype;
  StackStorage storage;
};


// stack.cpp
int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
  const static int PAGESIZE = getpagesize();
  const int PAGESIZE_M1 = PAGESIZE - 1;
  const int MIN_STACKSIZE = PAGESIZE * 2;
  const int MIN_GUARDSIZE = PAGESIZE;

  // Align stacksize
  const int stacksize =
    (std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
    ~PAGESIZE_M1;  // 栈对齐,PAGESIZE 一般是 4K

  if (guardsize_in <= 0) {
    ...
  } else {
    // Align guardsize
    const int guardsize =
      (std::max(guardsize_in, MIN_GUARDSIZE) + PAGESIZE_M1) &
      ~PAGESIZE_M1;

    const int memsize = stacksize + guardsize;
    void* const mem = mmap(NULL, memsize, (PROT_READ | PROT_WRITE),
                           (MAP_PRIVATE | MAP_ANONYMOUS), -1, 0);

    if (MAP_FAILED == mem) {
      PLOG_EVERY_SECOND(ERROR) 
        << "Fail to mmap size=" << memsize << " stack_count="
        << s_stack_count.load(butil::memory_order_relaxed)
        << ", possibly limited by /proc/sys/vm/max_map_count";
      // may fail due to limit of max_map_count (65536 in default)
      return -1;
    }

    void* aligned_mem = (void*)(((intptr_t)mem + PAGESIZE_M1) & ~PAGESIZE_M1);
    if (aligned_mem != mem) {
      LOG_ONCE(ERROR) << "addr=" << mem << " returned by mmap is not "
        "aligned by pagesize=" << PAGESIZE;
    }
    const int offset = (char*)aligned_mem - (char*)mem;
    // 使用 mprotect,当栈溢出时抛出异常
    if (guardsize <= offset ||
        mprotect(aligned_mem, guardsize - offset, PROT_NONE) != 0) {
      munmap(mem, memsize);
      PLOG_EVERY_SECOND(ERROR) 
        << "Fail to mprotect " << (void*)aligned_mem << " length="
        << guardsize - offset; 
      return -1;
    }

    s_stack_count.fetch_add(1, butil::memory_order_relaxed);
    s->bottom = (char*)mem + memsize;
    s->stacksize = stacksize;
    s->guardsize = guardsize;
    if (RunningOnValgrind()) {
      s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
        s->bottom, (char*)s->bottom - stacksize);
    } else {
      s->valgrind_stack_id = 0;
    }
    return 0;
  }
}

int* SmallStackClass::stack_size_flag = &FLAGS_stack_size_small;
int* NormalStackClass::stack_size_flag = &FLAGS_stack_size_normal;
int* LargeStackClass::stack_size_flag = &FLAGS_stack_size_large;

2. Work Stealing

核心思路是当本线程内没有待执行的任务时,从其他线程的任务队列中窃取任务执行。首先来看 work stealing 时使用的无锁队列 src/bthread/work_stealing_queue.h

template <typename T>
class WorkStealingQueue {
 public:
  WorkStealingQueue() : _bottom(1), _capacity(0), _buffer(NULL), _top(1) {}

  int init(size_t capacity) {
    if (_capacity != 0) {
      LOG(ERROR) << "Already initialized";
      return -1;
    }
    if (capacity == 0) {
      LOG(ERROR) << "Invalid capacity=" << capacity;
      return -1;
    }
    if (capacity & (capacity - 1)) {
      LOG(ERROR) << "Invalid capacity=" << capacity
                 << " which must be power of 2";
      return -1;
    }
    _buffer = new (std::nothrow) T[capacity];
    if (NULL == _buffer) {
      return -1;
    }
    _capacity = capacity;
    return 0;
  }

  // 从底部追加,非线程安全,与 steal 线程安全
  bool push(const T& x) {
    const size_t b = _bottom.load(butil::memory_order_relaxed);
    const size_t t = _top.load(butil::memory_order_acquire);
    if (b >= t + _capacity) {  // Full queue.
      return false;
    }
    _buffer[b & (_capacity - 1)] = x;
    _bottom.store(b + 1, butil::memory_order_release);
    return true;
  }

  // 从底部弹出,非线程安全,与 steal 线程安全
  bool pop(T* val) {
    const size_t b = _bottom.load(butil::memory_order_relaxed);
    size_t t = _top.load(butil::memory_order_relaxed);
    if (t >= b) {
      // fast check since we call pop() in each sched.
      // Stale _top which is smaller should not enter this branch.
      return false;
    }
    const size_t newb = b - 1;
    _bottom.store(newb, butil::memory_order_relaxed);
    butil::atomic_thread_fence(butil::memory_order_seq_cst);
    t = _top.load(butil::memory_order_relaxed);
    if (t > newb) {
      _bottom.store(b, butil::memory_order_relaxed);
      return false;
    }
    *val = _buffer[newb & (_capacity - 1)];
    if (t != newb) {
      return true;
    }
    // Single last element, compete with steal()
    // 对于最后一个元素,使用 CAS 保证和 steal 并发时的线程安全
    const bool popped = _top.compare_exchange_strong(
        t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed);
    _bottom.store(b, butil::memory_order_relaxed);
    return popped;
  }

  // 从顶部窃取,线程安全
  bool steal(T* val) {
    size_t t = _top.load(butil::memory_order_acquire);
    size_t b = _bottom.load(butil::memory_order_acquire);
    if (t >= b) {
      // Permit false negative for performance considerations.
      return false;
    }
    do {
      butil::atomic_thread_fence(butil::memory_order_seq_cst);
      b = _bottom.load(butil::memory_order_acquire);
      if (t >= b) {
        return false;
      }
      *val = _buffer[t & (_capacity - 1)];
      // CAS 保证线程安全
    } while (!_top.compare_exchange_strong(
        t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed));
    return true;
  }

 private:
  // Copying a concurrent structure makes no sense.
  DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue);

  butil::atomic<size_t> _bottom;
  size_t _capacity;
  T* _buffer;
  butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top;  // 分开到两个 CacheLine
};

pushpop 仅在底部操作,非线程安全。steal 仅在顶部窃取,通过 CAS 保证线程安全。

接着来看 bthread 启动的流程:

// test/bthread_unittest.cpp
TEST_F(BthreadTest, sanity) {
  LOG(INFO) << "main thread " << pthread_self();
  bthread_t th1;
  ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, misc, (void*)1));
  LOG(INFO) << "back to main thread " << th1 << " " << pthread_self();
  ASSERT_EQ(0, bthread_join(th1, NULL));
}


// bthread.cpp
int bthread_start_urgent(bthread_t* __restrict tid,
                         const bthread_attr_t* __restrict attr,
                         void * (*fn)(void*),
                         void* __restrict arg) {
  bthread::TaskGroup* g = bthread::tls_task_group;
  if (g) {
    // start from worker
    return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
  }
  // 首次执行,需要初始化
  return bthread::start_from_non_worker(tid, attr, fn, arg);
}

BUTIL_FORCE_INLINE int
  start_from_non_worker(bthread_t* __restrict tid,
                        const bthread_attr_t* __restrict attr,
                        void * (*fn)(void*),
                        void* __restrict arg) {
  // 获取 TaskControl 全局单例
  TaskControl* c = get_or_new_task_control();
  if (NULL == c) {
    return ENOMEM;
  }
  if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
    // Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
    // 1. NOSIGNAL is often for creating many bthreads in batch,
    //    inserting into the same TaskGroup maximizes the batch.
    // 2. bthread_flush() needs to know which TaskGroup to flush.
    TaskGroup* g = tls_task_group_nosignal;
    if (NULL == g) {
      g = c->choose_one_group();
      tls_task_group_nosignal = g;
    }
    return g->start_background<true>(tid, attr, fn, arg);
  }
  // 加入队列
  return c->choose_one_group()->start_background<true>(
    tid, attr, fn, arg);
}

inline TaskControl* get_or_new_task_control() {
  butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control;
  TaskControl* c = p->load(butil::memory_order_consume);
  if (c != NULL) {
    return c;
  }
  BAIDU_SCOPED_LOCK(g_task_control_mutex);  // 全局锁
  c = p->load(butil::memory_order_consume);
  if (c != NULL) {
    return c;
  }
  c = new (std::nothrow) TaskControl;
  if (NULL == c) {
    return NULL;
  }
  int concurrency = FLAGS_bthread_min_concurrency > 0 ?
    FLAGS_bthread_min_concurrency :
  FLAGS_bthread_concurrency;
  // 初始化,concurrency 为工作线程数
  if (c->init(concurrency) != 0) {
    LOG(ERROR) << "Fail to init g_task_control";
    delete c;
    return NULL;
  }
  p->store(c, butil::memory_order_release);
  return c;
}


// task_control.cpp
int TaskControl::init(int concurrency) {
  if (_concurrency != 0) {
    LOG(ERROR) << "Already initialized";
    return -1;
  }
  if (concurrency <= 0) {
    LOG(ERROR) << "Invalid concurrency=" << concurrency;
    return -1;
  }
  _concurrency = concurrency;

  // Make sure TimerThread is ready.
  if (get_or_create_global_timer_thread() == NULL) {
    LOG(ERROR) << "Fail to get global_timer_thread";
    return -1;
  }

  _workers.resize(_concurrency);   
  for (int i = 0; i < _concurrency; ++i) {
    // 启动工作线程
    const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
    if (rc) {
      LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
      return -1;
    }
  }
  _worker_usage_second.expose("bthread_worker_usage");
  _switch_per_second.expose("bthread_switch_second");
  _signal_per_second.expose("bthread_signal_second");
  _status.expose("bthread_group_status");

  // Wait for at least one group is added so that choose_one_group()
  // never returns NULL.
  // TODO: Handle the case that worker quits before add_group
  while (_ngroup == 0) {
    usleep(100);  // TODO: Elaborate
  }
  return 0;
}

bthread 后台会开启多个 worker_thread 线程执行 bthread 任务:

// task_control.cpp
void* TaskControl::worker_thread(void* arg) {
  run_worker_startfn();

  TaskControl* c = static_cast<TaskControl*>(arg);
  TaskGroup* g = c->create_group();  // 每个线程有一个对应的 TaskGroup
  TaskStatistics stat;
  if (NULL == g) {
    LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
    return NULL;
  }
  BT_VLOG << "Created worker=" << pthread_self()
    << " bthread=" << g->main_tid();

  tls_task_group = g;  // 使用 TLS 存储线程对应的 TaskGroup
  c->_nworkers << 1;
  g->run_main_task();  // 任务主循环

  stat = g->main_stat();
  BT_VLOG << "Destroying worker=" << pthread_self() << " bthread="
    << g->main_tid() << " idle=" << stat.cputime_ns / 1000000.0
    << "ms uptime=" << g->current_uptime_ns() / 1000000.0 << "ms";
  tls_task_group = NULL;
  g->destroy_self();
  c->_nworkers << -1;
  return NULL;
}


// task_group.cpp
void TaskGroup::run_main_task() {
  bvar::PassiveStatus<double> cumulated_cputime(
    get_cumulated_cputime_from_this, this);
  std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;

  TaskGroup* dummy = this;
  bthread_t tid;
  while (wait_task(&tid)) {  // 获取任务
    TaskGroup::sched_to(&dummy, tid);  // 调度执行
    DCHECK_EQ(this, dummy);
    DCHECK_EQ(_cur_meta->stack, _main_stack);
    if (_cur_meta->tid != _main_tid) {
      TaskGroup::task_runner(1/*skip remained*/);
    }
  }
}

bool TaskGroup::wait_task(bthread_t* tid) {
  do {
    if (_last_pl_state.stopped()) {
      return false;
    }
    _pl->wait(_last_pl_state);
    if (steal_task(tid)) {  // 窃取任务
      return true;
    }
  } while (true);
}

bool steal_task(bthread_t* tid) {
  // 本地队列中有任务,优先本地
  if (_remote_rq.pop(tid)) {
    return true;
  }
  // 否则通过 TaskControl 窃取全局的任务
  return _control->steal_task(tid, &_steal_seed, _steal_offset);
}


// task_control.cpp
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
  // 1: Acquiring fence is paired with releasing fence in _add_group to
  // avoid accessing uninitialized slot of _groups.
  const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
  if (0 == ngroup) {
    return false;
  }

  // NOTE: Don't return inside `for' iteration since we need to update |seed|
  bool stolen = false;
  size_t s = *seed;
  for (size_t i = 0; i < ngroup; ++i, s += offset) {
    TaskGroup* g = _groups[s % ngroup];
    // g is possibly NULL because of concurrent _destroy_group
    if (g) {
      if (g->_rq.steal(tid)) {  // 无锁窃取
        stolen = true;
        break;
      }
      if (g->_remote_rq.pop(tid)) {  // 有锁窃取
        stolen = true;
        break;
      }
    }
  }
  *seed = s;
  return stolen;
}


// task_group_inl.h
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
  TaskMeta* next_meta = address_meta(next_tid);
  if (next_meta->stack == NULL) {
    ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
    if (stk) {
      next_meta->set_stack(stk);
    } else {
      // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
      // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
      // This basically means that if we can't allocate stack, run
      // the task in pthread directly.
      next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
      next_meta->set_stack((*pg)->_main_stack);
    }
  }
  // Update now_ns only when wait_task did yield.
  sched_to(pg, next_meta);  // 执行
}


// task_group.cpp
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
  TaskGroup* g = *pg;
  // Save errno so that errno is bthread-specific.
  const int saved_errno = errno;
  void* saved_unique_user_ptr = tls_unique_user_ptr;

  TaskMeta* const cur_meta = g->_cur_meta;
  const int64_t now = butil::cpuwide_time_ns();
  const int64_t elp_ns = now - g->_last_run_ns;
  g->_last_run_ns = now;
  cur_meta->stat.cputime_ns += elp_ns;
  if (cur_meta->tid != g->main_tid()) {
    g->_cumulated_cputime_ns += elp_ns;
  }
  ++cur_meta->stat.nswitch;
  ++ g->_nswitch;
  // Switch to the task
  if (__builtin_expect(next_meta != cur_meta, 1)) {
    g->_cur_meta = next_meta;
    // Switch tls_bls
    cur_meta->local_storage = tls_bls;
    tls_bls = next_meta->local_storage;

    // Logging must be done after switching the local storage, since the logging lib
    // use bthread local storage internally, or will cause memory leak.
    if ((cur_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH) ||
        (next_meta->attr.flags & BTHREAD_LOG_CONTEXT_SWITCH)) {
      LOG(INFO) << "Switch bthread: " << cur_meta->tid << " -> "
        << next_meta->tid;
    }

    if (cur_meta->stack != NULL) {
      if (next_meta->stack != cur_meta->stack) {
        jump_stack(cur_meta->stack, next_meta->stack);  // 协程切换
        // probably went to another group, need to assign g again.
        g = tls_task_group;
      }
    }
    // else because of ending_sched(including pthread_task->pthread_task)
  } else {
    LOG(FATAL) << "bthread=" << g->current_tid() << " sched_to itself!";
  }

  while (g->_last_context_remained) {
    RemainedFn fn = g->_last_context_remained;
    g->_last_context_remained = NULL;
    fn(g->_last_context_remained_arg);
    g = tls_task_group;
  }

  // Restore errno
  errno = saved_errno;
  tls_unique_user_ptr = saved_unique_user_ptr;

  *pg = g;
}

// stack_inl.h
inline void jump_stack(ContextualStack* from, ContextualStack* to) {
  bthread_jump_fcontext(&from->context, to->context, 0/*not skip remained*/);
}

从外部线程通过 TaskControl 新增 bthread 的流程:

// task_group.cpp
template <bool REMOTE>
int TaskGroup::start_background(bthread_t* __restrict th,
                                const bthread_attr_t* __restrict attr,
                                void * (*fn)(void*),
                                void* __restrict arg) {
    if (__builtin_expect(!fn, 0)) {
        return EINVAL;
    }
    const int64_t start_ns = butil::cpuwide_time_ns();
    const bthread_attr_t using_attr = (attr ? *attr : BTHREAD_ATTR_NORMAL);
    butil::ResourceId<TaskMeta> slot;
    TaskMeta* m = butil::get_resource(&slot);
    if (__builtin_expect(!m, 0)) {
        return ENOMEM;
    }
    CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
    m->stop = false;
    m->interrupted = false;
    m->about_to_quit = false;
    m->fn = fn;
    m->arg = arg;
    CHECK(m->stack == NULL);
    m->attr = using_attr;
    m->local_storage = LOCAL_STORAGE_INIT;
    m->cpuwide_start_ns = start_ns;
    m->stat = EMPTY_STAT;
    m->tid = make_tid(*m->version_butex, slot);
    *th = m->tid;
    if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
        LOG(INFO) << "Started bthread " << m->tid;
    }
    _control->_nbthreads << 1;
    if (REMOTE) {
        // 外部线程
        ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    } else {
        ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
    }
    return 0;
}

void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
    // 加锁后加入队列
    _remote_rq._mutex.lock();
    while (!_remote_rq.push_locked(tid)) {
        flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
        LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                                << _remote_rq.capacity();
        ::usleep(1000);
        _remote_rq._mutex.lock();
    }
    if (nosignal) {
        ++_remote_num_nosignal;
        _remote_rq._mutex.unlock();
    } else {
        const int additional_signal = _remote_num_nosignal;
        _remote_num_nosignal = 0;
        _remote_nsignaled += 1 + additional_signal;
        _remote_rq._mutex.unlock();
        _control->signal_task(1 + additional_signal);  // 唤醒工作线程执行任务
    }
}

3. Futex & Butex

Linux 中提供了快速的用户态锁 futex,bRPC 中进行了简单的封装:

// sys_futex.h
inline int futex_wait_private(void *addr1, int expected,
                              const timespec *timeout) {
  // 当 *addr1 == expected 时,线程挂起等待
  return syscall(SYS_futex, addr1, (FUTEX_WAIT | FUTEX_PRIVATE_FLAG), expected,
                 timeout, NULL, 0);
}

inline int futex_wake_private(void *addr1, int nwake) {
  // 唤醒 nwake 个等待的线程
  return syscall(SYS_futex, addr1, (FUTEX_WAKE | FUTEX_PRIVATE_FLAG), nwake,
                 NULL, NULL, 0);
}

对于 MacOS,bthread 也提供了用户态的实现,可以自行查看 src/bthread/sys_futex.cpp 中对应的实现。使用 futex 可以构造更快速的互斥锁:

// mutex.h
class FastPthreadMutex {
public:
  FastPthreadMutex() : _futex(0) {}
  ~FastPthreadMutex() {}
  void lock();
  void unlock();
  bool try_lock();

private:
  DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
  int lock_contended();
  unsigned _futex;
};


// mutex.cpp
// Implement bthread_mutex_t related functions
struct MutexInternal {
  butil::static_atomic<unsigned char> locked;
  butil::static_atomic<unsigned char> contended;
  unsigned short padding;
};

const MutexInternal MUTEX_CONTENDED_RAW = {{1}, {1}, 0};
const MutexInternal MUTEX_LOCKED_RAW = {{1}, {0}, 0};
// Define as macros rather than constants which can't be put in read-only
// section and affected by initialization-order fiasco.
#define BTHREAD_MUTEX_CONTENDED                                                \
  (*(const unsigned *)&bthread::MUTEX_CONTENDED_RAW)
#define BTHREAD_MUTEX_LOCKED (*(const unsigned *)&bthread::MUTEX_LOCKED_RAW)

int FastPthreadMutex::lock_contended() {
  butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
  // 将状态原子修改为加锁 + 等待
  // 如果这期间锁被释放了,那么该函数直接退出
  while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
    // 否则调用 futex 尝试挂起当前线程,并且要求状态为加锁 + 等待(因为中间锁可能会被释放)
    if (futex_wait_private(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
        errno != EWOULDBLOCK) {
      return errno;
    }
    // futex_wait_private 非 EWOULDBLOCK 失败的情况下,继续循环重试
  }
  return 0;
}

void FastPthreadMutex::lock() {
  bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
  // 加锁,如果 locked 原先的值为 0,lock 直接返回成功
  if (split->locked.exchange(1, butil::memory_order_acquire)) {
    // 如果已经是加锁状态,那么进入锁竞争的处理
    (void)lock_contended();
  }
}

bool FastPthreadMutex::try_lock() {
  bthread::MutexInternal *split = (bthread::MutexInternal *)&_futex;
  return !split->locked.exchange(1, butil::memory_order_acquire);
}

void FastPthreadMutex::unlock() {
  butil::atomic<unsigned> *whole = (butil::atomic<unsigned> *)&_futex;
  const unsigned prev = whole->exchange(0, butil::memory_order_release);
  // CAUTION: the mutex may be destroyed, check comments before butex_create
  if (prev != BTHREAD_MUTEX_LOCKED) {
    // 如果有挂起等待的线程,执行唤醒
    futex_wake_private(whole, 1);
  }
}

写了一个简单的测试样例,这里的 FastPthreadMutex 可以比 std::mutex 快一倍。

对于 bthread,如果直接使用互斥锁,会导致整个工作线程阻塞,进而影响同线程下的其他 bthread。这里更好的解决方案是仅挂起 bthread,主动挂起出让 CPU 让其他 bthread 继续执行。bthread 中使用 butex 提供类 futex 的同步原语,下面来看具体实现:

// butex.h
// Create a butex which is a futex-like 32-bit primitive for synchronizing
// bthreads/pthreads.
// Returns a pointer to 32-bit data, NULL on failure.
// NOTE: all butexes are private(not inter-process).
void *butex_create();

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void *butex);

// Atomically wait on |butex| if *butex equals |expected_value|, until the
// butex is woken up by butex_wake*, or CLOCK_REALTIME reached |abstime| if
// abstime is not NULL.
// About |abstime|:
//   Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void *butex, int expected_value, const timespec *abstime);


// butex.cpp
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
  // tids of pthreads are 0
  bthread_t tid;

  // Erasing node from middle of LinkedList is thread-unsafe, we need
  // to hold its container's lock.
  butil::atomic<Butex *> container;
};

struct ButexBthreadWaiter : public ButexWaiter {
  TaskMeta *task_meta;
  TimerThread::TaskId sleep_id;
  WaiterState waiter_state;
  int expected_value;
  Butex *initial_butex;
  TaskControl *control;
};

// 双向链表
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;

struct BAIDU_CACHELINE_ALIGNMENT Butex {
  Butex() {}
  ~Butex() {}

  butil::atomic<int> value;
  ButexWaiterList waiters;
  internal::FastPthreadMutex waiter_lock;
};

// 注意 value 的地址与 butex 对象一致,方便后面转换
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);

void *butex_create() {
  Butex *b = butil::get_object<Butex>();
  if (b) {
    return &b->value;
  }
  return NULL;
}

int butex_wait(void *arg, int expected_value, const timespec *abstime) {
  Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
  if (b->value.load(butil::memory_order_relaxed) != expected_value) {
    errno = EWOULDBLOCK;
    // Sometimes we may take actions immediately after unmatched butex,
    // this fence makes sure that we see changes before changing butex.
    butil::atomic_thread_fence(butil::memory_order_acquire);
    return -1;
  }
  TaskGroup *g = tls_task_group;
  if (NULL == g || g->is_current_pthread_task()) {
    return butex_wait_from_pthread(g, b, expected_value, abstime);
  }
  // 在栈上创建 waiter 对象
  ButexBthreadWaiter bbw;
  // tid is 0 iff the thread is non-bthread
  bbw.tid = g->current_tid();
  bbw.container.store(NULL, butil::memory_order_relaxed);
  bbw.task_meta = g->current_task();
  bbw.sleep_id = 0;
  bbw.waiter_state = WAITER_STATE_READY;
  bbw.expected_value = expected_value;
  bbw.initial_butex = b;
  bbw.control = g->control();

  if (abstime != NULL) {
    ...  // 先忽略对时间的处理
  }

  // release fence matches with acquire fence in interrupt_and_consume_waiters
  // in task_group.cpp to guarantee visibility of `interrupted'.
  bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
  g->set_remained(wait_for_butex, &bbw);  // 在执行下个 bthread 前执行 wait_for_butex
  TaskGroup::sched(&g);  // 主动 Yield

  // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
  // running and using bbw. The chance is small, just spin until it's done.
  BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
               30 /*nops before sched_yield*/);

  // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
  // Spin until current_waiter != NULL.
  BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
                   NULL, butil::memory_order_acquire) == NULL,
               30 /*nops before sched_yield*/);

  bool is_interrupted = false;
  if (bbw.task_meta->interrupted) {
    // Race with set and may consume multiple interruptions, which are OK.
    bbw.task_meta->interrupted = false;
    is_interrupted = true;
  }
  // If timed out as well as value unmatched, return ETIMEDOUT.
  if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
    errno = ETIMEDOUT;
    return -1;
  } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
    errno = EWOULDBLOCK;
    return -1;
  } else if (is_interrupted) {
    errno = EINTR;
    return -1;
  }
  return 0;
}

static void wait_for_butex(void *arg) {
  ButexBthreadWaiter *const bw = static_cast<ButexBthreadWaiter *>(arg);
  Butex *const b = bw->initial_butex;
  // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
  //    before they're queued, otherwise the waiter is already timedout
  //    and removed by TimerThread, in which case we should stop queueing.
  //
  // Visibility of waiter_state:
  //    [bthread]                         [TimerThread]
  //    waiter_state = TIMED
  //    tt_lock { add task }
  //                                      tt_lock { get task }
  //                                      waiter_lock { waiter_state=TIMEDOUT }
  //    waiter_lock { use waiter_state }
  // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
  // sequenced by two locks, both threads are guaranteed to see the correct
  // value.
  {
    BAIDU_SCOPED_LOCK(b->waiter_lock);
    // 加锁后,校验 value 和 expected_value 是否相等
    if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
      bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
    } else if (bw->waiter_state == WAITER_STATE_READY /*1*/ &&
               !bw->task_meta->interrupted) {
      b->waiters.Append(bw);  // 加入 waiters 队列
      bw->container.store(b, butil::memory_order_relaxed);
      return;
    }
  }

  // b->container is NULL which makes erase_from_butex_and_wakeup() and
  // TaskGroup::interrupt() no-op, there's no race between following code and
  // the two functions. The on-stack ButexBthreadWaiter is safe to use and
  // bw->waiter_state will not change again.
  // 如果没有成功 wait,则清理定时器,并切回原协程
  unsleep_if_necessary(bw, get_global_timer_thread());
  tls_task_group->ready_to_run(bw->tid);
  // FIXME: jump back to original thread is buggy.

  // // Value unmatched or waiter is already woken up by TimerThread, jump
  // // back to original bthread.
  // TaskGroup* g = tls_task_group;
  // ReadyToRunArgs args = { g->current_tid(), false };
  // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
  // // 2: Don't run remained because we're already in a remained function
  // //    otherwise stack may overflow.
  // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

int butex_wake(void *arg) {
  Butex *b = container_of(static_cast<butil::atomic<int> *>(arg), Butex, value);
  ButexWaiter *front = NULL;
  {
    BAIDU_SCOPED_LOCK(b->waiter_lock);
    if (b->waiters.empty()) {
      return 0;
    }
    front = b->waiters.head()->value();
    front->RemoveFromList();  // 从队列中取出等待的 waiter
    front->container.store(NULL, butil::memory_order_relaxed);
  }
  if (front->tid == 0) {
    wakeup_pthread(static_cast<ButexPthreadWaiter *>(front));
    return 1;
  }
  ButexBthreadWaiter *bbw = static_cast<ButexBthreadWaiter *>(front);
  unsleep_if_necessary(bbw, get_global_timer_thread());
  TaskGroup *g = tls_task_group;
  if (g) {
    TaskGroup::exchange(&g, bbw->tid);  // 如果在 bthread 环境中,直接唤醒
  } else {
    // 否认触发重新调度
    bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
  }
  return 1;
}

PS:一些题外话

没错,Tokio 源码分析被“鸽”置了。去年下半年组里项目非常忙,导致周末也不想学习。好在项目赶在年前上线了,年初的答辩也挺顺利,今年就有更多时间自我提升了。开 bRPC 的新坑是因为该项目里有不少先进经验可以应用到自己的工作上,Tokio 会排在 bRPC 之后补上。