LevelDB 源码分析【4】—— SSTable

SSTable 是一种文件的存储格式,MemTable 中的数据,最终都会被序列化然后压缩持久化存储到稳定介质中去(磁盘);

SSTable 物理结构

SSTable 文件被划分成固定大小的块(一般每块为 4KB),这里我就有问题了:为什么一个文件要被划分出固定的块呢?

每一个块由三部分组成:

  • Data:经过 序列化 + 压缩 后的数据
  • Compression Type:压缩算法类型,leveldb 默认使用 Snappy 算法进行压缩
  • CRC:冗余校验校验码,校验范围包括 Data 和 Compression Type

图1

SSTable 逻辑结构

在逻辑上,根据功能不同,leveldb在逻辑上又将sstable分为:

  1. data block: 用来存储key value数据对;
  2. filter block: 用来存储一些过滤器相关的数据(布隆过滤器),但是若用户不指定leveldb使用过滤器,leveldb在该block中不会存储任何内容;
  3. meta Index block: 用来存储filter block的索引信息(索引信息指在该sstable文件中的偏移量以及数据长度);
  4. index block:index block中用来存储每个data block的索引信息;
  5. footer: 用来存储meta index block及index block的索引信息;

sstable1.PNG

注意,1-4类型的区块,其物理结构都是如1.1节所示,每个区块都会有自己的压缩信息以及CRC校验码信息。

想要了解各类 block 的具体建造方式,可以查看 table/block_builder.htable/block_builder.cc

data block

data block 中存储的数据是 leveldb 中的 key value 键值对。其中一个 data block 中的数据部分(不包括压缩类型、CRC校验码)按逻辑又以下图进行划分:

sstable2.PNG

第一部分用来存储 key value 数据。由于 sstable 中所有的 key value 对都是严格按序存储的,为了节省存储空间,leveldb 并不会为每一对 key value 对都存储完整的 key 值,而是存储与上一个 key 非共享的部分,避免了 key 重复内容的存储。

每间隔若干个 key value 对,将为该条记录重新存储一个完整的 key。重复该过程(默认间隔值为16),每个重新存储完整 key 的点称之为 Restart point;

leveldb设计Restart point的目的是在读取sstable内容时,加速查找的过程。

由于每个Restart point存储的都是完整的key值,因此在sstable中进行数据查找时,可以首先利用restart point点的数据进行键值比较,以便于快速定位目标数据所在的区域;

当确定目标数据所在区域时,再依次对区间内所有数据项逐项比较key值,进行细粒度地查找;

该思想有点类似于跳表中利用高层数据迅速定位,底层数据详细查找的理念,降低查找的复杂度。

sstable3.PNG

一个 entry 分为5部分内容:

  1. 与前一条记录 key 共享部分的长度;
  2. 与前一条记录 key 不共享部分的长度;
  3. value 长度;
  4. 与前一条记录 key 非共享的内容;
  5. value 内容;

那么当接收到一个 {key, value} pair 的时候,怎么把它编码成一个 Entry 呢?

代码展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= options_->block_restart_interval);
assert(buffer_.empty() // No values yet?
|| options_->comparator->Compare(key, last_key_piece) > 0);
size_t shared = 0;
if (counter_ < options_->block_restart_interval) {
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared;

// DOC: 制作 Entry
// Add "<shared><non_shared><value_size>" to buffer_
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());

// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());

// Update state
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}

我们可以把 BlockBuilder 看作是一个 Block 的建造者,它的职责就是建造一个 Block;

现在来看看它是怎么建造 data block 的:

  • 首先它会判断是否需要开启一个新的 restart point;如果是,那么它是一个完整的 key,因此 shared key length = 0;如果不是则需要确定共享 key 的长度;
  • 根据共享 key 的长度制作 Entry
  • 把 Entry 序列化到 buffer 中

当一个 data block 达到一定阈值的时候,就可以 restart points 以及 restart points length 也序列化到结尾,如下代码所示:

1
2
3
4
5
6
7
8
9
Slice BlockBuilder::Finish() {
// Append restart array
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}

