bRPC 源码分析「二、资源管理」

2021.03.03
SF-Zhou

1. Object Pool

代码中频繁使用的结构体大部分是等长的。根据该原理,bRPC 中设计了一个对象池,简化定长对象的分配和回收策略,尽可能地在 Thread Local 环境中进行内存操作。申请对象时按照如下的规则 BAIDU_OBJECT_POOL_GET:

  1. 本线程回收列表中是否有对象,有则直接返回
  2. 尝试从全局获取一个回收列表,有则直接返回回收列表中的一个对象
  3. 本线程内存块中是否还有足够的内存,有则构造一个对象返回
  4. 本线程申请一个新的内存块,构造对象返回
// object_pool.h
// Get an object typed |T|. The object should be cleared before usage.
// NOTE: T must be default-constructible.
template <typename T> inline T *get_object() {
  return ObjectPool<T>::singleton()->get_object();
}

// object_pool_inl.h
inline T *ObjectPool<T>::get_object() {
  LocalPool *lp = get_or_new_local_pool();  // 获取 TLS 单例
  if (BAIDU_LIKELY(lp != NULL)) {
    return lp->get();  // 分配对象
  }
  return NULL;
}

template <typename T>
BAIDU_THREAD_LOCAL
    typename ObjectPool<T>::LocalPool *ObjectPool<T>::_local_pool = NULL

inline LocalPool *ObjectPool<T>::get_or_new_local_pool() {
  LocalPool *lp = _local_pool;
  if (BAIDU_LIKELY(lp != NULL)) {
    return lp;
  }
  lp = new (std::nothrow) LocalPool(this);
  if (NULL == lp) {
    return NULL;
  }
  // 如果用 C++11 的 thread_local,可以省略掉这里的加锁和析构注册
  BAIDU_SCOPED_LOCK(_change_thread_mutex); // avoid race with clear()
  _local_pool = lp;
  butil::thread_atexit(LocalPool::delete_local_pool, lp);
  _nlocal.fetch_add(1, butil::memory_order_relaxed);
  return lp;
}

class BAIDU_CACHELINE_ALIGNMENT LocalPool {
 public:
  explicit LocalPool(ObjectPool *pool)
      : _pool(pool), _cur_block(NULL), _cur_block_index(0) {
    _cur_free.nfree = 0;
  }

  ~LocalPool() {
    // Add to global _free if there're some free objects
    if (_cur_free.nfree) {
      _pool->push_free_chunk(_cur_free);
    }

    _pool->clear_from_destructor_of_local_pool();
  }

  static void delete_local_pool(void *arg) { delete (LocalPool *)arg; }

// We need following macro to construct T with different CTOR_ARGS
// which may include parenthesis because when T is POD, "new T()"
// and "new T" are different: former one sets all fields to 0 which
// we don't want.
#define BAIDU_OBJECT_POOL_GET(CTOR_ARGS)                                       \
  /* Fetch local free ptr */                                                   \
  if (_cur_free.nfree) {                                                       \
    BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1;                                      \
    return _cur_free.ptrs[--_cur_free.nfree];                                  \
  }                                                                            \
  /* Fetch a FreeChunk from global.                                            \
     TODO: Popping from _free needs to copy a FreeChunk which is               \
     costly, but hardly impacts amortized performance. */                      \
  if (_pool->pop_free_chunk(_cur_free)) {                                      \
    BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1;                                      \
    return _cur_free.ptrs[--_cur_free.nfree];                                  \
  }                                                                            \
  /* Fetch memory from local block */                                          \
  if (_cur_block && _cur_block->nitem < BLOCK_NITEM) {                         \
    T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS;     \
    if (!ObjectPoolValidator<T>::validate(obj)) {                              \
      obj->~T();                                                               \
      return NULL;                                                             \
    }                                                                          \
    ++_cur_block->nitem;                                                       \
    return obj;                                                                \
  }                                                                            \
  /* Fetch a Block from global */                                              \
  _cur_block = add_block(&_cur_block_index);                                   \
  if (_cur_block != NULL) {                                                    \
    T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS;     \
    if (!ObjectPoolValidator<T>::validate(obj)) {                              \
      obj->~T();                                                               \
      return NULL;                                                             \
    }                                                                          \
    ++_cur_block->nitem;                                                       \
    return obj;                                                                \
  }                                                                            \
  return NULL;

  inline T *get() { BAIDU_OBJECT_POOL_GET(); }

  template <typename A1> inline T *get(const A1 &a1) {
    BAIDU_OBJECT_POOL_GET((a1));
  }

  template <typename A1, typename A2>
  inline T *get(const A1 &a1, const A2 &a2) {
    BAIDU_OBJECT_POOL_GET((a1, a2));
  }

#undef BAIDU_OBJECT_POOL_GET

