bRPC 源码分析「四、协程调度」

2021.03.10
SF-Zhou

1. Parking Lot

bthread 内实现了 ParkingLot 用以管理工作线程的挂起和唤醒:

// parking_lot.h
// Park idle workers.
class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
public:
  class State {
  public:
    State() : val(0) {}
    bool stopped() const { return val & 1; }

  private:
    friend class ParkingLot;
    State(int val) : val(val) {}
    int val;
  };

  ParkingLot() : _pending_signal(0) {}

  // Wake up at most `num_task' workers.
  // Returns #workers woken up.
  // 唤醒至多 num_task 个等待的线程
  // 注意 _pending_signal 增加了 num_task * 2,因为最后一位用来表示是否 stop
  int signal(int num_task) {
    _pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
    return futex_wake_private(&_pending_signal, num_task);
  }

  // Get a state for later wait().
  State get_state() {
    return _pending_signal.load(butil::memory_order_acquire);
  }

  // Wait for tasks.
  // If the `expected_state' does not match, wait() may finish directly.
  // 工作线程尝试挂起等待,此时会检查 _pending_signal 的数值,若不一致则挂起,继续执行
  void wait(const State &expected_state) {
    futex_wait_private(&_pending_signal, expected_state.val, NULL);
  }

  // Wakeup suspended wait() and make them unwaitable ever.
  void stop() {
    _pending_signal.fetch_or(1);
    // 将 _pending_signal 设为 stop,唤醒所有等待的工作线程,触发结束
    futex_wake_private(&_pending_signal, 10000);
  }

private:
  // higher 31 bits for signalling, LSB for stopping.
  butil::atomic<int> _pending_signal;
};

TaskControl 中包含一组 ParkingLot 对象,TaskGroup 初始化时会哈希打散使用这些对象:

// task_control.h
class TaskControl {
  ...
  static const int PARKING_LOT_NUM = 4;
  ParkingLot _pl[PARKING_LOT_NUM];
};

// task_group.cpp
TaskGroup::TaskGroup(TaskControl *c)
    : _cur_meta(NULL), _control(c), _num_nosignal(0), _nsignaled(0),
      _last_run_ns(butil::cpuwide_time_ns()), _cumulated_cputime_ns(0),
      _nswitch(0), _last_context_remained(NULL),
      _last_context_remained_arg(NULL), _pl(NULL), _main_stack(NULL),
      _main_tid(0), _remote_num_nosignal(0), _remote_nsignaled(0) {
  _steal_seed = butil::fast_rand();
  _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
  // 根据线程号哈希到某个 ParkingLot 对象上
  _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) %
                TaskControl::PARKING_LOT_NUM];
  CHECK(c);
}

当前工作线程在窃取任务前会先取一次 ParkingLot 的状态,当状态发生改变时会直接跳过 wait

bool TaskGroup::wait_task(bthread_t *tid) {
  do {
    if (_last_pl_state.stopped()) {
      // 如果已经 stop
      return false;
    }
    // 根据当前的 _last_pl_state 状态判断是否挂起
    _pl->wait(_last_pl_state);
    if (steal_task(tid)) {
      return true;
    }
  } while (true);
}

bool TaskGroup::steal_task(bthread_t *tid) {
  if (_remote_rq.pop(tid)) {
    return true;
  }
  // _last_pl_state 的状态在这里进行更新
  // _last_pl_state 发生变化时 wait_task 中的 wait 会直接跳过
  // 这里容忍 false postive,增加的开销是多一次尝试 steal_task
  _last_pl_state = _pl->get_state();
  return _control->steal_task(tid, &_steal_seed, _steal_offset);
}

当有新任务加入时,则调用 TaskControl::signal_task 通过 ParkingLog 唤醒等待的工作线程:

