Tokio 源码分析「三、运行时 Runtime」

2021.04.25
SF-Zhou

Tokio 的核心是一套 M:N 的协程 Runtime,下层通过 Rust 协程和 Mio 驱动,支撑上层的 HTTP / RPC 应用。本篇开始分析 Runtime,代码版本 v1.5.0

Tokio 架构图 from tokio.rs

1. 概览

在阅读核心代码前,先介绍一下异步编程中的核心关键字:

Asynchrony:异步指事件的发生与主程序流及处理此类事件的方式无关。这些事件可能是像信号这样的外部事件,或者是由程序引发的动作,会和程序的执行同时发生,而程序不会阻塞地等待结果。简而言之,事件发生在非调用方的线程中。同步与异步关注的是事件是否是在本线程中处理。

Non-blocking:非阻塞指执行的操作不会阻塞程序的继续执行。阻塞与非阻塞关注的是调用方等待结果时的状态。阻塞非阻塞和同步异步是正交的,即存在同步阻塞、同步非阻塞、异步阻塞、异步非阻塞。

Resumable Function:可恢复函数指可以暂停执行并从调用中返回,并且可以在将来从暂停的位置恢复执行的函数。可恢复函数是协程的基石。

Asynchronous Runtime:异步运行时,上述概念的整合,通过可恢复函数、挂起位置埋点和用户态调度实现非抢占式的用户态线程切换,称之为协程。一般将同步阻塞 IO 的位置作为默认的挂起位置。

Tokio 库的代码量十分巨大,tokio/src 目录下共计 256 个代码文件,行数 5w+。核心代码的文件结构为:

tokio/src
├── blocking.rs
├── coop.rs
├── fs
├── future
├── io
├── lib.rs
├── loom
├── macros
├── net
├── park
├── process
├── runtime
├── signal
├── sync
├── task
├── time
└── util

再来看官方提供的样例:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = [0; 1024];

            // In a loop, read data from the socket and write the data back.
            loop {
                let n = match socket.read(&mut buf).await {
                    // socket closed
                    Ok(n) if n == 0 => return,
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("failed to read from socket; err = {:?}", e);
                        return;
                    }
                };

                // Write the data back
                if let Err(e) = socket.write_all(&buf[0..n]).await {
                    eprintln!("failed to write to socket; err = {:?}", e);
                    return;
                }
            }
        });
    }
}

如果之前接触过协程和网络编程,肯定会赞叹上方 Echo Server 的高效和简洁。

2. tokio/runtime/task

Tokio 的 Runtime 提供以下能力:

  • An I/O event loop, called the driver, which drives I/O resources and dispatches I/O events to tasks that depend on them.
  • A scheduler to execute tasks that use these I/O resources.
  • A timer for scheduling work to run after a set period of time.

这一节来看任务 task 相关的抽象,代码路径为 tokio/src/runtime/task。先看任务状态 state.rs

// State 本身是原子无符号数
pub(super) struct State {
    val: AtomicUsize,
}

// Snapshot 是从 State 读取到的值
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize)

// 更新 State 时的 Result,成功返回更新后的 Snapshot
type UpdateResult = Result<Snapshot, Snapshot>;

// State 实际的编码,包括一个引用计数
const RUNNING: usize = 0b0001;       // 是否在运行
const COMPLETE: usize = 0b0010;      // 是否已完成
const LIFECYCLE_MASK: usize = 0b11;
const NOTIFIED: usize = 0b100;       // task 是否已经加入运行队列
const JOIN_INTEREST: usize = 0b1_000;
const JOIN_WAKER: usize = 0b10_000;
const CANCELLED: usize = 0b100_000;  // 任务被取消
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;  // 所有状态相关的位

const REF_COUNT_MASK: usize = !STATE_MASK;   // 引用计数使用的位
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;  // 引用计数中的 1

const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;  // 初始化状态,scheduler 和 `JoinHandle` 会引用它

impl State {
    pub(super) fn new() -> State {
        State {
            val: AtomicUsize::new(INITIAL_STATE),
        }
    }

