网络应用的核心是处理字节流,本篇关注 Tokio 处理字节流的基础库 bytes,阅读的代码版本为 v0.6.0。
Bytes 封装了对字节流的常用操作,核心特性是使用引用计数实现内存安全的 string_view
。src
目录下的文件结构为:
src
├── buf # buffer 实现
│ ├── buf_impl.rs
│ ├── buf_mut.rs
│ ├── chain.rs
│ ├── iter.rs
│ ├── limit.rs
│ ├── mod.rs
│ ├── reader.rs
│ ├── take.rs
│ ├── uninit_slice.rs
│ ├── vec_deque.rs
│ └── writer.rs
├── bytes.rs # bytes 实现
├── bytes_mut.rs # mutable bytes
├── fmt # 输出格式
│ ├── debug.rs
│ ├── hex.rs
│ └── mod.rs
├── lib.rs
├── loom.rs # 引用计数
└── serde.rs # serde 序列化支持
来看一个单元测试:
#[test]
fn slice() {
let a = Bytes::from(&b"hello world"[..]);
let b = a.slice(3..5);
assert_eq!(b, b"lo"[..]);
let b = a.slice(0..0);
assert_eq!(b, b""[..]);
let b = a.slice(3..3);
assert_eq!(b, b""[..]);
let b = a.slice(a.len()..a.len());
assert_eq!(b, b""[..]);
let b = a.slice(..5);
assert_eq!(b, b"hello"[..]);
let b = a.slice(3..);
assert_eq!(b, b"lo world"[..]);
}
Buf
类似 LevelDB 中的 Slice
,其 trait 定义在src/buf/buf_impl.rs
中,依赖 reader.rs
/ take.rs
/ chain.rs
,依次看依赖的文件:
// reader.rs
/// A `Buf` adapter which implements `io::Read` for the inner value.
#[derive(Debug)]
pub struct Reader<B> {
buf: B,
}
pub fn new<B>(buf: B) -> Reader<B> {
Reader { buf }
}
impl<B: Buf + Sized> io::Read for Reader<B> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(self.buf.remaining(), dst.len());
Buf::copy_to_slice(&mut self.buf, &mut dst[0..len]);
Ok(len)
}
}
// releated functions in buf_impl.rs
pub trait Buf {
#[cfg(feature = "std")]
fn reader(self) -> Reader<Self>
where
Self: Sized,
{
reader::new(self)
}
fn copy_to_slice(&mut self, dst: &mut [u8]) {
let mut off = 0;
assert!(self.remaining() >= dst.len());
while off < dst.len() {
let cnt;
unsafe {
let src = self.bytes();
cnt = cmp::min(src.len(), dst.len() - off);
ptr::copy_nonoverlapping(src.as_ptr(), dst[off..].as_mut_ptr(), cnt);
off += cnt;
}
self.advance(cnt);
}
}
}
Reader
内部包含一个 buf: B
对象,执行 io::Read::read
时可以直接复制 buf
的内存到 dst
。Buf::copy_to_slice
的过程中会将指针移动到复制结束的位置。Buf::copy_to_slice
可以换成 self.buf.copy_to_slice
。复制过程中调用的 ptr::copy_nonoverlapping
是 unsafe
的,语义上和 memcpy
等价,要求 src
和 dst
指向的两段内存不存在重叠。
// take.rs
/// A `Buf` adapter which limits the bytes read from an underlying buffer.
#[derive(Debug)]
pub struct Take<T> {
inner: T,
limit: usize,
}
pub fn new<T>(inner: T, limit: usize) -> Take<T> {
Take { inner, limit }
}
impl<T: Buf> Buf for Take<T> {
fn remaining(&self) -> usize {
cmp::min(self.inner.remaining(), self.limit)
}
fn bytes(&self) -> &[u8] {
let bytes = self.inner.bytes();
&bytes[..cmp::min(bytes.len(), self.limit)]
}
fn advance(&mut self, cnt: usize) {
assert!(cnt <= self.limit);
self.inner.advance(cnt);
self.limit -= cnt;
}
}
// releated functions in buf_impl.rs
pub trait Buf {
/// Creates an adaptor which will read at most `limit` bytes from `self`.
fn take(self, limit: usize) -> Take<Self>
where
Self: Sized,
{
take::new(self, limit)
}
}
/// # Examples
///
/// ```
/// use bytes::{Buf, BufMut};
///
/// let mut buf = b"hello world"[..].take(5);
/// let mut dst = vec![];
///
/// dst.put(&mut buf);
/// assert_eq!(dst, b"hello");
///
/// let mut buf = buf.into_inner();
/// dst.clear();
/// dst.put(&mut buf);
/// assert_eq!(dst, b" world");
/// ```
Take
内部包含一个 inner: T
对象以及限制长度的 limit
。比较有意思的是 Take<Buf>
对象也是一种 Buf
,也就是说你可以递归的调用 buf.take(limit)
。
// chain.rs
/// A `Chain` sequences two buffers.
#[derive(Debug)]
pub struct Chain<T, U> {
a: T,
b: U,
}
impl<T, U> Buf for Chain<T, U>
where
T: Buf,
U: Buf,
{
fn remaining(&self) -> usize {
self.a.remaining() + self.b.remaining()
}
fn bytes(&self) -> &[u8] {
if self.a.has_remaining() {
self.a.bytes()
} else {
self.b.bytes()
}
}
fn advance(&mut self, mut cnt: usize) {
let a_rem = self.a.remaining();
if a_rem != 0 {
if a_rem >= cnt {
self.a.advance(cnt);
return;
}
// Consume what is left of a
self.a.advance(a_rem);
cnt -= a_rem;
}
self.b.advance(cnt);
}
#[cfg(feature = "std")]
fn bytes_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
let mut n = self.a.bytes_vectored(dst);
n += self.b.bytes_vectored(&mut dst[n..]);
n
}
}
Chain
对象可以链接两个 Buf
对象,并提供两个缓冲区之间的连续视图。换句话说,可以无限的链接 Buf
对象,比如 a.chain(b).chain(c)
,最终得到的类型是 Chain<Chain<Buf, Buf>, Buf>
。
看完了依赖项,可以继续看 buf_impl.rs
。Buf
trait 中的“纯虚函数”只有下面三个:
pub trait Buf {
/// Returns the number of bytes between the current position and the end of
/// the buffer.
fn remaining(&self) -> usize;
/// Returns a slice starting at the current position and of length between 0
/// and `Buf::remaining()`. Note that this *can* return shorter slice (this allows
/// non-continuous internal representation)
fn bytes(&self) -> &[u8];
/// Fills `dst` with potentially multiple slices starting at `self`'s
/// current position.
fn advance(&mut self, cnt: usize);
}
实现这三个纯虚函数就可以实现 Buf
trait,比如:
impl Buf for &[u8] {
#[inline]
fn remaining(&self) -> usize {
self.len()
}
#[inline]
fn bytes(&self) -> &[u8] {
self
}
#[inline]
fn advance(&mut self, cnt: usize) {
*self = &self[cnt..];
}
}
通过这几个接口,可以实现 Buf
的迭代器:
// iter.rs
/// Iterator over the bytes contained by the buffer.
#[derive(Debug)]
pub struct IntoIter<T> {
inner: T,
}
impl<T: Buf> Iterator for IntoIter<T> {
type Item = u8;
fn next(&mut self) -> Option<u8> {
if !self.inner.has_remaining() {
return None;
}
let b = self.inner.bytes()[0];
self.inner.advance(1);
Some(b)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let rem = self.inner.remaining();
(rem, Some(rem))
}
}
Buf
也有可写的版本 BufMut
,实现上大同小异,不再赘述。
Bytes
,一种低开销的可复制、可切片的连续内存块。通过引用计数保证原始内存块的生命周期,通过 <ptr, len>
声明当前指向的位置和大小。一个 Bytes
对象的内存布局如下:
/// A cheaply cloneable and sliceable chunk of contiguous memory.
pub struct Bytes {
ptr: *const u8,
len: usize,
// inlined "trait object"
data: AtomicPtr<()>,
vtable: &'static Vtable,
}
pub(crate) struct Vtable {
/// fn(data, ptr, len)
pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes,
/// fn(data, ptr, len)
pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}
Bytes
自然会实现 Buf
trait:
impl Buf for Bytes {
#[inline]
fn remaining(&self) -> usize {
self.len()
}
#[inline]
fn bytes(&self) -> &[u8] {
self.as_slice()
}
#[inline]
fn advance(&mut self, cnt: usize) {
assert!(
cnt <= self.len(),
"cannot advance past `remaining`: {:?} <= {:?}",
cnt,
self.len(),
);
unsafe {
self.inc_start(cnt);
}
}
fn copy_to_bytes(&mut self, len: usize) -> crate::Bytes {
if len == self.remaining() {
core::mem::replace(self, Bytes::new())
} else {
let ret = self.slice(..len);
self.advance(len);
ret
}
}
}
Bytes
的切片操作实现也很简单,通过 clone
获得一个新的对象,再修改该对象的指向和长度:
impl Bytes {
pub fn slice(&self, range: impl RangeBounds<usize>) -> Bytes {
use core::ops::Bound;
let len = self.len();
let begin = match range.start_bound() {
Bound::Included(&n) => n,
Bound::Excluded(&n) => n + 1,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&n) => n.checked_add(1).expect("out of range"),
Bound::Excluded(&n) => n,
Bound::Unbounded => len,
};
assert!(
begin <= end,
"range start must not be greater than end: {:?} <= {:?}",
begin,
end,
);
assert!(
end <= len,
"range end out of bounds: {:?} <= {:?}",
end,
len,
);
if end == begin {
return Bytes::new();
}
let mut ret = self.clone();
ret.len = end - begin;
ret.ptr = unsafe { ret.ptr.offset(begin as isize) };
ret
}
}
为了保证低开销,clone
操作有不同的实现方式。对于编译器确定的静态内存块,直接复制 ptr
和 len
就可以,不需要额外管理析构:
// ===== impl StaticVtable =====
const STATIC_VTABLE: Vtable = Vtable {
clone: static_clone,
drop: static_drop,
};
unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let slice = slice::from_raw_parts(ptr, len);
Bytes::from_static(slice)
}
unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}
impl Bytes {
pub fn from_static(bytes: &'static [u8]) -> Bytes {
Bytes {
ptr: bytes.as_ptr(),
len: bytes.len(),
data: AtomicPtr::new(ptr::null_mut()),
vtable: &STATIC_VTABLE,
}
}
}
对于动态申请的内存块,则需要实现引用计数:
struct Shared {
// holds vec for drop, but otherwise doesnt access it
_vec: Vec<u8>,
ref_cnt: AtomicUsize,
}
const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2.
static SHARED_VTABLE: Vtable = Vtable {
clone: shared_clone,
drop: shared_drop,
};
unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let shared = data.load(Ordering::Relaxed);
shallow_clone_arc(shared as _, ptr, len)
}
unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
data.with_mut(|shared| {
release_shared(*shared as *mut Shared);
});
}
unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes {
let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_size > usize::MAX >> 1 {
crate::abort();
}
Bytes {
ptr,
len,
data: AtomicPtr::new(shared as _),
vtable: &SHARED_VTABLE,
}
}
unsafe fn release_shared(ptr: *mut Shared) {
// `Shared` storage... follow the drop steps from Arc.
if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 {
return;
}
atomic::fence(Ordering::Acquire);
Box::from_raw(ptr);
}
这里实现地比较复杂的是从 Vec<u8>
转到 Bytes
。从 Vec<u8>
转到 Bytes
时,会将原子指针 data
指向该 Vec<u8>
的堆内存,并且将最低位设为 1 用以区分指向 Shared
对象的原子指针。当发生 clone
时,将会使用 data
指向的堆内存构建 Shared
对象,并将原子指针 data
使用 CAS 指向新的 Shared
对象,以保证该提升操作只会发生一次。可以学习下这里 from_raw
/ into_raw
/ mem::forget
等关于内存的去糖操作。
impl From<Vec<u8>> for Bytes {
fn from(vec: Vec<u8>) -> Bytes {
// into_boxed_slice doesn't return a heap allocation for empty vectors,
// so the pointer isn't aligned enough for the KIND_VEC stashing to
// work.
if vec.is_empty() {
return Bytes::new();
}
let slice = vec.into_boxed_slice();
let len = slice.len();
let ptr = slice.as_ptr();
drop(Box::into_raw(slice)); // drop box with remaining memory
if ptr as usize & 0x1 == 0 {
let data = ptr as usize | KIND_VEC;
Bytes {
ptr,
len,
data: AtomicPtr::new(data as *mut _),
vtable: &PROMOTABLE_EVEN_VTABLE,
}
} else {
Bytes {
ptr,
len,
data: AtomicPtr::new(ptr as *mut _),
vtable: &PROMOTABLE_ODD_VTABLE,
}
}
}
}
static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
drop: promotable_even_drop,
};
unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let shared = data.load(Ordering::Acquire);
let kind = shared as usize & KIND_MASK;
if kind == KIND_ARC {
shallow_clone_arc(shared as _, ptr, len)
} else {
debug_assert_eq!(kind, KIND_VEC);
let buf = (shared as usize & !KIND_MASK) as *mut u8;
shallow_clone_vec(data, shared, buf, ptr, len)
}
}
unsafe fn rebuild_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) -> Box<[u8]> {
let cap = (offset as usize - buf as usize) + len;
Box::from_raw(slice::from_raw_parts_mut(buf, cap))
}
#[cold]
unsafe fn shallow_clone_vec(
atom: &AtomicPtr<()>,
ptr: *const (),
buf: *mut u8,
offset: *const u8,
len: usize,
) -> Bytes {
// If the buffer is still tracked in a `Vec<u8>`. It is time to
// promote the vec to an `Arc`. This could potentially be called
// concurrently, so some care must be taken.
// First, allocate a new `Shared` instance containing the
// `Vec` fields. It's important to note that `ptr`, `len`,
// and `cap` cannot be mutated without having `&mut self`.
// This means that these fields will not be concurrently
// updated and since the buffer hasn't been promoted to an
// `Arc`, those three fields still are the components of the
// vector.
let vec = rebuild_boxed_slice(buf, offset, len).into_vec();
let shared = Box::new(Shared {
_vec: vec,
// Initialize refcount to 2. One for this reference, and one
// for the new clone that will be returned from
// `shallow_clone`.
ref_cnt: AtomicUsize::new(2),
});
let shared = Box::into_raw(shared);
// The pointer should be aligned, so this assert should
// always succeed.
debug_assert!(
0 == (shared as usize & KIND_MASK),
"internal: Box<Shared> should have an aligned pointer",
);
// Try compare & swapping the pointer into the `arc` field.
// `Release` is used synchronize with other threads that
// will load the `arc` field.
//
// If the `compare_and_swap` fails, then the thread lost the
// race to promote the buffer to shared. The `Acquire`
// ordering will synchronize with the `compare_and_swap`
// that happened in the other thread and the `Shared`
// pointed to by `actual` will be visible.
let actual = atom.compare_and_swap(ptr as _, shared as _, Ordering::AcqRel);
if actual as usize == ptr as usize {
// The upgrade was successful, the new handle can be
// returned.
return Bytes {
ptr: offset,
len,
data: AtomicPtr::new(shared as _),
vtable: &SHARED_VTABLE,
};
}
// The upgrade failed, a concurrent clone happened. Release
// the allocation that was made in this thread, it will not
// be needed.
let shared = Box::from_raw(shared);
mem::forget(*shared);
// Buffer already promoted to shared storage, so increment ref
// count.
shallow_clone_arc(actual as _, offset, len)
}