  inline int return_object(T *ptr) {
    // Return to local free list
    // 线程内对象数量没有超过阈值,仍缓存在线程中
    if (_cur_free.nfree < ObjectPool::free_chunk_nitem()) {
      _cur_free.ptrs[_cur_free.nfree++] = ptr;
      BAIDU_OBJECT_POOL_FREE_ITEM_NUM_ADD1;
      return 0;
    }
    // Local free list is full, return it to global.
    // For copying issue, check comment in upper get()
    // 超过阈值,将线程中对象归还到全局缓存中
    if (_pool->push_free_chunk(_cur_free)) {
      _cur_free.nfree = 1;
      _cur_free.ptrs[0] = ptr;
      BAIDU_OBJECT_POOL_FREE_ITEM_NUM_ADD1;
      return 0;
    }
    return -1;
  }

 private:
  ObjectPool *_pool;
  Block *_cur_block;
  size_t _cur_block_index;
  FreeChunk _cur_free;
};

// 从全局缓存中获取一批对象
bool ObjectPool<T>::pop_free_chunk(FreeChunk &c) {
  // Critical for the case that most return_object are called in
  // different threads of get_object.
  if (_free_chunks.empty()) {
    return false;
  }
  pthread_mutex_lock(&_free_chunks_mutex);
  if (_free_chunks.empty()) {
    pthread_mutex_unlock(&_free_chunks_mutex);
    return false;
  }
  // 从 _free_chunks 尾部 pop 一批对象
  DynamicFreeChunk *p = _free_chunks.back();
  _free_chunks.pop_back();
  pthread_mutex_unlock(&_free_chunks_mutex);
  c.nfree = p->nfree;
  memcpy(c.ptrs, p->ptrs, sizeof(*p->ptrs) * p->nfree);
  free(p);
  return true;
}

// When a thread needs memory, it allocates a Block. To improve locality,
// items in the Block are only used by the thread.
// To support cache-aligned objects, align Block.items by cacheline.
struct BAIDU_CACHELINE_ALIGNMENT Block {
  char items[sizeof(T) * BLOCK_NITEM];
  size_t nitem;

  Block() : nitem(0) {}
};

// An Object addresses at most OP_MAX_BLOCK_NGROUP BlockGroups,
// each BlockGroup addresses at most OP_GROUP_NBLOCK blocks. So an
// object addresses at most OP_MAX_BLOCK_NGROUP * OP_GROUP_NBLOCK Blocks.
// 用来存储一批 blocks 的地址
struct BlockGroup {
  butil::atomic<size_t> nblock;
  butil::atomic<Block *> blocks[OP_GROUP_NBLOCK];

  BlockGroup() : nblock(0) {
    // We fetch_add nblock in add_block() before setting the entry,
    // thus address_resource() may sees the unset entry. Initialize
    // all entries to NULL makes such address_resource() return NULL.
    memset(blocks, 0, sizeof(butil::atomic<Block *>) * OP_GROUP_NBLOCK);
  }
};

// Create a Block and append it to right-most BlockGroup.
static Block *ObjectPool<T>::add_block(size_t *index) {
  Block *const new_block = new (std::nothrow) Block;
  if (NULL == new_block) {
    return NULL;
  }
  size_t ngroup;
  do {
    ngroup = _ngroup.load(butil::memory_order_acquire);
    if (ngroup >= 1) {
      BlockGroup *const g =
        _block_groups[ngroup - 1].load(butil::memory_order_consume);
      const size_t block_index =
        g->nblock.fetch_add(1, butil::memory_order_relaxed);  // 原子加
      if (block_index < OP_GROUP_NBLOCK) {
        // 如果当前 group 的大小没有超过阈值,成功增加 block
        g->blocks[block_index].store(new_block, butil::memory_order_release);
        *index = (ngroup - 1) * OP_GROUP_NBLOCK + block_index;
        return new_block;
      }
      // 否则将增加的计数减去,增加新的 group
      g->nblock.fetch_sub(1, butil::memory_order_relaxed);
    }
  } while (add_block_group(ngroup));

  // Fail to add_block_group.
  delete new_block;
  return NULL;
}

// Create a BlockGroup and append it to _block_groups.
// Shall be called infrequently because a BlockGroup is pretty big.
static bool add_block_group(size_t old_ngroup) {
  BlockGroup *bg = NULL;
  BAIDU_SCOPED_LOCK(_block_group_mutex);  // 加锁
  const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
  if (ngroup != old_ngroup) {
    // Other thread got lock and added group before this thread.
    // 有其他线程成功增加了 block group,需要重试
    return true;
  }
  if (ngroup < OP_MAX_BLOCK_NGROUP) {
    bg = new (std::nothrow) BlockGroup;
    if (NULL != bg) {
      // Release fence is paired with consume fence in add_block()
      // to avoid un-constructed bg to be seen by other threads.
      // 成功增加 block group
      _block_groups[ngroup].store(bg, butil::memory_order_release);
      _ngroup.store(ngroup + 1, butil::memory_order_release);
    }
  }
  return bg != NULL;
}