    // 读取当前的状态,使用 Acquire Ordering
    pub(super) fn load(&self) -> Snapshot {
        Snapshot(self.val.load(Acquire))
    }

    // 使用 CAS 实现线程安全的状态转移,转移失败时返回当前的状态
    fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
    where
        F: FnMut(Snapshot) -> Option<Snapshot>,
    {
        let mut curr = self.load();

        loop {
            let next = match f(curr) {
                Some(next) => next,
                None => return Err(curr),
            };

            let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);

            match res {
                Ok(_) => return Ok(next),
                Err(actual) => curr = Snapshot(actual),
            }
        }
    }

    // 尝试转移到 running 状态
    pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
        self.fetch_update(|curr| {
            assert!(curr.is_notified());

            let mut next = curr;

            if !next.is_idle() {
                return None;
            }

            if ref_inc {
                next.ref_inc();
            }

            next.set_running();
            next.unset_notified();
            Some(next)
        })
    }

    // 增加引用计数
    pub(super) fn ref_inc(&self) {
        use std::process;
        use std::sync::atomic::Ordering::Relaxed;

        let prev = self.val.fetch_add(REF_ONE, Relaxed);

        // If the reference count overflowed, abort.
        if prev > isize::max_value() as usize {
            process::abort();
        }
    }
}

// Snapshot 提供状态读写的街口
impl Snapshot {
    pub(super) fn is_running(self) -> bool {
        self.0 & RUNNING == RUNNING
    }

    fn set_running(&mut self) {
        self.0 |= RUNNING;
    }
    ...
}

再来看 task 的核心数据结构 core.rs

// Task Cell,包含任务相关的元数据,注意这里的 #[repr(C)] 标识
// 其中 header 必须放在第一个字段,因为会有 Header 指针转 Cell 指针的操作
#[repr(C)]
pub(super) struct Cell<T: Future, S> {
    /// Hot task state data
    pub(super) header: Header,

    /// Either the future or output, depending on the execution stage.
    pub(super) core: Core<T, S>,

    /// Cold data
    pub(super) trailer: Trailer,
}

// Task 的头部,包含任务的状态 state,其他字段后续遇到的时候在看
#[repr(C)]
pub(crate) struct Header {
    /// Task state
    pub(super) state: State,

    pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,

    /// Pointer to next task, used with the injection queue
    pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,

    /// Pointer to the next task in the transfer stack
    pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,

    /// Table of function pointers for executing actions on the task.
    pub(super) vtable: &'static Vtable,
}

pub(super) struct Scheduler<S> {
    scheduler: UnsafeCell<Option<S>>,
}

pub(super) struct CoreStage<T: Future> {
    stage: UnsafeCell<Stage<T>>,
}

// Task 的核心部分
// scheduler 表示绑定的调度器
// stage 表示 future 或者输出的结果
pub(super) struct Core<T: Future, S> {
    /// Scheduler used to drive this future
    pub(super) scheduler: Scheduler<S>,

    /// Either the future or the output
    pub(super) stage: CoreStage<T>,
}
// stage 的具体定义,三种状态,Consumed 表示数据已经被消费掉不可再用
pub(super) enum Stage<T: Future> {
    Running(T),
    Finished(super::Result<T::Output>),
    Consumed,
}

// Task 的尾部,包含一个 waker 指针
pub(super) struct Trailer {
    /// Consumer task waiting on completion of this task.
    pub(super) waker: UnsafeCell<Option<Waker>>,
}

// 给定 future 和 state,构造一个 cell 对象
impl<T: Future, S: Schedule> Cell<T, S> {
    /// Allocates a new task cell, containing the header, trailer, and core
    /// structures.
    pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
        Box::new(Cell {
            header: Header {
                state,
                owned: UnsafeCell::new(linked_list::Pointers::new()),
                queue_next: UnsafeCell::new(None),
                stack_next: UnsafeCell::new(None),
                vtable: raw::vtable::<T, S>(),
            },
            core: Core {
                scheduler: Scheduler {
                    scheduler: UnsafeCell::new(None),
                },
                stage: CoreStage {
                    stage: UnsafeCell::new(Stage::Running(future)),
                },
            },
            trailer: Trailer {
                waker: UnsafeCell::new(None),
            },
        })
    }
}