那么什么时候调用 BlockBuilder::Finish 呢?从 TableBuilder::Add 中可见端倪:

1
2
3
4
5
6
7
8
void TableBuilder::Add(const Slice& key, const Slice& value) {
... ...
// DOC: 当 data_block 的大小超过设定值时,对它 Flush
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}

当 data_block 当前占用空间大小 >= 系统设定的阈值时,就进行 Flush,那么 Flush 会调用 WriteBlock,看一下它的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish(); // 1

Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) { // 2
case kNoCompression:
block_contents = raw;
break;

case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle); // 3
r->compressed_output.clear();
block->Reset();
}
  1. 它直接调用了 BlockBuilder::Finish,得到了序列化后的 data block;
  2. 然后对 data block 进行压缩(大名鼎鼎的 Leveldb 竟然也是用 switch case 的方式来判断压缩算法,然后去执行压缩的。。。显然违反开闭原则);
  3. 最后调用 WriteRawBlock

WriteRawBlock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}

可以看到它就是在 data block 后面追加了一个 compression type 和 CRC 校验码,就如图1所示;

Index Block

index block 用来存储所有 data block 的相关索引信息

index block 包含若干条 Entry,每一条 Entry 代表一个 data block 的索引信息。

一条索引包括以下内容:

  1. data block i 中最大的 key 值;
  2. 该 data block 起始地址在 sstable 中的偏移量;
  3. 该 data block 的大小;

这三部分内容最终会被制作成一条 Entry,并追加到 Index Block 中去;

sstable4.PNG

其中,data block i最大的key值还是index block中该条记录的key值。

如此设计的目的是,依次比较index block中记录信息的key值即可实现快速定位目标数据在哪个data block中。

在源码中,涉及到 Index Block 的数据变量有:

1
2
3
BlockBuilder index_block; // DOC: 就是 sstable 逻辑结构中的 Index Block
bool pending_index_entry; // DOC: 如果是 true,表示一个 data block 刚刚 flush 到 sstable,紧接着就要制作一个 index entry 用于索引该 data block
BlockHandle pending_handle; // DOC: 用于制作 Index Block

具体什么时候往 Index Block 追加索引 entry 呢?Leveldb 为了在 entry 中使用更短的 key,它将 index block entry 的制作时机推迟到下一个块 block data i + 1 插入第一个 key 的时候,这样它就能找到一个更短的 key 使得 key >= block data i 中最大的 key,并且 key < block data i + 1 中最小的 key;

注释:

// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys “the quick brown fox” and “the who”. We can use
// “the r” as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.

因此在 TableBuilder::Add 中进行 index block entry 的制作:

1
2
3
4
5
6
7
8
9
10
11
12
13
void TableBuilder::Add(const Slice& key, const Slice& value) {
... ...
// DOC: 如果是 data_block 中的第一个 key,制作 index block entry
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key); // 找到一个更短的 key;其中 last_key 是 data block i 中最大的 key,而 key 则是 data block i + 1 中第一个 key,即最小的 key;从这两个 key 中找到一个中间的最小的 key 就可以满足要求了;
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
... ...
}

这里 pending_handle 会在 TableBuilder::WriteRawBlock 函数调用时(即一个 data block 需要被 flush 到 sstable 时)记住每个 data block 在 是stable

中的偏移位置及其大小;

可以看到索引记录(handle_encoding)通过 BlockBuilder::Add 的方式追加到 Index Block 中,所以它也是被制作成了 Entry 格式的;

Filter Block

为了加快 sstable 中数据查询的效率,在直接查询 data block 中的内容之前,leveldb 首先根据 filter block 中的过滤数据判断指定的 data block 中是否有需要查询的数据,若判断不存在,则无需对这个 data block 进行数据查找

filter block 存储的是 data block 数据的一些过滤信息。这些过滤数据一般指代布隆过滤器的数据,用于加快查询的速度;

filter block存储的数据主要可以分为两部分:(1)过滤数据(2)索引数据。

