LevelDB 源码分析【2】—— 数据变更 DBImpl::Write

leveldb::Slice 就是其实就是 c++17 中的 std::string_view,多了一个 ToString 的功能而已;

看一下一个 leveldb::DB::Put 的执行流程:

1
2
3
leveldb::DB::Put --> WriteBatch::Put --> DBImpl::Write(会被阻塞,直到 1. WAL 完成;2. 插入 memtable 完成)
DBImpl::Write 内部执行流程:
--> DBImpl::MakeRoomForWrite --> DBImpl::BuildBatchGroup --> Writer::AddRecord(执行 1. WAL) --> WriteBatchInternal::InsertInto(执行 2. 把数据插入 memtable)

DBImpl::Write 就是把对数据库的变更写入数据库,这里涉及到两步,也是 LSMTree 的核心:

  • 制作 WAL,并写入 disk,对应 Writer::AddRecord
  • 把数据变更插入到 memtable 中,对应 WriteBatchInternal::InsertInto

因为 Writer::AddRecord 是会 flush 数据到磁盘的,这里 leveldb 为了减少 flush 次数来降低延迟,它用 DBImpl::BuildBatchGroup 把一些 WriteBatch 整合成一个 WriteBatch,然后再调用 Writer::AddRecord;

具体看下代码吧

源码分析

WriteBatch::Put

1
2
3
4
5
6
7
8
9
void WriteBatch::Put(const Slice& key, const Slice& value) {
// 以第9个字节为起始地址,设置count(int32, 小端字节序)大小
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
// 第13个字节记录type, 0->delete 1->value
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key); // 第14个字节开始记录varint32(key的大小)+char*(key的data)
PutLengthPrefixedSlice(&rep_, value); // 继续在后面写入varint32(value的大小)+char*(value的data)
// 至此,WriteBatch的rep_中记录了一些信息,我画个图表示下
}

这里 WriteBatch 其实就是一个 buffer,或者 string;用于存储序列化后的操作,当然在最前面要有一个 header 来表示有多少个操作;

一个操作可以用 type key [value] 表示,key 和 value 都是字符串类型,其中 type 有两类:

  • put:修改/添加一个数据
  • delete:删除一个数据

value 根据具体的操作是可选的,对于 put 来说就必须要有 value;对于 delete 来说就不需要 value 了;

使用 varint 来表示字符串长度;这样的话整个 WriteBatch 的底层实际上就是一个序列化后的字节流,如下图所示:

WriteBatch0.png

DBImpl::BuildBatchGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
... ...
Writer* first = writers_.front();
WriteBatch* result = first->batch;
// 把能够整合的 WriteBatch 都整合到一个 batch 里面
auto max_size = // 自定义能整合得到的最大 batch 的 size
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) { // 这里是指不要把同步 write 的内容整合进来
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// 开始整合,就是把 其他的 WriteBatch 中的除了 header 的数据全部 append 到一个 WriteBatch 中去
// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

这里就是尽量把能整合的 WriteBatch 整合到一个 WriteBatch 中去,想来是为了减少 flush 的次数,降低延迟;

Writer::AddRecord

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}

// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;
const bool end = (left == fragment_length);
// 根据 begin 和 end 来识别
// 1. 一条完整的记录
// 2. 第一条记录
// 3. 最后一条记录
// 4. 中间记录
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
// emitphysicalrecord 函数里面会 flush,因此这里可能很慢
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}

这个函数能够把 WriteBatch 制作成一条日志记录并放入一个 block 中,最终 flush 到磁盘上面去;磁盘和内存之间传输的最小数据单元的大小是 kBlockSize;日志记录的格式如下图所示:

log_record.png

EmitPhysicalRecord,这个函数就是具体制作日志记录的函数,可以看到传给了他 3 个参数:

  • type:表示 record 以什么形式存在 block 中,分开存储还是完整存储
  • ptr:指向 WriteBatch 字节流的第一个字节
  • fragment_length:适应一个 block 大小的 WriteBatch 字节流分片

根据 WriteBatch 的大小可以产生多种不同的存储形式:

  • WriteBatch > block 中可用空间的大小:这种情况,一条日志记录中的数据可能会存储在多个块上;具体分以下两种情况;
    • kMiddleType:存在中间数据存在整个块中的情况;
    • kFirstType,kLastType:这种情况是一条记录中的数据被分开存在了两个块上;
  • WriteBatch < block 中可用空间的大小:这种情况,日志记录作为一条完整的记录(kFullType)追加到块的末尾

当 WAL 完成之后,接下来就是更新 memtable 了

WriteBatchInternal::InsertInto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
class LEVELDB_EXPORT WriteBatch {
public:
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};
... ...
Status Iterate(Handler* handler) const;
};

感觉这里 leveldb 用了 visitor 模式WriteBatch::Iterate 接受一个 Handler,这个 Handler 可以是各种 visitor;例如 MemTableInserter 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_; // MemTable 中有一个 table_ 成员变量,其实就是 跳表

void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
} // namespace

WriteBatch::Iterate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}

input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}

这段代码,无非就是根据反序列化方式,遍历整合后的 WriteBatch 字节流,然后根据 type 是 put 还是 delete,提取出 {key,value} pairs 或 key 并应用到 memtable;

接下来看下 Memtable 是怎么插入一个数据变更的

从 Memtable 的成员变量中,发现它是由跳表实现的,因此重点在于 db/skiplist.h ,这里太长了,还是记录到下一篇 blog 中吧~

参考

  1. leveldb源码分析(4) memtable and log