core.rs 中还有 Cell 相关函数的实现,暂时跳过,来看下 raw.rs

// RawTask,本质是指向 Cell 的指针
pub(super) struct RawTask {
    ptr: NonNull<Header>,
}

// 手动构建的虚表,Header.vtable 中有调用
pub(super) struct Vtable {
    /// Poll the future
    pub(super) poll: unsafe fn(NonNull<Header>),

    /// Deallocate the memory
    pub(super) dealloc: unsafe fn(NonNull<Header>),

    /// Read the task output, if complete
    pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),

    /// The join handle has been dropped
    pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),

    /// Scheduler is being shutdown
    pub(super) shutdown: unsafe fn(NonNull<Header>),
}

// 返回一个静态的虚表
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
    &Vtable {
        poll: poll::<T, S>,
        dealloc: dealloc::<T, S>,
        try_read_output: try_read_output::<T, S>,
        drop_join_handle_slow: drop_join_handle_slow::<T, S>,
        shutdown: shutdown::<T, S>,
    }
}

// 虚表指向的函数,实际上会调用 Harness 中的实现
unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
    let harness = Harness::<T, S>::from_raw(ptr);
    harness.poll();
}

顺着这个思路,继续看 harness.rs

// Harness,是一个指向 Cell 对象的非空指针
pub(super) struct Harness<T: Future, S: 'static> {
    cell: NonNull<Cell<T, S>>,
}

// 从 Header 指针转为 Cell 指针,从而可以使用 header / trailer / core 对象
impl<T, S> Harness<T, S>
where
    T: Future,
    S: 'static,
{
    pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
        Harness {
            cell: ptr.cast::<Cell<T, S>>(),
        }
    }

    fn header(&self) -> &Header {
        unsafe { &self.cell.as_ref().header }
    }

    fn trailer(&self) -> &Trailer {
        unsafe { &self.cell.as_ref().trailer }
    }

    fn core(&self) -> &Core<T, S> {
        unsafe { &self.cell.as_ref().core }
    }
}

// poll 操作的结果
enum PollFuture<T> {
    Complete(Result<T, JoinError>, bool),
    DropReference,
    Notified,
    None,
}

// poll 的实现
impl<T, S> Harness<T, S>
where
    T: Future,
    S: Schedule,
{
    pub(super) fn poll(self) {
        match self.poll_inner() {
            PollFuture::Notified => {
                // Signal yield
                self.core().scheduler.yield_now(Notified(self.to_task()));
                // The ref-count was incremented as part of
                // `transition_to_idle`.
                self.drop_reference();
            }
            PollFuture::DropReference => {
                self.drop_reference();
            }
            PollFuture::Complete(out, is_join_interested) => {
                self.complete(out, is_join_interested);
            }
            PollFuture::None => (),
        }
    }

    fn poll_inner(&self) -> PollFuture<T::Output> {
        let snapshot = match self.scheduler_view().transition_to_running() {
            TransitionToRunning::Ok(snapshot) => snapshot,
            TransitionToRunning::DropReference => return PollFuture::DropReference,
        };

        // The transition to `Running` done above ensures that a lock on the
        // future has been obtained. This also ensures the `*mut T` pointer
        // contains the future (as opposed to the output) and is initialized.

        let waker_ref = waker_ref::<T, S>(self.header());
        let cx = Context::from_waker(&*waker_ref);
        poll_future(self.header(), &self.core().stage, snapshot, cx)
    }

    fn scheduler_view(&self) -> SchedulerView<'_, S> {
        SchedulerView {
            header: self.header(),
            scheduler: &self.core().scheduler,
        }
    }
}


enum TransitionToRunning {
    Ok(Snapshot),
    DropReference,
}

struct SchedulerView<'a, S> {
    header: &'a Header,
    scheduler: &'a Scheduler<S>,
}