回收对象时不进行析构和删除,仅将其指针加入本线程的缓存列表中,积累后一定阈值后有锁地加入全局缓存中。注意申请对象时是申请整段内存,返回的对象并不具有其对应内存的所有权,必须调用内存池提供的 return_object 接口进行回收。默认实现中所有申请的对象都不会进行析构删除,意味着内存占用是不会回落的,类似 std::vector

2. Resource Pool

资源池的设计与上文中的对象池类似,不同的是引入了 ResourceId 的概念,表示该资源在对象池中的偏移量:

// resource_pool_inl.h
template <typename T> struct ResourceId {
  uint64_t value;

  operator uint64_t() const { return value; }

  template <typename T2> ResourceId<T2> cast() const {
    ResourceId<T2> id = {value};
    return id;
  }
};

template <typename T, size_t NITEM> struct ResourcePoolFreeChunk {
  size_t nfree;
  ResourceId<T> ids[NITEM];
};

    // We need following macro to construct T with different CTOR_ARGS
    // which may include parenthesis because when T is POD, "new T()"
    // and "new T" are different: former one sets all fields to 0 which
    // we don't want.
#define BAIDU_RESOURCE_POOL_GET(CTOR_ARGS)                                     \
  /* Fetch local free id */                                                    \
  if (_cur_free.nfree) {                                                       \
    const ResourceId<T> free_id = _cur_free.ids[--_cur_free.nfree];            \
    *id = free_id;                                                             \
    BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1;                                    \
    return unsafe_address_resource(free_id);                                   \
  }                                                                            \
  /* Fetch a FreeChunk from global.                                            \
     TODO: Popping from _free needs to copy a FreeChunk which is               \
     costly, but hardly impacts amortized performance. */                      \
  if (_pool->pop_free_chunk(_cur_free)) {                                      \
    --_cur_free.nfree;                                                         \
    const ResourceId<T> free_id = _cur_free.ids[_cur_free.nfree];              \
    *id = free_id;                                                             \
    BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1;                                    \
    return unsafe_address_resource(free_id);                                   \
  }                                                                            \
  /* Fetch memory from local block */                                          \
  if (_cur_block && _cur_block->nitem < BLOCK_NITEM) {                         \
    id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem;            \
    T *p = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS;       \
    if (!ResourcePoolValidator<T>::validate(p)) {                              \
      p->~T();                                                                 \
      return NULL;                                                             \
    }                                                                          \
    ++_cur_block->nitem;                                                       \
    return p;                                                                  \
  }                                                                            \
  /* Fetch a Block from global */                                              \
  _cur_block = add_block(&_cur_block_index);                                   \
  if (_cur_block != NULL) {                                                    \
    id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem;            \
    T *p = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS;       \
    if (!ResourcePoolValidator<T>::validate(p)) {                              \
      p->~T();                                                                 \
      return NULL;                                                             \
    }                                                                          \
    ++_cur_block->nitem;                                                       \
    return p;                                                                  \
  }                                                                            \

注意 ResourceId 的计算公式:_cur_block_index * BLOCK_NITEM + _cur_block->nitem,可以在 O(1)\mathcal O(1) 的时间内反推对象的内存地址:

static inline T *address_resource(ResourceId<T> id) {
  const size_t block_index = id.value / BLOCK_NITEM;
  const size_t group_index = (block_index >> RP_GROUP_NBLOCK_NBIT);
  if (__builtin_expect(group_index < RP_MAX_BLOCK_NGROUP, 1)) {
    BlockGroup *bg =
      _block_groups[group_index].load(butil::memory_order_consume);
    if (__builtin_expect(bg != NULL, 1)) {
      Block *b = bg->blocks[block_index & (RP_GROUP_NBLOCK - 1)].load(
        butil::memory_order_consume);
      if (__builtin_expect(b != NULL, 1)) {
        const size_t offset = id.value - block_index * BLOCK_NITEM;
        if (__builtin_expect(offset < b->nitem, 1)) {
          return (T *)b->items + offset;
        }
      }
    }
  }

  return NULL;
}

大多数场景下 ResourceId 小于 2322^{32}(除非全局对象数量超过 40+ 亿),bRPC 利用这一点将其编码为 64 位 ID 的一部分,余下的部分放置对象的版本。以下描述引用自参考文献一。

