Tokio 的核心是一套 M:N 的协程 Runtime,下层通过 Rust 协程和 Mio 驱动,支撑上层的 HTTP / RPC 应用。本篇开始分析 Runtime,代码版本 v1.5.0。
在阅读核心代码前,先介绍一下异步编程中的核心关键字:
Asynchrony:异步指事件的发生与主程序流及处理此类事件的方式无关。这些事件可能是像信号这样的外部事件,或者是由程序引发的动作,会和程序的执行同时发生,而程序不会阻塞地等待结果。简而言之,事件发生在非调用方的线程中。同步与异步关注的是事件是否是在本线程中处理。
Non-blocking:非阻塞指执行的操作不会阻塞程序的继续执行。阻塞与非阻塞关注的是调用方等待结果时的状态。阻塞非阻塞和同步异步是正交的,即存在同步阻塞、同步非阻塞、异步阻塞、异步非阻塞。
Resumable Function:可恢复函数指可以暂停执行并从调用中返回,并且可以在将来从暂停的位置恢复执行的函数。可恢复函数是协程的基石。
Asynchronous Runtime:异步运行时,上述概念的整合,通过可恢复函数、挂起位置埋点和用户态调度实现非抢占式的用户态线程切换,称之为协程。一般将同步阻塞 IO 的位置作为默认的挂起位置。
Tokio 库的代码量十分巨大,tokio/src
目录下共计 256 个代码文件,行数 5w+。核心代码的文件结构为:
tokio/src
├── blocking.rs
├── coop.rs
├── fs
├── future
├── io
├── lib.rs
├── loom
├── macros
├── net
├── park
├── process
├── runtime
├── signal
├── sync
├── task
├── time
└── util
再来看官方提供的样例:
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = [0; 1024];
// In a loop, read data from the socket and write the data back.
loop {
let n = match socket.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
// Write the data back
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket; err = {:?}", e);
return;
}
}
});
}
}
如果之前接触过协程和网络编程,肯定会赞叹上方 Echo Server 的高效和简洁。
tokio/runtime/task
Tokio 的 Runtime 提供以下能力:
- An I/O event loop, called the driver, which drives I/O resources and dispatches I/O events to tasks that depend on them.
- A scheduler to execute tasks that use these I/O resources.
- A timer for scheduling work to run after a set period of time.
这一节来看任务 task
相关的抽象,代码路径为 tokio/src/runtime/task。先看任务状态 state.rs
:
// State 本身是原子无符号数
pub(super) struct State {
val: AtomicUsize,
}
// Snapshot 是从 State 读取到的值
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize)
// 更新 State 时的 Result,成功返回更新后的 Snapshot
type UpdateResult = Result<Snapshot, Snapshot>;
// State 实际的编码,包括一个引用计数
const RUNNING: usize = 0b0001; // 是否在运行
const COMPLETE: usize = 0b0010; // 是否已完成
const LIFECYCLE_MASK: usize = 0b11;
const NOTIFIED: usize = 0b100; // task 是否已经加入运行队列
const JOIN_INTEREST: usize = 0b1_000;
const JOIN_WAKER: usize = 0b10_000;
const CANCELLED: usize = 0b100_000; // 任务被取消
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; // 所有状态相关的位
const REF_COUNT_MASK: usize = !STATE_MASK; // 引用计数使用的位
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
const REF_ONE: usize = 1 << REF_COUNT_SHIFT; // 引用计数中的 1
const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; // 初始化状态,scheduler 和 `JoinHandle` 会引用它
impl State {
pub(super) fn new() -> State {
State {
val: AtomicUsize::new(INITIAL_STATE),
}
}
// 读取当前的状态,使用 Acquire Ordering
pub(super) fn load(&self) -> Snapshot {
Snapshot(self.val.load(Acquire))
}
// 使用 CAS 实现线程安全的状态转移,转移失败时返回当前的状态
fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
where
F: FnMut(Snapshot) -> Option<Snapshot>,
{
let mut curr = self.load();
loop {
let next = match f(curr) {
Some(next) => next,
None => return Err(curr),
};
let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
match res {
Ok(_) => return Ok(next),
Err(actual) => curr = Snapshot(actual),
}
}
}
// 尝试转移到 running 状态
pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_notified());
let mut next = curr;
if !next.is_idle() {
return None;
}
if ref_inc {
next.ref_inc();
}
next.set_running();
next.unset_notified();
Some(next)
})
}
// 增加引用计数
pub(super) fn ref_inc(&self) {
use std::process;
use std::sync::atomic::Ordering::Relaxed;
let prev = self.val.fetch_add(REF_ONE, Relaxed);
// If the reference count overflowed, abort.
if prev > isize::max_value() as usize {
process::abort();
}
}
}
// Snapshot 提供状态读写的街口
impl Snapshot {
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
fn set_running(&mut self) {
self.0 |= RUNNING;
}
...
}
再来看 task
的核心数据结构 core.rs
:
// Task Cell,包含任务相关的元数据,注意这里的 #[repr(C)] 标识
// 其中 header 必须放在第一个字段,因为会有 Header 指针转 Cell 指针的操作
#[repr(C)]
pub(super) struct Cell<T: Future, S> {
/// Hot task state data
pub(super) header: Header,
/// Either the future or output, depending on the execution stage.
pub(super) core: Core<T, S>,
/// Cold data
pub(super) trailer: Trailer,
}
// Task 的头部,包含任务的状态 state,其他字段后续遇到的时候在看
#[repr(C)]
pub(crate) struct Header {
/// Task state
pub(super) state: State,
pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,
/// Pointer to next task, used with the injection queue
pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,
/// Pointer to the next task in the transfer stack
pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,
/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,
}
pub(super) struct Scheduler<S> {
scheduler: UnsafeCell<Option<S>>,
}
pub(super) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
// Task 的核心部分
// scheduler 表示绑定的调度器
// stage 表示 future 或者输出的结果
pub(super) struct Core<T: Future, S> {
/// Scheduler used to drive this future
pub(super) scheduler: Scheduler<S>,
/// Either the future or the output
pub(super) stage: CoreStage<T>,
}
// stage 的具体定义,三种状态,Consumed 表示数据已经被消费掉不可再用
pub(super) enum Stage<T: Future> {
Running(T),
Finished(super::Result<T::Output>),
Consumed,
}
// Task 的尾部,包含一个 waker 指针
pub(super) struct Trailer {
/// Consumer task waiting on completion of this task.
pub(super) waker: UnsafeCell<Option<Waker>>,
}
// 给定 future 和 state,构造一个 cell 对象
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
Box::new(Cell {
header: Header {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
},
core: Core {
scheduler: Scheduler {
scheduler: UnsafeCell::new(None),
},
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
},
trailer: Trailer {
waker: UnsafeCell::new(None),
},
})
}
}
core.rs
中还有 Cell 相关函数的实现,暂时跳过,来看下 raw.rs
:
// RawTask,本质是指向 Cell 的指针
pub(super) struct RawTask {
ptr: NonNull<Header>,
}
// 手动构建的虚表,Header.vtable 中有调用
pub(super) struct Vtable {
/// Poll the future
pub(super) poll: unsafe fn(NonNull<Header>),
/// Deallocate the memory
pub(super) dealloc: unsafe fn(NonNull<Header>),
/// Read the task output, if complete
pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
/// The join handle has been dropped
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),
/// Scheduler is being shutdown
pub(super) shutdown: unsafe fn(NonNull<Header>),
}
// 返回一个静态的虚表
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
shutdown: shutdown::<T, S>,
}
}
// 虚表指向的函数,实际上会调用 Harness 中的实现
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll();
}
顺着这个思路,继续看 harness.rs
:
// Harness,是一个指向 Cell 对象的非空指针
pub(super) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
// 从 Header 指针转为 Cell 指针,从而可以使用 header / trailer / core 对象
impl<T, S> Harness<T, S>
where
T: Future,
S: 'static,
{
pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
Harness {
cell: ptr.cast::<Cell<T, S>>(),
}
}
fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
fn trailer(&self) -> &Trailer {
unsafe { &self.cell.as_ref().trailer }
}
fn core(&self) -> &Core<T, S> {
unsafe { &self.cell.as_ref().core }
}
}
// poll 操作的结果
enum PollFuture<T> {
Complete(Result<T, JoinError>, bool),
DropReference,
Notified,
None,
}
// poll 的实现
impl<T, S> Harness<T, S>
where
T: Future,
S: Schedule,
{
pub(super) fn poll(self) {
match self.poll_inner() {
PollFuture::Notified => {
// Signal yield
self.core().scheduler.yield_now(Notified(self.to_task()));
// The ref-count was incremented as part of
// `transition_to_idle`.
self.drop_reference();
}
PollFuture::DropReference => {
self.drop_reference();
}
PollFuture::Complete(out, is_join_interested) => {
self.complete(out, is_join_interested);
}
PollFuture::None => (),
}
}
fn poll_inner(&self) -> PollFuture<T::Output> {
let snapshot = match self.scheduler_view().transition_to_running() {
TransitionToRunning::Ok(snapshot) => snapshot,
TransitionToRunning::DropReference => return PollFuture::DropReference,
};
// The transition to `Running` done above ensures that a lock on the
// future has been obtained. This also ensures the `*mut T` pointer
// contains the future (as opposed to the output) and is initialized.
let waker_ref = waker_ref::<T, S>(self.header());
let cx = Context::from_waker(&*waker_ref);
poll_future(self.header(), &self.core().stage, snapshot, cx)
}
fn scheduler_view(&self) -> SchedulerView<'_, S> {
SchedulerView {
header: self.header(),
scheduler: &self.core().scheduler,
}
}
}
enum TransitionToRunning {
Ok(Snapshot),
DropReference,
}
struct SchedulerView<'a, S> {
header: &'a Header,
scheduler: &'a Scheduler<S>,
}
impl<'a, S> SchedulerView<'a, S>
where
S: Schedule,
{
fn to_task(&self) -> Task<S> {
// SAFETY The header is from the same struct containing the scheduler `S` so the cast is safe
unsafe { Task::from_raw(self.header.into()) }
}
/// Returns true if the task should be deallocated.
fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
let ref_dec = if self.scheduler.is_bound() {
if let Some(task) = self.scheduler.release(self.to_task()) {
mem::forget(task);
true
} else {
false
}
} else {
false
};
// This might deallocate
let snapshot = self
.header
.state
.transition_to_terminal(!is_join_interested, ref_dec);
snapshot.ref_count() == 0
}
fn transition_to_running(&self) -> TransitionToRunning {
// 首次执行时会绑定到调度器上
let is_not_bound = !self.scheduler.is_bound();
// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
// while in the run queue pending execution.
let snapshot = match self.header.state.transition_to_running(is_not_bound) {
Ok(snapshot) => snapshot,
Err(_) => {
// The task was shutdown while in the run queue. At this point,
// we just hold a ref counted reference. Since we do not have access to it here
// return `DropReference` so the caller drops it.
return TransitionToRunning::DropReference;
}
};
if is_not_bound {
// Ensure the task is bound to a scheduler instance. Since this is
// the first time polling the task, a scheduler instance is pulled
// from the local context and assigned to the task.
//
// The scheduler maintains ownership of the task and responds to
// `wake` calls.
//
// The task reference count has been incremented.
//
// Safety: Since we have unique access to the task so that we can
// safely call `bind_scheduler`.
self.scheduler.bind_scheduler(self.to_task());
}
TransitionToRunning::Ok(snapshot)
}
}
fn poll_future<T: Future>(
header: &Header,
core: &CoreStage<T>,
snapshot: Snapshot,
cx: Context<'_>,
) -> PollFuture<T::Output> {
if snapshot.is_cancelled() {
PollFuture::Complete(Err(JoinError::cancelled()), snapshot.is_join_interested())
} else {
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future> {
core: &'a CoreStage<T>,
}
impl<T: Future> Drop for Guard<'_, T> {
fn drop(&mut self) {
self.core.drop_future_or_output();
}
}
let guard = Guard { core };
let res = guard.core.poll(cx);
// prevent the guard from dropping the future
mem::forget(guard);
res
}));
match res {
Ok(Poll::Pending) => match header.state.transition_to_idle() {
// 需要等待,先切换到 idle 状态
Ok(snapshot) => {
if snapshot.is_notified() {
PollFuture::Notified
} else {
PollFuture::None
}
}
Err(_) => PollFuture::Complete(Err(cancel_task(core)), true),
},
Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()),
Err(err) => {
PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested())
}
}
}
}
impl<S: Schedule> Scheduler<S> {
/// 绑定调度器
pub(super) fn bind_scheduler(&self, task: Task<S>) {
debug_assert!(!self.is_bound());
// Bind the task to the scheduler
let scheduler = S::bind(task);
self.scheduler.with_mut(|ptr| unsafe {
*ptr = Some(scheduler);
});
}
pub(super) fn is_bound(&self) -> bool {
self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
}
pub(super) fn yield_now(&self, task: Notified<S>) {
self.scheduler.with(|ptr| {
// Safety: Can only be called after initial `poll`, which is the
// only time the field is mutated.
match unsafe { &*ptr } {
Some(scheduler) => scheduler.yield_now(task),
None => panic!("no scheduler set"),
}
});
}
}
impl<T: Future> CoreStage<T> {
/// Poll the future
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.with_mut(|ptr| {
// 由调用方确保线程安全
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
_ => unreachable!("unexpected stage"),
};
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };
// Rust 提供的 poll 接口
future.poll(&mut cx)
})
};
if res.is_ready() {
self.drop_future_or_output();
}
res
}
/// Drop the future
///
/// # Safety
///
/// The caller must ensure it is safe to mutate the `stage` field.
pub(super) fn drop_future_or_output(&self) {
// Safety: the caller ensures mutual exclusion to the field.
unsafe {
self.set_stage(Stage::Consumed);
}
}
}
最后看下 mod.rs
:
// Task 的定义,实际上是 RawTask 的封装。
// PhantomData 用于绑定没有使用到的 S 类型。
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
raw: RawTask,
_p: PhantomData<S>,
}
pub(crate) type Result<T> = std::result::Result<T, JoinError>;
// 调度 trait
pub(crate) trait Schedule: Sync + Sized + 'static {
// 绑定一个 task 到 executor
fn bind(task: Task<Self>) -> Self;
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
fn schedule(&self, task: Notified<Self>);
// yield 操作,触发协程切换
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}
}
// 构造 task 和对应的 JoinHandle
cfg_rt! {
/// Create a new task with an associated join handle
pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
where
T: Future + Send + 'static,
S: Schedule,
{
let raw = RawTask::new::<_, S>(task);
let task = Task {
raw,
_p: PhantomData,
};
let join = JoinHandle::new(raw);
(Notified(task), join)
}
}
// Task<Schedule> 的构造
impl<S: 'static> Task<S> {
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Task {
raw: RawTask::from_raw(ptr),
_p: PhantomData,
}
}
pub(crate) fn header(&self) -> &Header {
self.raw.header()
}
}
// Notified<Schedule>,实际上还是 Task<Schedule>
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>)
unsafe impl<S: Schedule> Send for Notified<S> {}
unsafe impl<S: Schedule> Sync for Notified<S> {}
// Notified<Schedule> 的构造
cfg_rt_multi_thread! {
impl<S: 'static> Notified<S> {
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
Notified(Task::from_raw(ptr))
}
pub(crate) fn header(&self) -> &Header {
self.0.header()
}
}
}
// Notified<Task>.run(),本质上是执行 poll
impl<S: Schedule> Notified<S> {
/// Run the task
pub(crate) fn run(self) {
self.0.raw.poll();
mem::forget(self);
}
/// Pre-emptively cancel the task as part of the shutdown process.
pub(crate) fn shutdown(self) {
self.0.shutdown();
}
}
配合单元测试 task.rs
整体看下:
use crate::runtime::task::{self, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::TryLock;
use std::collections::VecDeque;
use std::sync::Arc;
#[test]
fn schedule() {
with(|rt| {
// 构造一个 task,注意这里依赖下面的 `rt.schedule` 完成类型推导
let (task, _) = task::joinable(async {
// 执行一次 yield_now,该函数定义于 tokio/src/task/yield_now.rs
crate::task::yield_now().await;
});
// 使用 rt 调度 task
rt.schedule(task);
// 加入任务队列后,队列中有且仅有该任务,弹出并准备执行,此时计数为 1。
// 执行 crate::task::yield_now().await 后,
assert_eq!(2, rt.tick());
})
}
// yield_now 的实现
cfg_rt! {
/// Yields execution back to the Tokio runtime.
///
/// A task yields by awaiting on `yield_now()`, and may resume when that
/// future completes (with no output.) The current task will be re-added as
/// a pending task at the _back_ of the pending queue. Any other pending
/// tasks will be scheduled. No other waking is required for the task to
/// continue.
///
/// See also the usage example in the [task module](index.html#yield_now).
#[must_use = "yield_now does nothing unless polled/`await`-ed"]
pub async fn yield_now() {
/// Yield implementation
struct YieldNow {
yielded: bool,
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
// 第二次执行,直接返回 Ready
return Poll::Ready(());
}
// 第一次执行时,将 yielded 置为 true,并将当前 task 再次加入任务队列
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
YieldNow { yielded: false }.await
}
}
fn with(f: impl FnOnce(Runtime)) {
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
// 退出时将 CURRENT 绑定的引用计数释放
let _rt = CURRENT.try_lock().unwrap().take();
}
}
// RAII
let _reset = Reset;
// 构造 RunTime
let rt = Runtime(Arc::new(Inner {
released: task::TransferStack::new(),
core: TryLock::new(Core {
queue: VecDeque::new(),
tasks: LinkedList::new(),
}),
}));
// 使用 CURRENT 绑定当前使用的 runtime 实例的一个引用计数
*CURRENT.try_lock().unwrap() = Some(rt.clone());
f(rt)
}
#[derive(Clone)]
struct Runtime(Arc<Inner>);
struct Inner {
released: task::TransferStack<Runtime>,
core: TryLock<Core>,
}
struct Core {
queue: VecDeque<task::Notified<Runtime>>,
tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>,
}
static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
impl Runtime {
fn tick(&self) -> usize {
self.tick_max(usize::max_value())
}
// 任务队列循环
fn tick_max(&self, max: usize) -> usize {
let mut n = 0;
while !self.is_empty() && n < max {
// 获取任务并执行
// 第一次获取到 `schedule` 中定义的任务,然后执行了 yield_now,将自身任务加入任务队列
// 第二次则再次获取到同一个任务,退出
let task = self.next_task();
n += 1;
task.run();
}
self.0.maintenance();
n
}
fn is_empty(&self) -> bool {
self.0.core.try_lock().unwrap().queue.is_empty()
}
fn next_task(&self) -> task::Notified<Runtime> {
self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
}
fn shutdown(&self) {
let mut core = self.0.core.try_lock().unwrap();
for task in core.tasks.iter() {
task.shutdown();
}
while let Some(task) = core.queue.pop_back() {
task.shutdown();
}
drop(core);
while !self.0.core.try_lock().unwrap().tasks.is_empty() {
self.0.maintenance();
}
}
}
impl Inner {
fn maintenance(&self) {
use std::mem::ManuallyDrop;
for task in self.released.drain() {
let task = ManuallyDrop::new(task);
// safety: see worker.rs
unsafe {
let ptr = task.header().into();
self.core.try_lock().unwrap().tasks.remove(ptr);
}
}
}
}
impl Schedule for Runtime {
fn bind(task: Task<Self>) -> Runtime {
let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
rt.0.core.try_lock().unwrap().tasks.push_front(task);
rt
}
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
// safety: copying worker.rs
let task = unsafe { Task::from_raw(task.header().into()) };
self.0.released.push(task);
None
}
fn schedule(&self, task: task::Notified<Self>) {
// 将 task 加入任务队列
self.0.core.try_lock().unwrap().queue.push_back(task);
}
}