Folly 的 Hazard Pointer 实现中有一段注释,详细描述了 C++ 里几种主流的安全内存回收方法,列表如下:
优点 | 缺点 | 场景 | |
---|---|---|---|
Locking | 易用 | 读高开销 / 抢占式 / 死锁 | 非性能敏感 |
Reference Couting | 自动回收 / 线程无关 / 免于死锁 | 读高开销 / 抢占式 | 需要自动回收 |
Read-copy-update (RCU) | 简单 / 高速 / 可拓展 | 对阻塞敏感 | 性能敏感 |
Hazard Pointer | 高速 / 可拓展 / 阻塞场景可用 | 性能依赖 TLS | 性能敏感 / 读多写少 |
C++ 标准库中提供了锁和引用计数方案。锁的缺点很明显,无论是哪种锁,在读的时候都会产生较大的开销。引用计数则相对好一些,但每次读取都需要修改引用计数,高并发场景下这样的原子操作也会成为性能瓶颈,毕竟原子加对应的 CPU 指令 lock add
也可以看成是微型锁。
Linux 内核中提供了 RCU 方法,笔者目前对此还没有太多的了解。本文主要介绍 Hazard Pointer,一种无锁编程中广泛使用的安全内存回收方法,适用于需要高性能读、读多写少的场景。其论文可参考文献 1,标准草案可参考文献 2,代码实现可参考 Folly 中的 HazPtr
。
首先回忆下引用计数的做法:
#include <atomic>
#include <memory>
template <class T>
class ReferenceCount {
public:
ReferenceCount(std::unique_ptr<T> ptr) : ptr_(std::move(ptr)), cnt_(1) {}
T *Ptr() const { return ptr_.get(); }
ReferenceCount *Ref() {
++cnt_;
return this;
}
void Deref() {
if (--cnt_ == 0) {
delete this;
}
}
private:
std::unique_ptr<T> ptr_;
std::atomic_uint32_t cnt_;
};
仔细观察可以发现:
总结起来,当对象正在使用时,就不能回收内存。每一个“正在使用”都需要对应一个标记,引用计数使用的标记是计数数值一,对应的原子操作性能问题就成为它无法摆脱的原罪。而 Hazard Pointer 使用的标记更为轻巧,一般通过在链表中标记该对象的指针实现,回收时如果发现链表中有对应的指针就不进行内存回收,将标记的复杂度转移到回收部分,也就更适合读多写少的场景。Hazard Pointer 的简单实现(在线执行):
#include <atomic>
#include <memory>
#include <unordered_set>
template <class T>
struct HazardPointer {
public:
class Holder {
public:
explicit Holder(HazardPointer<T> *pointer) : pointer_(pointer) {}
Holder(const HazardPointer &) = delete;
~Holder() { pointer_->Release(); }
T *get() const { return pointer_->target_.load(std::memory_order_acquire); }
operator bool() const { return get(); }
T *operator->() const { return get(); }
T &operator*() const { return *get(); }
private:
HazardPointer<T> *pointer_;
};
public:
~HazardPointer() {
if (next_) {
delete next_;
}
}
void Release() {
target_.store(nullptr, std::memory_order_release);
active_.clear(std::memory_order_release);
}
static Holder Acquire(const std::atomic<T *> &target) {
auto pointer = HazardPointer<T>::Alloc();
do {
pointer->target_ = target.load(std::memory_order_acquire);
} while (pointer->target_.load(std::memory_order_acquire) !=
target.load(std::memory_order_acquire));
return Holder(pointer);
}
static void Update(std::atomic<T *> &target, T *new_target) {
auto old = target.exchange(new_target);
Retire(old);
}
static void Reclaim() {
// collect in-use pointers
std::unordered_set<T *> in_use;
for (auto p = head_list_.load(std::memory_order_acquire); p; p = p->next_) {
in_use.insert(p->target_);
}
// delete useless pointers
List *retire_head = nullptr;
List *retire_tail = nullptr;
auto p = retire_list_.exchange(nullptr);
while (p != nullptr) {
auto next = p->next;
if (in_use.count(p->target.get()) == 0) {
delete p;
} else {
p->next = retire_head;
retire_head = p;
if (retire_tail == nullptr) {
retire_tail = p;
}
}
p = next;
}
if (retire_head) {
// push to retired list again
auto &tail = retire_tail->next;
do {
tail = retire_list_.load(std::memory_order_acquire);
} while (!retire_list_.compare_exchange_weak(tail, retire_head));
}
}
private:
static HazardPointer<T> *Alloc() {
for (auto p = head_list_.load(std::memory_order_acquire); p; p = p->next_) {
if (!p->active_.test_and_set()) {
return p;
}
}
auto p = new HazardPointer<T>();
p->active_.test_and_set();
do {
p->next_ = head_list_.load(std::memory_order_acquire);
} while (!head_list_.compare_exchange_weak(p->next_, p));
return p;
}
static void Retire(T *ptr) {
auto p = new List;
p->target.reset(ptr);
do {
p->next = retire_list_.load(std::memory_order_acquire);
} while (!retire_list_.compare_exchange_weak(p->next, p));
if (++retire_count_ == 1000) {
retire_count_ = 0;
Reclaim();
}
}
private:
struct List {
std::unique_ptr<T> target{nullptr};
List *next = nullptr;
};
private:
std::atomic<T *> target_{nullptr};
HazardPointer<T> *next_;
std::atomic_flag active_ = ATOMIC_FLAG_INIT;
static std::atomic<HazardPointer<T> *> head_list_;
static std::atomic<uint32_t> retire_count_;
static std::atomic<List *> retire_list_;
};
template <class T>
std::atomic<HazardPointer<T> *> HazardPointer<T>::head_list_{nullptr};
template <class T>
std::atomic<uint32_t> HazardPointer<T>::retire_count_{0};
template <class T>
std::atomic<typename HazardPointer<T>::List *> HazardPointer<T>::retire_list_{
nullptr};
// example
#include <cstdint>
#include <iostream>
#include <thread>
#include <vector>
std::atomic<int> g_count{0};
class A {
public:
explicit A(int value) : value_(value) { ++g_count; }
~A() { --g_count; }
int Value() { return value_; }
private:
int value_;
};
int main() {
std::atomic<A *> target{new A(0)};
constexpr int N = 8;
constexpr int M = 1000000;
constexpr int W = 1000;
std::vector<std::thread> threads;
std::atomic<uint64_t> sum{0};
for (int t = 0; t < N; ++t) {
threads.emplace_back([&, t] {
for (int i = 0; i < M; ++i) {
if (i % W == 0) {
// write less
HazardPointer<A>::Update(target, new A(t * M + i));
} else {
// read more
auto holder = HazardPointer<A>::Acquire(target);
sum.fetch_add(holder->Value());
}
}
});
}
// wait finish
for (auto &thread : threads) {
thread.join();
}
HazardPointer<A>::Reclaim();
printf("Remain: %d\n", g_count.load());
}
简单解释下原理。Hazard Pointer 申请读取时,会在对象链表中申请一个空位置,将对象的指针写入该位置中,读取结束时将该位置重新置空即可;而发生更新时,将更新替换下来的旧指针加入退休列表里,退休列表积攒到一定程度时则检查哪些对象已经不在对象链表中,不再使用的则可以执行删除。
如果使用 std::shared_ptr
实现上述逻辑,你会发现它的执行速度还要高于上述代码。原因在于这里实现的 Hazard Pointer 没有使用非对称内存屏障和线程本地存储优化。如果仔细观察,可以发现 Acquire
函数中使用顺序一致性内部屏障 pointer->target_ = ...
,x86 平台上会翻译为 mfence
指令,与 lock add
指令相比也不遑多让。在读多写少的前提下,可以将读写两边的屏障替换为非对称内存屏障,将读部分的开销转移到写部分中。可参考先前内存屏障博文中的介绍。
另一方面,Hazard Pointer 的高性能依赖于平台上线程本地存储(TLS)的性能。单纯使用 CAS 更新全局的对象链表和退休列表的性能太低,可以使用 TLS 做为缓冲层,这样大部分时间都是更新本线程的数据。这部分可以参考 Folly 中的 ThreadLocalPtr
,其中实现了遍历所有线程 TLS 的黑魔法。