void TaskControl::signal_task(int num_task) {
  if (num_task <= 0) {
    return;
  }
  // TODO(gejun): Current algorithm does not guarantee enough threads will
  // be created to match caller's requests. But in another side, there's also
  // many useless signalings according to current impl. Capping the concurrency
  // is a good balance between performance and timeliness of scheduling.
  // 上方的官方注释也写明了,限制并发可以更好地平衡性能和调度及时性,这里直接将唤醒的上限设为 2
  if (num_task > 2) {
    num_task = 2;
  }
  int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
  num_task -= _pl[start_index].signal(1);  // 通过 ParkingLog 唤醒
  if (num_task > 0) {
    for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
      if (++start_index >= PARKING_LOT_NUM) {
        start_index = 0;
      }
      
      num_task -= _pl[start_index].signal(1);
    }
  }
  if (num_task > 0 &&
      FLAGS_bthread_min_concurrency >
          0 && // test min_concurrency for performance
      _concurrency.load(butil::memory_order_relaxed) <
          FLAGS_bthread_concurrency) {
    // TODO: Reduce this lock
    // 当仍然有需要执行的任务时,尝试增加工作线程
    BAIDU_SCOPED_LOCK(g_task_control_mutex);
    if (_concurrency.load(butil::memory_order_acquire) <
        FLAGS_bthread_concurrency) {
      add_workers(1);
    }
  }
}

void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
  push_rq(tid);
  if (nosignal) {
    ++_num_nosignal;  // 对于设定了不触发信号的任务,仅增加计数
  } else {
    const int additional_signal = _num_nosignal;
    _num_nosignal = 0;
    _nsignaled += 1 + additional_signal;
    // 调用 signal_task 唤醒
    _control->signal_task(1 + additional_signal);
  }
}

// 将还没有触发信号的任务统一触发信号(工作线程内调用)
void TaskGroup::flush_nosignal_tasks() {
  const int val = _num_nosignal;
  if (val) {
    _num_nosignal = 0;
    _nsignaled += val;
    _control->signal_task(val);
  }
}

// 从工作线程外提交任务
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
  _remote_rq._mutex.lock();  // 加锁
  while (!_remote_rq.push_locked(tid)) {
    flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
    LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
                            << _remote_rq.capacity();
    ::usleep(1000);
    _remote_rq._mutex.lock();
  }
  if (nosignal) {
    ++_remote_num_nosignal;
    _remote_rq._mutex.unlock();
  } else {
    const int additional_signal = _remote_num_nosignal;
    _remote_num_nosignal = 0;
    _remote_nsignaled += 1 + additional_signal;
    _remote_rq._mutex.unlock();  // 锁的生命周期内也保护了计数相关变量
    _control->signal_task(1 + additional_signal);
  }
}

void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
  const int val = _remote_num_nosignal;
  if (!val) {
    locked_mutex.unlock();
    return;
  }
  _remote_num_nosignal = 0;
  _remote_nsignaled += val;
  locked_mutex.unlock();
  _control->signal_task(val);
}

2. Task Runner

bthread 工作线程在执行过程中会有以下几种状态:

  1. 执行主协程任务,负责获取任务或者挂起等待,此时:
    1. _cur_meta->tid == _main_tid
    2. _cur_meta->stack == _main_stack
  2. 执行 pthread 任务,直接在主协程中调用 TaskGroup::task_runner(1) 执行,此时:
    1. _cur_meta->tid != _main_tid
    2. _cur_meta->stack == _main_stack
  3. 执行 bthread 任务,通过在 TaskGroup::sched_to 中调用 jump_stack 切换到协程栈,此时:
    1. _cur_meta->tid != _main_tid
    2. _cur_meta->stack != _main_stack

上述三种状态可以相互切换。假设当前任务队列中包含一个 bthread 任务和一个 pthread 任务,那么执行的流程为:

void TaskGroup::run_main_task() {
  TaskGroup* dummy = this;
  task_t tid;
  while (wait_task(&tid)) {
    // 1. 获取到 bthread 任务,调用 TaskGroup::sched_to 触发执行
    TaskGroup::sched_to(&dummy, tid);
    if (_cur_meta->tid != _main_tid) {
      TaskGroup::task_runner(1);
    }
  }
}