对象可以被归还,但归还后对象并没有删除,也没有被析构,而是仅仅进入回收列表。下次申请时可能会取到这种使用过的对象,需要重置后才能使用。当对象被归还后,通过对应的偏移量仍可以访问到对象,即 ResourcePool 只负责内存分配,并不解决 ABA 问题。

bthread 的大部分函数都需要在 O(1) 时间内通过 bthread_t 访问到 TaskMeta,并且当 bthread_t 失效后,访问应返回 NULL 以让函数做出返回错误。解决方法是:bthread_t 由 32 位的版本和 32 位的偏移量组成。版本解决 ABA 问题,偏移量由ResourcePool 分配。查找时先通过偏移量获得 TaskMeta,再检查版本,如果版本不匹配,说明 bthread 失效了。

// task_group_inl.h
// Utilities to manipulate bthread_t
inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
  return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}

inline butil::ResourceId<TaskMeta> get_slot(bthread_t tid) {
  butil::ResourceId<TaskMeta> id = {(tid & 0xFFFFFFFFul)};
  return id;
}
inline uint32_t get_version(bthread_t tid) {
  return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}

inline TaskMeta *TaskGroup::address_meta(bthread_t tid) {
  // TaskMeta * m = address_resource<TaskMeta>(get_slot(tid));
  // if (m != NULL && m->version == get_version(tid)) {
  //     return m;
  // }
  // return NULL;
  return address_resource(get_slot(tid));
}

// task_group.cpp
// 版本使用举例
bool TaskGroup::exists(bthread_t tid) {
  if (tid != 0) { // tid of bthread is never 0.
    TaskMeta *m = address_meta(tid);
    if (m != NULL) {
      return (*m->version_butex == get_version(tid));  // 判断版本是否一致
    }
  }
  return false;
}

3. IO Buf

bRPC 中使用 IOBuf 处理字节流相关的数据,它是一种非连续零拷贝缓冲,提供类似 std::string 的接口。该数据结构和笔者之前分析的 Tokio Bytes 也很像,均使用引用计数实现零拷贝。

// iobuf.h
// IOBuf is a non-continuous buffer that can be cut and combined w/o copying
// payload. It can be read from or flushed into file descriptors as well.
// IOBuf is [thread-compatible]. Namely using different IOBuf in different
// threads simultaneously is safe, and reading a static IOBuf from different
// threads is safe as well.
// IOBuf is [NOT thread-safe]. Modifying a same IOBuf from different threads
// simultaneously is unsafe and likely to crash.
class IOBuf {
 public:
  static const size_t DEFAULT_BLOCK_SIZE = 8192;  // block 默认大小 8KB

  // can't directly use `struct iovec' here because we also need to access the
  // reference counter(nshared) in Block*
  struct BlockRef {
    // NOTICE: first bit of `offset' is shared with BigView::start
    uint32_t offset;  // block 上的偏移,这里保证 offset <= max_int
    uint32_t length;  // 和长度
    Block *block;
  };

  // IOBuf is essentially a tiny queue of BlockRefs.
  struct SmallView {
    BlockRef refs[2];  // 引用少于两个 block
  };

  // SmallView 与 BigView 大小相同
  struct BigView {
    int32_t magic;
    uint32_t start;  // 起始的 block 下标
    BlockRef *refs;  // block 列表
    uint32_t nref;   // block 列表大小
    uint32_t cap_mask;
    size_t nbytes;  // 当前总长度

    const BlockRef &ref_at(uint32_t i) const {
      return refs[(start + i) & cap_mask];
    }

    BlockRef &ref_at(uint32_t i) { return refs[(start + i) & cap_mask]; }

    uint32_t capacity() const { return cap_mask + 1; }  // block 列表容量
  };

  bool _small() const { return _bv.magic >= 0; }  // 小于 0 时为 BigView

  size_t length() const {
    return _small() ? (_sv.refs[0].length + _sv.refs[1].length) : _bv.nbytes;
  }

 private:
  union {
    BigView _bv;  // _bv.magic 与 _sv.refs[0].offset 地址相同,小于 0 时表示 BigView
    SmallView _sv;
  };
};


// iobuf.cpp,看一下 append 的流程
void IOBuf::append(const IOBuf &other) {
  const size_t nref = other._ref_num();
  for (size_t i = 0; i < nref; ++i) {
    _push_back_ref(other._ref_at(i));
  }
}

// 引用的 Block 数量
inline size_t IOBuf::_ref_num() const {
  return _small() ? (!!_sv.refs[0].block + !!_sv.refs[1].block) : _bv.nref;
}

inline void IOBuf::_push_back_ref(const BlockRef &r) {
  if (_small()) {
    return _push_or_move_back_ref_to_smallview<false>(r);
  } else {
    return _push_or_move_back_ref_to_bigview<false>(r);
  }
}

