Rust RDMA 编程「三、对象封装」

2024.07.30
SF-Zhou

1. 精简绑定

系列的上一篇中讲述了如何对 libibverbs 库进行绑定。后续开发过程中笔者意识到真正需要的 API 接口其实是有限的,所以我决定精简接口绑定的代码,build.rs 如下:

use std::collections::HashSet;
use std::env;
use std::path::PathBuf;

fn main() {
    let lib = pkg_config::Config::new()
        .statik(false)
        .probe("libibverbs")
        .unwrap_or_else(|_| panic!("please install libibverbs-dev and pkg-config"));

    let mut include_paths = lib.include_paths.into_iter().collect::<HashSet<_>>();
    include_paths.insert(PathBuf::from("/usr/include"));

    let builder = bindgen::Builder::default()
        .clang_args(include_paths.iter().map(|p| format!("-I{:?}", p)))
        .header_contents("header.h", "#include <infiniband/verbs.h>")
        .derive_copy(true)
        .derive_debug(true)
        .derive_default(true)
        .generate_comments(false)
        .prepend_enum_name(false)
        .formatter(bindgen::Formatter::Rustfmt)
        .size_t_is_usize(true)
        .translate_enum_integer_types(true)
        .layout_tests(false)
        .default_enum_style(bindgen::EnumVariation::Rust {
            non_exhaustive: false,
        })
        .opaque_type("pthread_cond_t")
        .opaque_type("pthread_mutex_t")
        .allowlist_type("ibv_access_flags")
        .allowlist_type("ibv_comp_channel")
        .allowlist_type("ibv_context")
        .allowlist_type("ibv_cq")
        .allowlist_type("ibv_device")
        .allowlist_type("ibv_gid")
        .allowlist_type("ibv_mr")
        .allowlist_type("ibv_pd")
        .allowlist_type("ibv_port_attr")
        .allowlist_type("ibv_qp")
        .allowlist_type("ibv_qp_attr_mask")
        .allowlist_type("ibv_qp_init_attr")
        .allowlist_type("ibv_send_flags")
        .allowlist_type("ibv_wc")
        .allowlist_type("ibv_wc_flags")
        .allowlist_type("ibv_wc_status")
        .allowlist_function("ibv_ack_cq_events")
        .allowlist_function("ibv_alloc_pd")
        .allowlist_function("ibv_close_device")
        .allowlist_function("ibv_create_comp_channel")
        .allowlist_function("ibv_create_cq")
        .allowlist_function("ibv_create_qp")
        .allowlist_function("ibv_dealloc_pd")
        .allowlist_function("ibv_dereg_mr")
        .allowlist_function("ibv_destroy_comp_channel")
        .allowlist_function("ibv_destroy_cq")
        .allowlist_function("ibv_destroy_qp")
        .allowlist_function("ibv_free_device_list")
        .allowlist_function("ibv_get_cq_event")
        .allowlist_function("ibv_get_device_guid")
        .allowlist_function("ibv_get_device_list")
        .allowlist_function("ibv_modify_qp")
        .allowlist_function("ibv_req_notify_cq")
        .allowlist_function("ibv_poll_cq")
        .allowlist_function("ibv_post_recv")
        .allowlist_function("ibv_post_send")
        .allowlist_function("ibv_query_gid")
        .allowlist_function("ibv_query_port")
        .allowlist_function("ibv_open_device")
        .allowlist_function("ibv_reg_mr")
        .bitfield_enum("ibv_access_flags")
        .bitfield_enum("ibv_send_flags")
        .bitfield_enum("ibv_wc_flags")
        .bitfield_enum("ibv_qp_attr_mask")
        .no_copy("ibv_qp")
        .no_copy("ibv_cq")
        .no_copy("ibv_context");

    builder
        .generate()
        .expect("Unable to generate bindings")
        .write_to_file(PathBuf::from(env::var("OUT_DIR").unwrap()).join("bindings.rs"))
        .expect("Couldn't write bindings!");
}

定义 verbs.rs 使用生成的绑定代码,并且实现需要使用的 static inline 函数:

#![allow(dead_code)]
#![allow(deref_nullptr)]
#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
#![allow(clippy::missing_safety_doc, clippy::too_many_arguments)]

use libc::{c_int, pthread_cond_t, pthread_mutex_t};
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));

#[inline(always)]
pub unsafe fn ibv_req_notify_cq(cq: *mut ibv_cq, solicited_only: c_int) -> c_int {
    (*(*cq).context).ops.req_notify_cq.unwrap_unchecked()(cq, solicited_only)
}

#[inline(always)]
pub unsafe fn ibv_poll_cq(cq: *mut ibv_cq, num_entries: c_int, wc: *mut ibv_wc) -> c_int {
    (*(*cq).context).ops.poll_cq.unwrap_unchecked()(cq, num_entries, wc)
}

