本系列的上一篇介绍了 Sorted Table 的构建过程,本篇就继续分析 Sorted Table 的读取、解析过程。
根据依赖关系,首先来看 table/format.cc
中 ReadBlock
函数的实现:
struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
bool heap_allocated; // True iff caller should delete[] data.data()
};
// Read the block identified by "handle" from "file". On failure
// return non-OK. On success fill *result and return OK.
Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
const BlockHandle& handle, BlockContents* result) {
result->data = Slice();
result->cachable = false;
result->heap_allocated = false;
// Read the block contents as well as the type/crc footer.
// See table_builder.cc for the code that built this structure.
size_t n = static_cast<size_t>(handle.size());
char* buf = new char[n + kBlockTrailerSize];
Slice contents;
Status s = file->Read(handle.offset(), n + kBlockTrailerSize, &contents, buf);
if (!s.ok()) {
delete[] buf;
return s;
}
if (contents.size() != n + kBlockTrailerSize) {
delete[] buf;
return Status::Corruption("truncated block read");
}
// Check the crc of the type and the block contents
const char* data = contents.data(); // Pointer to where Read put the data
if (options.verify_checksums) {
const uint32_t crc = crc32c::Unmask(DecodeFixed32(data + n + 1));
const uint32_t actual = crc32c::Value(data, n + 1);
if (actual != crc) {
delete[] buf;
s = Status::Corruption("block checksum mismatch");
return s;
}
}
switch (data[n]) {
case kNoCompression:
if (data != buf) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
delete[] buf;
result->data = Slice(data, n);
result->heap_allocated = false;
result->cachable = false; // Do not double-cache
} else {
result->data = Slice(buf, n);
result->heap_allocated = true;
result->cachable = true;
}
// Ok
break;
case kSnappyCompression: {
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
result->heap_allocated = true;
result->cachable = true;
break;
}
default:
delete[] buf;
return Status::Corruption("bad block type");
}
return Status::OK();
}
对于已经存储的 Sorted Table 文件,提供 ReadOptions
和 BlockHandle
后,可以将 handle
对应的 Block 内容读取到 BlockContents
中。该结构体储存 Block 的字节流,以及能否缓存、是否需要手动清理的标记。ReadBlock
的实现非常直接,读取文件对应位置的字节流,进行必要的校验和解压缩。继续看 Block
的解析部分 table/block.h
:
struct BlockContents;
class Comparator;
class Block {
public:
// Initialize the block with the specified contents.
explicit Block(const BlockContents& contents);
Block(const Block&) = delete;
Block& operator=(const Block&) = delete;
~Block();
size_t size() const { return size_; }
Iterator* NewIterator(const Comparator* comparator);
private:
class Iter;
uint32_t NumRestarts() const;
const char* data_;
size_t size_;
uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[]
};
接口的核心部分自然是迭代器。迭代器提供 Block 中键值对数据的遍历和查找。继续看对应函数的实现:
inline uint32_t Block::NumRestarts() const {
assert(size_ >= sizeof(uint32_t));
return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
}
Block::Block(const BlockContents& contents)
: data_(contents.data.data()),
size_(contents.data.size()),
owned_(contents.heap_allocated) {
if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker
} else {
size_t max_restarts_allowed = (size_ - sizeof(uint32_t)) / sizeof(uint32_t);
if (NumRestarts() > max_restarts_allowed) {
// The size is too small for NumRestarts()
size_ = 0;
} else {
restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
}
}
}
Block::~Block() {
if (owned_) {
delete[] data_;
}
}
// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively. Will not dereference past "limit".
//
// If any errors are detected, returns nullptr. Otherwise, returns a
// pointer to the key delta (just past the three decoded values).
static inline const char* DecodeEntry(const char* p, const char* limit,
uint32_t* shared, uint32_t* non_shared,
uint32_t* value_length) {
if (limit - p < 3) return nullptr;
*shared = reinterpret_cast<const uint8_t*>(p)[0];
*non_shared = reinterpret_cast<const uint8_t*>(p)[1];
*value_length = reinterpret_cast<const uint8_t*>(p)[2];
if ((*shared | *non_shared | *value_length) < 128) {
// Fast path: all three values are encoded in one byte each
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr;
}
if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return nullptr;
}
return p;
}
按照之前描述的 Block 存储结构,最后 4 字节存储复活点的数量,如 NumRestarts
实现。构造时进行必要的判断,在 restart_offset_
中存储复活点列表的位置。解析条目时,首先假设 shared
、non_shared
和 value_length
都小于 128,尝试按照按字节读取长度以提高解析速度。在大部分情况下该规则都是满足的,当少数情况出现超长的串时,会回退到普通的 GetVarint32Ptr
。继续看迭代器的实现:
class Block::Iter : public Iterator {
private:
const Comparator* const comparator_;
const char* const data_; // underlying block contents
uint32_t const restarts_; // Offset of restart array (list of fixed32)
uint32_t const num_restarts_; // Number of uint32_t entries in restart array
// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
uint32_t restart_index_; // Index of restart block in which current_ falls
std::string key_;
Slice value_;
Status status_;
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
// Return the offset in data_ just past the end of the current entry.
inline uint32_t NextEntryOffset() const {
return (value_.data() + value_.size()) - data_;
}
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
void SeekToRestartPoint(uint32_t index) {
key_.clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
// ParseNextKey() starts at the end of value_, so set value_ accordingly
uint32_t offset = GetRestartPoint(index);
value_ = Slice(data_ + offset, 0);
}
public:
Iter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts)
: comparator_(comparator),
data_(data),
restarts_(restarts),
num_restarts_(num_restarts),
current_(restarts_),
restart_index_(num_restarts_) {
assert(num_restarts_ > 0);
}
bool Valid() const override { return current_ < restarts_; }
Status status() const override { return status_; }
Slice key() const override {
assert(Valid());
return key_;
}
Slice value() const override {
assert(Valid());
return value_;
}
void Next() override {
assert(Valid());
ParseNextKey();
}
void Prev() override {
assert(Valid());
// Scan backwards to a restart point before current_
const uint32_t original = current_;
while (GetRestartPoint(restart_index_) >= original) {
if (restart_index_ == 0) {
// No more entries
current_ = restarts_;
restart_index_ = num_restarts_;
return;
}
restart_index_--;
}
SeekToRestartPoint(restart_index_);
do {
// Loop until end of current entry hits the start of original entry
} while (ParseNextKey() && NextEntryOffset() < original);
}
void SeekToFirst() override {
SeekToRestartPoint(0);
ParseNextKey();
}
void SeekToLast() override {
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextKey() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}
private:
void CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.clear();
value_.clear();
}
bool ParseNextKey() {
current_ = NextEntryOffset();
const char* p = data_ + current_;
const char* limit = data_ + restarts_; // Restarts come right after data
if (p >= limit) {
// No more entries to return. Mark as invalid.
current_ = restarts_;
restart_index_ = num_restarts_;
return false;
}
// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
if (p == nullptr || key_.size() < shared) {
CorruptionError();
return false;
} else {
key_.resize(shared);
key_.append(p, non_shared);
value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
return true;
}
}
};
Iterator* Block::NewIterator(const Comparator* comparator) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(comparator, data_, restart_offset_, num_restarts);
}
}
先看成员变量:
data_
中;restarts_
和 num_restarts_
存储复活点列表的偏移和数量;current_
存储当前迭代器的偏移,restart_index_
存储 current_
前面最近的复活点偏移;key_
和 value_
存储键值对。注意 key_
是 std::string
,因为有共享前缀,需要存储中间恢复的 Key
,而 value_
可以直接从 data_
中截取。函数 NextEntryOffset
根据当前的 value
的位置和大小计算下一个键值对的起始位置,因为每个条目最后存储的是 value
。函数 GetRestartPoint
读取第 index
个复活点的位置,函数 SeekToRestartPoint
将当前的 key_
清空、设定 restart_index_
并将 value_
设为复活点前的空串,以便于执行 NextEntryOffset
时获得对应复活点偏移。跳到函数 ParseNextKey
,将 current_
设为下一个键值对的起点,通过 DecodeEntry
解析得到需要的长度信息,恢复 key_
并读取 value_
。
迭代器存储的状态信息包括 key_
存储的共享前缀,和 value_
存储的下个条目起点。当发生起始点的切换时,需要先执行函数 SeekToRestartPoint
清空当前存储的状态信息,再执行函数 ParseNextKey
解析下一个键值对。按照这个过程读函数 Prev
、SeekToFirst
和 SeekToLast
就非常轻松了。最后来看函数 Seek
:
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}
首先在复活点上做二分查找,这里实现的二分查找的结果就是 left
对应的复活点 Key
是严格小于 target
的最大 Key
。二分查找完成后跳到复活点处,按顺序恢复每个条目的键值,对比返回。
最后来看一个细节:restart_index_
只会在函数 Prev
里读取到,以确定上一个复活点的位置。如果仔细观察函数 ParseNextKey
中关于 restart_index_
的更新:
while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) {
++restart_index_;
}
如果这里 current_
刚好到达复活点 i
,restart_index_
仍然会保持在 i - 1
的,这样在执行 Prev
时 while
循环可以少做一次。Google 大佬实力可见一斑。但从语义的角度,这种做法比较容易让人困惑,我应该不会这么做(所以成不了 Google 大佬。
上一节中分析了 Block 的迭代器实现,而对一个 Sorted Table 来说,还需要其他几种迭代器共同组成迭代器链,以高效地完成对 Sorted Table 的遍历和查找。首先来看 IteratorWrapper
的实现 table/iterator_wrapper
:
// A internal wrapper class with an interface similar to Iterator that
// caches the valid() and key() results for an underlying iterator.
// This can help avoid virtual function calls and also gives better
// cache locality.
class IteratorWrapper {
public:
IteratorWrapper() : iter_(nullptr), valid_(false) {}
explicit IteratorWrapper(Iterator* iter) : iter_(nullptr) { Set(iter); }
~IteratorWrapper() { delete iter_; }
Iterator* iter() const { return iter_; }
// Takes ownership of "iter" and will delete it when destroyed, or
// when Set() is invoked again.
void Set(Iterator* iter) {
delete iter_;
iter_ = iter;
if (iter_ == nullptr) {
valid_ = false;
} else {
Update();
}
}
// Iterator interface methods
bool Valid() const { return valid_; }
Slice key() const {
assert(Valid());
return key_;
}
Slice value() const {
assert(Valid());
return iter_->value();
}
// Methods below require iter() != nullptr
Status status() const {
assert(iter_);
return iter_->status();
}
void Next() {
assert(iter_);
iter_->Next();
Update();
}
void Prev() {
assert(iter_);
iter_->Prev();
Update();
}
void Seek(const Slice& k) {
assert(iter_);
iter_->Seek(k);
Update();
}
void SeekToFirst() {
assert(iter_);
iter_->SeekToFirst();
Update();
}
void SeekToLast() {
assert(iter_);
iter_->SeekToLast();
Update();
}
private:
void Update() {
valid_ = iter_->Valid();
if (valid_) {
key_ = iter_->key();
}
}
Iterator* iter_;
bool valid_;
Slice key_;
};
迭代器的简单包装,缓存了 key_
和 valid_
属性。按照注释所说的,可以减少虚函数的调用,并且提供更好的缓存局部性。前者很好理解,后者仍然有些困惑。接着来看二级迭代器 TwoLevelIterator
的实现 table/two_level_iterator.cc
:
#include "leveldb/table.h"
#include "table/block.h"
#include "table/format.h"
#include "table/iterator_wrapper.h"
namespace leveldb {
namespace {
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, const Slice&);
class TwoLevelIterator : public Iterator {
public:
TwoLevelIterator(Iterator* index_iter, BlockFunction block_function,
void* arg, const ReadOptions& options);
~TwoLevelIterator() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() override;
void Prev() override;
bool Valid() const override { return data_iter_.Valid(); }
Slice key() const override {
assert(Valid());
return data_iter_.key();
}
Slice value() const override {
assert(Valid());
return data_iter_.value();
}
Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_.status().ok()) {
return index_iter_.status();
} else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) {
return data_iter_.status();
} else {
return status_;
}
}
private:
void SaveError(const Status& s) {
if (status_.ok() && !s.ok()) status_ = s;
}
void SkipEmptyDataBlocksForward();
void SkipEmptyDataBlocksBackward();
void SetDataIterator(Iterator* data_iter);
void InitDataBlock();
BlockFunction block_function_;
void* arg_;
const ReadOptions options_;
Status status_;
IteratorWrapper index_iter_;
IteratorWrapper data_iter_; // May be nullptr
// If data_iter_ is non-null, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the data_iter_.
std::string data_block_handle_;
};
TwoLevelIterator::TwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options)
: block_function_(block_function),
arg_(arg),
options_(options),
index_iter_(index_iter),
data_iter_(nullptr) {}
TwoLevelIterator::~TwoLevelIterator() = default;
void TwoLevelIterator::Seek(const Slice& target) {
index_iter_.Seek(target);
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
SkipEmptyDataBlocksForward();
}
void TwoLevelIterator::SeekToFirst() {
index_iter_.SeekToFirst();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
SkipEmptyDataBlocksForward();
}
void TwoLevelIterator::SeekToLast() {
index_iter_.SeekToLast();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast();
SkipEmptyDataBlocksBackward();
}
void TwoLevelIterator::Next() {
assert(Valid());
data_iter_.Next();
SkipEmptyDataBlocksForward();
}
void TwoLevelIterator::Prev() {
assert(Valid());
data_iter_.Prev();
SkipEmptyDataBlocksBackward();
}
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Next();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
}
}
void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Prev();
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast();
}
}
void TwoLevelIterator::SetDataIterator(Iterator* data_iter) {
if (data_iter_.iter() != nullptr) SaveError(data_iter_.status());
data_iter_.Set(data_iter);
}
void TwoLevelIterator::InitDataBlock() {
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
} else {
Slice handle = index_iter_.value();
if (data_iter_.iter() != nullptr &&
handle.compare(data_block_handle_) == 0) {
// data_iter_ is already constructed with this iterator, so
// no need to change anything
} else {
Iterator* iter = (*block_function_)(arg_, options_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
}
}
} // namespace
Iterator* NewTwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options) {
return new TwoLevelIterator(index_iter, block_function, arg, options);
}
Sorted Table 中存储了多个 Data Block,使用 Index Block 完成对 Data Block 的索引。二级迭代器的第一级 index_iter_
完成对 Index Block 的迭代,第二级 data_iter_
完成对 Data Block 的迭代。遍历时移动 data_iter_
,并在 data_iter_
边界的地方使用函数 SkipEmptyDataBlocksForward
和 SkipEmptyDataBlocksBackward
实现 Data Block 的前后切换。查找时同样先在 index_iter_
上查找,Index Block 的每个条目存储了 Data Block 的 max_key
和位置大小信息,可以二分;确定 index_iter_
的位置后再读取对应的 Data Block 进一步二分查找。
总结来看 Sorted Table 使用的迭代器们组成的链路如下:
TwoLevelIterator -> IteratorWrapper(index_iter_) -> Block::Iter
-> IteratorWrapper(data_iter_) -> Block::Iter
另外还有一个合并迭代器 MergingIterator
,将会在多 Sorted Table 文件的遍历中使用到。该迭代器管理 Next
和 Seek
操作时齐头并进、选择最小的那个,具体实现如下:
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator),
children_(new IteratorWrapper[n]),
n_(n),
current_(nullptr),
direction_(kForward) {
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
}
}
~MergingIterator() override { delete[] children_; }
bool Valid() const override { return (current_ != nullptr); }
void SeekToFirst() override {
for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst();
}
FindSmallest();
direction_ = kForward;
}
void SeekToLast() override {
for (int i = 0; i < n_; i++) {
children_[i].SeekToLast();
}
FindLargest();
direction_ = kReverse;
}
void Seek(const Slice& target) override {
for (int i = 0; i < n_; i++) {
children_[i].Seek(target);
}
FindSmallest();
direction_ = kForward;
}
void Next() override {
assert(Valid());
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() &&
comparator_->Compare(key(), child->key()) == 0) {
child->Next();
}
}
}
direction_ = kForward;
}
current_->Next();
FindSmallest();
}
void Prev() override {
assert(Valid());
// Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kReverse) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid()) {
// Child is at first entry >= key(). Step back one to be < key()
child->Prev();
} else {
// Child has no entries >= key(). Position at last entry.
child->SeekToLast();
}
}
}
direction_ = kReverse;
}
current_->Prev();
FindLargest();
}
Slice key() const override {
assert(Valid());
return current_->key();
}
Slice value() const override {
assert(Valid());
return current_->value();
}
Status status() const override {
Status status;
for (int i = 0; i < n_; i++) {
status = children_[i].status();
if (!status.ok()) {
break;
}
}
return status;
}
private:
// Which direction is the iterator moving?
enum Direction { kForward, kReverse };
void FindSmallest();
void FindLargest();
// We might want to use a heap in case there are lots of children.
// For now we use a simple array since we expect a very small number
// of children in leveldb.
const Comparator* comparator_;
IteratorWrapper* children_;
int n_;
IteratorWrapper* current_;
Direction direction_;
};
void MergingIterator::FindSmallest() {
IteratorWrapper* smallest = nullptr;
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child->Valid()) {
if (smallest == nullptr) {
smallest = child;
} else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
smallest = child;
}
}
}
current_ = smallest;
}
void MergingIterator::FindLargest() {
IteratorWrapper* largest = nullptr;
for (int i = n_ - 1; i >= 0; i--) {
IteratorWrapper* child = &children_[i];
if (child->Valid()) {
if (largest == nullptr) {
largest = child;
} else if (comparator_->Compare(child->key(), largest->key()) > 0) {
largest = child;
}
}
}
current_ = largest;
}
最后来看 Sorted Table 的实现 table/table.cc
:
struct Table::Rep {
~Rep() {
delete filter;
delete[] filter_data;
delete index_block;
}
Options options;
Status status;
RandomAccessFile* file;
uint64_t cache_id;
FilterBlockReader* filter;
const char* filter_data;
BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer
Block* index_block;
};
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}
char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;
Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;
// Read the index block
BlockContents index_block_contents;
if (s.ok()) {
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);
}
if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}
return s;
}
void Table::ReadMeta(const Footer& footer) {
if (rep_->options.filter_policy == nullptr) {
return; // Do not need any metadata
}
// TODO(sanjay): Skip this if footer.metaindex_handle() size indicates
// it is an empty block.
ReadOptions opt;
if (rep_->options.paranoid_checks) {
opt.verify_checksums = true;
}
BlockContents contents;
if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) {
// Do not propagate errors since meta info is not needed for operation
return;
}
Block* meta = new Block(contents);
Iterator* iter = meta->NewIterator(BytewiseComparator());
std::string key = "filter.";
key.append(rep_->options.filter_policy->Name());
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) {
ReadFilter(iter->value());
}
delete iter;
delete meta;
}
void Table::ReadFilter(const Slice& filter_handle_value) {
Slice v = filter_handle_value;
BlockHandle filter_handle;
if (!filter_handle.DecodeFrom(&v).ok()) {
return;
}
// We might want to unify with ReadBlock() if we start
// requiring checksum verification in Table::Open.
ReadOptions opt;
if (rep_->options.paranoid_checks) {
opt.verify_checksums = true;
}
BlockContents block;
if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) {
return;
}
if (block.heap_allocated) {
rep_->filter_data = block.data.data(); // Will need to delete later
}
rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data);
}
Table::~Table() { delete rep_; }
开始部分依然是熟悉的 pImpl
范式,毕竟 Table
是对外的接口,需要保持稳定。Table::Open
时,首先读取文件尾部的 Footer
,根据 Footer::index_handle()
读取 index_block
到内存中,并按需读取 meta_block
和 filter
;这些析构时也会做对应的删除。接着看迭代器部分的实现:
static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}
static void ReleaseBlock(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = nullptr;
Cache::Handle* cache_handle = nullptr;
BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.
if (s.ok()) {
BlockContents contents;
if (block_cache != nullptr) {
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer + 8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
}
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
}
}
}
Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr) {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}
Table::NewIterator
中会构造一个二级迭代器,第一级自然是 index_block
的迭代器,并且提供了第二级迭代器的创建函数 Table::BlockReader
。该函数的第一个参数实际上为 Table
对象的指针,第三个参数是 index_block
键值对中的 Value
,也就是对应的 Data Block Handle。如果不考虑缓存部分,代码还是很容易理解的:首先解析对应的 BlockHandle
,据此读取 block
,创建迭代器并且注册迭代器清理函数 DeleteBlock
,当删除迭代器时删除对应的 block
。当考虑缓存时,可以回忆下系列第一篇介绍的 LRUCache
再来看代码:使用 cache_id
和 handle.offset
构建一个缓存的 Key
,将 block
作为缓存的 Value
,后者的清理函数为 DeleteCachedBlock
;当前使用 block
创建迭代器增加了 block
的引用计数,当迭代器析构时需要调用 ReleaseBlock
以减少缓存的 block
的引用计数。这样就非常合理且高效了。
LevelDB 中会使用 file_number
给 Sorted Table 编号。为了提高读取性能、简化使用,LevelDB 提供了 TableCache
用以缓存 Sorted Table 及对应的 .ldb
文件,定义于 db/table_cache.h
:
class TableCache {
public:
TableCache(const std::string& dbname, const Options& options, int entries);
~TableCache();
// Return an iterator for the specified file number (the corresponding
// file length must be exactly "file_size" bytes). If "tableptr" is
// non-null, also sets "*tableptr" to point to the Table object
// underlying the returned iterator, or to nullptr if no Table object
// underlies the returned iterator. The returned "*tableptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, Table** tableptr = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value).
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&, const Slice&));
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
private:
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);
Env* const env_;
const std::string dbname_;
const Options& options_;
Cache* cache_;
};
核心接口 TableCache::NewIterator
,只需要提供 file_number
和 file_size
,就可以返回对应的 Sorted Table 对象及其迭代器。TableCache 封装了缓存和清理的逻辑,其实现也非常简单:
struct TableAndFile {
RandomAccessFile* file;
Table* table;
};
static void DeleteEntry(const Slice& key, void* value) {
TableAndFile* tf = reinterpret_cast<TableAndFile*>(value);
delete tf->table;
delete tf->file;
delete tf;
}
static void UnrefEntry(void* arg1, void* arg2) {
Cache* cache = reinterpret_cast<Cache*>(arg1);
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
cache->Release(h);
}
TableCache::TableCache(const std::string& dbname, const Options& options,
int entries)
: env_(options.env),
dbname_(dbname),
options_(options),
cache_(NewLRUCache(entries)) {}
TableCache::~TableCache() { delete cache_; }
Status TableCache::FindTable(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = nullptr;
Table* table = nullptr;
s = env_->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
std::string old_fname = SSTTableFileName(dbname_, file_number);
if (env_->NewRandomAccessFile(old_fname, &file).ok()) {
s = Status::OK();
}
}
if (s.ok()) {
s = Table::Open(options_, file, file_size, &table);
}
if (!s.ok()) {
assert(table == nullptr);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
*handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
}
return s;
}
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number, uint64_t file_size,
Table** tableptr) {
if (tableptr != nullptr) {
*tableptr = nullptr;
}
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (!s.ok()) {
return NewErrorIterator(s);
}
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
Iterator* result = table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != nullptr) {
*tableptr = table;
}
return result;
}
Status TableCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& k, void* arg,
void (*handle_result)(void*, const Slice&,
const Slice&)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(file_number, file_size, &handle);
if (s.ok()) {
Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
s = t->InternalGet(options, k, arg, handle_result);
cache_->Release(handle);
}
return s;
}
void TableCache::Evict(uint64_t file_number) {
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
cache_->Erase(Slice(buf, sizeof(buf)));
}
TableCache::FindTable
中会根据 file_number
构建缓存的 Key,首先尝试在缓存中查找,如果找不到则手动的打开文件、构造 Table。对应迭代器的实现也很简单,只需要设定好对应的清理函数 DeleteEntry
和 UnrefEntry
,就可以放心使用了。每多加一层封装,就多屏蔽一分底层实现的细节,对使用者来说就更易用。
前后两篇完成了对 Sorted Table 代码的阅读和分析。当数据位于内存时,查找过程中随机访问的时间微乎其微;但当数据保存到硬盘后,将 Sorted Table 载入内存的 IO 时间就非常可观了。Sorted Table 使用多级迭代器来缓和这个问题,首先完整地读取了 Index Block 到内存中;进行查找时首先在 Index Block 上二分,确定 Data Block 的位置后再进行必要的 IO 读取,并且通过缓存 Data Block 的方式提升读取的性能。
仔细看 LevelDB 的代码,总觉得它在合适的地方完成了合适的事情,合理且高效。向 Google 大佬低头!