template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef &r) {
  BlockRef &back = _bv.ref_at(_bv.nref - 1);
  if (back.block == r.block && back.offset + back.length == r.offset) {
    // Merge ref
    back.length += r.length;
    _bv.nbytes += r.length;
    if (MOVE) {
      r.block->dec_ref();
    }
    return;
  }

  if (_bv.nref != _bv.capacity()) {
    // block 列表有容量的情况下,直接增加对该 block 的引用
    _bv.ref_at(_bv.nref++) = r;
    _bv.nbytes += r.length;  // 长度叠加
    if (!MOVE) {
      r.block->inc_ref();
    }
    return;
  }
  // resize, don't modify bv until new_refs is fully assigned
  // 当前 block 列表容量不足,倍增申请
  const uint32_t new_cap = _bv.capacity() * 2;
  BlockRef *new_refs = iobuf::acquire_blockref_array(new_cap);
  for (uint32_t i = 0; i < _bv.nref; ++i) {
    // 复制原 block
    new_refs[i] = _bv.ref_at(i);
  }
  new_refs[_bv.nref++] = r;

  // Change other variables
  _bv.start = 0;
  // 删除原 block 列表
  iobuf::release_blockref_array(_bv.refs, _bv.capacity());
  _bv.refs = new_refs;
  _bv.cap_mask = new_cap - 1;
  _bv.nbytes += r.length;
  if (!MOVE) {
    r.block->inc_ref();
  }
}

// Block 的实现
void *(*blockmem_allocate)(size_t) = ::malloc;
void (*blockmem_deallocate)(void *) = ::free;

const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
typedef void (*UserDataDeleter)(void *);

struct UserDataExtension {
  UserDataDeleter deleter;
};

struct IOBuf::Block {
  butil::atomic<int> nshared;  // 引用计数
  uint16_t flags;  // 是否使用自定义 deleter,可以看下方的英文注释
  uint16_t abi_check; // original cap, never be zero.
  uint32_t size;
  uint32_t cap;
  Block *portal_next;
  // When flag is 0, data points to `size` bytes starting at
  // `(char*)this+sizeof(Block)' When flag & IOBUF_BLOCK_FLAGS_USER_DATA is
  // non-0, data points to the user data and the deleter is put in
  // UserDataExtension at `(char*)this+sizeof(Block)'
  char *data;

  Block(char *data_in, uint32_t data_size)
      : nshared(1), flags(0), abi_check(0), size(0), cap(data_size),
        portal_next(NULL), data(data_in) {
    iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
    iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
                                butil::memory_order_relaxed);
  }

  Block(char *data_in, uint32_t data_size, UserDataDeleter deleter)
      : nshared(1), flags(IOBUF_BLOCK_FLAGS_USER_DATA), abi_check(0),
        size(data_size), cap(data_size), portal_next(NULL), data(data_in) {
    get_user_data_extension()->deleter = deleter;  // 自定义 deleter
  }

  // Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
  UserDataExtension *get_user_data_extension() {
    char *p = (char *)this;
    return (UserDataExtension *)(p + sizeof(Block));  // 当前内存之后的 4 字节
  }

  void inc_ref() {
    check_abi();
    nshared.fetch_add(1, butil::memory_order_relaxed);
  }

  void dec_ref() {
    check_abi();
    if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
      butil::atomic_thread_fence(butil::memory_order_acquire);
      if (!flags) {
        iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
        iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
                                    butil::memory_order_relaxed);
        this->~Block();  // 调用析构函数,不删除内存
        iobuf::blockmem_deallocate(this);  // 删除内存
      } else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
        get_user_data_extension()->deleter(data);  // 执行自定义删除
        this->~Block();
        free(this);
      }
    }
  }

  int ref_count() const { return nshared.load(butil::memory_order_relaxed); }

  bool full() const { return size >= cap; }
  size_t left_space() const { return cap - size; }
};

// 创建 block,同时存储 Block 对象和实际数据
inline IOBuf::Block *create_block(const size_t block_size) {
  if (block_size > 0xFFFFFFFFULL) {
    LOG(FATAL) << "block_size=" << block_size << " is too large";
    return NULL;
  }
  char *mem = (char *)iobuf::blockmem_allocate(block_size);
  if (mem == NULL) {
    return NULL;
  }
  return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
                                block_size - sizeof(IOBuf::Block));
}

// 追加用户数据,所有权取决于传入的 deleter
int IOBuf::append_user_data(void *data, size_t size, void (*deleter)(void *)) {
  if (size > 0xFFFFFFFFULL - 100) {
    LOG(FATAL) << "data_size=" << size << " is too large";
    return -1;
  }
  char *mem = (char *)malloc(sizeof(IOBuf::Block) + sizeof(UserDataExtension));
  if (mem == NULL) {
    return -1;
  }
  if (deleter == NULL) {
    deleter = ::free;
  }
  IOBuf::Block *b = new (mem) IOBuf::Block((char *)data, size, deleter);
  const IOBuf::BlockRef r = {0, b->cap, b};
  _move_back_ref(r);
  return 0;
}

