代码中频繁使用的结构体大部分是等长的。根据该原理,bRPC 中设计了一个对象池,简化定长对象的分配和回收策略,尽可能地在 Thread Local 环境中进行内存操作。申请对象时按照如下的规则 BAIDU_OBJECT_POOL_GET:
// object_pool.h
// Get an object typed |T|. The object should be cleared before usage.
// NOTE: T must be default-constructible.
template <typename T> inline T *get_object() {
return ObjectPool<T>::singleton()->get_object();
}
// object_pool_inl.h
inline T *ObjectPool<T>::get_object() {
LocalPool *lp = get_or_new_local_pool(); // 获取 TLS 单例
if (BAIDU_LIKELY(lp != NULL)) {
return lp->get(); // 分配对象
}
return NULL;
}
template <typename T>
BAIDU_THREAD_LOCAL
typename ObjectPool<T>::LocalPool *ObjectPool<T>::_local_pool = NULL
inline LocalPool *ObjectPool<T>::get_or_new_local_pool() {
LocalPool *lp = _local_pool;
if (BAIDU_LIKELY(lp != NULL)) {
return lp;
}
lp = new (std::nothrow) LocalPool(this);
if (NULL == lp) {
return NULL;
}
// 如果用 C++11 的 thread_local,可以省略掉这里的加锁和析构注册
BAIDU_SCOPED_LOCK(_change_thread_mutex); // avoid race with clear()
_local_pool = lp;
butil::thread_atexit(LocalPool::delete_local_pool, lp);
_nlocal.fetch_add(1, butil::memory_order_relaxed);
return lp;
}
class BAIDU_CACHELINE_ALIGNMENT LocalPool {
public:
explicit LocalPool(ObjectPool *pool)
: _pool(pool), _cur_block(NULL), _cur_block_index(0) {
_cur_free.nfree = 0;
}
~LocalPool() {
// Add to global _free if there're some free objects
if (_cur_free.nfree) {
_pool->push_free_chunk(_cur_free);
}
_pool->clear_from_destructor_of_local_pool();
}
static void delete_local_pool(void *arg) { delete (LocalPool *)arg; }
// We need following macro to construct T with different CTOR_ARGS
// which may include parenthesis because when T is POD, "new T()"
// and "new T" are different: former one sets all fields to 0 which
// we don't want.
#define BAIDU_OBJECT_POOL_GET(CTOR_ARGS) \
/* Fetch local free ptr */ \
if (_cur_free.nfree) { \
BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1; \
return _cur_free.ptrs[--_cur_free.nfree]; \
} \
/* Fetch a FreeChunk from global. \
TODO: Popping from _free needs to copy a FreeChunk which is \
costly, but hardly impacts amortized performance. */ \
if (_pool->pop_free_chunk(_cur_free)) { \
BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1; \
return _cur_free.ptrs[--_cur_free.nfree]; \
} \
/* Fetch memory from local block */ \
if (_cur_block && _cur_block->nitem < BLOCK_NITEM) { \
T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ObjectPoolValidator<T>::validate(obj)) { \
obj->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return obj; \
} \
/* Fetch a Block from global */ \
_cur_block = add_block(&_cur_block_index); \
if (_cur_block != NULL) { \
T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ObjectPoolValidator<T>::validate(obj)) { \
obj->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return obj; \
} \
return NULL;
inline T *get() { BAIDU_OBJECT_POOL_GET(); }
template <typename A1> inline T *get(const A1 &a1) {
BAIDU_OBJECT_POOL_GET((a1));
}
template <typename A1, typename A2>
inline T *get(const A1 &a1, const A2 &a2) {
BAIDU_OBJECT_POOL_GET((a1, a2));
}
#undef BAIDU_OBJECT_POOL_GET
inline int return_object(T *ptr) {
// Return to local free list
// 线程内对象数量没有超过阈值,仍缓存在线程中
if (_cur_free.nfree < ObjectPool::free_chunk_nitem()) {
_cur_free.ptrs[_cur_free.nfree++] = ptr;
BAIDU_OBJECT_POOL_FREE_ITEM_NUM_ADD1;
return 0;
}
// Local free list is full, return it to global.
// For copying issue, check comment in upper get()
// 超过阈值,将线程中对象归还到全局缓存中
if (_pool->push_free_chunk(_cur_free)) {
_cur_free.nfree = 1;
_cur_free.ptrs[0] = ptr;
BAIDU_OBJECT_POOL_FREE_ITEM_NUM_ADD1;
return 0;
}
return -1;
}
private:
ObjectPool *_pool;
Block *_cur_block;
size_t _cur_block_index;
FreeChunk _cur_free;
};
// 从全局缓存中获取一批对象
bool ObjectPool<T>::pop_free_chunk(FreeChunk &c) {
// Critical for the case that most return_object are called in
// different threads of get_object.
if (_free_chunks.empty()) {
return false;
}
pthread_mutex_lock(&_free_chunks_mutex);
if (_free_chunks.empty()) {
pthread_mutex_unlock(&_free_chunks_mutex);
return false;
}
// 从 _free_chunks 尾部 pop 一批对象
DynamicFreeChunk *p = _free_chunks.back();
_free_chunks.pop_back();
pthread_mutex_unlock(&_free_chunks_mutex);
c.nfree = p->nfree;
memcpy(c.ptrs, p->ptrs, sizeof(*p->ptrs) * p->nfree);
free(p);
return true;
}
// When a thread needs memory, it allocates a Block. To improve locality,
// items in the Block are only used by the thread.
// To support cache-aligned objects, align Block.items by cacheline.
struct BAIDU_CACHELINE_ALIGNMENT Block {
char items[sizeof(T) * BLOCK_NITEM];
size_t nitem;
Block() : nitem(0) {}
};
// An Object addresses at most OP_MAX_BLOCK_NGROUP BlockGroups,
// each BlockGroup addresses at most OP_GROUP_NBLOCK blocks. So an
// object addresses at most OP_MAX_BLOCK_NGROUP * OP_GROUP_NBLOCK Blocks.
// 用来存储一批 blocks 的地址
struct BlockGroup {
butil::atomic<size_t> nblock;
butil::atomic<Block *> blocks[OP_GROUP_NBLOCK];
BlockGroup() : nblock(0) {
// We fetch_add nblock in add_block() before setting the entry,
// thus address_resource() may sees the unset entry. Initialize
// all entries to NULL makes such address_resource() return NULL.
memset(blocks, 0, sizeof(butil::atomic<Block *>) * OP_GROUP_NBLOCK);
}
};
// Create a Block and append it to right-most BlockGroup.
static Block *ObjectPool<T>::add_block(size_t *index) {
Block *const new_block = new (std::nothrow) Block;
if (NULL == new_block) {
return NULL;
}
size_t ngroup;
do {
ngroup = _ngroup.load(butil::memory_order_acquire);
if (ngroup >= 1) {
BlockGroup *const g =
_block_groups[ngroup - 1].load(butil::memory_order_consume);
const size_t block_index =
g->nblock.fetch_add(1, butil::memory_order_relaxed); // 原子加
if (block_index < OP_GROUP_NBLOCK) {
// 如果当前 group 的大小没有超过阈值,成功增加 block
g->blocks[block_index].store(new_block, butil::memory_order_release);
*index = (ngroup - 1) * OP_GROUP_NBLOCK + block_index;
return new_block;
}
// 否则将增加的计数减去,增加新的 group
g->nblock.fetch_sub(1, butil::memory_order_relaxed);
}
} while (add_block_group(ngroup));
// Fail to add_block_group.
delete new_block;
return NULL;
}
// Create a BlockGroup and append it to _block_groups.
// Shall be called infrequently because a BlockGroup is pretty big.
static bool add_block_group(size_t old_ngroup) {
BlockGroup *bg = NULL;
BAIDU_SCOPED_LOCK(_block_group_mutex); // 加锁
const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
if (ngroup != old_ngroup) {
// Other thread got lock and added group before this thread.
// 有其他线程成功增加了 block group,需要重试
return true;
}
if (ngroup < OP_MAX_BLOCK_NGROUP) {
bg = new (std::nothrow) BlockGroup;
if (NULL != bg) {
// Release fence is paired with consume fence in add_block()
// to avoid un-constructed bg to be seen by other threads.
// 成功增加 block group
_block_groups[ngroup].store(bg, butil::memory_order_release);
_ngroup.store(ngroup + 1, butil::memory_order_release);
}
}
return bg != NULL;
}
回收对象时不进行析构和删除,仅将其指针加入本线程的缓存列表中,积累后一定阈值后有锁地加入全局缓存中。注意申请对象时是申请整段内存,返回的对象并不具有其对应内存的所有权,必须调用内存池提供的 return_object
接口进行回收。默认实现中所有申请的对象都不会进行析构和删除,意味着内存占用是不会回落的,类似 std::vector
。
资源池的设计与上文中的对象池类似,不同的是引入了 ResourceId
的概念,表示该资源在对象池中的偏移量:
// resource_pool_inl.h
template <typename T> struct ResourceId {
uint64_t value;
operator uint64_t() const { return value; }
template <typename T2> ResourceId<T2> cast() const {
ResourceId<T2> id = {value};
return id;
}
};
template <typename T, size_t NITEM> struct ResourcePoolFreeChunk {
size_t nfree;
ResourceId<T> ids[NITEM];
};
// We need following macro to construct T with different CTOR_ARGS
// which may include parenthesis because when T is POD, "new T()"
// and "new T" are different: former one sets all fields to 0 which
// we don't want.
#define BAIDU_RESOURCE_POOL_GET(CTOR_ARGS) \
/* Fetch local free id */ \
if (_cur_free.nfree) { \
const ResourceId<T> free_id = _cur_free.ids[--_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
/* Fetch a FreeChunk from global. \
TODO: Popping from _free needs to copy a FreeChunk which is \
costly, but hardly impacts amortized performance. */ \
if (_pool->pop_free_chunk(_cur_free)) { \
--_cur_free.nfree; \
const ResourceId<T> free_id = _cur_free.ids[_cur_free.nfree]; \
*id = free_id; \
BAIDU_RESOURCE_POOL_FREE_ITEM_NUM_SUB1; \
return unsafe_address_resource(free_id); \
} \
/* Fetch memory from local block */ \
if (_cur_block && _cur_block->nitem < BLOCK_NITEM) { \
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T *p = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
/* Fetch a Block from global */ \
_cur_block = add_block(&_cur_block_index); \
if (_cur_block != NULL) { \
id->value = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; \
T *p = new ((T *)_cur_block->items + _cur_block->nitem) T CTOR_ARGS; \
if (!ResourcePoolValidator<T>::validate(p)) { \
p->~T(); \
return NULL; \
} \
++_cur_block->nitem; \
return p; \
} \
注意 ResourceId
的计算公式:_cur_block_index * BLOCK_NITEM + _cur_block->nitem
,可以在
static inline T *address_resource(ResourceId<T> id) {
const size_t block_index = id.value / BLOCK_NITEM;
const size_t group_index = (block_index >> RP_GROUP_NBLOCK_NBIT);
if (__builtin_expect(group_index < RP_MAX_BLOCK_NGROUP, 1)) {
BlockGroup *bg =
_block_groups[group_index].load(butil::memory_order_consume);
if (__builtin_expect(bg != NULL, 1)) {
Block *b = bg->blocks[block_index & (RP_GROUP_NBLOCK - 1)].load(
butil::memory_order_consume);
if (__builtin_expect(b != NULL, 1)) {
const size_t offset = id.value - block_index * BLOCK_NITEM;
if (__builtin_expect(offset < b->nitem, 1)) {
return (T *)b->items + offset;
}
}
}
}
return NULL;
}
大多数场景下 ResourceId
小于
对象可以被归还,但归还后对象并没有删除,也没有被析构,而是仅仅进入回收列表。下次申请时可能会取到这种使用过的对象,需要重置后才能使用。当对象被归还后,通过对应的偏移量仍可以访问到对象,即 ResourcePool 只负责内存分配,并不解决 ABA 问题。
bthread 的大部分函数都需要在 O(1) 时间内通过 bthread_t 访问到 TaskMeta,并且当 bthread_t 失效后,访问应返回 NULL 以让函数做出返回错误。解决方法是:bthread_t 由 32 位的版本和 32 位的偏移量组成。版本解决 ABA 问题,偏移量由ResourcePool 分配。查找时先通过偏移量获得 TaskMeta,再检查版本,如果版本不匹配,说明 bthread 失效了。
// task_group_inl.h
// Utilities to manipulate bthread_t
inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}
inline butil::ResourceId<TaskMeta> get_slot(bthread_t tid) {
butil::ResourceId<TaskMeta> id = {(tid & 0xFFFFFFFFul)};
return id;
}
inline uint32_t get_version(bthread_t tid) {
return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}
inline TaskMeta *TaskGroup::address_meta(bthread_t tid) {
// TaskMeta * m = address_resource<TaskMeta>(get_slot(tid));
// if (m != NULL && m->version == get_version(tid)) {
// return m;
// }
// return NULL;
return address_resource(get_slot(tid));
}
// task_group.cpp
// 版本使用举例
bool TaskGroup::exists(bthread_t tid) {
if (tid != 0) { // tid of bthread is never 0.
TaskMeta *m = address_meta(tid);
if (m != NULL) {
return (*m->version_butex == get_version(tid)); // 判断版本是否一致
}
}
return false;
}
bRPC 中使用 IOBuf
处理字节流相关的数据,它是一种非连续零拷贝缓冲,提供类似 std::string
的接口。该数据结构和笔者之前分析的 Tokio Bytes 也很像,均使用引用计数实现零拷贝。
// iobuf.h
// IOBuf is a non-continuous buffer that can be cut and combined w/o copying
// payload. It can be read from or flushed into file descriptors as well.
// IOBuf is [thread-compatible]. Namely using different IOBuf in different
// threads simultaneously is safe, and reading a static IOBuf from different
// threads is safe as well.
// IOBuf is [NOT thread-safe]. Modifying a same IOBuf from different threads
// simultaneously is unsafe and likely to crash.
class IOBuf {
public:
static const size_t DEFAULT_BLOCK_SIZE = 8192; // block 默认大小 8KB
// can't directly use `struct iovec' here because we also need to access the
// reference counter(nshared) in Block*
struct BlockRef {
// NOTICE: first bit of `offset' is shared with BigView::start
uint32_t offset; // block 上的偏移,这里保证 offset <= max_int
uint32_t length; // 和长度
Block *block;
};
// IOBuf is essentially a tiny queue of BlockRefs.
struct SmallView {
BlockRef refs[2]; // 引用少于两个 block
};
// SmallView 与 BigView 大小相同
struct BigView {
int32_t magic;
uint32_t start; // 起始的 block 下标
BlockRef *refs; // block 列表
uint32_t nref; // block 列表大小
uint32_t cap_mask;
size_t nbytes; // 当前总长度
const BlockRef &ref_at(uint32_t i) const {
return refs[(start + i) & cap_mask];
}
BlockRef &ref_at(uint32_t i) { return refs[(start + i) & cap_mask]; }
uint32_t capacity() const { return cap_mask + 1; } // block 列表容量
};
bool _small() const { return _bv.magic >= 0; } // 小于 0 时为 BigView
size_t length() const {
return _small() ? (_sv.refs[0].length + _sv.refs[1].length) : _bv.nbytes;
}
private:
union {
BigView _bv; // _bv.magic 与 _sv.refs[0].offset 地址相同,小于 0 时表示 BigView
SmallView _sv;
};
};
// iobuf.cpp,看一下 append 的流程
void IOBuf::append(const IOBuf &other) {
const size_t nref = other._ref_num();
for (size_t i = 0; i < nref; ++i) {
_push_back_ref(other._ref_at(i));
}
}
// 引用的 Block 数量
inline size_t IOBuf::_ref_num() const {
return _small() ? (!!_sv.refs[0].block + !!_sv.refs[1].block) : _bv.nref;
}
inline void IOBuf::_push_back_ref(const BlockRef &r) {
if (_small()) {
return _push_or_move_back_ref_to_smallview<false>(r);
} else {
return _push_or_move_back_ref_to_bigview<false>(r);
}
}
template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef &r) {
BlockRef &back = _bv.ref_at(_bv.nref - 1);
if (back.block == r.block && back.offset + back.length == r.offset) {
// Merge ref
back.length += r.length;
_bv.nbytes += r.length;
if (MOVE) {
r.block->dec_ref();
}
return;
}
if (_bv.nref != _bv.capacity()) {
// block 列表有容量的情况下,直接增加对该 block 的引用
_bv.ref_at(_bv.nref++) = r;
_bv.nbytes += r.length; // 长度叠加
if (!MOVE) {
r.block->inc_ref();
}
return;
}
// resize, don't modify bv until new_refs is fully assigned
// 当前 block 列表容量不足,倍增申请
const uint32_t new_cap = _bv.capacity() * 2;
BlockRef *new_refs = iobuf::acquire_blockref_array(new_cap);
for (uint32_t i = 0; i < _bv.nref; ++i) {
// 复制原 block
new_refs[i] = _bv.ref_at(i);
}
new_refs[_bv.nref++] = r;
// Change other variables
_bv.start = 0;
// 删除原 block 列表
iobuf::release_blockref_array(_bv.refs, _bv.capacity());
_bv.refs = new_refs;
_bv.cap_mask = new_cap - 1;
_bv.nbytes += r.length;
if (!MOVE) {
r.block->inc_ref();
}
}
// Block 的实现
void *(*blockmem_allocate)(size_t) = ::malloc;
void (*blockmem_deallocate)(void *) = ::free;
const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
typedef void (*UserDataDeleter)(void *);
struct UserDataExtension {
UserDataDeleter deleter;
};
struct IOBuf::Block {
butil::atomic<int> nshared; // 引用计数
uint16_t flags; // 是否使用自定义 deleter,可以看下方的英文注释
uint16_t abi_check; // original cap, never be zero.
uint32_t size;
uint32_t cap;
Block *portal_next;
// When flag is 0, data points to `size` bytes starting at
// `(char*)this+sizeof(Block)' When flag & IOBUF_BLOCK_FLAGS_USER_DATA is
// non-0, data points to the user data and the deleter is put in
// UserDataExtension at `(char*)this+sizeof(Block)'
char *data;
Block(char *data_in, uint32_t data_size)
: nshared(1), flags(0), abi_check(0), size(0), cap(data_size),
portal_next(NULL), data(data_in) {
iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
butil::memory_order_relaxed);
}
Block(char *data_in, uint32_t data_size, UserDataDeleter deleter)
: nshared(1), flags(IOBUF_BLOCK_FLAGS_USER_DATA), abi_check(0),
size(data_size), cap(data_size), portal_next(NULL), data(data_in) {
get_user_data_extension()->deleter = deleter; // 自定义 deleter
}
// Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
UserDataExtension *get_user_data_extension() {
char *p = (char *)this;
return (UserDataExtension *)(p + sizeof(Block)); // 当前内存之后的 4 字节
}
void inc_ref() {
check_abi();
nshared.fetch_add(1, butil::memory_order_relaxed);
}
void dec_ref() {
check_abi();
if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
butil::atomic_thread_fence(butil::memory_order_acquire);
if (!flags) {
iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
butil::memory_order_relaxed);
this->~Block(); // 调用析构函数,不删除内存
iobuf::blockmem_deallocate(this); // 删除内存
} else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
get_user_data_extension()->deleter(data); // 执行自定义删除
this->~Block();
free(this);
}
}
}
int ref_count() const { return nshared.load(butil::memory_order_relaxed); }
bool full() const { return size >= cap; }
size_t left_space() const { return cap - size; }
};
// 创建 block,同时存储 Block 对象和实际数据
inline IOBuf::Block *create_block(const size_t block_size) {
if (block_size > 0xFFFFFFFFULL) {
LOG(FATAL) << "block_size=" << block_size << " is too large";
return NULL;
}
char *mem = (char *)iobuf::blockmem_allocate(block_size);
if (mem == NULL) {
return NULL;
}
return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
block_size - sizeof(IOBuf::Block));
}
// 追加用户数据,所有权取决于传入的 deleter
int IOBuf::append_user_data(void *data, size_t size, void (*deleter)(void *)) {
if (size > 0xFFFFFFFFULL - 100) {
LOG(FATAL) << "data_size=" << size << " is too large";
return -1;
}
char *mem = (char *)malloc(sizeof(IOBuf::Block) + sizeof(UserDataExtension));
if (mem == NULL) {
return -1;
}
if (deleter == NULL) {
deleter = ::free;
}
IOBuf::Block *b = new (mem) IOBuf::Block((char *)data, size, deleter);
const IOBuf::BlockRef r = {0, b->cap, b};
_move_back_ref(r);
return 0;
}
IOBuf
提供了丰富的字节流处理能力,整体代码也非常长,超过 3k 行。这里省略其他处理细节的分析了,有兴趣可以自行阅读。
bRPC 中使用 Socket
来管理连接,并且使用了 64 位的 SocketId
来指代 Socket
对象方便在多线程下环境下使用连接。使用 Socket 时会用到以下数据结构:
Socket
对象本身;SocketUniquePtr
,实际上更像是 std::shared_ptr
,析构时减少引用计数;SocketId
,带版本校验,使用上类似 std::weak_ptr
。常用的方法有:
Create
:创建 Socket 对象并返回 SocketId
;Address
:获取 SocketId
对应的 Socket
对象,会返回一个 SocketUniquePtr
;SetFailed
:标记一个 Socket
对象为失败,之后所有对该 SocketId
的 Address
操作会返回空指针,当 Socket
对象使用计数为 0 时触发回收。官方文档上也解释了为何设计地如此复杂(摘录自参考文献 3):
Socket 独有的 SetFailed 可以在需要时确保 Socket 不能被继续 Address 而最终引用计数归 0,单纯使用 shared_ptr/weak_ptr 则无法保证这点,当一个 server 需要退出时,如果请求仍频繁地到来,对应 Socket 的引用计数可能迟迟无法清 0 而导致 server 无法退出。另外 weak_ptr 无法直接作为 epoll 的 data,而 SocketId 可以。这些因素使我们设计了 Socket。
存储 SocketUniquePtr 还是 SocketId 取决于是否需要强引用。像 Controller 贯穿了 RPC 的整个流程,和 Socket 中的数据有大量交互,它存放的是 SocketUniquePtr。epoll 主要是提醒对应 fd 上发生了事件,如果 Socket 回收了,那这个事件是可有可无的,所以它存放了 SocketId。
由于 SocketUniquePtr 只要有效,其中的数据就不会变,这个机制使用户不用关心麻烦的 race conditon 和 ABA problem,可以放心地对共享的 fd 进行操作。这种方法也规避了隐式的引用计数,内存的 ownership 明确,程序的质量有很好的保证。bRPC 中有大量的 SocketUniquePtr 和 SocketId,它们确实简化了我们的开发。
部分理念需要等分析完 bRPC 全流程的代码后才能体会到了。先看 Socket
本身的实现:
class BAIDU_CACHELINE_ALIGNMENT Socket {
struct Forbidden {};
public:
Socket(Forbidden); // 禁止用户直接构造,但 ResourcePool 又需要访问 public 的构造函数,私有的 Forbidden 可以实现这个 trick
// Create a Socket according to `options', put the identifier into `id'.
// Returns 0 on sucess, -1 otherwise.
static int Create(const SocketOptions &options, SocketId *id);
// Place the Socket associated with identifier `id' into unique_ptr `ptr',
// which will be released automatically when out of scope (w/o explicit
// std::move). User can still access `ptr' after calling ptr->SetFailed()
// before release of `ptr'.
// This function is wait-free.
// Returns 0 on success, -1 when the Socket was SetFailed().
static int Address(SocketId id, SocketUniquePtr *ptr);
// Mark this Socket or the Socket associated with `id' as failed.
// Any later Address() of the identifier shall return NULL unless the
// Socket was revivied by HealthCheckThread. The Socket is NOT recycled
// after calling this function, instead it will be recycled when no one
// references it. Internal fields of the Socket are still accessible
// after calling this function. Calling SetFailed() of a Socket more
// than once is OK.
// This function is lock-free.
// Returns -1 when the Socket was already SetFailed(), 0 otherwise.
int SetFailed();
private:
// unsigned 32-bit version + signed 32-bit referenced-count.
// Meaning of version:
// * Created version: no SetFailed() is called on the Socket yet. Must be
// same evenness with initial _versioned_ref because during lifetime of
// a Socket on the slot, the version is added with 1 twice. This is
// also the version encoded in SocketId.
// * Failed version: = created version + 1, SetFailed()-ed but returned.
// * Other versions: the socket is already recycled.
// Socket 自身记录的版本+引用计数,首次构造对象时为 0
butil::atomic<uint64_t> _versioned_ref;
// Flag used to mark whether additional reference has been decreased
// by either `SetFailed' or `SetRecycle'
// 回收标记
butil::atomic<bool> _recycle_flag;
};
// SocketId = 32-bit version + 32-bit slot.
// version: from version part of _versioned_nref, must be an EVEN number.
// slot: designated by ResourcePool.
int Socket::Create(const SocketOptions &options, SocketId *id) {
butil::ResourceId<Socket> slot; // 接收返回的资源 ID
Socket *const m = butil::get_resource(&slot, Forbidden());
if (m == NULL) {
LOG(FATAL) << "Fail to get_resource<Socket>";
return -1;
}
g_vars->nsocket << 1;
CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
...
// nref can be non-zero due to concurrent AddressSocket().
// _this_id will only be used in destructor/Destroy of referenced
// slots, which is safe and properly fenced. Although it's better
// to put the id into SocketUniquePtr.
// 构造 SocketId,_versioned_ref 引用计数加 1,首次构造时版本为 0
m->_this_id = MakeSocketId(VersionOfVRef(m->_versioned_ref.fetch_add(
1, butil::memory_order_release)),
slot);
...
*id = m->_this_id; // 返回 socket id
return 0;
}
// Utility functions to combine and extract SocketId.
BUTIL_FORCE_INLINE SocketId MakeSocketId(uint32_t version,
butil::ResourceId<Socket> slot) {
return SocketId((((uint64_t)version) << 32) | slot.value);
}
// Utility functions to combine and extract Socket::_versioned_ref
BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
return (uint32_t)(vref >> 32);
}
BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
butil::ResourceId<Socket> id = {(sid & 0xFFFFFFFFul)};
return id;
}
BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
return (int32_t)(vref & 0xFFFFFFFFul);
}
inline int Socket::Address(SocketId id, SocketUniquePtr *ptr) {
// 解析 ResourceId
const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
// 获取对象地址
Socket *const m = address_resource(slot);
if (__builtin_expect(m != NULL, 1)) {
// acquire fence makes sure this thread sees latest changes before
// Dereference() or Revive().
// 引用计数 +1,注意这里返回的 vref1 是自增前的 _versioned_ref
const uint64_t vref1 =
m->_versioned_ref.fetch_add(1, butil::memory_order_acquire);
// 获取版本
const uint32_t ver1 = VersionOfVRef(vref1);
if (ver1 == VersionOfSocketId(id)) {
// 版本与 SocketId 的版本一致则返回成功
ptr->reset(m);
return 0;
}
// 版本不一致,恢复引用计数,返回的 vref2 是前面自增后的 _versioned_ref
const uint64_t vref2 =
m->_versioned_ref.fetch_sub(1, butil::memory_order_release);
// 取出引用数量
const int32_t nref = NRefOfVRef(vref2);
if (nref > 1) {
return -1;
} else if (__builtin_expect(nref == 1, 1)) {
// 无引用,再次读取版本
const uint32_t ver2 = VersionOfVRef(vref2);
// 如果版本是奇数,说明刚进行了 SetFailed 操作
// 并且当前无引用,触发 socket 回收
if ((ver2 & 1)) {
if (ver1 == ver2 || ver1 + 1 == ver2) {
uint64_t expected_vref = vref2 - 1; // 自减后 _versioned_ref 的理论值
// 尝试 CAS,原子地将 _versioned_ref 替换为版本 id_ver + 2,计数 0
if (m->_versioned_ref.compare_exchange_strong(
expected_vref, MakeVRef(ver2 + 1, 0),
butil::memory_order_acquire, butil::memory_order_relaxed)) {
// 成功 CAS 后,执行回收
m->OnRecycle(); // 执行实际的回收操作
return_resource(SlotOfSocketId(id));
}
} else {
CHECK(false) << "ref-version=" << ver1 << " unref-version=" << ver2;
}
} else {
CHECK_EQ(ver1, ver2);
// Addressed a free slot.
}
} else {
CHECK(false) << "Over dereferenced SocketId=" << id;
}
}
return -1;
}
int Socket::SetFailed() { return SetFailed(EFAILEDSOCKET, NULL); }
int Socket::SetFailed(int error_code, const char *error_fmt, ...) {
if (error_code == 0) {
CHECK(false) << "error_code is 0";
error_code = EFAILEDSOCKET;
}
const uint32_t id_ver = VersionOfSocketId(_this_id); // 获取 socket id 上的版本
uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
for (;;) { // need iteration to retry compare_exchange_strong
// version_ref 的版本与 socket id 的版本不一致,返回失败
if (VersionOfVRef(vref) != id_ver) {
return -1;
}
// Try to set version=id_ver+1 (to make later Address() return NULL),
// retry on fail.
// 尝试 CAS,原子地将 _version_ref 替换为版本 id_ver + 1,计数不变
if (_versioned_ref.compare_exchange_strong(
vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
butil::memory_order_release, butil::memory_order_relaxed)) {
// CAS 成功后,原先的 socket id Address 操作会因为版本校验而失败
// _version_ref 的版本也会修改为一个**奇数**
...
// Deref additionally which is added at creation so that this
// Socket's reference will hit 0(recycle) when no one addresses it.
ReleaseAdditionalReference();
// NOTE: This Socket may be recycled at this point, don't
// touch anything.
return 0;
}
}
}
int Socket::ReleaseAdditionalReference() {
bool expect = false;
// Use `relaxed' fence here since `Dereference' has `released' fence
if (_recycle_flag.compare_exchange_strong(expect, true,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
return Dereference(); // 引用计数 -1
}
return -1;
}
每次 SocketUniquePtr
析构时,会调用 DereferenceSocket
减少引用计数,当引用计数为 0 时回收 Socket
对象。中间 SetFailed
与 Address
操作并发时会产生一些计数的特殊情况,需要特殊处理,这里可以仔细思考下。
// socket_id.h
typedef uint64_t SocketId;
const SocketId INVALID_SOCKET_ID = (SocketId)-1;
class Socket;
extern void DereferenceSocket(Socket *);
struct SocketDeleter {
void operator()(Socket *m) const { DereferenceSocket(m); }
};
// RAII,析构时执行 SocketDeleter() -> DereferenceSocket
typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;
// socket.cpp
void DereferenceSocket(Socket *s) {
if (s) {
// 减少引用计数
s->Dereference();
}
}
inline int Socket::Dereference() {
const SocketId id = _this_id;
// 引用计数 -1
const uint64_t vref =
_versioned_ref.fetch_sub(1, butil::memory_order_release);
const int32_t nref = NRefOfVRef(vref); // 获得计数的值
if (nref > 1) {
// 存在其他引用,直接返回
return 0;
}
if (__builtin_expect(nref == 1, 1)) {
// 无引用,需要回收
const uint32_t ver = VersionOfVRef(vref); // 获取 version_ref 上的版本
const uint32_t id_ver = VersionOfSocketId(id); // 获取 socket id 上的版本
// SetFailed 与 Address 并发时对特殊情况的处理,可以自行看英文注释
// Besides first successful SetFailed() adds 1 to version, one of
// those dereferencing nref from 1->0 adds another 1 to version.
// Notice "one of those": The wait-free Address() may make ref of a
// version-unmatched slot change from 1 to 0 for mutiple times, we
// have to use version as a guard variable to prevent returning the
// Socket to pool more than once.
//
// Note: `ver == id_ver' means this socket has been `SetRecycle'
// before rather than `SetFailed'; `ver == ide_ver+1' means we
// had `SetFailed' this socket before. We should destroy the
// socket under both situation
if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
// sees nref:1->0, try to set version=id_ver+2,--nref.
// No retry: if version changes, the slot is already returned by
// another one who sees nref:1->0 concurrently; if nref changes,
// which must be non-zero, the slot will be returned when
// nref changes from 1->0 again.
// Example:
// SetFailed(): --nref, sees nref:1->0 (1)
// try to set version=id_ver+2 (2)
// Address(): ++nref, unmatched version (3)
// --nref, sees nref:1->0 (4)
// try to set version=id_ver+2 (5)
// 1,2,3,4,5 or 1,3,4,2,5:
// SetFailed() succeeds, Address() fails at (5).
// 1,3,2,4,5: SetFailed() fails with (2), the slot will be
// returned by (5) of Address()
// 1,3,4,5,2: SetFailed() fails with (2), the slot is already
// returned by (5) of Address().
uint64_t expected_vref = vref - 1;
if (_versioned_ref.compare_exchange_strong(
expected_vref, MakeVRef(id_ver + 2, 0),
butil::memory_order_acquire, butil::memory_order_relaxed)) {
OnRecycle(); // 执行实际的回收操作
return_resource(SlotOfSocketId(id));
return 1;
}
return 0;
}
LOG(FATAL) << "Invalid SocketId=" << id;
return -1;
}
LOG(FATAL) << "Over dereferenced SocketId=" << id;
return -1;
}