LevelDB 源码分析【7】—— Version Control

涉及到的源文件:

1
2
db/version_set.h
db/version_edit.h

Leveldb 每一次删除或增加 sstable 都会从一个版本升级到另一个版本,每次 sstable 文件的更替对于 leveldb 来说是一个最小的操作单元,具有原子性。

Leveldb 用 Version 表示一个版本的元数据,它的定义是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Version {
public:
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};
... ...
// DOC: next_ 和 prev_ 用于双向链表
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
// DOC: refs_ 记录了被不同的 Version 引用的个数,保证被引用中的文件不会被删除
int refs_; // Number of live refs to this version

// DOC: 每一层的 sstable 文件元数据
std::vector<FileMetaData*> files_[config::kNumLevels];

// DOC: 这两个变量的作用是什么?
FileMetaData* file_to_compact_;
int file_to_compact_level_;

// DOC: 触发 compaction 的状态信息,这些信息会在读写请求或 compaction 的过程中更新
double compaction_score_;
int compaction_level_;
};

所以它本身就是双向链表中的一个节点,里面保存了某个版本的 Leveldb 的元数据:

  • 可以用 files_ 索引到每个 level 的所有 sstable 文件的元数据;
  • 用 file_to_compact_ 和 file_to_compact_level_ 记录下一个将要被 compaction 的 sstable;
  • 用 compaction_score_ 和 compaction_level_ 记录某个 level 是否需要进行 compaction;如果 compaction_score_ >= 1 则要进行 compaction;

然后就可以用这样的节点组成一个循环双向链表 VertionSet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class VersionSet {
... ...
Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version ; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_

// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
};

VersionSet 是一个 Version 构成的循环双向链表,这些 Version 按时间顺序先后产生,记录了当时的元信息,链表尾部是当前最新的 Version;每个 Version 自己会维护引用计数,当其被引用时不会被删除,其对应的 sstable 也得以保留;通过这种方式,使得 leveldb 可以在任意一个稳定的快照视图上(即任意一个未被删除的 Version 上)访问文件。

如何从 Version_i 升级到 Version_i+1

相邻 Version 之间的不同仅仅是一些文件被创建和另一些文件被删除。也就是说将文件变动应用在旧的 Version 上可以得到新的 Version,这也就是 Version 产生的方式。leveldb 用 VersionEdit 来表示这种相邻 Version 的差值;

version_control1.png

VersionEdit:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class VersionEdit {
... ...
private:
friend class VersionSet;

typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;

std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
};

Manifest

将 Version 和 VersionEdit 持久化存储得到的就是 Manifest 文件

manifest 文件专用于记录版本信息。leveldb 采用了增量式的存储方式,记录每一个版本相较于上一个版本的变化情况。

展开来说,一个 Manifest 文件中,包含了多条 Session Record。一个 Session Record 记录了从上一个版本至该版本的变化情况。

(1)新增了哪些sstable文件;

(2)删除了哪些sstable文件(由于compaction导致);

(3)最新的journal日志文件标号等;

借助这个 Manifest 文件,leveldb 启动时,可以根据一个初始的版本状态,不断地应用这些版本改动,使得系统的版本信息恢复到最近一次使用的状态。

一个 Manifest 文件的格式示意图如下所示:

version_control0.png

一个 Manifest 内部包含若干条 Session Record,其中第一条 Session Record 记载了当时 leveldb 的全量版本信息(即 Version?),其余若干条 Session Record 仅记录每次更迭的变化情况(即 VersionEdit?)。因此,每个 manifest 文件的第一条 Session Record 都是一个记录点(checkpoint or snapshot),记载了全量的版本信息,可以作为一个初始的状态进行版本恢复。

一个 Session Record 可能包含以下字段:

  • Comparer 的名称;
  • 最新的 journal 文件编号;
  • 下一个可以使用的文件编号;
  • 数据库已经持久化数据项中最大的 sequence number;
  • 新增的文件信息;
  • 删除的文件信息;
  • compaction 记录信息;

可以看到这些信息起始都是 VersionSet 和 VersionEdit 中的成员变量;

如何从一个 Manifest 文件恢复数据库

当 leveldb 要根据 manifest 进行恢复时,会读出最早的一个 Version 然后不断应用 VersionEdit 恢复到最近的状态,这就会产生一堆的中间 Version 状态,但这可能是不需要的,我们只需要最新最近的数据库状态;leveldb 引入VersionSet::Builder 来避免这种中间变量,方法是先将所有的 VersoinEdit 内容整理到 VersionBuilder 中,然后一次应用产生最终的 Version,这种实现上的优化如下图所示:

version_control2.png

VersionSet::Builder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class VersionSet::Builder {
private:
struct BySmallestKey {
const InternalKeyComparator* internal_comparator;
bool operator()(const FileMetaData*, const FileMetaData*) const;
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels]; // DOC: 每一层删了哪些旧文件,增加了哪些新文件
public:
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
base_->Ref();
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
// 引用所有的 *edit 到当前状态(由 current version 表示)
void Apply(const VersionEdit* edit);
// 保存当前状态到 Version v
void SaveTo(Version* v);
};