IOBuf 提供了丰富的字节流处理能力,整体代码也非常长,超过 3k 行。这里省略其他处理细节的分析了,有兴趣可以自行阅读。

4. Socket

bRPC 中使用 Socket 来管理连接,并且使用了 64 位的 SocketId 来指代 Socket 对象方便在多线程下环境下使用连接。使用 Socket 时会用到以下数据结构:

  1. Socket 对象本身;
  2. SocketUniquePtr,实际上更像是 std::shared_ptr,析构时减少引用计数;
  3. SocketId,带版本校验,使用上类似 std::weak_ptr

常用的方法有:

  1. Create:创建 Socket 对象并返回 SocketId
  2. Address:获取 SocketId 对应的 Socket 对象,会返回一个 SocketUniquePtr
  3. SetFailed:标记一个 Socket 对象为失败,之后所有对该 SocketIdAddress 操作会返回空指针,当 Socket 对象使用计数为 0 时触发回收。

官方文档上也解释了为何设计地如此复杂(摘录自参考文献 3):

Socket 独有的 SetFailed 可以在需要时确保 Socket 不能被继续 Address 而最终引用计数归 0,单纯使用 shared_ptr/weak_ptr 则无法保证这点,当一个 server 需要退出时,如果请求仍频繁地到来,对应 Socket 的引用计数可能迟迟无法清 0 而导致 server 无法退出。另外 weak_ptr 无法直接作为 epoll 的 data,而 SocketId 可以。这些因素使我们设计了 Socket。

存储 SocketUniquePtr 还是 SocketId 取决于是否需要强引用。像 Controller 贯穿了 RPC 的整个流程,和 Socket 中的数据有大量交互,它存放的是 SocketUniquePtr。epoll 主要是提醒对应 fd 上发生了事件,如果 Socket 回收了,那这个事件是可有可无的,所以它存放了 SocketId。

由于 SocketUniquePtr 只要有效,其中的数据就不会变,这个机制使用户不用关心麻烦的 race conditon 和 ABA problem,可以放心地对共享的 fd 进行操作。这种方法也规避了隐式的引用计数,内存的 ownership 明确,程序的质量有很好的保证。bRPC 中有大量的 SocketUniquePtr 和 SocketId,它们确实简化了我们的开发。

部分理念需要等分析完 bRPC 全流程的代码后才能体会到了。先看 Socket 本身的实现:

class BAIDU_CACHELINE_ALIGNMENT Socket {
  struct Forbidden {};

 public:
  Socket(Forbidden);  // 禁止用户直接构造,但 ResourcePool 又需要访问 public 的构造函数,私有的 Forbidden 可以实现这个 trick

  // Create a Socket according to `options', put the identifier into `id'.
  // Returns 0 on sucess, -1 otherwise.
  static int Create(const SocketOptions &options, SocketId *id);

  // Place the Socket associated with identifier `id' into unique_ptr `ptr',
  // which will be released automatically when out of scope (w/o explicit
  // std::move). User can still access `ptr' after calling ptr->SetFailed()
  // before release of `ptr'.
  // This function is wait-free.
  // Returns 0 on success, -1 when the Socket was SetFailed().
  static int Address(SocketId id, SocketUniquePtr *ptr);

  // Mark this Socket or the Socket associated with `id' as failed.
  // Any later Address() of the identifier shall return NULL unless the
  // Socket was revivied by HealthCheckThread. The Socket is NOT recycled
  // after calling this function, instead it will be recycled when no one
  // references it. Internal fields of the Socket are still accessible
  // after calling this function. Calling SetFailed() of a Socket more
  // than once is OK.
  // This function is lock-free.
  // Returns -1 when the Socket was already SetFailed(), 0 otherwise.
  int SetFailed();

 private:
  // unsigned 32-bit version + signed 32-bit referenced-count.
  // Meaning of version:
  // * Created version: no SetFailed() is called on the Socket yet. Must be
  //   same evenness with initial _versioned_ref because during lifetime of
  //   a Socket on the slot, the version is added with 1 twice. This is
  //   also the version encoded in SocketId.
  // * Failed version: = created version + 1, SetFailed()-ed but returned.
  // * Other versions: the socket is already recycled.
  // Socket 自身记录的版本+引用计数,首次构造对象时为 0
  butil::atomic<uint64_t> _versioned_ref;

  // Flag used to mark whether additional reference has been decreased
  // by either `SetFailed' or `SetRecycle'
  // 回收标记
  butil::atomic<bool> _recycle_flag;
};

