0%

leveldb 源码阅读记录 - DBIter

DBIter·

Leveldb 数据库的 MemTable 和 sstable 文件的存储格式都是 InternalKey (userkey, seq, type) => uservalue。 DBIter 把同一个 userkey 在 DB 中的多条记录合并为一条,综合考虑了 userkey 的序号、删除标记、和写覆盖等等因素。DBIter 只会把 userkey 最新(seq 最大的就是最新的,相同 userkey 的老记录(seq 较小的)不会让上层看到)的一条记录展现给用户,另外如果这条最新的记录是删除类型,则会跳过该记录,否则,遍历时会把已删除的 key 列举出来。

1. 创建 DBIterator·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);
}

Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
Iterator* internal_iter, SequenceNumber sequence,
uint32_t seed) {
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
}

2. DBIterator 的基本定义 ·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DBIter: public Iterator {
enum Direction {
kForward,
kReverse
};
private:
DBImpl* db_;
const Comparator* const user_comparator_; //比较iter间userkey
Iterator* const iter_; // 是一个MergingIterator
SequenceNumber const sequence_; // DBIter只能访问到比sequence_小的kv对,这就方便了老版本(快照)数据库的遍历

Status status_;
std::string saved_key_; //用于方向遍历 direction_==kReverse时才有效
std::string saved_value_; //用于反向遍历 direction_==kReverse时才有效
Direction direction_;
bool valid_;

Random rnd_;
ssize_t bytes_counter_;
}

后面再说 DBIter 的作用,这里先介绍下 sequence_的作用。

值得一提的 sequence_·

在创建 DBIter 时,我们将系统的一个 snapshot 的 sequence_number 传递给了 DBIter:

1
2
3
4
5
6
return NewDBIterator(this, user_comparator(), iter,
(options.snapshot != nullptr //!! 这里
? static_cast<const SnapshotImpl*>(options.snapshot)
->sequence_number()
: latest_snapshot),
seed);

那它代表什么含义?

我们从一次 WriteBatch 出发,看看序列号是怎么使用及修改的。先给出结论图:

** 我们知道一个 WriteBatch 对一次批量写操作的封装。** 先看看一个 WriteBatch 的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class LEVELDB_EXPORT WriteBatch {
public:
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};

...

WriteBatch();
private:
friend class WriteBatchInternal;

std::string rep_; // See comment in write_batch.cc for the format of rep_
};

其内部只有一个 rep_的成员变量,其结构如下:

1
2
3
4
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]

由 8 字节序列号,4 字节 count,和实际数据 record 数组组成。

一次 Put 操作:

1
2
3
4
5
6
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}

Put 操作很简单,设置 count+1, 然后将 key value append 到 rep_末尾。

这里我们知道了 count 代表 rep_中有效的 kv pair 数量。

有了 batch,我们如何应用 batch(即如何将 batch 写入到 leveldb 系统中?)。对于一次写,除了写入到 Log 中来保证持久性以外,我们首先做的就是将数据插入到 memtable 中。这个过程如下函数:

注意,这里终于要说到 seequence_的意义了

1
2
3
4
5
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
...
status = WriteBatchInternal::InsertInto(write_batch, mem_);
...
}
1
2
3
4
5
6
7
8
9
10
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b); // !! 这里设置到了sequence number
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}

SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data())); // 取出 batch的rep_的前8个字节作为sequence number
}

现在在 MemTableInserter 中保存了一个 batch 最开始的 sequence number。(至于 batch 的 sequence number 是在哪儿初始化的,我们之后再说)。

再回到问题,如何将 batch 中的多个 kv pair 应用到系统中?核心就在 b->Iterate (&inserter) 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Status WriteBatch::Iterate(Handler* handler) const {
...
while (!input.empty()) {
...
switch (tag) {
case kTypeValue: // 普通值
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion: // 删除值
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}

注意到这就是一个循环遍历 batch 中的各 kv pair。然后调用 handler->Put 或 handler->Delete 来应用。而 handler 就是 MemTableInserter:

1
2
3
4
5
6
7
8
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++; // sequence的修改
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++; // sequence的修改
}

从这里终于可以看出,每插入一对 kv,就会递增一次 sequence

现在还剩最后一个问题,batch 的 sequence number 是如何初始化的?

在 WriteBatchInternal 类中有个 SetSequence 函数:

1
2
3
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}

在这里,将 rep_的前 8 个字节设置为这个 batch 的起始 sequence。那谁调用的这个函数?

在 DBImpl::Wirte 函数中:

1
2
3
4
5
6
7
8
9
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
...
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence(); // !! sequence从version中获得
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); // 注入sequence