通过 Builder,我们可以先把所有的 VersionEdit 通过 Builder::Apply 保存到 Builder::levels_ 中;当所有的 VersionEdit 都添加完之后再调用 Builder::SaveTo 一次性从 Version i 升级到 Version i + 1;

compaction 后对 VersionSet 的变更

看着一部前先可以先看下 compaction 的流程LevelDB 源码分析【8】—— Compaction

  • compaction 的调用者首先有责任填充 VersionEdit
  • 通过 VersionSet::LogAndApply,把 VersionEdit 序列化追加到 manifest 文件中去当作一条 session record,然后通过 VersionSet::Builder 和当前状态生成一个最先的版本 v,并让 current_ = v
  • 把 current_ 插入 VersionSet 循环双向链表的尾部,更新 VersionSet 的状态(例如 **log_number_prev_log_number_**)

所以整个流程中,串起 compaction 和 VersionSet 以及 Version 的关键类就是 VersionEdit

看下 compaction 是怎么创建一个 VersionEdit 的,首先得了解下 compaction 的函数调用栈:

1
2
3
4
5
6
7
8
DBImpl::Write->DBImpl::MakeRoomForWrite
DBImpl::MaybeScheduleCompaction
DBImpl::BGWork
DBImpl::BackgroundCall
DBImpl::BackgroundCompaction // 这里就去做真正的压缩了!
DBImlp::CompactMemTable // minor compaction
VersionSet::CompactRange // manual compaction
VersionSet::PickCompaction // major compaction

对于 minor compaction 来说是怎么创建 VersionEdit 的?

先写下我的猜想:

肯定是创建一个 VersionEdit 对象 edit,然后让 DBImlp::CompactMemTable 去填充这个 edit;edit 中最重要的三个成员就是 deleted_files_new_files_compact_pointers_ 了吧!

对于 minor compaction 它只会增加新的 sstable 文件,而不会删除旧的 sstable 文件,并且只会往 level 0 层增加文件;

那么 VersionEdit::deleted_files_ 自然为空,VersionEdit::new_files_ 则是新添加的 sstable 文件对应的元数据;

因为 level 0~i 层 compaction 时对于输入文件的选择是通过轮转的方式进行的,因此需要记住本次 compaction 后输出文件中最大的 key 值,保存到 VersionEdit::compact_pointers_ 中去;

看下代码:

DBImlp::CompactMemTable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);

// Save the contents of the memtable as a new Table
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
... ...
// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}

if (s.ok()) {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}

正如我所预测的,它首先拿到一个 VersionEdit 对象 edit;其次让 WriteLevel0Table 根据 immutable memtable 去填充 edit;然后调用 VersionSet::LogAndApply 将该 edit 应用到当前状态生成一个全新的 leveldb 版本并插入循环双向链表尾部;最后就可以删除 immutable memtable 了;

所以对于 minor compaction 来说是通过 WriteLevel0Table 来创建 VersionEdit 的

DBImlp::WriteLevel0Table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {

FileMetaData meta;
meta.number = versions_->NewFileNumber();
... ...
Iterator* iter = mem->NewIterator();

Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); // 创建 sstable 并把元数据存入 meta
mutex_.Lock();
}
... ...

edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
... ...
return s;
}

简单看下这段代码,它其实就是创建一个 FileMetaData 对象 meta,也就是 sstable 的元数据,怎么填充它呢?把填充任务交给 BuildTable,因为它会持久化 immutable memtable 中的数据变成 sstable,那么它最懂怎么填充 meta!

因为是 minor compaction 因此只会新增一个 sstable,因此 edit 仅仅调用以下 AddFile 就可以了!至此 edit 就填充完全了,就可以后续的更新 Version 和 VersionSet 了!

对于 major compaction 来说是怎么创建 VersionEdit 的?

1
2
3
4
DBImpl::BackgroundCompaction
VersionSet::PickCompaction // major compaction
DBImpl::DoCompactionWork

这个流程在LevelDB 源码分析【8】—— Compaction中进行了分析:

  1. 获取输入文件
  2. 扩大输入文件集合
  3. 多路归并
  4. 积分计算

VersionSet::PickCompaction 主要做 1 和 2 两步,把需要 compact 的所有 sstable 文件放到 Compaction 对象中返回;

DBImpl::DoCompactionWork 主要做 3 和 4 两步;

VersionSet::PickCompaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);

// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
// DOC: 从 level i 层选择输入文件
if (compact_pointer_[level].empty() || // DOC: 如果 level i 层第一次做 compaction
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { // DOC: 如果 sstable 的最大的 key > compact_pointer_
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}

c->input_version_ = current_;
c->input_version_->Ref();

// Files in level 0 may overlap each other, so pick up all overlapping ones
// DOC: level 0 层可能出现 sstable 之间 overlap 的情况,因此在 level 0 层就可以扩大输入文件集合
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}

// DOC: 在 level i + 1 层扩到输入文件集合
SetupOtherInputs(c);

return c;
}

参考

  1. 版本控制
  2. 庖丁解LevelDB之版本控制