impl<'a, S> SchedulerView<'a, S>
where
    S: Schedule,
{
    fn to_task(&self) -> Task<S> {
        // SAFETY The header is from the same struct containing the scheduler `S` so  the cast is safe
        unsafe { Task::from_raw(self.header.into()) }
    }

    /// Returns true if the task should be deallocated.
    fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
        let ref_dec = if self.scheduler.is_bound() {
            if let Some(task) = self.scheduler.release(self.to_task()) {
                mem::forget(task);
                true
            } else {
                false
            }
        } else {
            false
        };

        // This might deallocate
        let snapshot = self
            .header
            .state
            .transition_to_terminal(!is_join_interested, ref_dec);

        snapshot.ref_count() == 0
    }

    fn transition_to_running(&self) -> TransitionToRunning {
        // 首次执行时会绑定到调度器上
        let is_not_bound = !self.scheduler.is_bound();

        // Transition the task to the running state.
        //
        // A failure to transition here indicates the task has been cancelled
        // while in the run queue pending execution.
        let snapshot = match self.header.state.transition_to_running(is_not_bound) {
            Ok(snapshot) => snapshot,
            Err(_) => {
                // The task was shutdown while in the run queue. At this point,
                // we just hold a ref counted reference. Since we do not have access to it here
                // return `DropReference` so the caller drops it.
                return TransitionToRunning::DropReference;
            }
        };

        if is_not_bound {
            // Ensure the task is bound to a scheduler instance. Since this is
            // the first time polling the task, a scheduler instance is pulled
            // from the local context and assigned to the task.
            //
            // The scheduler maintains ownership of the task and responds to
            // `wake` calls.
            //
            // The task reference count has been incremented.
            //
            // Safety: Since we have unique access to the task so that we can
            // safely call `bind_scheduler`.
            self.scheduler.bind_scheduler(self.to_task());
        }
        TransitionToRunning::Ok(snapshot)
    }
}

fn poll_future<T: Future>(
    header: &Header,
    core: &CoreStage<T>,
    snapshot: Snapshot,
    cx: Context<'_>,
) -> PollFuture<T::Output> {
    if snapshot.is_cancelled() {
        PollFuture::Complete(Err(JoinError::cancelled()), snapshot.is_join_interested())
    } else {
        let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
            struct Guard<'a, T: Future> {
                core: &'a CoreStage<T>,
            }

            impl<T: Future> Drop for Guard<'_, T> {
                fn drop(&mut self) {
                    self.core.drop_future_or_output();
                }
            }

            let guard = Guard { core };

            let res = guard.core.poll(cx);

            // prevent the guard from dropping the future
            mem::forget(guard);

            res
        }));
        match res {
            Ok(Poll::Pending) => match header.state.transition_to_idle() {
                // 需要等待,先切换到 idle 状态
                Ok(snapshot) => {
                    if snapshot.is_notified() {
                        PollFuture::Notified
                    } else {
                        PollFuture::None
                    }
                }
                Err(_) => PollFuture::Complete(Err(cancel_task(core)), true),
            },
            Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()),
            Err(err) => {
                PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested())
            }
        }
    }
}

impl<S: Schedule> Scheduler<S> {
    /// 绑定调度器
    pub(super) fn bind_scheduler(&self, task: Task<S>) {
        debug_assert!(!self.is_bound());

        // Bind the task to the scheduler
        let scheduler = S::bind(task);
        self.scheduler.with_mut(|ptr| unsafe {
            *ptr = Some(scheduler);
        });
    }

    pub(super) fn is_bound(&self) -> bool {
        self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
    }

    pub(super) fn yield_now(&self, task: Notified<S>) {
        self.scheduler.with(|ptr| {
            // Safety: Can only be called after initial `poll`, which is the
            // only time the field is mutated.
            match unsafe { &*ptr } {
                Some(scheduler) => scheduler.yield_now(task),
                None => panic!("no scheduler set"),
            }
        });
    }
}