当成这次 batch 的写入后,又会反过来更新 version 中的序列号:

1
2
3
4
5
6
last_sequence += WriteBatchInternal::Count(write_batch);	// version版本号 = 旧版本号+此次写入的batch的kv数量

...
if (write_batch == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence); // 应用

所以,究其根本,系统中插入的每对 kv 的序列号,最初是由当前系统 version 的序列号指定的,然后在后序插入的 kv 中逐渐递增。

有了上面对 sequence 的理解,我们就能更好的理解 DBIter 的作用了。

DBIter 的作用(功能)·

iter_是由 NewInternalIterator 创建的一个 MergingIterator,通过 MergingIterator 可以实现多个有序数据集合的归并操作。其中包含多个 child iterator 组成的集合。对 MergingIterator 的遍历会有序的遍历其 child iterator 中的每个元素。因为 iter_遍历的是数据库的每一条记录。它是以 InternalKey (userkey, seq, type) 为遍历粒度的,只要 InternalKey 中任意一个组成元素不同,MergingIterator 就认为他们是不同的 kv 对。 而 **DBIter 是以 userkey 为遍历粒度的,只要记录的 userkey 相同,那么 DBIter 就认为他们是一条记录(不同版本),sqe 越大代表该记录越新。每次迭代将跳到下一个不同 userkey 的记录,且 DBIter 在遍历一个 InternalKey 时仅会检索 InternalKey->seq 小于 DBIter 创建时所初始化的 seq 号。** 举个例子:

上面表示了 6 个 InternalKey,冒号前为 user_key, 冒号后为序列号。现假设创建 DBIter 时,所初始化的 seq 为 2. 则 DBIter 在从前往后遍历时,将会直接跳过 key1:6,key1:5,key1:4 和 key2:3. 只会从 key2:2 开始遍历。

下面我们来分析 DBIter 的各个操作。

3.FindNextUserEntry·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(direction_ == kForward);
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) { // 根据当前iter_的key封装成一个新ikey,同时要求ikey的序列号小于创建DBIter时传入的序列号
switch (ikey.type) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
SaveKey(ikey.user_key, skip);
skipping = true; // 如果当前ikey的类型是删除,将跳过整个 user_key = ikey.user_key()的nodes
break;
case kTypeValue:
if (skipping && // 跳过
user_comparator_->Compare(ikey.user_key, *skip) <= 0) { // 旧条目不再需要
// Entry hidden
} else { // 否则就是本 user_key 所在nodes中的最新节点
valid_ = true;
saved_key_.clear();
return;
}
break;
}
}
iter_->Next(); // 定位到下一个
} while (iter_->Valid());
saved_key_.clear();
valid_ = false;
}

用一个例子来解释这个函数的作用:

上图中, 每个节点由 3 个字段组成,由冒号:分隔,第 1 个为 user_key,第 2 个为版本号,第 3 个为节点类型(1 代表普通值,0 代表删除)。

假设当前要遍历所有 key,且 iter_目前指向 key1:6:1. 则:

  1. iter 往下走到 key1:5:1, 由于 key1::5:1 的 user_key = key1:6:1 的 user_key 即:

    1
    2
    3
    4
    if (skipping &&	// 跳过
    user_comparator_->Compare(ikey.user_key, *skip) <= 0) { // 旧条目不再需要
    // Entry hidden
    }

    所以跳过这个节点。

  2. iter 往下走到 key1:4:1, 由于 key1::4:1 的 user_key = key1:6:1 的 user_key,所以跳过。

  3. iter 往下走到 key2:3:0, 由于节点类型是 0,删除节点。所以:

    1
    2
    3
    4
    5
    6
    case kTypeDeletion:
    // Arrange to skip all upcoming entries for this key since
    // they are hidden by this deletion.
    SaveKey(ikey.user_key, skip);
    skipping = true; // 如果当前ikey的类型是删除,将跳过整个 user_key = ikey.user_key()的nodes
    break;

    注意之类设置了 skipping

  4. iter 往下走到 key2:2:1,执行到:

    1
    2
    3
    4
    if (skipping &&	// 跳过
    user_comparator_->Compare(ikey.user_key, *skip) <= 0) { // 旧条目不再需要
    // Entry hidden
    }

    所以跳过。

  5. 后面同理。

这次遍历的结果是,只得到了 key:1:6:1 (注意这是所有 key1 的节点中最新的节点),所有 key2 都跳过了。