void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
  TaskGroup* g = *pg;
  const int saved_errno = errno;

  TaskMeta* const cur_meta = g->_cur_meta;
  if (PAXOS_LIKELY(next_meta != cur_meta)) {
    // 2. cur_meta 此时为主线程 meta,next_meta 为 bthread 任务
    // 将 g->_cur_meta 设定为即将执行的 bthread 任务
    g->_cur_meta = next_meta;

    if (cur_meta->stack != nullptr) {
      if (next_meta->stack != cur_meta->stack) {
        // 3. 从主协程的 main_stack 切换到 bthread 任务的栈
        jump_stack(cur_meta->stack, next_meta->stack);
        // 4. 主协程在此处挂起,进入 bthread 任务入口 task_runner
  ...
}
      
void TaskGroup::task_runner(intptr_t skip_remained) {
  TaskGroup *g = tls_task_group;

  if (!skip_remained) {
    // 5. 用户函数执行前,先执行可能存在的 remained 函数
    while (g->_last_context_remained) {
      RemainedFn fn = g->_last_context_remained;
      g->_last_context_remained = NULL;
      fn(g->_last_context_remained_arg);
      g = tls_task_group;
    }
  }

  do {
    // Meta and identifier of the task is persistent in this run.
    // 6. 通过 TLS 获取当前线程待执行的任务 TaskMeta
    TaskMeta *const m = g->_cur_meta;

    void *thread_return;
    try {
      // 7. 执行用户执行的 bthread 任务,假设中途协程调用 yield 主动挂起
      thread_return = m->fn(m->arg);
    }
  ...
}

void TaskGroup::yield(TaskGroup **pg) {
  // 8. yield 时将当前的 bthread 任务打包为 remained 函数
  TaskGroup *g = *pg;
  ReadyToRunArgs args = {g->current_tid(), false};
  g->set_remained(ready_to_run_in_worker, &args);
  sched(pg);
}

void TaskGroup::sched(TaskGroup **pg) {
  TaskGroup *g = *pg;
  bthread_t next_tid = 0;
  const bool popped = g->_rq.pop(&next_tid);
  if (!popped && !g->steal_task(&next_tid)) {
    next_tid = g->_main_tid;
  }
  // 9. 从任务队列中成功拿到下一个 pthread 任务,触发调度
  sched_to(pg, next_tid);
}

inline void TaskGroup::sched_to(TaskGroup **pg, bthread_t next_tid) {
  TaskMeta *next_meta = address_meta(next_tid);
  if (next_meta->stack == NULL) {
    ContextualStack *stk = get_stack(next_meta->stack_type(), task_runner);
    if (stk) {
      next_meta->set_stack(stk);
    } else {
      // 10. pthread 获取到的栈为空指针,这里会赋值为 _main_stack
      next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
      next_meta->set_stack((*pg)->_main_stack);
    }
  }
  sched_to(pg, next_meta);
}
  
void TaskGroup::sched_to(TaskGroup **pg, TaskMeta *next_meta) {
  TaskGroup *g = *pg;
  const int saved_errno = errno;

  TaskMeta *const cur_meta = g->_cur_meta;
  if (__builtin_expect(next_meta != cur_meta, 1)) {
    // 11. 将 g->_cur_meta 设定为下一个 pthread 任务
    g->_cur_meta = next_meta;

    if (cur_meta->stack != NULL) {
      if (next_meta->stack != cur_meta->stack) {
        // 12. 从 bthread 任务的栈切换到 _main_stack
        jump_stack(cur_meta->stack, next_meta->stack);
        // 13. bthread 从此处挂起,恢复主协程的运行状态,也就是这里第 4 步的位置
  ...
}

void TaskGroup::sched_to(TaskGroup **pg, TaskMeta *next_meta) {
        ...
        jump_stack(cur_meta->stack, next_meta->stack);
        // 14. 恢复到主协程此处的位置,继续执行,重新获取一次 tls_task_group
        g = tls_task_group;
      }
    }
  }

  // 15. 执行可能存在的 remained 函数,也就是第 8 步中打包的 bthread 恢复任务
  // 会执行 ready_to_run_in_worker(bthread_tid) 将之前挂起的 bthread 重新加入任务队列
  while (g->_last_context_remained) {
    RemainedFn fn = g->_last_context_remained;
    g->_last_context_remained = NULL;
    fn(g->_last_context_remained_arg);
    g = tls_task_group;
  }

  errno = saved_errno;
  *pg = g;
}

void TaskGroup::run_main_task() {
  TaskGroup* dummy = this;
  task_t tid;
  while (wait_task(&tid)) {
    TaskGroup::sched_to(&dummy, tid);
    if (_cur_meta->tid != _main_tid) {
      // 16. 主协程继续走到这里,直接在 main_stack 调用 task_runner 执行 pthread 任务
      TaskGroup::task_runner(1);
    }
  }
}

void TaskGroup::task_runner(intptr_t skip_remained) {
  ...
    void *thread_return;
    try {
      // 17. 执行 pthread 任务
      thread_return = m->fn(m->arg);
    } catch (ExitException &e) {
      thread_return = e.value();
    }
  ...
    // 18. 任务执行完成后,进行结束时期的调度
    ending_sched(&g);
}

void TaskGroup::ending_sched(TaskGroup **pg) {
  TaskGroup *g = *pg;
  bthread_t next_tid = 0;
  // 19. 从任务队列中取出任务,假设取出了之前挂起的 bthread 任务
  const bool popped = g->_rq.pop(&next_tid);
  ...

  // 20. 执行调度,切换到 bthread 挂起的地方
  sched_to(pg, next_meta);
}

3. Timer Thread

bRPC 中有频繁的定时器的需求,主要是:

  1. 在发起 RPC 过程中增加一个定时器,时间为 RPC 的超时时间
  2. 若 RPC 没有超时正常回复(大部分情况),则需要删除对应的定时器

bRPC 的官方文档中详细介绍了这样的定时器设计的难点,以及 TimerThread 是如何解决这些难点的。这里直接看对应的代码实现:

struct TimerThreadOptions {
  // Scheduling requests are hashed into different bucket to improve
  // scalability. However bigger num_buckets may NOT result in more scalable
  // schedule() because bigger values also make each buckets more sparse
  // and more likely to lock the global mutex. You better not change
  // this value, just leave it to us.
  // Default: 13
  size_t num_buckets;

  // If this field is not empty, some bvar for reporting stats of TimerThread
  // will be exposed with this prefix.
  // Default: ""
  std::string bvar_prefix;

  // Constructed with default options.
  TimerThreadOptions();
};

// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
public:
  struct Task;
  class Bucket;

  typedef uint64_t TaskId;
  const static TaskId INVALID_TASK_ID;

  TimerThread();
  ~TimerThread();

  // Start the timer thread.
  // This method should only be called once.
  // return 0 if success, errno otherwise.
  // 启动 TimerThread 线程
  int start(const TimerThreadOptions *options);

  // Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
  // 结束 TimerThread 线程
  void stop_and_join();

  // Schedule |fn(arg)| to run at realtime |abstime| approximately.
  // Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
  // 增加一个定时器任务
  TaskId schedule(void (*fn)(void *), void *arg, const timespec &abstime);

  // Prevent the task denoted by `task_id' from running. `task_id' must be
  // returned by schedule() ever.
  // Returns:
  //   0   -  Removed the task which does not run yet
  //  -1   -  The task does not exist.
  //   1   -  The task is just running.
  // 取消一个定时器任务
  int unschedule(TaskId task_id);

  // Get identifier of internal pthread.
  // Returns (pthread_t)0 if start() is not called yet.
  pthread_t thread_id() const { return _thread; }

private:
  // the timer thread will run this method.
  void run();  // TimerThread 线程函数
  static void *run_this(void *arg);

  bool _started; // whether the timer thread was started successfully.
  butil::atomic<bool> _stop;

  TimerThreadOptions _options;
  Bucket *_buckets;                  // list of tasks to be run
  internal::FastPthreadMutex _mutex; // protect _nearest_run_time
  int64_t _nearest_run_time;
  // the futex for wake up timer thread. can't use _nearest_run_time because
  // it's 64-bit.
  int _nsignals;
  pthread_t _thread; // all scheduled task will be run on this thread
};

void TimerThread::run() {
  int64_t last_sleep_time = butil::gettimeofday_us();

  // min heap of tasks (ordered by run_time)
  // TimerThread 线程内通过最小堆维护定时任务序列
  std::vector<Task *> tasks;
  tasks.reserve(4096);

  while (!_stop.load(butil::memory_order_relaxed)) {
    {
      BAIDU_SCOPED_LOCK(_mutex);
      _nearest_run_time = std::numeric_limits<int64_t>::max();
    }

    // 从所有的 Bucket 的获取任务
    for (size_t i = 0; i < _options.num_buckets; ++i) {
      Bucket &bucket = _buckets[i];
      for (Task *p = bucket.consume_tasks(); p != nullptr; ++nscheduled) {
        Task *next_task = p->next;

        // 对于已经取消的任务,不会加入到堆里,直接删除。这也是高性能的关键
        if (!p->try_delete()) {
          // 循环加入到堆里
          tasks.push_back(p);
          std::push_heap(tasks.begin(), tasks.end(), task_greater);
        }
        p = next_task;
      }
    }

    bool pull_again = false;
    while (!tasks.empty()) {
      // 从最小堆中取出任务
      Task *task1 = tasks[0];
      if (task1->try_delete()) { // already unscheduled
        std::pop_heap(tasks.begin(), tasks.end(), task_greater);
        tasks.pop_back();
        continue;
      }
      // 判断是否到时间了,没有则退出
      if (butil::gettimeofday_us() < task1->run_time) { // not ready yet.
        break;
      }

      {
        BAIDU_SCOPED_LOCK(_mutex);
        if (task1->run_time > _nearest_run_time) {
          // 检查当前的 _nearest_run_time,确认是否有更新的任务
          // 有的话则再次从 Bucket 中拉取
          pull_again = true;
          break;
        }
      }
      std::pop_heap(tasks.begin(), tasks.end(), task_greater);
      tasks.pop_back();
      // 执行定时任务并删除
      if (task1->run_and_delete()) {
        ++ntriggered;
      }
    }
    if (pull_again) {
      BT_VLOG << "pull again, tasks=" << tasks.size();
      continue;
    }

    // The realtime to wait for.
    int64_t next_run_time = std::numeric_limits<int64_t>::max();
    if (tasks.empty()) {
      next_run_time = std::numeric_limits<int64_t>::max();
    } else {
      next_run_time = tasks[0]->run_time;
    }
    // Similarly with the situation before running tasks, we check
    // _nearest_run_time to prevent us from waiting on a non-earliest
    // task. We also use the _nsignal to make sure that if new task
    // is earlier that the realtime that we wait for, we'll wake up.
    int expected_nsignals = 0;
    {
      BAIDU_SCOPED_LOCK(_mutex);
      if (next_run_time > _nearest_run_time) {
        // a task is earlier that what we would wait for.
        // We need to check buckets.
        continue;
      } else {
        _nearest_run_time = next_run_time;
        expected_nsignals = _nsignals;
      }
    }
    timespec *ptimeout = NULL;
    timespec next_timeout = {0, 0};
    const int64_t now = butil::gettimeofday_us();
    if (next_run_time != std::numeric_limits<int64_t>::max()) {
      next_timeout = butil::microseconds_to_timespec(next_run_time - now);
      ptimeout = &next_timeout;
    }
    busy_seconds += (now - last_sleep_time) / 1000000.0;
    // 计算需要等待的时间,通过 futex 等待
    futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
    last_sleep_time = butil::gettimeofday_us();
  }
  BT_VLOG << "Ended TimerThread=" << pthread_self();
}

TimerThread::Task *TimerThread::Bucket::consume_tasks() {
  Task *head = NULL;
  if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
    // by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
    // We can avoid touching the mutex and related cacheline when the
    // bucket is actually empty.
    BAIDU_SCOPED_LOCK(_mutex);
    if (_task_head) {
      // 获取任务时直接将链表指针传回
      head = _task_head;
      _task_head = NULL;
      _nearest_run_time = std::numeric_limits<int64_t>::max();
    }
  }
  return head;
}

bool TimerThread::Task::run_and_delete() {
  const uint32_t id_version = version_of_task_id(task_id);
  uint32_t expected_version = id_version;
  // This CAS is rarely contended, should be fast.
  // 通过 CAS 判定当前任务是否还需要做
  if (version.compare_exchange_strong(expected_version, id_version + 1,
                                      butil::memory_order_relaxed)) {
    fn(arg);  // 执行定时任务
    // The release fence is paired with acquire fence in
    // TimerThread::unschedule to make changes of fn(arg) visible.
    version.store(id_version + 2, butil::memory_order_release);
    butil::return_resource(slot_of_task_id(task_id));
    return true;
  } else if (expected_version == id_version + 2) {
    // 对于取消的任务直接归还资源
    butil::return_resource(slot_of_task_id(task_id));
    return false;
  } else {
    // Impossible.
    LOG(ERROR) << "Invalid version=" << expected_version << ", expecting "
               << id_version + 2;
    return false;
  }
}

TimerThread::TaskId TimerThread::schedule(void (*fn)(void *), void *arg,
                                          const timespec &abstime) {
  if (_stop.load(butil::memory_order_relaxed) || !_started) {
    // Not add tasks when TimerThread is about to stop.
    return INVALID_TASK_ID;
  }
  // Hashing by pthread id is better for cache locality.
  // 新增任务时分片到不同的 Bucket 中
  const Bucket::ScheduleResult result =
      _buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
          .schedule(fn, arg, abstime);
  // 如果有更早的唤醒时间
  if (result.earlier) {
    bool earlier = false;
    const int64_t run_time = butil::timespec_to_microseconds(abstime);
    {
      BAIDU_SCOPED_LOCK(_mutex);
      if (run_time < _nearest_run_time) {
        _nearest_run_time = run_time;
        ++_nsignals;
        earlier = true;
      }
    }
    if (earlier) {
      // 使用 futex 唤醒
      futex_wake_private(&_nsignals, 1);
    }
  }
  return result.task_id;
}

TimerThread::Bucket::schedule(void (*fn)(void *), void *arg,
                              const timespec &abstime) {
  butil::ResourceId<Task> slot_id;
  Task *task = butil::get_resource<Task>(&slot_id);
  if (task == NULL) {
    ScheduleResult result = {INVALID_TASK_ID, false};
    return result;
  }
  task->next = NULL;
  task->fn = fn;
  task->arg = arg;
  task->run_time = butil::timespec_to_microseconds(abstime);
  uint32_t version = task->version.load(butil::memory_order_relaxed);
  if (version == 0) { // skip 0.
    // 分配的版本总是跳过 INVALID_TASK_ID
    task->version.fetch_add(2, butil::memory_order_relaxed);
    version = 2;
  }
  const TaskId id = make_task_id(slot_id, version);
  task->task_id = id;
  bool earlier = false;
  {
    BAIDU_SCOPED_LOCK(_mutex);
    // 加锁的临界区很短
    task->next = _task_head;
    _task_head = task;
    if (task->run_time < _nearest_run_time) {
      _nearest_run_time = task->run_time;
      earlier = true;
    }
  }
  ScheduleResult result = {id, earlier};
  return result;
}

TimerThread 有效的原因有以下几点:

  1. Bucket 锁内链表增加任务的操作是 O(1)\mathcal{O}(1) 的,临界区短
  2. RPC 场景下超时时间一般不变,大部分插入的时间是递增的,早于 nearest_run_time 而唤醒线程的次数很少
  3. 通过 ResourcePool 和版本进行删除操作,不参与全局竞争
  4. TimerThread 自行维护小顶堆,不参与全局竞争
  5. TimerThread 醒来的频率大约是 RPC 超时的倒数,比如超时时间 100ms,TimerThread 一秒内大约醒 10 次,已经最优

bthread 依赖 TimerThread 实现高效的 usleep

// Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds);

int bthread_usleep(uint64_t microseconds) {
  bthread::TaskGroup *g = bthread::tls_task_group;
  if (NULL != g && !g->is_current_pthread_task()) {
    return bthread::TaskGroup::usleep(&g, microseconds);
  }
  return ::usleep(microseconds);
}

// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup **pg, uint64_t timeout_us) {
  if (0 == timeout_us) {
    yield(pg);
    return 0;
  }
  TaskGroup *g = *pg;
  // We have to schedule timer after we switched to next bthread otherwise
  // the timer may wake up(jump to) current still-running context.
  // 将定时任务打包为 remained 函数
  SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
  g->set_remained(_add_sleep_event, &e);
  sched(pg);  // 当前协程出让
  g = *pg;
  e.meta->current_sleep = 0;
  if (e.meta->interrupted) {
    // Race with set and may consume multiple interruptions, which are OK.
    e.meta->interrupted = false;
    // NOTE: setting errno to ESTOP is not necessary from bthread's
    // pespective, however many RPC code expects bthread_usleep to set
    // errno to ESTOP when the thread is stopping, and print FATAL
    // otherwise. To make smooth transitions, ESTOP is still set instead
    // of EINTR when the thread is stopping.
    errno = (e.meta->stop ? ESTOP : EINTR);
    return -1;
  }
  return 0;
}

void TaskGroup::_add_sleep_event(void *void_args) {
  // Must copy SleepArgs. After calling TimerThread::schedule(), previous
  // thread may be stolen by a worker immediately and the on-stack SleepArgs
  // will be gone.
  SleepArgs e = *static_cast<SleepArgs *>(void_args);
  TaskGroup *g = e.group;

  // 增加定时任务
  TimerThread::TaskId sleep_id;
  sleep_id = get_global_timer_thread()->schedule(
      ready_to_run_from_timer_thread, void_args,
      butil::microseconds_from_now(e.timeout_us));
  ...
}

static void ready_to_run_from_timer_thread(void *arg) {
  CHECK(tls_task_group == NULL);
  const SleepArgs *e = static_cast<const SleepArgs *>(arg);
  // 到时间后,根据 tid 将任务重新加入队列
  e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
}

References

  1. "bRPC Timer Keeping", incubator-brpc