// SocketId = 32-bit version + 32-bit slot.
//   version: from version part of _versioned_nref, must be an EVEN number.
//   slot: designated by ResourcePool.
int Socket::Create(const SocketOptions &options, SocketId *id) {
  butil::ResourceId<Socket> slot;  // 接收返回的资源 ID
  Socket *const m = butil::get_resource(&slot, Forbidden());
  if (m == NULL) {
    LOG(FATAL) << "Fail to get_resource<Socket>";
    return -1;
  }
  g_vars->nsocket << 1;
  CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
  ...
  // nref can be non-zero due to concurrent AddressSocket().
  // _this_id will only be used in destructor/Destroy of referenced
  // slots, which is safe and properly fenced. Although it's better
  // to put the id into SocketUniquePtr.
  // 构造 SocketId,_versioned_ref 引用计数加 1,首次构造时版本为 0
  m->_this_id = MakeSocketId(VersionOfVRef(m->_versioned_ref.fetch_add(
                                 1, butil::memory_order_release)),
                             slot);
  ...
  *id = m->_this_id;  // 返回 socket id
  return 0;
}

// Utility functions to combine and extract SocketId.
BUTIL_FORCE_INLINE SocketId MakeSocketId(uint32_t version,
                                         butil::ResourceId<Socket> slot) {
  return SocketId((((uint64_t)version) << 32) | slot.value);
}

// Utility functions to combine and extract Socket::_versioned_ref
BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
  return (uint32_t)(vref >> 32);
}

BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
  butil::ResourceId<Socket> id = {(sid & 0xFFFFFFFFul)};
  return id;
}

BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
  return (int32_t)(vref & 0xFFFFFFFFul);
}

inline int Socket::Address(SocketId id, SocketUniquePtr *ptr) {
  // 解析 ResourceId
  const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
  // 获取对象地址
  Socket *const m = address_resource(slot);
  if (__builtin_expect(m != NULL, 1)) {
    // acquire fence makes sure this thread sees latest changes before
    // Dereference() or Revive().
    // 引用计数 +1,注意这里返回的 vref1 是自增前的 _versioned_ref
    const uint64_t vref1 =
        m->_versioned_ref.fetch_add(1, butil::memory_order_acquire);
    // 获取版本
    const uint32_t ver1 = VersionOfVRef(vref1);
    if (ver1 == VersionOfSocketId(id)) {
      // 版本与 SocketId 的版本一致则返回成功
      ptr->reset(m);
      return 0;
    }

    // 版本不一致,恢复引用计数,返回的 vref2 是前面自增后的 _versioned_ref
    const uint64_t vref2 =
        m->_versioned_ref.fetch_sub(1, butil::memory_order_release);
    // 取出引用数量
    const int32_t nref = NRefOfVRef(vref2);
    if (nref > 1) {
      return -1;
    } else if (__builtin_expect(nref == 1, 1)) {
      // 无引用,再次读取版本
      const uint32_t ver2 = VersionOfVRef(vref2);
      // 如果版本是奇数,说明刚进行了 SetFailed 操作
      // 并且当前无引用,触发 socket 回收
      if ((ver2 & 1)) {
        if (ver1 == ver2 || ver1 + 1 == ver2) {
          uint64_t expected_vref = vref2 - 1;  // 自减后 _versioned_ref 的理论值
          // 尝试 CAS,原子地将 _versioned_ref 替换为版本 id_ver + 2,计数 0
          if (m->_versioned_ref.compare_exchange_strong(
                  expected_vref, MakeVRef(ver2 + 1, 0),
                  butil::memory_order_acquire, butil::memory_order_relaxed)) {
            // 成功 CAS 后,执行回收
            m->OnRecycle();  // 执行实际的回收操作
            return_resource(SlotOfSocketId(id));
          }
        } else {
          CHECK(false) << "ref-version=" << ver1 << " unref-version=" << ver2;
        }
      } else {
        CHECK_EQ(ver1, ver2);
        // Addressed a free slot.
      }
    } else {
      CHECK(false) << "Over dereferenced SocketId=" << id;
    }
  }
  return -1;
}

int Socket::SetFailed() { return SetFailed(EFAILEDSOCKET, NULL); }