impl<T: Future> CoreStage<T> {
    /// Poll the future
    pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
        let res = {
            self.stage.with_mut(|ptr| {
                // 由调用方确保线程安全
                let future = match unsafe { &mut *ptr } {
                    Stage::Running(future) => future,
                    _ => unreachable!("unexpected stage"),
                };

                // Safety: The caller ensures the future is pinned.
                let future = unsafe { Pin::new_unchecked(future) };

              	// Rust 提供的 poll 接口
                future.poll(&mut cx)
            })
        };

        if res.is_ready() {
            self.drop_future_or_output();
        }

        res
    }

    /// Drop the future
    ///
    /// # Safety
    ///
    /// The caller must ensure it is safe to mutate the `stage` field.
    pub(super) fn drop_future_or_output(&self) {
        // Safety: the caller ensures mutual exclusion to the field.
        unsafe {
            self.set_stage(Stage::Consumed);
        }
    }
}

最后看下 mod.rs

// Task 的定义,实际上是 RawTask 的封装。
// PhantomData 用于绑定没有使用到的 S 类型。
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
    raw: RawTask,
    _p: PhantomData<S>,
}

pub(crate) type Result<T> = std::result::Result<T, JoinError>;

// 调度 trait
pub(crate) trait Schedule: Sync + Sized + 'static {
    // 绑定一个 task 到 executor
    fn bind(task: Task<Self>) -> Self;
    fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
    fn schedule(&self, task: Notified<Self>);

		// yield 操作,触发协程切换
    fn yield_now(&self, task: Notified<Self>) {
        self.schedule(task);
    }
}

// 构造 task 和对应的 JoinHandle
cfg_rt! {
    /// Create a new task with an associated join handle
    pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
    where
        T: Future + Send + 'static,
        S: Schedule,
    {
        let raw = RawTask::new::<_, S>(task);

        let task = Task {
            raw,
            _p: PhantomData,
        };

        let join = JoinHandle::new(raw);

        (Notified(task), join)
    }
}

// Task<Schedule> 的构造
impl<S: 'static> Task<S> {
    pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
        Task {
            raw: RawTask::from_raw(ptr),
            _p: PhantomData,
        }
    }

    pub(crate) fn header(&self) -> &Header {
        self.raw.header()
    }
}

// Notified<Schedule>,实际上还是 Task<Schedule>
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>)
unsafe impl<S: Schedule> Send for Notified<S> {}
unsafe impl<S: Schedule> Sync for Notified<S> {}

// Notified<Schedule> 的构造
cfg_rt_multi_thread! {
    impl<S: 'static> Notified<S> {
        pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
            Notified(Task::from_raw(ptr))
        }

        pub(crate) fn header(&self) -> &Header {
            self.0.header()
        }
    }
}

// Notified<Task>.run(),本质上是执行 poll
impl<S: Schedule> Notified<S> {
    /// Run the task
    pub(crate) fn run(self) {
        self.0.raw.poll();
        mem::forget(self);
    }

    /// Pre-emptively cancel the task as part of the shutdown process.
    pub(crate) fn shutdown(self) {
        self.0.shutdown();
    }
}

配合单元测试 task.rs 整体看下:

use crate::runtime::task::{self, Schedule, Task};
use crate::util::linked_list::{Link, LinkedList};
use crate::util::TryLock;

use std::collections::VecDeque;
use std::sync::Arc;

#[test]
fn schedule() {
    with(|rt| {
        // 构造一个 task,注意这里依赖下面的 `rt.schedule` 完成类型推导
        let (task, _) = task::joinable(async {
            // 执行一次 yield_now,该函数定义于 tokio/src/task/yield_now.rs
            crate::task::yield_now().await;
        });

        // 使用 rt 调度 task
        rt.schedule(task);

        // 加入任务队列后,队列中有且仅有该任务,弹出并准备执行,此时计数为 1。
        // 执行 crate::task::yield_now().await 后,
        assert_eq!(2, rt.tick());
    })
}