4.Seek·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void DBIter::Seek(const Slice& target) {
direction_ = kForward;
// 首先在saved_key中封装一个internal key
ClearSavedValue();
saved_key_.clear();
AppendInternalKey(&saved_key_,
ParsedInternalKey(target, sequence_, kValueTypeForSeek));
// 注意之类的iter_是上一篇文章介绍过的InternalIterator.
iter_->Seek(saved_key_); // seek到合适位置
if (iter_->Valid()) { // 通过FindNextUserEntry找到当前最新key或者过滤掉已经被删除的key
FindNextUserEntry(false, &saved_key_ /* temporary storage */);
} else {
valid_ = false;
}
}

5. Next·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void DBIter::Next() {
...
// Store in saved_key_ the current key so we skip it below.
SaveKey(ExtractUserKey(iter_->key()), &saved_key_); // 为后序FindNextUserEntry函数中进行跳过无用节点做铺垫

// iter_ is pointing to current key. We can now safely move to the next to
// avoid checking current key.
iter_->Next(); // 定位到下一个节点
if (!iter_->Valid()) {
valid_ = false;
saved_key_.clear();
return;
}

FindNextUserEntry(true, &saved_key_);
}

还是用例子来说:

假设当前 iter_ 指向 key0:8:1。则 Next 函数的工作如下:

  1. 在 saved_key_中保存 key0:8:1
  2. iter 指向 key0:7:1
  3. 执行 FindNextUserEntry
    1. 由于指定了 FindNextUserEntry 中的 skipping 为 true,并且 key0:7:1 的 user key = key0:8:1 的 user key,所以直接跳过 key0:7:1。
    2. 现在 iter 来到了 key1:6:1,有效,直接返回。

6. FindPrevUserEntry·

向前遍历和向后遍历基本类似,但是由于是越靠前越新,所以向前遍历时,需要多往前尝试,直到找到一个新的不同的 user_key.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void DBIter::FindPrevUserEntry() {
assert(direction_ == kReverse);

ValueType value_type = kTypeDeletion;
if (iter_->Valid()) {
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
if ((value_type != kTypeDeletion) && // 不是删除节点
user_comparator_->Compare(ikey.user_key, saved_key_) < 0) { // 遇到了新user_key
// We encountered a non-deleted value in entries for previous keys,
break;
}
value_type = ikey.type;
if (value_type == kTypeDeletion) { // 删除节点,清空saved_key和saved_value
saved_key_.clear();
ClearSavedValue();
} else { // 正常节点,保存saved_key和saved_value
Slice raw_value = iter_->value();
if (saved_value_.capacity() > raw_value.size() + 1048576) {
std::string empty;
swap(empty, saved_value_);
}
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
saved_value_.assign(raw_value.data(), raw_value.size());
}
}
iter_->Prev();
} while (iter_->Valid());
}

if (value_type == kTypeDeletion) { // 结束位置,无法再继续往前走
// End
valid_ = false;
saved_key_.clear();
ClearSavedValue();
direction_ = kForward;
} else { // 正常情况走到这里
valid_ = true;
}
}

假设当前 iter 指向 key1:4:1, 则 FindPrevUserEntry 的工作为:

  1. 保存 key1:4:1,

    1
    2
    3
    4
    5
    6
    7
    8
    Slice raw_value = iter_->value();
    if (saved_value_.capacity() > raw_value.size() + 1048576) {
    std::string empty;
    swap(empty, saved_value_);
    }
    SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
    saved_value_.assign(raw_value.data(), raw_value.size());
    }
  2. iter 指向 key1:5:0,当前是删除节点,清空 saved_key 和 saved_value:

    1
    2
    3
    4
    if (value_type == kTypeDeletion) {	// 删除节点,清空saved_key和saved_value
    saved_key_.clear();
    ClearSavedValue();
    }
  3. iter 指向 key1:6:1, 保存.

  4. iter 指向 key0:7:1, 遇到不同 user_key 且不是删除几点,跳出:

    1
    2
    3
    4
    5
    if ((value_type != kTypeDeletion) &&		// 不是删除节点
    user_comparator_->Compare(ikey.user_key, saved_key_) < 0) { // 遇到了新user_key
    // We encountered a non-deleted value in entries for previous keys,
    break;
    }

这样就从后往前遍历到当前 user_key 的最新版本。

7. Prev·

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void DBIter::Prev() {
assert(valid_);

if (direction_ == kForward) { // Switch directions?
// iter_ is pointing at the current entry. Scan backwards until
// the key changes so we can use the normal reverse scanning code.
assert(iter_->Valid()); // Otherwise valid_ would have been false
SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
while (true) {
iter_->Prev();
if (!iter_->Valid()) {
valid_ = false;
saved_key_.clear();
ClearSavedValue();
return;
}
if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) <
0) {
break;
}
}
direction_ = kReverse;
}

