Rust Future & Pin

2024.10.17
SF-Zhou

Rust 异步编程中,Future 和 Pin 及其相关的概念是比较让人困惑的。本文试图阐述为什么需要这些设计。

1. 为什么需要 Pin?

因为存在自引用数据结构和侵入式数据结构。异步函数返回的 Future 是典型的自引用数据结构。为了保证在 Future 执行过程中自引用的指针的合法性,最简单的办法就是在它的生命周期内将它的地址死死地固定在一个确定的位置上。

2. Pin 是如何实现的?

本质上就是限制 T 类型的对象使其无法移动。以 Pin<Box<T>> 为例,只要它不暴露 &mut T 以及 Box<T> 出来,那么该对象就不可能被移动。限制 &mut T 是因为 std::mem::swap 等函数可以通过 &mut T 移动对象的位置。Pin 如禁字法一般,不杀敌,只克己。

3. 什么是 Future?

Future 表示一个可能尚未完成计算的值。Rust 中 Future 的定义如下:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Future 是异步编程的核心。所有的异步函数的返回值都是一个实现 Future trait 的对象,通过运行时不断地执行 poll 来推进 Future 的计算。除了异步函数的返回值外,用户也可以为自定义的数据结构实现 Future trait。

4. 为什么需要 Unpin?

如上一节所述,用户可以为自己的数据结构实现 Future trait,而用户自己的数据结构绝大部分是不存在自引用数据结构的,如果全部按照 Pin 的要求无法返回 &mut T,那么它将会非常不易使用。而 Future trait 肯定是固定的,传入的参数始终是 Pin<&mut T>。所以 Pin 的设计保留了方便之门,它仅保证在 T: !Unpin 时将对象固定住,而 T: Unpin 时可以返回 &mut T 方便修改和使用。

可能的类似问题:为什么我的 Pin<P<T>> 没有将对象 Pin 住?

5. 为什么需要 pin!

有些时候我们希望直接将栈上的变量 Pin 住,而不希望将其移动到堆上再操作。对于 !Unpin 的类型,如果希望对栈上的对象直接构造一个 Pin 指针就得保证该指针 drop 时栈上的对象也同时完成 drop。所以官方提供了这个宏,帮助创建 Pin 指针的同时,将原对象的生命周期绑定到这个 Pin 指针上:

#[allow_internal_unstable(unsafe_pin_internals)]
pub macro pin($value:expr $(,)?) {
  $crate::pin::Pin::<&mut _> { __pointer: &mut { $value } }
}

这里有一个小技巧,{ $value } 会吃掉传入的参数,将其变成一个临时变量;但同时构造 Pin 的时候传入了临时变量的引用,延长了它的生命周期,这就保证了该对象拥有和 Pin 指针一致的生命周期。为了实现这样的小技巧,PR 的作者将 Pin 内的 __pointer 设为 pub 属性,并且增加了 unstable feature 约束用户无法直接访问该字段。有兴趣可以看看这个修改 rust-lang/rust#93176

6. 为什么异步函数返回的 Future 存在自引用数据结构?

可以参考如下代码,我自定义了一个 Breakpoint 结构,它引用了异步函数“栈”上的一个变量,在执行 await 操作后,此时的 future 对象就是一个自引用的状态(point.t 引用 x)。代码如下(在线执行):

use std::{
    future::Future,
    pin::{pin, Pin},
    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};

pub struct Breakpoint<'a, T> {
    t: &'a T,
    ready: bool,
}

impl<'a, T> Breakpoint<'a, T> {
    pub fn new(t: &'a T) -> Self {
        println!("construct breakpoint, t: {:p}", t as *const _);
        Self { t, ready: false }
    }
}

impl<'a, T> Future for Breakpoint<'a, T> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        match this.ready {
            true => {
                println!(
                    "breakpoint@{:p} ready, t: {:p}",
                    this as *const _, this.t as *const _
                );
                Poll::Ready(())
            }
            false => {
                println!(
                    "breakpoint@{:p} not ready, t: {:p}",
                    this as *const _, this.t as *const _
                );
                this.ready = true;
                Poll::Pending
            }
        }
    }
}

async fn run() {
    let x = 0u32;
    println!("x: {:p}", &x as *const _);
    let point = Breakpoint::new(&x);
    point.await;
    println!("x: {:p}", &x as *const _);
}

fn dummy_raw_waker() -> RawWaker {
    unsafe fn no_op(_data: *const ()) {}
    unsafe fn clone(_data: *const ()) -> RawWaker {
        dummy_raw_waker()
    }
    let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
    RawWaker::new(std::ptr::null(), vtable)
}

fn main() {
    let waker = unsafe { Waker::from_raw(dummy_raw_waker()) };
    let mut cx = Context::from_waker(&waker);

    let mut f = pin!(run());
    println!("start!");
    while let Poll::Pending = Pin::new(&mut f).poll(&mut cx) {
        println!("pending, try again");
    }
    println!("finish!");
}

/*
start!
x: 0x7ffe8527be38
construct breakpoint, t: 0x7ffe8527be38
breakpoint@0x7ffe8527be28 not ready, t: 0x7ffe8527be38
pending, try again
breakpoint@0x7ffe8527be28 ready, t: 0x7ffe8527be38
x: 0x7ffe8527be38
finish!
*/

7. 为什么 Pin::new_unchecked 是 unsafe 的?

如果我不遵守 Pin 的规则,那么该自引用的状态可以被破坏(在线执行):

fn main() {
    let waker = unsafe { Waker::from_raw(dummy_raw_waker()) };
    let mut cx = Context::from_waker(&waker);

    let mut f = run();
    println!("sizeof future: {}", std::mem::size_of_val(&f));
    println!("start!");

    {
        let p = unsafe { Pin::new_unchecked(&mut f) };
        if p.poll(&mut cx) == Poll::Pending {
            println!("pending, try again");
        }
    }

    {
        let mut p = Box::pin(f); // move from stack to heap.
        if p.as_mut().poll(&mut cx) == Poll::Pending {
            println!("pending, try again");
        }
    }

    println!("finish!");
}

/*
sizeof future: 24
start!
x: 0x7ffe1590ed80
construct breakpoint, t: 0x7ffe1590ed80
breakpoint@0x7ffe1590ed70 not ready, t: 0x7ffe1590ed80
pending, try again
breakpoint@0x61c1df7a3b80 ready, t: 0x7ffe1590ed80
x: 0x61c1df7a3b90
finish!
*/

从输出中,我们可以得出以下结论:

  1. 异步函数中获取引用/地址的操作是实时计算的
  2. 注意 xt 地址的变化,异常的移动会破坏自引用结构。这间接要求了构造 Pin 时需要严格限制
  3. future 结构在首次执行 poll 前,是不会构造自引用数据的,此时它是可以合法移动的

总结

可以查看参考中列出的 Issue 和文章,Pin 和 Future 的设计是经过社区讨论不断演进的,理解了它的底层需求会发现这些设计还是自然、合理的。

References

  1. Rust std::pin
  2. rust-lang/rust#2349 Standard library API for immovable types
  3. Rust RFC 2394 async_await
  4. Pin, withoutboats
  5. rust-lang/rust#93176 Add a stack-pin!-ning macro to core::pin