其中索引数据中,filter i offset 表示第 i 个 filter data 在整个 filter block 中的起始偏移量,filter offset's offset 表示filter block的索引数据在 filter block 中的偏移量

在读取 filter block 中的内容时,可以首先读出 filter offset's offset 的值,然后依次读取 filter i offset,根据这些 offset 分别读出filter data

Base Lg 默认值为11,表示每 2KB 的数据,创建一个新的过滤器来存放过滤数据;

一个 sstable 只有一个 filter block,其内存储了所有 block 的 filter 数据;具体来说,filter data k 包含了所有起始位置处于 [basek, base(k+1)) 范围内的 block 的 key 的集合的 filter 数据,按数据大小而非 block 切分主要是为了尽量均匀,以应对存在一些 block 的 key 很多,另一些 block 的 key 很少的情况;

sstable5.PNG

在 leveldb 中,怎么建造一个 Filter Block 呢?它是通过 FilterBlockBuilder 这个类来实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class FilterBlockBuilder {
public:
explicit FilterBlockBuilder(const FilterPolicy*);

FilterBlockBuilder(const FilterBlockBuilder&) = delete;
FilterBlockBuilder& operator=(const FilterBlockBuilder&) = delete;

void StartBlock(uint64_t block_offset);
void AddKey(const Slice& key);
Slice Finish();

private:
void GenerateFilter();

const FilterPolicy* policy_;
std::string keys_; // Flattened key contents
std::vector<size_t> start_; // Starting index in keys_ of each key
std::string result_; // Filter data computed so far
std::vector<Slice> tmp_keys_; // policy_->CreateFilter() argument
std::vector<uint32_t> filter_offsets_;
};

为了可扩展性(支持多种过滤策略)它接受一个 FilterPolicy 指针;整个建造过程需要顺序调用 StartBlock、AddKey、Finish;

每次新建一个 data block 的时候都会调用 FilterBlockBuilder::StartBlock

1
2
3
4
5
6
7
8
9
10
static const size_t kFilterBaseLg = 11;
static const size_t kFilterBase = 1 << kFilterBaseLg;

void FilterBlockBuilder::StartBlock(uint64_t block_offset) {
uint64_t filter_index = (block_offset / kFilterBase);
assert(filter_index >= filter_offsets_.size());
while (filter_index > filter_offsets_.size()) {
GenerateFilter();
}
}

这段代码其实就是每 2KB 用 GenerateFilter 创建一个新的 Filter;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void FilterBlockBuilder::GenerateFilter() {
const size_t num_keys = start_.size(); // DOC: start_[i] 表示第 i 个 key 的 offset
if (num_keys == 0) {
// Fast path if there are no keys for this filter
filter_offsets_.push_back(result_.size());
return;
}

// Make list of keys from flattened key structure
start_.push_back(keys_.size()); // Simplify length computation
tmp_keys_.resize(num_keys);
for (size_t i = 0; i < num_keys; i++) {
const char* base = keys_.data() + start_[i];
size_t length = start_[i + 1] - start_[i];
tmp_keys_[i] = Slice(base, length);
}

// Generate filter for current set of keys and append to result_.
filter_offsets_.push_back(result_.size());
// DOC: 我估计 CreateFilter 才会去真正的创建 Filter Data i
// 将得到的 raw data 追加到 result_ 后面去
policy_->CreateFilter(&tmp_keys_[0], static_cast<int>(num_keys), &result_);

tmp_keys_.clear();
keys_.clear();
start_.clear();
}

FilterBlockBuilder::GenerateFilter 就是把 keys_ 中的内容追加到 result_ 中去,并且制作 Filter offset;其实这部分没有看明白~~之后再说

footer结构

footer 大小固定,为 48 字节,用来存储 meta index block 与 index block 在 sstable 中的索引信息,另外尾部还会存储一个 magic word,内容为:”http://code.google.com/p/leveldb/"字符串 sha1哈希的前 8 个字节。

参考

  1. sstable

  2. leveldb源码分析(5) sstable文件

  3. 庖丁解LevelDB之数据存储