// yield_now 的实现
cfg_rt! {
    /// Yields execution back to the Tokio runtime.
    ///
    /// A task yields by awaiting on `yield_now()`, and may resume when that
    /// future completes (with no output.) The current task will be re-added as
    /// a pending task at the _back_ of the pending queue. Any other pending
    /// tasks will be scheduled. No other waking is required for the task to
    /// continue.
    ///
    /// See also the usage example in the [task module](index.html#yield_now).
    #[must_use = "yield_now does nothing unless polled/`await`-ed"]
    pub async fn yield_now() {
        /// Yield implementation
        struct YieldNow {
            yielded: bool,
        }

        impl Future for YieldNow {
            type Output = ();

            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
                if self.yielded {
                    // 第二次执行,直接返回 Ready
                    return Poll::Ready(());
                }

                // 第一次执行时,将 yielded 置为 true,并将当前 task 再次加入任务队列
                self.yielded = true;
                cx.waker().wake_by_ref();
                Poll::Pending
            }
        }

        YieldNow { yielded: false }.await
    }
}

fn with(f: impl FnOnce(Runtime)) {
    struct Reset;

    impl Drop for Reset {
        fn drop(&mut self) {
            // 退出时将 CURRENT 绑定的引用计数释放
            let _rt = CURRENT.try_lock().unwrap().take();
        }
    }

    // RAII
    let _reset = Reset;

    // 构造 RunTime
    let rt = Runtime(Arc::new(Inner {
        released: task::TransferStack::new(),
        core: TryLock::new(Core {
            queue: VecDeque::new(),
            tasks: LinkedList::new(),
        }),
    }));

    // 使用 CURRENT 绑定当前使用的 runtime 实例的一个引用计数
    *CURRENT.try_lock().unwrap() = Some(rt.clone());
    f(rt)
}

#[derive(Clone)]
struct Runtime(Arc<Inner>);

struct Inner {
    released: task::TransferStack<Runtime>,
    core: TryLock<Core>,
}

struct Core {
    queue: VecDeque<task::Notified<Runtime>>,
    tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>,
}

static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);

impl Runtime {
    fn tick(&self) -> usize {
        self.tick_max(usize::max_value())
    }

    // 任务队列循环
    fn tick_max(&self, max: usize) -> usize {
        let mut n = 0;

        while !self.is_empty() && n < max {
            // 获取任务并执行
            // 第一次获取到 `schedule` 中定义的任务,然后执行了 yield_now,将自身任务加入任务队列
            // 第二次则再次获取到同一个任务,退出
            let task = self.next_task();
            n += 1;
            task.run();
        }

        self.0.maintenance();

        n
    }

    fn is_empty(&self) -> bool {
        self.0.core.try_lock().unwrap().queue.is_empty()
    }

    fn next_task(&self) -> task::Notified<Runtime> {
        self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
    }

    fn shutdown(&self) {
        let mut core = self.0.core.try_lock().unwrap();

        for task in core.tasks.iter() {
            task.shutdown();
        }

        while let Some(task) = core.queue.pop_back() {
            task.shutdown();
        }

        drop(core);

        while !self.0.core.try_lock().unwrap().tasks.is_empty() {
            self.0.maintenance();
        }
    }
}

impl Inner {
    fn maintenance(&self) {
        use std::mem::ManuallyDrop;

        for task in self.released.drain() {
            let task = ManuallyDrop::new(task);

            // safety: see worker.rs
            unsafe {
                let ptr = task.header().into();
                self.core.try_lock().unwrap().tasks.remove(ptr);
            }
        }
    }
}

impl Schedule for Runtime {
    fn bind(task: Task<Self>) -> Runtime {
        let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
        rt.0.core.try_lock().unwrap().tasks.push_front(task);
        rt
    }

    fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
        // safety: copying worker.rs
        let task = unsafe { Task::from_raw(task.header().into()) };
        self.0.released.push(task);
        None
    }

    fn schedule(&self, task: task::Notified<Self>) {
        // 将 task 加入任务队列
        self.0.core.try_lock().unwrap().queue.push_back(task);
    }
}

References

  1. "Announcing Tokio 1.0", tokio.rs
  2. "Asynchrony (computer programming)", Wikipedia
  3. "Asynchronous I/O", Wikipedia
  4. "async is not nonblocking", Wikipedia