bthread 内实现了 ParkingLot
用以管理工作线程的挂起和唤醒:
// parking_lot.h
// Park idle workers.
class BAIDU_CACHELINE_ALIGNMENT ParkingLot {
public:
class State {
public:
State() : val(0) {}
bool stopped() const { return val & 1; }
private:
friend class ParkingLot;
State(int val) : val(val) {}
int val;
};
ParkingLot() : _pending_signal(0) {}
// Wake up at most `num_task' workers.
// Returns #workers woken up.
// 唤醒至多 num_task 个等待的线程
// 注意 _pending_signal 增加了 num_task * 2,因为最后一位用来表示是否 stop
int signal(int num_task) {
_pending_signal.fetch_add((num_task << 1), butil::memory_order_release);
return futex_wake_private(&_pending_signal, num_task);
}
// Get a state for later wait().
State get_state() {
return _pending_signal.load(butil::memory_order_acquire);
}
// Wait for tasks.
// If the `expected_state' does not match, wait() may finish directly.
// 工作线程尝试挂起等待,此时会检查 _pending_signal 的数值,若不一致则挂起,继续执行
void wait(const State &expected_state) {
futex_wait_private(&_pending_signal, expected_state.val, NULL);
}
// Wakeup suspended wait() and make them unwaitable ever.
void stop() {
_pending_signal.fetch_or(1);
// 将 _pending_signal 设为 stop,唤醒所有等待的工作线程,触发结束
futex_wake_private(&_pending_signal, 10000);
}
private:
// higher 31 bits for signalling, LSB for stopping.
butil::atomic<int> _pending_signal;
};
TaskControl
中包含一组 ParkingLot
对象,TaskGroup
初始化时会哈希打散使用这些对象:
// task_control.h
class TaskControl {
...
static const int PARKING_LOT_NUM = 4;
ParkingLot _pl[PARKING_LOT_NUM];
};
// task_group.cpp
TaskGroup::TaskGroup(TaskControl *c)
: _cur_meta(NULL), _control(c), _num_nosignal(0), _nsignaled(0),
_last_run_ns(butil::cpuwide_time_ns()), _cumulated_cputime_ns(0),
_nswitch(0), _last_context_remained(NULL),
_last_context_remained_arg(NULL), _pl(NULL), _main_stack(NULL),
_main_tid(0), _remote_num_nosignal(0), _remote_nsignaled(0) {
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
// 根据线程号哈希到某个 ParkingLot 对象上
_pl = &c->_pl[butil::fmix64(pthread_numeric_id()) %
TaskControl::PARKING_LOT_NUM];
CHECK(c);
}
当前工作线程在窃取任务前会先取一次 ParkingLot
的状态,当状态发生改变时会直接跳过 wait
:
bool TaskGroup::wait_task(bthread_t *tid) {
do {
if (_last_pl_state.stopped()) {
// 如果已经 stop
return false;
}
// 根据当前的 _last_pl_state 状态判断是否挂起
_pl->wait(_last_pl_state);
if (steal_task(tid)) {
return true;
}
} while (true);
}
bool TaskGroup::steal_task(bthread_t *tid) {
if (_remote_rq.pop(tid)) {
return true;
}
// _last_pl_state 的状态在这里进行更新
// _last_pl_state 发生变化时 wait_task 中的 wait 会直接跳过
// 这里容忍 false postive,增加的开销是多一次尝试 steal_task
_last_pl_state = _pl->get_state();
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
当有新任务加入时,则调用 TaskControl::signal_task
通过 ParkingLog
唤醒等待的工作线程:
void TaskControl::signal_task(int num_task) {
if (num_task <= 0) {
return;
}
// TODO(gejun): Current algorithm does not guarantee enough threads will
// be created to match caller's requests. But in another side, there's also
// many useless signalings according to current impl. Capping the concurrency
// is a good balance between performance and timeliness of scheduling.
// 上方的官方注释也写明了,限制并发可以更好地平衡性能和调度及时性,这里直接将唤醒的上限设为 2
if (num_task > 2) {
num_task = 2;
}
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
num_task -= _pl[start_index].signal(1); // 通过 ParkingLog 唤醒
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
num_task -= _pl[start_index].signal(1);
}
}
if (num_task > 0 &&
FLAGS_bthread_min_concurrency >
0 && // test min_concurrency for performance
_concurrency.load(butil::memory_order_relaxed) <
FLAGS_bthread_concurrency) {
// TODO: Reduce this lock
// 当仍然有需要执行的任务时,尝试增加工作线程
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) <
FLAGS_bthread_concurrency) {
add_workers(1);
}
}
}
void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) {
++_num_nosignal; // 对于设定了不触发信号的任务,仅增加计数
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
// 调用 signal_task 唤醒
_control->signal_task(1 + additional_signal);
}
}
// 将还没有触发信号的任务统一触发信号(工作线程内调用)
void TaskGroup::flush_nosignal_tasks() {
const int val = _num_nosignal;
if (val) {
_num_nosignal = 0;
_nsignaled += val;
_control->signal_task(val);
}
}
// 从工作线程外提交任务
void TaskGroup::ready_to_run_remote(bthread_t tid, bool nosignal) {
_remote_rq._mutex.lock(); // 加锁
while (!_remote_rq.push_locked(tid)) {
flush_nosignal_tasks_remote_locked(_remote_rq._mutex);
LOG_EVERY_SECOND(ERROR) << "_remote_rq is full, capacity="
<< _remote_rq.capacity();
::usleep(1000);
_remote_rq._mutex.lock();
}
if (nosignal) {
++_remote_num_nosignal;
_remote_rq._mutex.unlock();
} else {
const int additional_signal = _remote_num_nosignal;
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock(); // 锁的生命周期内也保护了计数相关变量
_control->signal_task(1 + additional_signal);
}
}
void TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
const int val = _remote_num_nosignal;
if (!val) {
locked_mutex.unlock();
return;
}
_remote_num_nosignal = 0;
_remote_nsignaled += val;
locked_mutex.unlock();
_control->signal_task(val);
}
bthread 工作线程在执行过程中会有以下几种状态:
_cur_meta->tid == _main_tid
_cur_meta->stack == _main_stack
TaskGroup::task_runner(1)
执行,此时:
_cur_meta->tid != _main_tid
_cur_meta->stack == _main_stack
TaskGroup::sched_to
中调用 jump_stack
切换到协程栈,此时:
_cur_meta->tid != _main_tid
_cur_meta->stack != _main_stack
上述三种状态可以相互切换。假设当前任务队列中包含一个 bthread 任务和一个 pthread 任务,那么执行的流程为:
void TaskGroup::run_main_task() {
TaskGroup* dummy = this;
task_t tid;
while (wait_task(&tid)) {
// 1. 获取到 bthread 任务,调用 TaskGroup::sched_to 触发执行
TaskGroup::sched_to(&dummy, tid);
if (_cur_meta->tid != _main_tid) {
TaskGroup::task_runner(1);
}
}
}
void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
const int saved_errno = errno;
TaskMeta* const cur_meta = g->_cur_meta;
if (PAXOS_LIKELY(next_meta != cur_meta)) {
// 2. cur_meta 此时为主线程 meta,next_meta 为 bthread 任务
// 将 g->_cur_meta 设定为即将执行的 bthread 任务
g->_cur_meta = next_meta;
if (cur_meta->stack != nullptr) {
if (next_meta->stack != cur_meta->stack) {
// 3. 从主协程的 main_stack 切换到 bthread 任务的栈
jump_stack(cur_meta->stack, next_meta->stack);
// 4. 主协程在此处挂起,进入 bthread 任务入口 task_runner
...
}
void TaskGroup::task_runner(intptr_t skip_remained) {
TaskGroup *g = tls_task_group;
if (!skip_remained) {
// 5. 用户函数执行前,先执行可能存在的 remained 函数
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
}
do {
// Meta and identifier of the task is persistent in this run.
// 6. 通过 TLS 获取当前线程待执行的任务 TaskMeta
TaskMeta *const m = g->_cur_meta;
void *thread_return;
try {
// 7. 执行用户执行的 bthread 任务,假设中途协程调用 yield 主动挂起
thread_return = m->fn(m->arg);
}
...
}
void TaskGroup::yield(TaskGroup **pg) {
// 8. yield 时将当前的 bthread 任务打包为 remained 函数
TaskGroup *g = *pg;
ReadyToRunArgs args = {g->current_tid(), false};
g->set_remained(ready_to_run_in_worker, &args);
sched(pg);
}
void TaskGroup::sched(TaskGroup **pg) {
TaskGroup *g = *pg;
bthread_t next_tid = 0;
const bool popped = g->_rq.pop(&next_tid);
if (!popped && !g->steal_task(&next_tid)) {
next_tid = g->_main_tid;
}
// 9. 从任务队列中成功拿到下一个 pthread 任务,触发调度
sched_to(pg, next_tid);
}
inline void TaskGroup::sched_to(TaskGroup **pg, bthread_t next_tid) {
TaskMeta *next_meta = address_meta(next_tid);
if (next_meta->stack == NULL) {
ContextualStack *stk = get_stack(next_meta->stack_type(), task_runner);
if (stk) {
next_meta->set_stack(stk);
} else {
// 10. pthread 获取到的栈为空指针,这里会赋值为 _main_stack
next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
next_meta->set_stack((*pg)->_main_stack);
}
}
sched_to(pg, next_meta);
}
void TaskGroup::sched_to(TaskGroup **pg, TaskMeta *next_meta) {
TaskGroup *g = *pg;
const int saved_errno = errno;
TaskMeta *const cur_meta = g->_cur_meta;
if (__builtin_expect(next_meta != cur_meta, 1)) {
// 11. 将 g->_cur_meta 设定为下一个 pthread 任务
g->_cur_meta = next_meta;
if (cur_meta->stack != NULL) {
if (next_meta->stack != cur_meta->stack) {
// 12. 从 bthread 任务的栈切换到 _main_stack
jump_stack(cur_meta->stack, next_meta->stack);
// 13. bthread 从此处挂起,恢复主协程的运行状态,也就是这里第 4 步的位置
...
}
void TaskGroup::sched_to(TaskGroup **pg, TaskMeta *next_meta) {
...
jump_stack(cur_meta->stack, next_meta->stack);
// 14. 恢复到主协程此处的位置,继续执行,重新获取一次 tls_task_group
g = tls_task_group;
}
}
}
// 15. 执行可能存在的 remained 函数,也就是第 8 步中打包的 bthread 恢复任务
// 会执行 ready_to_run_in_worker(bthread_tid) 将之前挂起的 bthread 重新加入任务队列
while (g->_last_context_remained) {
RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg);
g = tls_task_group;
}
errno = saved_errno;
*pg = g;
}
void TaskGroup::run_main_task() {
TaskGroup* dummy = this;
task_t tid;
while (wait_task(&tid)) {
TaskGroup::sched_to(&dummy, tid);
if (_cur_meta->tid != _main_tid) {
// 16. 主协程继续走到这里,直接在 main_stack 调用 task_runner 执行 pthread 任务
TaskGroup::task_runner(1);
}
}
}
void TaskGroup::task_runner(intptr_t skip_remained) {
...
void *thread_return;
try {
// 17. 执行 pthread 任务
thread_return = m->fn(m->arg);
} catch (ExitException &e) {
thread_return = e.value();
}
...
// 18. 任务执行完成后,进行结束时期的调度
ending_sched(&g);
}
void TaskGroup::ending_sched(TaskGroup **pg) {
TaskGroup *g = *pg;
bthread_t next_tid = 0;
// 19. 从任务队列中取出任务,假设取出了之前挂起的 bthread 任务
const bool popped = g->_rq.pop(&next_tid);
...
// 20. 执行调度,切换到 bthread 挂起的地方
sched_to(pg, next_meta);
}
bRPC 中有频繁的定时器的需求,主要是:
bRPC 的官方文档中详细介绍了这样的定时器设计的难点,以及 TimerThread
是如何解决这些难点的。这里直接看对应的代码实现:
struct TimerThreadOptions {
// Scheduling requests are hashed into different bucket to improve
// scalability. However bigger num_buckets may NOT result in more scalable
// schedule() because bigger values also make each buckets more sparse
// and more likely to lock the global mutex. You better not change
// this value, just leave it to us.
// Default: 13
size_t num_buckets;
// If this field is not empty, some bvar for reporting stats of TimerThread
// will be exposed with this prefix.
// Default: ""
std::string bvar_prefix;
// Constructed with default options.
TimerThreadOptions();
};
// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
public:
struct Task;
class Bucket;
typedef uint64_t TaskId;
const static TaskId INVALID_TASK_ID;
TimerThread();
~TimerThread();
// Start the timer thread.
// This method should only be called once.
// return 0 if success, errno otherwise.
// 启动 TimerThread 线程
int start(const TimerThreadOptions *options);
// Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
// 结束 TimerThread 线程
void stop_and_join();
// Schedule |fn(arg)| to run at realtime |abstime| approximately.
// Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
// 增加一个定时器任务
TaskId schedule(void (*fn)(void *), void *arg, const timespec &abstime);
// Prevent the task denoted by `task_id' from running. `task_id' must be
// returned by schedule() ever.
// Returns:
// 0 - Removed the task which does not run yet
// -1 - The task does not exist.
// 1 - The task is just running.
// 取消一个定时器任务
int unschedule(TaskId task_id);
// Get identifier of internal pthread.
// Returns (pthread_t)0 if start() is not called yet.
pthread_t thread_id() const { return _thread; }
private:
// the timer thread will run this method.
void run(); // TimerThread 线程函数
static void *run_this(void *arg);
bool _started; // whether the timer thread was started successfully.
butil::atomic<bool> _stop;
TimerThreadOptions _options;
Bucket *_buckets; // list of tasks to be run
internal::FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
int _nsignals;
pthread_t _thread; // all scheduled task will be run on this thread
};
void TimerThread::run() {
int64_t last_sleep_time = butil::gettimeofday_us();
// min heap of tasks (ordered by run_time)
// TimerThread 线程内通过最小堆维护定时任务序列
std::vector<Task *> tasks;
tasks.reserve(4096);
while (!_stop.load(butil::memory_order_relaxed)) {
{
BAIDU_SCOPED_LOCK(_mutex);
_nearest_run_time = std::numeric_limits<int64_t>::max();
}
// 从所有的 Bucket 的获取任务
for (size_t i = 0; i < _options.num_buckets; ++i) {
Bucket &bucket = _buckets[i];
for (Task *p = bucket.consume_tasks(); p != nullptr; ++nscheduled) {
Task *next_task = p->next;
// 对于已经取消的任务,不会加入到堆里,直接删除。这也是高性能的关键
if (!p->try_delete()) {
// 循环加入到堆里
tasks.push_back(p);
std::push_heap(tasks.begin(), tasks.end(), task_greater);
}
p = next_task;
}
}
bool pull_again = false;
while (!tasks.empty()) {
// 从最小堆中取出任务
Task *task1 = tasks[0];
if (task1->try_delete()) { // already unscheduled
std::pop_heap(tasks.begin(), tasks.end(), task_greater);
tasks.pop_back();
continue;
}
// 判断是否到时间了,没有则退出
if (butil::gettimeofday_us() < task1->run_time) { // not ready yet.
break;
}
{
BAIDU_SCOPED_LOCK(_mutex);
if (task1->run_time > _nearest_run_time) {
// 检查当前的 _nearest_run_time,确认是否有更新的任务
// 有的话则再次从 Bucket 中拉取
pull_again = true;
break;
}
}
std::pop_heap(tasks.begin(), tasks.end(), task_greater);
tasks.pop_back();
// 执行定时任务并删除
if (task1->run_and_delete()) {
++ntriggered;
}
}
if (pull_again) {
BT_VLOG << "pull again, tasks=" << tasks.size();
continue;
}
// The realtime to wait for.
int64_t next_run_time = std::numeric_limits<int64_t>::max();
if (tasks.empty()) {
next_run_time = std::numeric_limits<int64_t>::max();
} else {
next_run_time = tasks[0]->run_time;
}
// Similarly with the situation before running tasks, we check
// _nearest_run_time to prevent us from waiting on a non-earliest
// task. We also use the _nsignal to make sure that if new task
// is earlier that the realtime that we wait for, we'll wake up.
int expected_nsignals = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
if (next_run_time > _nearest_run_time) {
// a task is earlier that what we would wait for.
// We need to check buckets.
continue;
} else {
_nearest_run_time = next_run_time;
expected_nsignals = _nsignals;
}
}
timespec *ptimeout = NULL;
timespec next_timeout = {0, 0};
const int64_t now = butil::gettimeofday_us();
if (next_run_time != std::numeric_limits<int64_t>::max()) {
next_timeout = butil::microseconds_to_timespec(next_run_time - now);
ptimeout = &next_timeout;
}
busy_seconds += (now - last_sleep_time) / 1000000.0;
// 计算需要等待的时间,通过 futex 等待
futex_wait_private(&_nsignals, expected_nsignals, ptimeout);
last_sleep_time = butil::gettimeofday_us();
}
BT_VLOG << "Ended TimerThread=" << pthread_self();
}
TimerThread::Task *TimerThread::Bucket::consume_tasks() {
Task *head = NULL;
if (_task_head) { // NOTE: schedule() and consume_tasks() are sequenced
// by TimerThread._nearest_run_time and fenced by TimerThread._mutex.
// We can avoid touching the mutex and related cacheline when the
// bucket is actually empty.
BAIDU_SCOPED_LOCK(_mutex);
if (_task_head) {
// 获取任务时直接将链表指针传回
head = _task_head;
_task_head = NULL;
_nearest_run_time = std::numeric_limits<int64_t>::max();
}
}
return head;
}
bool TimerThread::Task::run_and_delete() {
const uint32_t id_version = version_of_task_id(task_id);
uint32_t expected_version = id_version;
// This CAS is rarely contended, should be fast.
// 通过 CAS 判定当前任务是否还需要做
if (version.compare_exchange_strong(expected_version, id_version + 1,
butil::memory_order_relaxed)) {
fn(arg); // 执行定时任务
// The release fence is paired with acquire fence in
// TimerThread::unschedule to make changes of fn(arg) visible.
version.store(id_version + 2, butil::memory_order_release);
butil::return_resource(slot_of_task_id(task_id));
return true;
} else if (expected_version == id_version + 2) {
// 对于取消的任务直接归还资源
butil::return_resource(slot_of_task_id(task_id));
return false;
} else {
// Impossible.
LOG(ERROR) << "Invalid version=" << expected_version << ", expecting "
<< id_version + 2;
return false;
}
}
TimerThread::TaskId TimerThread::schedule(void (*fn)(void *), void *arg,
const timespec &abstime) {
if (_stop.load(butil::memory_order_relaxed) || !_started) {
// Not add tasks when TimerThread is about to stop.
return INVALID_TASK_ID;
}
// Hashing by pthread id is better for cache locality.
// 新增任务时分片到不同的 Bucket 中
const Bucket::ScheduleResult result =
_buckets[butil::fmix64(pthread_numeric_id()) % _options.num_buckets]
.schedule(fn, arg, abstime);
// 如果有更早的唤醒时间
if (result.earlier) {
bool earlier = false;
const int64_t run_time = butil::timespec_to_microseconds(abstime);
{
BAIDU_SCOPED_LOCK(_mutex);
if (run_time < _nearest_run_time) {
_nearest_run_time = run_time;
++_nsignals;
earlier = true;
}
}
if (earlier) {
// 使用 futex 唤醒
futex_wake_private(&_nsignals, 1);
}
}
return result.task_id;
}
TimerThread::Bucket::schedule(void (*fn)(void *), void *arg,
const timespec &abstime) {
butil::ResourceId<Task> slot_id;
Task *task = butil::get_resource<Task>(&slot_id);
if (task == NULL) {
ScheduleResult result = {INVALID_TASK_ID, false};
return result;
}
task->next = NULL;
task->fn = fn;
task->arg = arg;
task->run_time = butil::timespec_to_microseconds(abstime);
uint32_t version = task->version.load(butil::memory_order_relaxed);
if (version == 0) { // skip 0.
// 分配的版本总是跳过 INVALID_TASK_ID
task->version.fetch_add(2, butil::memory_order_relaxed);
version = 2;
}
const TaskId id = make_task_id(slot_id, version);
task->task_id = id;
bool earlier = false;
{
BAIDU_SCOPED_LOCK(_mutex);
// 加锁的临界区很短
task->next = _task_head;
_task_head = task;
if (task->run_time < _nearest_run_time) {
_nearest_run_time = task->run_time;
earlier = true;
}
}
ScheduleResult result = {id, earlier};
return result;
}
TimerThread
有效的原因有以下几点:
Bucket
锁内链表增加任务的操作是 nearest_run_time
而唤醒线程的次数很少ResourcePool
和版本进行删除操作,不参与全局竞争TimerThread
自行维护小顶堆,不参与全局竞争TimerThread
醒来的频率大约是 RPC 超时的倒数,比如超时时间 100ms,TimerThread
一秒内大约醒 10 次,已经最优bthread 依赖 TimerThread
实现高效的 usleep
:
// Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds);
int bthread_usleep(uint64_t microseconds) {
bthread::TaskGroup *g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::usleep(&g, microseconds);
}
return ::usleep(microseconds);
}
// To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup **pg, uint64_t timeout_us) {
if (0 == timeout_us) {
yield(pg);
return 0;
}
TaskGroup *g = *pg;
// We have to schedule timer after we switched to next bthread otherwise
// the timer may wake up(jump to) current still-running context.
// 将定时任务打包为 remained 函数
SleepArgs e = {timeout_us, g->current_tid(), g->current_task(), g};
g->set_remained(_add_sleep_event, &e);
sched(pg); // 当前协程出让
g = *pg;
e.meta->current_sleep = 0;
if (e.meta->interrupted) {
// Race with set and may consume multiple interruptions, which are OK.
e.meta->interrupted = false;
// NOTE: setting errno to ESTOP is not necessary from bthread's
// pespective, however many RPC code expects bthread_usleep to set
// errno to ESTOP when the thread is stopping, and print FATAL
// otherwise. To make smooth transitions, ESTOP is still set instead
// of EINTR when the thread is stopping.
errno = (e.meta->stop ? ESTOP : EINTR);
return -1;
}
return 0;
}
void TaskGroup::_add_sleep_event(void *void_args) {
// Must copy SleepArgs. After calling TimerThread::schedule(), previous
// thread may be stolen by a worker immediately and the on-stack SleepArgs
// will be gone.
SleepArgs e = *static_cast<SleepArgs *>(void_args);
TaskGroup *g = e.group;
// 增加定时任务
TimerThread::TaskId sleep_id;
sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, void_args,
butil::microseconds_from_now(e.timeout_us));
...
}
static void ready_to_run_from_timer_thread(void *arg) {
CHECK(tls_task_group == NULL);
const SleepArgs *e = static_cast<const SleepArgs *>(arg);
// 到时间后,根据 tid 将任务重新加入队列
e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
}