FindPrevUserEntry();
}

注释说得很明白,首先从后往前遍历到和当前 user_key 不同的节点,设这个节点为节点 k。然后通过 FindPrevUserEntry 找到节点 k 的最新版本。

8. key & value·

1
2
3
4
5
6
7
8
Slice key() const override {
assert(valid_);
return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
}
Slice value() const override {
assert(valid_);
return (direction_ == kForward) ? iter_->value() : saved_value_;
}

这个很简单了,不过这里也终于解释了 saved_key_,saved_value_已经 direction 变量的作用。因为在反向遍历时,会出现这种的情况:

反向遍历时,需要的 key 应该时 key1:6:1, 但是 iter_已经指向了 key0:7:1。 正向遍历则不存在这个问题,所以 DBIter 使用了三个额外的变量 (direction,saved_key,saved_value) 来区分正向遍历和反向遍历。

9. ParseKey·

看完前面的几个函数分析,其实已经能够了解 DBIter 的所有操作了,但是这里还要提一下 ParseKey 这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
Slice k = iter_->key();

size_t bytes_read = k.size() + iter_->value().size();
while (bytes_until_read_sampling_ < bytes_read) {
bytes_until_read_sampling_ += RandomCompactionPeriod();
db_->RecordReadSample(k);
}
assert(bytes_until_read_sampling_ >= bytes_read);
bytes_until_read_sampling_ -= bytes_read;

if (!ParseInternalKey(k, ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter");
return false;
} else {
return true;
}
}

ParseKey 的主要职责从 iter_->key () 返回的 string 中解析出一个 InternalKey 出来,封装在 ikey 中。这部分很简单,我想说的是:

1
2
3
4
5
6
7
size_t bytes_read = k.size() + iter_->value().size();
while (bytes_until_read_sampling_ < bytes_read) {
bytes_until_read_sampling_ += RandomCompactionPeriod();
db_->RecordReadSample(k);
}
assert(bytes_until_read_sampling_ >= bytes_read);
bytes_until_read_sampling_ -= bytes_read;

这部分代码的意义是在 DBIter 的 scan 过程中,对整个 sstable 进行 compaction。当然这个 compaction 肯定有个执行间隔,这个间隔就是由 bytes_until_read_sampling_来控制。它由一个随机数初始化:

1
2
3
4
5
6
7
// Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576;

// Picks the number of bytes that can be read until a compaction is scheduled.
size_t RandomCompactionPeriod() {
return rnd_.Uniform(2 * config::kReadBytesPeriod);
}

每次增加的间隔也是也是一个基于均匀分布的随机数。

再看看 db_->RecordReadSample (k);

1
2
3
4
5
6
void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
if (versions_->current()->RecordReadSample(key)) {
MaybeScheduleCompaction();
}
}

MaybeScheduleCompaction 是 Compaction 过程,详情参考 Compaction 章节,这里不赘述。重点看 RecordReadSample_:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
return false;
}

struct State {
GetStats stats; // Holds first matching file
int matches;

static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);
state->matches++;
if (state->matches == 1) {
// Remember first match.
state->stats.seek_file = f;
state->stats.seek_file_level = level;
}
// We can stop iterating once we have a second match.
return state->matches < 2;
}
};

State state;
state.matches = 0;
ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);

// Must have at least two matches since we want to merge across
// files. But what if we have a single file that contains many
// overwrites and deletions? Should we have another mechanism for
// finding such files?
if (state.matches >= 2) {
// 1MB cost is about 1 seek (see comment in Builder::Apply).
return UpdateStats(state.stats);
}
return false;
}

这里是不是看着比较熟悉?整体框架和基于 Seek 的 Compaction 完全相同。只不过 Match 函数采用了 matches 次数来记录 seek_file 指针。

一旦匹配到两次以上,就执行 UpdateStats,在 UpdateStats 内部会对 seek_file 所指向的 file 的 allowed_seek–。 当 allowed_seek 减到 <=0 时,就可以设置用来触发 seek compaction 的标志了。

总结 ·

DBIter 是对 InternelIter 的封装,两者针对的粒度不同。InternelIter 以 InternelKey 为粒度,一个 InternelKey 由 (user_key,sequence,type) 组成,只要这三者有一个不同,InternelIter 就认为它们是不同 key。但对用户来说,只关心 user_key 是否相同,所以诞生了 DBIter,用来在具有相同 user_key 的 internelkey 中,找到最新版本的那个并返回给用户。

到这里,leveldb 中的所有 iterator 都已介绍完毕。

文章对你有帮助?打赏一下作者吧