#[inline(always)]
pub unsafe fn ibv_post_send(
    qp: *mut ibv_qp,
    wr: *mut ibv_send_wr,
    bad_wr: *mut *mut ibv_send_wr,
) -> c_int {
    (*(*qp).context).ops.post_send.unwrap_unchecked()(qp, wr, bad_wr)
}

#[inline(always)]
pub unsafe fn ibv_post_recv(
    qp: *mut ibv_qp,
    wr: *mut ibv_recv_wr,
    bad_wr: *mut *mut ibv_recv_wr,
) -> c_int {
    (*(*qp).context).ops.post_recv.unwrap_unchecked()(qp, wr, bad_wr)
}

2. 对象封装

libibverbs 提供的接口中,有很多返回的是对象的指针,并且对象需要显式地执行释放,使用对象时需要传入裸指针。指针自然不适合 Rust,这里对这类对象进行封装。定义 wrapper.rs

pub trait Deleter {
    unsafe fn delete(ptr: *mut Self) -> i32;
}

pub struct Wrapper<T: 'static + Deleter + ?Sized>(*mut T);

impl<T: 'static + Deleter + ?Sized> Wrapper<T> {
    pub fn new(v: *mut T) -> Self {
        Self(v)
    }

    #[inline(always)]
    pub fn as_mut_ptr(&self) -> *mut T {
        self.0
    }
}

impl<T: 'static + Deleter + ?Sized> Drop for Wrapper<T> {
    fn drop(&mut self) {
        match unsafe { Deleter::delete(self.0) } {
            0 => tracing::debug!("release {} succ", std::any::type_name::<T>()),
            r => tracing::error!("release {} failed: {}", std::any::type_name::<T>(), r),
        }
    }
}

impl<T: 'static + Deleter + ?Sized> std::ops::Deref for Wrapper<T> {
    type Target = T;

    #[inline(always)]
    fn deref(&self) -> &Self::Target {
        unsafe { &*self.0 }
    }
}

unsafe impl<T: 'static + Deleter + ?Sized> Send for Wrapper<T> {}
unsafe impl<T: 'static + Deleter + ?Sized> Sync for Wrapper<T> {}

Wrapper 对不同类型的对象裸指针进行封装,要求对象本身实现自定义的释放接口 Deleter::delete。封装举例:

pub type Device = utils::Wrapper<ibv_device>;

impl Device {
    pub fn name(&self) -> Cow<str> {
        unsafe { CStr::from_ptr(self.name.as_ptr()).to_string_lossy() }
    }

    pub fn guid(&self) -> u64 {
        u64::from_be(unsafe { ibv_get_device_guid(self.as_mut_ptr()) })
    }
}

impl utils::Deleter for ibv_device {
    unsafe fn delete(ptr: *mut Self) -> i32 {
        unreachable!("invalid deletion to Device {ptr:?}!")
    }
}

impl std::fmt::Debug for Device {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let name = self.name();
        let guid = crate::utils::bytes_to_hex_string(&self.guid().to_be_bytes());
        f.debug_struct("Device")
            .field("name", &name)
            .field("guid", &guid)
            .field("node_type", &self.node_type)
            .field("transport_type", &self.transport_type)
            .finish()
    }
}

pub type DeviceList = utils::Wrapper<[Device]>;

impl DeviceList {
    pub fn available() -> Result<Self> {
        let mut num_devices: c_int = 0;
        let arr = unsafe { ibv_get_device_list(&mut num_devices) };
        if arr.is_null() {
            return Err(Error::with_errno(ErrorKind::IBGetDeviceListFail));
        }
        if num_devices == 0 {
            return Err(Error::new(ErrorKind::IBDeviceNotFound));
        }

        Ok(Self::new(
            std::ptr::slice_from_raw_parts_mut(arr, num_devices as usize) as _,
        ))
    }
}

impl utils::Deleter for [Device] {
    unsafe fn delete(ptr: *mut Self) -> i32 {
        ibv_free_device_list(ptr as _);
        0
    }
}

impl std::fmt::Debug for DeviceList {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_tuple("DeviceList").field(&self.deref()).finish()
    }
}

这里封装的对象类型有:

  1. CompChannel:完成事件通道,当完成队列(CQ)有新的完成事件时,通过对应绑定的 CompChannel 进行通知
  2. CompQueue:完成队列,当提交的工作请求(WR)完成时添加到该队列,并提供该 WR 完成的详细信息(状态、长度等)
  3. Context:设备上下文,主要接口的必要参数
  4. DeviceList:设备列表,获取设备指针后可以创建 Context
  5. MemoryRegion:保护域(PD)相关的内存区域,管理 RDMA 设备允许读写的内存区域
  6. ProtectionDomain:保护域,管理 RDMA 设备允许使用的资源
  7. QueuePair:发送队列(Send Queue)和接收队列(Recv Queue),相当于 TCP 编程中的 Socket