LevelDB 源码分析【3】—— 内存表 MemTable

这一篇重点分析 leveldb 的 MemTable 是怎么实现的,为什么要这么实现;

MemTable 使用到的技术:

  • 跳表

  • 引用计数

  • 迭代器

  • 比较器

  • 内存分配器 Arena(这个第一篇就分析过了)

一个个来分析下

跳表

代码在 db/skiplist.h,SkipList 是模板类:

  • 接受两个模板参数:Key 和 Comparator;分别表示,键的类型和如何对键进行比较;
  • Node,表示跳表中的每一个节点
  • Iterator,指向跳表节点

跳表中节点 Node 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}

Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].store(x, std::memory_order_release);
}

// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].store(x, std::memory_order_relaxed);
}

private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};

成员对象仅仅只有一个 key 以及 一个节点的 next_ 数组,用于表示第 i 层的指向下一个节点的指针;并提供对该对象的操作方法,主要分为 acquire_release 内存序 和 releax 内存序(有屏障和无屏障);

跳表中的 Iterator:

它是一个 双向 Iterator,支持 Next 和 Prev,但是时间复杂度 Next 是 O(1) 的,而 Prev 是O(n) 的,因为跳表是多层单向链表构成的;

它提供以下几种方法:

  • Valid:判断这个迭代器是否有效,如果无效,其对应的 Node 为空
  • Key:返回 Node 对应的 key
  • Next,Prev
  • Seek:让迭代器指向 key Node
  • SeekToFirst:让迭代器指向第一个节点
  • SeekToLast:让迭代器指向最后一个节点

跳表中最重要的就是,读和写了,这两个方法分别是 SkipList::InsertIterator::Seek

用户可以这样对跳表写:

1
2
3
4
5
SkipList<Key, Comparator> skip(cmp, &arena);
Key key;
if (!skip.Contains(key)) {
skip.Insert(key);
}

用户可以这样对跳表读:

1
2
3
4
5
6
SkipList<Key, Comparator> skip(cmp, &arena);
Key key;
if (skip.Contains(key)) {
SkipList<Key, Comparator>::Iterator iter(&skip);
assert(iter.Seek(key).Key() == key);
}

SkipList::Insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// 获取插入节点的前继节点
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

assert(x == nullptr || !Equal(key, x->key));

// 给新节点安排一个随机高度,并完善 prev 数组
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
max_height_.store(height, std::memory_order_relaxed);
}

// 构造新节点,并链入跳表
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

只要我们获得了新节点每一层的前继节点,那么我们就能将新节点链入跳表;

Iterator::Seek

1
2
3
4
template <typename Key, class Comparator>
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}

读写都依赖于 SkipList::FindGreaterOrEqual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

该方法有两种调用方式:

  • prev == nullptr:这种方式,仅仅为了得到第一个 >= key 的节点
  • prev != nullptr:这种方式,在前者的基础上,还有记录下每一层往下走的那个节点

什么叫往下走的节点?其实就是 key 节点的每一层的前继节点;

注意:leveldb 的跳表没有实现 delete 节点的方法,因为它根本不需要,他对 MemTable 的变更只需要不断追加就可以了;

引用计数

使用引用计数的方式,记录有多少用户在使用该 MemTable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MemTable {
... ...
// Increase reference count.
void Ref() { ++refs_; }

// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
... ...
int refs_;
};

当调用 Unref 发现 refs_ 为 0 时,将自动删除整个 MemTable;为什么要用这种方式?

迭代器

这里 MemTable 自己又定义了一个迭代器,用于访问跳表,其实就是继承了一个通用的 Iterator 基类,然后对跳表的迭代器功能分装了一下;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MemTableIterator : public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) {}

MemTableIterator(const MemTableIterator&) = delete;
MemTableIterator& operator=(const MemTableIterator&) = delete;

~MemTableIterator() override = default;

bool Valid() const override { return iter_.Valid(); }
void Seek(const Slice& k) override { iter_.Seek(EncodeKey(&tmp_, k)); }
void SeekToFirst() override { iter_.SeekToFirst(); }
void SeekToLast() override { iter_.SeekToLast(); }
void Next() override { iter_.Next(); }
void Prev() override { iter_.Prev(); }
Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
Slice value() const override {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}

Status status() const override { return Status::OK(); }

private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};

这里所继承的 Iterator 仅仅提供了纯虚函数;然后通过 iter_ 来实现功能;

比较器

MemTable 中的跳表模板参数是这样的:

1
2
3
4
5
6
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* a, const char* b) const;
};
typedef SkipList<const char*, KeyComparator> Table;

其中 Key = const char*,Comparator = KeyComparator;key 仅仅是一个字符串首地址,这个字符串其实就是经过序列化后的 {key, value} pair;为了对字符串进行比较而不是对 const char* 进行比较,首先要对序列化后的 {key, value} pair 反序列化得到对应的 key,然后就可以比较了;

1
2
3
4
5
6
7
int MemTable::KeyComparator::operator()(const char* aptr,
const char* bptr) const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr); // 先反序列化 {key, value} pair,得到 a
Slice b = GetLengthPrefixedSlice(bptr); // 先反序列化 {key, value} pair,得到 b
return comparator.Compare(a, b); // 然后就可以比较 a,b 了
}

接下来具体分下,MemTable Add 和 Get 的方法

MemTable::Add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len); // 这里没用对齐分配啊
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

最后一行代码 table_.Insert(buf) 可以看到是往跳表里插入了一个 key,而这个 key 是一个 buffer,里面存的就是经过序列化后的 {key,value} pair,每一条记录(Entry)的格式如下图所示:

node_entry.PNG

其中需要注意的一点是:编码的时候,tag 被直接编码在了 key 之后,并且 tag 占固定的 8 个 bytes;调用 Get 进行解码时注意这一点即可;

MemTable::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}

这里的整个流程是这样的:

  • 找到 memtable_key(就是 key + tag),这样就找到了记录(Entry)
  • 比较 key 是否和 user_key 一致
  • 取出 tag,如果是 kTypeValue,那么就设置对应的 value;如果是 kTypeDeletion 那么说明该 key 应该被删除,状态码返回 NotFound

这里有一个关键还没有分析好,那就是 LookupKey,因为可能 MemTable 中有多个冗余的 user_key 存在,只是因为 sequence 不同,memtable_key (key + tag)都不一样罢了;但是查找的时候我们希望找到最新的 memtable_key 这时候该怎么办呢?我猜是根据 sequence 来实现的;sequence 这一部分涉及了 snapshot 以及 version 部分的代码,之后再写;