Tokio 源码分析「二、字节流 Bytes」

2020.11.12
SF-Zhou

网络应用的核心是处理字节流,本篇关注 Tokio 处理字节流的基础库 bytes,阅读的代码版本为 v0.6.0

Tokio 架构图 from tokio.rs

1. 概览

Bytes 封装了对字节流的常用操作,核心特性是使用引用计数实现内存安全的 string_viewsrc 目录下的文件结构为:

src
├── buf                  # buffer 实现
│   ├── buf_impl.rs
│   ├── buf_mut.rs
│   ├── chain.rs
│   ├── iter.rs
│   ├── limit.rs
│   ├── mod.rs
│   ├── reader.rs
│   ├── take.rs
│   ├── uninit_slice.rs
│   ├── vec_deque.rs
│   └── writer.rs
├── bytes.rs             # bytes 实现
├── bytes_mut.rs         # mutable bytes
├── fmt                  # 输出格式
│   ├── debug.rs
│   ├── hex.rs
│   └── mod.rs
├── lib.rs
├── loom.rs              # 引用计数
└── serde.rs             # serde 序列化支持

来看一个单元测试:

#[test]
fn slice() {
    let a = Bytes::from(&b"hello world"[..]);

    let b = a.slice(3..5);
    assert_eq!(b, b"lo"[..]);

    let b = a.slice(0..0);
    assert_eq!(b, b""[..]);

    let b = a.slice(3..3);
    assert_eq!(b, b""[..]);

    let b = a.slice(a.len()..a.len());
    assert_eq!(b, b""[..]);

    let b = a.slice(..5);
    assert_eq!(b, b"hello"[..]);

    let b = a.slice(3..);
    assert_eq!(b, b"lo world"[..]);
}

2. Buf

Buf 类似 LevelDB 中的 Slice,其 trait 定义在src/buf/buf_impl.rs 中,依赖 reader.rs / take.rs / chain.rs,依次看依赖的文件:

// reader.rs

/// A `Buf` adapter which implements `io::Read` for the inner value.
#[derive(Debug)]
pub struct Reader<B> {
    buf: B,
}

pub fn new<B>(buf: B) -> Reader<B> {
    Reader { buf }
}

impl<B: Buf + Sized> io::Read for Reader<B> {
    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
        let len = cmp::min(self.buf.remaining(), dst.len());

        Buf::copy_to_slice(&mut self.buf, &mut dst[0..len]);
        Ok(len)
    }
}

// releated functions in buf_impl.rs
pub trait Buf {
    #[cfg(feature = "std")]
    fn reader(self) -> Reader<Self>
    where
        Self: Sized,
    {
        reader::new(self)
    }

    fn copy_to_slice(&mut self, dst: &mut [u8]) {
        let mut off = 0;

        assert!(self.remaining() >= dst.len());

        while off < dst.len() {
            let cnt;

            unsafe {
                let src = self.bytes();
                cnt = cmp::min(src.len(), dst.len() - off);

                ptr::copy_nonoverlapping(src.as_ptr(), dst[off..].as_mut_ptr(), cnt);

                off += cnt;
            }

            self.advance(cnt);
        }
    }
}

Reader 内部包含一个 buf: B 对象,执行 io::Read::read 时可以直接复制 buf 的内存到 dstBuf::copy_to_slice 的过程中会将指针移动到复制结束的位置。Buf::copy_to_slice 可以换成 self.buf.copy_to_slice。复制过程中调用的 ptr::copy_nonoverlappingunsafe 的,语义上和 memcpy 等价,要求 srcdst 指向的两段内存不存在重叠。

// take.rs

/// A `Buf` adapter which limits the bytes read from an underlying buffer.
#[derive(Debug)]
pub struct Take<T> {
    inner: T,
    limit: usize,
}

pub fn new<T>(inner: T, limit: usize) -> Take<T> {
    Take { inner, limit }
}

impl<T: Buf> Buf for Take<T> {
    fn remaining(&self) -> usize {
        cmp::min(self.inner.remaining(), self.limit)
    }

    fn bytes(&self) -> &[u8] {
        let bytes = self.inner.bytes();
        &bytes[..cmp::min(bytes.len(), self.limit)]
    }

    fn advance(&mut self, cnt: usize) {
        assert!(cnt <= self.limit);
        self.inner.advance(cnt);
        self.limit -= cnt;
    }
}