int Socket::SetFailed(int error_code, const char *error_fmt, ...) {
  if (error_code == 0) {
    CHECK(false) << "error_code is 0";
    error_code = EFAILEDSOCKET;
  }
  const uint32_t id_ver = VersionOfSocketId(_this_id);  // 获取 socket id 上的版本
  uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
  for (;;) { // need iteration to retry compare_exchange_strong
    // version_ref 的版本与 socket id 的版本不一致,返回失败
    if (VersionOfVRef(vref) != id_ver) {
      return -1;
    }
    // Try to set version=id_ver+1 (to make later Address() return NULL),
    // retry on fail.
    // 尝试 CAS,原子地将 _version_ref 替换为版本 id_ver + 1,计数不变
    if (_versioned_ref.compare_exchange_strong(
            vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
            butil::memory_order_release, butil::memory_order_relaxed)) {
      // CAS 成功后,原先的 socket id Address 操作会因为版本校验而失败
      // _version_ref 的版本也会修改为一个**奇数**
      ...

      // Deref additionally which is added at creation so that this
      // Socket's reference will hit 0(recycle) when no one addresses it.
      ReleaseAdditionalReference();
      // NOTE: This Socket may be recycled at this point, don't
      // touch anything.
      return 0;
    }
  }
}

int Socket::ReleaseAdditionalReference() {
  bool expect = false;
  // Use `relaxed' fence here since `Dereference' has `released' fence
  if (_recycle_flag.compare_exchange_strong(expect, true,
                                            butil::memory_order_relaxed,
                                            butil::memory_order_relaxed)) {
    return Dereference();  // 引用计数 -1
  }
  return -1;
}

每次 SocketUniquePtr 析构时,会调用 DereferenceSocket 减少引用计数,当引用计数为 0 时回收 Socket 对象。中间 SetFailedAddress 操作并发时会产生一些计数的特殊情况,需要特殊处理,这里可以仔细思考下。

// socket_id.h
typedef uint64_t SocketId;

const SocketId INVALID_SOCKET_ID = (SocketId)-1;

class Socket;

extern void DereferenceSocket(Socket *);

struct SocketDeleter {
  void operator()(Socket *m) const { DereferenceSocket(m); }
};

// RAII,析构时执行 SocketDeleter() -> DereferenceSocket
typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;

// socket.cpp
void DereferenceSocket(Socket *s) {
  if (s) {
    // 减少引用计数
    s->Dereference();
  }
}

inline int Socket::Dereference() {
  const SocketId id = _this_id;
  // 引用计数 -1
  const uint64_t vref =
      _versioned_ref.fetch_sub(1, butil::memory_order_release);
  const int32_t nref = NRefOfVRef(vref);  // 获得计数的值
  if (nref > 1) {
    // 存在其他引用,直接返回
    return 0;
  }
  if (__builtin_expect(nref == 1, 1)) {
    // 无引用,需要回收
    const uint32_t ver = VersionOfVRef(vref);  // 获取 version_ref 上的版本
    const uint32_t id_ver = VersionOfSocketId(id);  // 获取 socket id 上的版本
    // SetFailed 与 Address 并发时对特殊情况的处理,可以自行看英文注释
    // Besides first successful SetFailed() adds 1 to version, one of
    // those dereferencing nref from 1->0 adds another 1 to version.
    // Notice "one of those": The wait-free Address() may make ref of a
    // version-unmatched slot change from 1 to 0 for mutiple times, we
    // have to use version as a guard variable to prevent returning the
    // Socket to pool more than once.
    //
    // Note: `ver == id_ver' means this socket has been `SetRecycle'
    // before rather than `SetFailed'; `ver == ide_ver+1' means we
    // had `SetFailed' this socket before. We should destroy the
    // socket under both situation
    if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
      // sees nref:1->0, try to set version=id_ver+2,--nref.
      // No retry: if version changes, the slot is already returned by
      // another one who sees nref:1->0 concurrently; if nref changes,
      // which must be non-zero, the slot will be returned when
      // nref changes from 1->0 again.
      // Example:
      //   SetFailed(): --nref, sees nref:1->0           (1)
      //                try to set version=id_ver+2      (2)
      //    Address():  ++nref, unmatched version        (3)
      //                --nref, sees nref:1->0           (4)
      //                try to set version=id_ver+2      (5)
      // 1,2,3,4,5 or 1,3,4,2,5:
      //            SetFailed() succeeds, Address() fails at (5).
      // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
      //            returned by (5) of Address()
      // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
      //            returned by (5) of Address().
      uint64_t expected_vref = vref - 1;
      if (_versioned_ref.compare_exchange_strong(
              expected_vref, MakeVRef(id_ver + 2, 0),
              butil::memory_order_acquire, butil::memory_order_relaxed)) {
        OnRecycle();  // 执行实际的回收操作
        return_resource(SlotOfSocketId(id));
        return 1;
      }
      return 0;
    }
    LOG(FATAL) << "Invalid SocketId=" << id;
    return -1;
  }
  LOG(FATAL) << "Over dereferenced SocketId=" << id;
  return -1;
}

References

  1. "bRPC Memory Management", incubator-brpc
  2. "bRPC IOBuf", incubator-brpc
  3. "bRPC IO", incubator-brpc