// releated functions in buf_impl.rs
pub trait Buf {
    /// Creates an adaptor which will read at most `limit` bytes from `self`.
    fn take(self, limit: usize) -> Take<Self>
    where
        Self: Sized,
    {
        take::new(self, limit)
    }
}

/// # Examples
///
/// ```
/// use bytes::{Buf, BufMut};
///
/// let mut buf = b"hello world"[..].take(5);
/// let mut dst = vec![];
///
/// dst.put(&mut buf);
/// assert_eq!(dst, b"hello");
///
/// let mut buf = buf.into_inner();
/// dst.clear();
/// dst.put(&mut buf);
/// assert_eq!(dst, b" world");
/// ```

Take 内部包含一个 inner: T 对象以及限制长度的 limit。比较有意思的是 Take<Buf> 对象也是一种 Buf,也就是说你可以递归的调用 buf.take(limit)

// chain.rs

/// A `Chain` sequences two buffers.
#[derive(Debug)]
pub struct Chain<T, U> {
    a: T,
    b: U,
}

impl<T, U> Buf for Chain<T, U>
where
    T: Buf,
    U: Buf,
{
    fn remaining(&self) -> usize {
        self.a.remaining() + self.b.remaining()
    }

    fn bytes(&self) -> &[u8] {
        if self.a.has_remaining() {
            self.a.bytes()
        } else {
            self.b.bytes()
        }
    }

    fn advance(&mut self, mut cnt: usize) {
        let a_rem = self.a.remaining();

        if a_rem != 0 {
            if a_rem >= cnt {
                self.a.advance(cnt);
                return;
            }

            // Consume what is left of a
            self.a.advance(a_rem);

            cnt -= a_rem;
        }

        self.b.advance(cnt);
    }

    #[cfg(feature = "std")]
    fn bytes_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
        let mut n = self.a.bytes_vectored(dst);
        n += self.b.bytes_vectored(&mut dst[n..]);
        n
    }
}

Chain 对象可以链接两个 Buf 对象,并提供两个缓冲区之间的连续视图。换句话说,可以无限的链接 Buf 对象,比如 a.chain(b).chain(c),最终得到的类型是 Chain<Chain<Buf, Buf>, Buf>

看完了依赖项,可以继续看 buf_impl.rsBuf trait 中的“纯虚函数”只有下面三个:

pub trait Buf {
    /// Returns the number of bytes between the current position and the end of
    /// the buffer.
    fn remaining(&self) -> usize;

    /// Returns a slice starting at the current position and of length between 0
    /// and `Buf::remaining()`. Note that this *can* return shorter slice (this allows
    /// non-continuous internal representation)
    fn bytes(&self) -> &[u8];

    /// Fills `dst` with potentially multiple slices starting at `self`'s
    /// current position.
    fn advance(&mut self, cnt: usize);
}

实现这三个纯虚函数就可以实现 Buf trait,比如:

impl Buf for &[u8] {
    #[inline]
    fn remaining(&self) -> usize {
        self.len()
    }

    #[inline]
    fn bytes(&self) -> &[u8] {
        self
    }

    #[inline]
    fn advance(&mut self, cnt: usize) {
        *self = &self[cnt..];
    }
}

通过这几个接口,可以实现 Buf 的迭代器:

// iter.rs

/// Iterator over the bytes contained by the buffer.
#[derive(Debug)]
pub struct IntoIter<T> {
    inner: T,
}

impl<T: Buf> Iterator for IntoIter<T> {
    type Item = u8;

    fn next(&mut self) -> Option<u8> {
        if !self.inner.has_remaining() {
            return None;
        }

        let b = self.inner.bytes()[0];
        self.inner.advance(1);

        Some(b)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let rem = self.inner.remaining();
        (rem, Some(rem))
    }
}

Buf 也有可写的版本 BufMut,实现上大同小异,不再赘述。

3. Bytes

Bytes,一种低开销的可复制、可切片的连续内存块。通过引用计数保证原始内存块的生命周期,通过 <ptr, len> 声明当前指向的位置和大小。一个 Bytes 对象的内存布局如下:

/// A cheaply cloneable and sliceable chunk of contiguous memory.
pub struct Bytes {
    ptr: *const u8,
    len: usize,
    // inlined "trait object"
    data: AtomicPtr<()>,
    vtable: &'static Vtable,
}

pub(crate) struct Vtable {
    /// fn(data, ptr, len)
    pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes,
    /// fn(data, ptr, len)
    pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}

Bytes 自然会实现 Buf trait:

impl Buf for Bytes {
    #[inline]
    fn remaining(&self) -> usize {
        self.len()
    }

    #[inline]
    fn bytes(&self) -> &[u8] {
        self.as_slice()
    }

    #[inline]
    fn advance(&mut self, cnt: usize) {
        assert!(
            cnt <= self.len(),
            "cannot advance past `remaining`: {:?} <= {:?}",
            cnt,
            self.len(),
        );

        unsafe {
            self.inc_start(cnt);
        }
    }

    fn copy_to_bytes(&mut self, len: usize) -> crate::Bytes {
        if len == self.remaining() {
            core::mem::replace(self, Bytes::new())
        } else {
            let ret = self.slice(..len);
            self.advance(len);
            ret
        }
    }
}

Bytes 的切片操作实现也很简单,通过 clone 获得一个新的对象,再修改该对象的指向和长度:

impl Bytes {
    pub fn slice(&self, range: impl RangeBounds<usize>) -> Bytes {
        use core::ops::Bound;

        let len = self.len();

        let begin = match range.start_bound() {
            Bound::Included(&n) => n,
            Bound::Excluded(&n) => n + 1,
            Bound::Unbounded => 0,
        };

        let end = match range.end_bound() {
            Bound::Included(&n) => n.checked_add(1).expect("out of range"),
            Bound::Excluded(&n) => n,
            Bound::Unbounded => len,
        };

        assert!(
            begin <= end,
            "range start must not be greater than end: {:?} <= {:?}",
            begin,
            end,
        );
        assert!(
            end <= len,
            "range end out of bounds: {:?} <= {:?}",
            end,
            len,
        );

        if end == begin {
            return Bytes::new();
        }

        let mut ret = self.clone();

        ret.len = end - begin;
        ret.ptr = unsafe { ret.ptr.offset(begin as isize) };

        ret
    }
}

为了保证低开销,clone 操作有不同的实现方式。对于编译器确定的静态内存块,直接复制 ptrlen 就可以,不需要额外管理析构:

// ===== impl StaticVtable =====
const STATIC_VTABLE: Vtable = Vtable {
    clone: static_clone,
    drop: static_drop,
};

unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let slice = slice::from_raw_parts(ptr, len);
    Bytes::from_static(slice)
}

unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
    // nothing to drop for &'static [u8]
}

impl Bytes {
    pub fn from_static(bytes: &'static [u8]) -> Bytes {
        Bytes {
            ptr: bytes.as_ptr(),
            len: bytes.len(),
            data: AtomicPtr::new(ptr::null_mut()),
            vtable: &STATIC_VTABLE,
        }
    }
}

对于动态申请的内存块,则需要实现引用计数:

struct Shared {
    // holds vec for drop, but otherwise doesnt access it
    _vec: Vec<u8>,
    ref_cnt: AtomicUsize,
}

const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2.

static SHARED_VTABLE: Vtable = Vtable {
    clone: shared_clone,
    drop: shared_drop,
};

unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let shared = data.load(Ordering::Relaxed);
    shallow_clone_arc(shared as _, ptr, len)
}

unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
    data.with_mut(|shared| {
        release_shared(*shared as *mut Shared);
    });
}

unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes {
    let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed);

    if old_size > usize::MAX >> 1 {
        crate::abort();
    }

    Bytes {
        ptr,
        len,
        data: AtomicPtr::new(shared as _),
        vtable: &SHARED_VTABLE,
    }
}

unsafe fn release_shared(ptr: *mut Shared) {
    // `Shared` storage... follow the drop steps from Arc.
    if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 {
        return;
    }

    atomic::fence(Ordering::Acquire);
    Box::from_raw(ptr);
}

这里实现地比较复杂的是从 Vec<u8> 转到 Bytes。从 Vec<u8> 转到 Bytes 时,会将原子指针 data 指向该 Vec<u8> 的堆内存,并且将最低位设为 1 用以区分指向 Shared 对象的原子指针。当发生 clone 时,将会使用 data 指向的堆内存构建 Shared 对象,并将原子指针 data 使用 CAS 指向新的 Shared 对象,以保证该提升操作只会发生一次。可以学习下这里 from_raw / into_raw / mem::forget 等关于内存的去糖操作。

impl From<Vec<u8>> for Bytes {
    fn from(vec: Vec<u8>) -> Bytes {
        // into_boxed_slice doesn't return a heap allocation for empty vectors,
        // so the pointer isn't aligned enough for the KIND_VEC stashing to
        // work.
        if vec.is_empty() {
            return Bytes::new();
        }

        let slice = vec.into_boxed_slice();
        let len = slice.len();
        let ptr = slice.as_ptr();
        drop(Box::into_raw(slice));  // drop box with remaining memory

        if ptr as usize & 0x1 == 0 {
            let data = ptr as usize | KIND_VEC;
            Bytes {
                ptr,
                len,
                data: AtomicPtr::new(data as *mut _),
                vtable: &PROMOTABLE_EVEN_VTABLE,
            }
        } else {
            Bytes {
                ptr,
                len,
                data: AtomicPtr::new(ptr as *mut _),
                vtable: &PROMOTABLE_ODD_VTABLE,
            }
        }
    }
}

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
    clone: promotable_even_clone,
    drop: promotable_even_drop,
};

unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
    let shared = data.load(Ordering::Acquire);
    let kind = shared as usize & KIND_MASK;

    if kind == KIND_ARC {
        shallow_clone_arc(shared as _, ptr, len)
    } else {
        debug_assert_eq!(kind, KIND_VEC);
        let buf = (shared as usize & !KIND_MASK) as *mut u8;
        shallow_clone_vec(data, shared, buf, ptr, len)
    }
}

unsafe fn rebuild_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) -> Box<[u8]> {
    let cap = (offset as usize - buf as usize) + len;
    Box::from_raw(slice::from_raw_parts_mut(buf, cap))
}

#[cold]
unsafe fn shallow_clone_vec(
    atom: &AtomicPtr<()>,
    ptr: *const (),
    buf: *mut u8,
    offset: *const u8,
    len: usize,
) -> Bytes {
    // If  the buffer is still tracked in a `Vec<u8>`. It is time to
    // promote the vec to an `Arc`. This could potentially be called
    // concurrently, so some care must be taken.

    // First, allocate a new `Shared` instance containing the
    // `Vec` fields. It's important to note that `ptr`, `len`,
    // and `cap` cannot be mutated without having `&mut self`.
    // This means that these fields will not be concurrently
    // updated and since the buffer hasn't been promoted to an
    // `Arc`, those three fields still are the components of the
    // vector.
    let vec = rebuild_boxed_slice(buf, offset, len).into_vec();
    let shared = Box::new(Shared {
        _vec: vec,
        // Initialize refcount to 2. One for this reference, and one
        // for the new clone that will be returned from
        // `shallow_clone`.
        ref_cnt: AtomicUsize::new(2),
    });

    let shared = Box::into_raw(shared);

    // The pointer should be aligned, so this assert should
    // always succeed.
    debug_assert!(
        0 == (shared as usize & KIND_MASK),
        "internal: Box<Shared> should have an aligned pointer",
    );

    // Try compare & swapping the pointer into the `arc` field.
    // `Release` is used synchronize with other threads that
    // will load the `arc` field.
    //
    // If the `compare_and_swap` fails, then the thread lost the
    // race to promote the buffer to shared. The `Acquire`
    // ordering will synchronize with the `compare_and_swap`
    // that happened in the other thread and the `Shared`
    // pointed to by `actual` will be visible.
    let actual = atom.compare_and_swap(ptr as _, shared as _, Ordering::AcqRel);

    if actual as usize == ptr as usize {
        // The upgrade was successful, the new handle can be
        // returned.
        return Bytes {
            ptr: offset,
            len,
            data: AtomicPtr::new(shared as _),
            vtable: &SHARED_VTABLE,
        };
    }

    // The upgrade failed, a concurrent clone happened. Release
    // the allocation that was made in this thread, it will not
    // be needed.
    let shared = Box::from_raw(shared);
    mem::forget(*shared);

    // Buffer already promoted to shared storage, so increment ref
    // count.
    shallow_clone_arc(actual as _, offset, len)
}