[技术向] TinyKV 关键模块思路

20

个人历史文档沉淀, PingCAP 数据库训练营 (TinyKV) 的代码记录, 主要围绕 Raft 的实现和优化. 这个方案在学习营里获得二等奖, 对应三千元奖金.

项目地址: https://github.com/czt1999/tinykv-czt1999

项目结构

第一部分

kv/storage/standalone_storage

功能: 封装 badger.DB 的读/写事务操作, 对应 ReaderWrite 函数.

实现: 内部结构自然是需要一个 badger.DB 指针, 通过 conf 的参数创建实例. Reader 函数新建一个只读事务, 由 BadgerReader 负责后续读操作. Write 函数新建一个可写事务, 将 batch 中的操作按照 Put, Delete 类型分别调用 badger 事务的 Set, Delete 函数, 其中需要通过 engine_util 的辅助函数将 CF (列族) 和 key 连接得到新的 key.

func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
	// Your Code Here (1).
	txn := s.db.NewTransaction(false)
	reader := BadgerReader{txn: txn}
	return reader, nil
}

func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
	// Your Code Here (1).
	txn := s.db.NewTransaction(true)
	defer txn.Discard()
	for _, modify := range batch {
		switch modify.Data.(type) {
		case storage.Put:
			err := txn.Set(engine_util.KeyWithCF(modify.Cf(), modify.Key()), modify.Value())
			if err != nil {
				return err
			}
		case storage.Delete:
			err := txn.Delete(engine_util.KeyWithCF(modify.Cf(), modify.Key()))
			if err != nil {
				return err
			}
		}
	}
	err := txn.Commit()
	return err
}

kv/server

功能: 提供基础的 KV 服务端读写接口, 依赖上面实现的单机存储引擎.

实现:

  • RawGet - 获取 storage 的 Reader, 通过 GetCF 查询, 若没有错误且 value 为空, 则置 NotFound 为真.

  • RawPut, RawDelete - 两个函数的实现基本一样, 将请求的参数包装成 Put/Delete 结构, 再调用 storage 的 Write.

  • RawScan - 获取 storage 的 Reader, 通过 IterCF 得到底层键值对的迭代器, 定位到 StartKey 再遍历指定长度的数据.

第二部分 (A)

raft/raft.go

功能: 分布式共识层, 保证日志 (操作顺序) 的一致性.

实现:

下面的讨论都是建立在消息有效的前提下, 即消息发送方的 Term 大于等于接收方的 Term. 反之, 接收方会进行必要的反馈和拒绝, 且不会对自身状态进行任何修改.

  • [ 选举 ] 首先, electionElapsed 每一次重置都是取 [electionTimeout, 2*electionTimeout) 范围内的随机值, 减小选举碰撞概率. 此外, vote 规则, 按照原论文的描述, 要求: 1. Candidate 的日志至少与 Follower 一样新; 2. 当前 Term 内还未给其他节点投票. 在具体实现上, 规则 1 需要在投票前检查 TermVote 字段, 并在投票后更新 Vote, 规则 2 需要比对 m.LogTerm, m.Index 与当前节点的状态.

  • [ 心跳 ] 如果 Follower 收到心跳消息且 Term 至少与自己的一样大, 则重置选举计时并反馈自身日志状态; Candidate 则在心跳发送方的 Term 比自己大时直接转变为 Follower.

  • [ 复制 ] 日志复制采用的是推模型, 由 Leader 发送给 Follower, 这个操作在 sendAppend 中实现. Leader 在四种情况下会发送日志 —— 1. Propose, 若存在其他节点则广播新日志项; 2. 收到其他节点的 Append/Heartbeat 响应, 指示该节点的日志非最新, 需要发送缺少的日志项, 范围是 [Next, LastIndex]; 3. 更新了 commited, 需要通过 Append 消息通知其他节点; 4. TransferLead, 后面会提到. 作为接收方的 Follower 在收到日志后, 会检查正确性: 1. 收到的 Entries 与自身的日志之间是否有连续; 2. 自身日志的哪一部分与 Entries 存在冲突, 需要舍弃. 如果存在缺漏, 则令响应中的 Reject=true , 否则执行更新. 日志更新完毕后, 还需要更新 committed, 这里我用的是 max-min 规则 (raft.go#703). 响应接收方, 即 Leader, 会更新对应节点的 MatchNext, 如果变动后的 Match 大于 committed, 意味着一个新的日志项或许已经满足 majority 规则, 需调用 mayIncreaseCommitted 检查并进行必要的更新.

// Progress represents a follower’s progress in the view of the leader. Leader maintains
// progresses of all followers, and sends entries to the follower based on its progress.
type Progress struct {
	Match, Next  uint64
	recentActive bool
}

type Raft struct {
	RegionID uint64 // for debug
	id       uint64

	Term uint64
	Vote uint64

	// the log
	RaftLog *RaftLog

	// log replication progress of each peers
	Prs map[uint64]*Progress

	// this peer's role
	State StateType

	// votes records
	votes map[uint64]bool

	// msgs need to send
	msgs []pb.Message

	// the leader id
	Lead uint64

	// heartbeat interval, should send
	heartbeatTimeout int
	// baseline of election interval
	electionTimeout int
	// number of ticks since it reached last heartbeatTimeout.
	// only leader keeps heartbeatElapsed.
	heartbeatElapsed int
	// Ticks since it reached last electionTimeout when it is leader or candidate.
	// Number of ticks since it reached last electionTimeout or received a
	// valid message from current leader when it is a follower.
	electionElapsed int

	// leadTransferee is id of the leader transfer target when its value is not zero.
	// Follow the procedure defined in section 3.10 of Raft phd thesis.
	// (https://web.stanford.edu/~ouster/cgi-bin/papers/OngaroPhD.pdf)
	// (Used in 3A leader transfer)
	leadTransferee uint64

	// Only one conf change may be pending (in the log, but not yet
	// applied) at a time. This is enforced via PendingConfIndex, which
	// is set to a value >= the log index of the latest pending
	// configuration change (if any). Config changes are only allowed to
	// be proposed if the leader's applied index is greater than this
	// value.
	// (Used in 3A conf change)
	PendingConfIndex uint64

	lastSnap *pb.Snapshot

	noActCnt int
}

...

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer. Returns true if a message was sent.
func (r *Raft) sendAppend(to uint64) bool {
	// should send snapshot
	if r.Prs[to].Next < r.RaftLog.first {
		return r.sendSnapshot(to)
	}
	m := r.newMessage(pb.MessageType_MsgAppend, to)
	// prev log index and term
	m.Index = r.Prs[to].Next - 1
	m.LogTerm, _ = r.RaftLog.Term(m.Index)
	if r.Prs[to].Next <= r.RaftLog.LastIndex() {
		ents := r.RaftLog.EntsAfter(r.Prs[to].Next)
		m.Entries = make([]*pb.Entry, len(ents))
		for i := range ents {
			m.Entries[i] = &pb.Entry{
				EntryType: ents[i].EntryType,
				Term:      ents[i].Term,
				Index:     ents[i].Index,
				Data:      ents[i].Data,
			}
		}
	}
	//r.debug("sendAppend %v (len %v)", to, len(m.Entries))
	r.msgs = append(r.msgs, m)
	return true
}

raft/rawnode.go

功能: 封装数据操作层与共识层之间的交互.

实现: 此处的重点在 Ready, Advance 两个函数, 本质是状态管理, 有软状态 (易失的) 和硬状态 (持久化) 之分.

  • [ Ready ] 获取 unstableEntries (需持久化), nextEnts (可以 apply) 等日志信息和需要发送的 raft 消息. 需要注意的是 SoftState 与 HardState 当且仅当与上一次 Ready 存在差异时才需要传递, 因此需要在 RawNode 结构体中添加 lastSslastHs 记录上一次传递的状态, 以供比较.

  • [ HasReady ] 为了让上层应用快速判断是否有新的 Ready 需要处理从而提升效率, RawNode 结构体中还有其他辅助字段. lastAdvcCommitlastAdvc 用于记录最近一次 Advance 的结果, lastMsgLen 用于记录最近一次 Ready 的消息长度. 只要判断这几个字段不需要更新, 则认为不需要提供新的 Ready.

  • [ Advance ] 对于上层应用告知已经持久化的日志项, 更新 stabled; 对于已经上层应用已经确认提交状态的日志项, 更新 applied. 创建新的 Message 切片存放还未发送的 raft 消息, 以便 GC, 此外, 与 HasReady 对应, 需要更新几个辅助字段.


// Ready returns the current point-in-time state of this RawNode.
func (rn *RawNode) Ready() Ready {
	rd := Ready{}
	currSs := SoftState{Lead: rn.Raft.Lead, RaftState: rn.Raft.State}
	if !reflect.DeepEqual(rn.lastSs, currSs) {
		rd.SoftState = &currSs
	}
	currHs := rn.Raft.HardState()
	if !reflect.DeepEqual(rn.lastHs, currHs) {
		rd.HardState = currHs
	}
	if len(rn.Raft.msgs) > 0 {
		rd.Messages = rn.Raft.msgs
		rn.lastMsgLen = len(rn.Raft.msgs)
	}
	if rn.Raft.RaftLog.pendingSnapshot != nil {
		rd.Snapshot = *rn.Raft.RaftLog.pendingSnapshot
	}
	rd.Entries = rn.Raft.RaftLog.unstableEntries()
	rd.CommittedEntries = rn.Raft.RaftLog.nextEnts()
	return rd
}

// HasReady called when RawNode user need to check if any Ready pending.
func (rn *RawNode) HasReady() bool {
	return len(rn.Raft.msgs)-rn.lastMsgLen > 0 || rn.lastAdvc < rn.Raft.RaftLog.LastIndex() || rn.lastAdvcCommit < rn.Raft.RaftLog.committed
}

// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
	if len(rd.Entries) > 0 {
		rn.lastAdvc = rd.Entries[len(rd.Entries)-1].Index
		rn.Raft.RaftLog.stabled = max(rn.Raft.RaftLog.stabled, rn.lastAdvc)
		//rn.Raft.debug("RawNode Advcance update stable %v : %v", rn.Raft.RaftLog.stabled, rd.Entries)
	}
	if len(rd.CommittedEntries) > 0 {
		rn.lastAdvcCommit = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
		if rn.lastAdvcCommit > rn.Raft.RaftLog.committed {
			panic("rn.lastAdvcCommit > rn.Raft.RaftLog.committed")
		}
		rn.Raft.RaftLog.applied = max(rn.Raft.RaftLog.applied, rn.lastAdvcCommit)
	}
	if rd.SoftState != nil {
		rn.lastSs = *rd.SoftState
	}
	if !reflect.DeepEqual(rd.HardState, pb.HardState{}) {
		rn.lastHs = rd.HardState
	}
	if len(rd.Messages) > 0 {
		newMsgs := make([]pb.Message, len(rn.Raft.msgs)-len(rd.Messages))
		copy(newMsgs, rn.Raft.msgs[len(rd.Messages):])
		rn.Raft.msgs = newMsgs
		rn.lastMsgLen = 0
	}
	if rd.Snapshot.Metadata != nil && rn.Raft.RaftLog.pendingSnapshot != nil {
		snapshot := *rn.Raft.RaftLog.pendingSnapshot
		if snapshot.Metadata.Index == rd.Snapshot.Metadata.Index {
			rn.Raft.RaftLog.pendingSnapshot = nil
		}
		rn.Raft.RaftLog.stabled = max(rn.Raft.RaftLog.stabled, snapshot.Metadata.Index)
		rn.Raft.debug("RawNode advance snapshot applied (%v)", snapshot.Metadata.Index)
	}
	rn.Raft.RaftLog.maybeCompact()
}

第二部分 (B)

kv/raftstore/peer_storage.go

功能: 负责 raft 和 kv 状态的持久化.

实现: 此处的重点在 SaveReadyStateAppend 两个函数.

  • [ SaveReadyState ] 通过 WriteBatch 将 ready 中的状态进行相应的持久化, 大体可分为 snapshot, append, apply 三部分逻辑, 分别对应 ApplySnapshot, Append, applyEntries 三个函数.

  • [ Append ] 首先, 裁剪掉 entries 在 applyState.AppliedIndex 之前的部分; 其次, 删除掉已经持久化, 但与当前 entries 存在冲突的那部分; 最后, 写入 entries (缓存在 WriteBatch 中, 不是真正落盘), 此处通过 meta.RaftLogKey 得到日志项的 key.

  • [ applyEntries ] 我自己提取出来的 apply 逻辑. 这里暂不考虑 AdminRequest 和 region 的问题, 主要是遍历 entries 并执行 Set/Delete, 比较简单.

kv/raftstore/peer_msg_handler.go

功能: 各节点消息处理的"中枢".

实现: 此处的重点在 proposeRaftCommand, HandleRaftReady 以及提取出来的相关逻辑.

  • [ proposeRaftCommand ] 暂不考虑 AdminRequest 和 region 的问题, 只需追加一个新的 proposal 并调用下层 RawNode 的 Propose.

  • [ HandleRaftReady ] 获取下层 RawNode 的 Ready (如有), 先调用 peerStorage.SaveReadyState 持久化, 接着调用 sendRaftMessage 发送 raft 消息. 注意持久化必须在发送消息之前, 才能保证各 raft 节点收到的消息是"可信"的, 这也是原论文强调的点. 最后调用 postprocessApplied 处理这些已被持久层 apply 的日志项.

  • [ postprocessApplied ] 首先, 通过 checkApplied 函数对 entries 建立 index 索引. 然后遍历 proposal: 1. 若 proposal 的 index, term 与 applied 一致, 则调用 processCmdReq, 根据请求的类型执行相应的回调, 这一步可以异步执行; 2. 若 applied 中存在对应 index 的 Entry, 但 term 与 proposal 不一致, 说明该请求已经过时, 返回 ErrRespStaleCommand; 3. 若 index 大于 highestCmtIdx, 说明下层 raft 或许还在处理对应的请求, 将该 proposal 加入到 remainedProposals 中, 等待后续处理; 4. 除以上 3 种情况外的 proposal 则直接丢弃, 通常情况下客户端会因超时而自动重试. 最后, 将 proposals 替换成 remainedProposals.

  • [ processCmdReq ] 根据请求的类型生成对应的响应. 对于 Snap, 需要手动创建一个只读事务.

第二部分 (C)

raft/raft.go

功能: 增加对快照的支持以应对日志压缩.

实现: 此处的重点在 sendSnapshothandleSnapshot 两个函数. 另外, log 也有一些新增的字段和函数, 其关键是跟踪和适配持久层的 FirstIndex (如今它是经常变化的), 在读写 entries 时需对 index 做相应的 offset 调整.

  • [ sendSnapshot ] 在 Leader 调用 sendAppend 时, 若发现 Next 小于 RaftLog.first, 则转移到 sendSnapshot, 通过 storage 的 Snapshot 函数生成快照信息并发送消息. 这里有两个优化: 1. 为了避免 Leader 频繁生成快照, 新增了一个字段 lastSnap 记录最近一次调用 Snapshot 的结果, 若 lastSnap 可用于此次发送 (i.e. Metadata.Index > Next), 则直接用它而无需再次生成一个新的; 2. 为了避免 Leader 向隔离节点频繁发送快照, Progress 结构中新增了一个字段 recentActive, 每次向一个节点发送快照后都会将对应的 recentActive 置为 false, 相应地, 若一个节点的 recentActive 为 false, 则 sendSnapshot 不会执行任何操作. recentActive 在 Leader 收到节点返回的 Append/Heartbeat 响应后重置为 true. 该辅助字段也用于避免隔离节点作为 Leader 干扰正常集群中的 Leader, 具体操作是: Leader 在发送心跳前, 会将该节点对应的 recentActive 置为 false, 若它发现上一轮广播心跳后所有其他节点的 recentActive 还是 false (意味着它没有收到任何 Follower 的响应), 则主动回退为 Follower.

  • [ handleSnapshot ] 根据接收到的有效 Snapshot 消息更新本地状态, 容易出错的地方在于日志的 compact (log.go#105), 在裁剪 entries 切片后, 对 stabled, committed, first 等字段都要进行正确的更新.

raft/rawnode.go

功能: 增加对快照的支持.

实现: 在 Ready 时需要考虑 log 的 pendingSnapshot, 对应地, 在 Advance 时也要根据 Snapshot 进行更新, 末尾调用 RaftLog.maybeCompact, 这是因为持久层可能执行了 GC.

kv/raftstore/peer_storage.go

功能: 增加对快照, GC 指令的支持.

实现: 此处的重点在 ApplySnapshot 函数. 首先对快照的新旧程度 (index, region) 做必要的检查. 其次删除旧数据 (前提是已经初始化). 然后更新 raftState, applyState, region. 最后生成并发送 RegionTaskApply 任务, 等待其执行完毕. 此外, 在 applyEntries 函数中也新增了对 CompactLog 指令的处理, 即更新 TruncatedState, 并置 needGC 为 true (这里是后续 GC 和 Snapshot 过程的起始点).

kv/raftstore/peer_msg_handler.go

实现: 我新增了 SaveReadyState 函数的返回参数, 此时该函数的第二个返回参数表示 TruncatedState 是否在 apply 过程中发生了更新 (即上面说到的 needGC). 在 HandleRaftReady 处理的结尾, 若检查到 needGC 为 true, 则调用 ScheduleCompactLog 生成并发送 GC 任务. GC 的实际处理过程不在作业的考虑范围内. GC 处理完成后, 持久层的 FirstIndex 改变了, 于是有了 P2-C-raft, P2-C-raw_node 的改动.

第三部分 (A)

第三部分的目标在于实现分区, 每个分区有各自的 raft group, 独立运作.

raft/raft.go

功能: 增加对 TransferLeaderConfChange 的支持.

实现:

  • [ TransferLeader ] 关键是 handleTransferLeader 函数: 首先做必要的检查, 比如当前 Leader 是否已被移出集群, transfer 的目标节点是否在集群中...然后检查目标节点的日志是否最新, 若不是则调用 sendAppend 向其补齐日志项, 接着向目标节点发送 TimeoutNow 消息, 并将 leadTransferee 设为目标节点 id, 重置 electionElapsed (因为需等待一轮选举). 正常情况下, 目标节点会接收到最新的日志项并更新, 然后发起一轮选举, 成为新的 Leader. 若这一过程没有顺利完成, 则原有的 Leader 在下一次 handleHup 时会将 leadTransferee 重置为 None, 意味着本次 transfer 失败. 另一个新增的函数 PeerMaybeTransferee 用于向上层应用提供一个除本节点之外具有最大 Match 的节点, 作为 transferee (若需要).

  • [ ConfChange ] 关键是 addNode, removeNode 两个函数, 更新 Prs. 对于 removeNode, 减小了 commit 的同步数量下界, 因此需要调用 mayIncreaseCommitted 检查并进行必要的更新. 此外, 还有一个需要注意的地方, 由于新增的节点是 replicate 产生的, 在接收 Leader 发来的 Snapshot 之前没有任何有效的状态, 所以在 handleHup 时需要新增一条 r.Prs[r.id] == nil 的判断, 若为真, 则意味着该节点是刚刚由 ConfChange 新增的, 后面的 becomeCandidate 等步骤一概跳过. 此外, 在 handlePropose 中也要新增对 ConfChange 这一 Entry 类型的处理: 将 PendingConfIndex 设为 LastIndex + 1.

第三部分 (B)

kv/raftstore/peer_msg_handler.go

功能: 增加对 TransferLeader, ChangePeerSplit 指令的支持.

实现:

  • [ handleTransferLeader ] 若 proposeRaftCommand 收到 TransferLeader 类型的指令, 会转到 handleTransferLeader 函数, 调用下层 RawNode 的 TransferLeader 函数, 然后直接返回.

  • [ handleChangePeer ] 若 proposeRaftCommand 收到 ChangePeer 类型的指令, 会转到 handleChangePeer 函数, 先做一些检查: 1. 若下层 raft 的 PendingConfIndex 大于持久层的 AppliedIndex, 意味着上一个 ChangePeer 请求还未处理完成, 因此返回一个错误响应; 2. 若 PeersStartPendingTime 中含有目标节点, 同样返回一个错误响应, 这是考虑到, 一个新增的节点还未正式参与到集群中就被移除, 可能会产生一些难以预料的错误; 3. 这是一个 RemoveNode 指令, 且目标节点是本 Leader, 此时需要执行 transfer, 即调用下层 raft 的 PeerMaybeTransferee 获取一个合理的 transferee, 然后调用 RawNode 的 TransferLeader, 直接返回. 原本的 ChangePeer 请求则会因超时而重试, 正常情况下, transfer 会成功, 收到 RemoveNode 的将是另一个节点, 得以正常执行后续流程. 上述检查通过后, 将类型, 目标节点等信息封装在 ConfChange 中, 调用 RawNode 的 ProposeConfChange, 交给下层的 raft 去备份, 后面会在 apply 时执行相应操作, 这一步就要看 mayApplyConfChange 函数.

  • [ mayApplyConfChange ] 若遇到 ConfChange 类型的 Entry, 先调用 IsEpochStale 检查是否过时. 具体的 apply 操作是: 更新 自身的 region, peerCache 信息并落盘, 更新 storeMeta 信息, 最后调用下层 RawNode 的 ApplyConfChange. 需要注意的是, 如果发现需要移除自己 (不是 Leader), 则直接返回 true, 告知调用者, 本节点需要 destory (若是 Leader, 视该指令为无效, 无需做任何操作).

  • [ Split ] 若 proposeRaftCommand 收到 ChangePeer 类型的指令, 会转到 handleSplit 函数, 调用 ExceedEndKey 做一些必要的检查, 然后交给下层的 raft 去备份, 后面会在 apply 时执行相应操作, 这一步要看 mayApplySplit 函数.

  • [ mayApplySplit ] 先检查: 1. Header 的 Version 是否等于当前 Version; 2. split 的 NewPeerIds 长度是否等于当前的 Peers 长度; 3. SplitKey 是否在当前 region 范围内. 检查通过后, 更新原 region 的信息, 创建新 region, 更新 storeMeta, 持久化...一系列操作与 mayApplyConfChange 类似. 不同的地方在于需要手动创建新的 peer 并注册到 router 中, 再发送一条 Start 消息令其启动. 为了让 client 侧的 region 及时更新以避免不必要的 RegionNotFound, Leader 节点在执行完上述更新后, 最好调用自己和新 peer 的 HeartbeatScheduler 函数, 发送最新的 region 信息. 最后, 这里还有个坑: 由于网络的不可靠或分区, 本应该由本节点 split 得到的新节点, 有可能先一步被其他节点通过 replicate 创建. 因此, 在执行与新 region 有关的操作之前, 需要检查 storeMeta 中的信息, 若 split.NewRegionId 在 storeMeta 的 regions 中已存在, 则只需要保存更新后的原 region 信息, 后续操作一概跳过. (peer_msg_handler.go#232)

第三部分 (C)

scheduler/server/cluster.go

需要完善的是 processRegionHeartbeat 函数. 按照文档的指示, 依次检查入参的 Version, ConfVer 是否至少与当前同一 id 的 region 信息一样新, 其次再与重叠的 region 比较. 若检查通过了, 就调用 core.PutRegion 和 updateStoreStatusLocked 更新信息.

scheduler/server/schedulers/balance_region.go

需要完善的是 Schedule 函数. 按照文档指示, 先对 stores 按照 region size 从大到小排序, 遍历过程中跳过 !IsUp 和 DownTime > maxDownTime 的, 按 PendingRegion, Follower, Leader 的优先顺序查找是否有合适的 region 以供迁出. 若找到了可迁出的 region, 还需检查 maxReplicas 条件. 确定可迁出后, 再按照 region size 从小到大的顺序, 找到合适的 store 以供迁入. 最后还要检查两个 store 之间的 region size 差额是否足够大 (即此次迁移操作是否有必要).

一系列检查完成后, 调用 AllocPeer 分配一个 peer id, 生成 balance operator 并返回.

第四部分

实现并发访问下的事务功能, TinyKV 采用的事务模型是 Percolator, 是两阶段提交 (2PC) 协议, 同时使用 MVCC 机制实现快照隔离级别.

kv/transaction/mvcc/transaction.go

MVCC 的关键在于 TS 决定数据是否可见. 首先要明确的是 Default 和 Write 列的 key 都是含有 timestamp 的, Lock 列则不含 (lock 结构本身包含事务的版本号). 明白这一点, PutWrite, GetLock, PutLock, DeleteLock, PutValue, DeleteValue 都很好实现, 这里不表. 剩下还有三个函数比较复杂:

  • [ GetValue ] 需要生成一个 Write 列的迭代器并定位到 key (含有 StartTS), 找到 user key 一致且 commitTS 小于等于 startTS 的 write 记录, 进而根据 write 的 StartTS 找到目标 value 值. 若迭代过程中发现 user key 不一致, 则可以断定 KeyNotFound.

  • [ CurrentWrite ] 同样是生成一个 Write 列的迭代器, 并定位到 key (含有 TsMax, 因而会定位到该 user key 对应的最新键值对), 然后返回 user key 一致且 commitTS 小于等于 startTS 的 write 记录.

  • [ MostRecentWrite ] 类似 CurrentWrite 但稍微简单些, 并定位到 key (含有 TsMax) 后, 直接返回最近的一条 write 记录.

kv/server/server.go

  • [ KvGet ] 逻辑与 raw_api 基本一致, 不同之处在于使用的是上一小节所实现的 MVCC 事务, 此外需要判断 key 是否被上锁.

  • [ KvPrewrite ] 遍历 Mutations, 检查是否被上锁, 是否存在写冲突 (即存在一条 Write 记录的 commitTS 大于当前事务的 startTS). 若没有, 则对该 key 上锁, 并写入 value.

  • [ KvCommit ] 遍历 Keys, 通过 MostRecentWrite 获取最近的 Write 记录, 检查是否被回滚, 是否存在写冲突, 是否无锁或被其他事务上了锁, 以上任何一种情况都意味着错误, 需要记录相应的信息. 若没有, 则删除原有的锁, 并写入 Write 记录, 表示成功提交.

  • [ KvScan ] 通过 NewScanner 获取一个可供逻辑上遍历 key (即跳过那些 user key 一致但 ts 更小的 key) 的迭代器. 对于每个由 Next 得到的键值对, 检查是否有锁. 达到 Limit 数量后则返回.

  • [ KvCheckTxnStatus ] 通过 GetLock 和 MostRecentWrite 获取锁和 Write 记录. 1. 锁和 Write 记录都不存在, 对应 LockNotExistRollback, 需要新增一条回滚记录, 然后返回; 2. 锁存在但超时, 对应 TTLExpireRollback, 将事务的 StartTS 置为锁对应的 ts, 然后删除锁, 删除 value, 新增一条回滚记录; 3. 其余情况则对应 NoAction. 另外, 若存在 Write 记录且为 Put, 则需要在响应中记录对应的 commitTS.

  • [ KvBatchRollback ] 遍历 Keys, 检查是否已被提交, 若是则返回 Abort 错误. 检查是否已被回滚, 若是则跳过这条记录. 检查是否有对应当前事务的锁, 若有则释放该锁, 删除 value. 最后新增一条回滚记录.

  • [ KvResolveLock ] 需要生成一个 Lock 列的迭代器, 对每一条记录都要 ParseLock 获取锁的信息. 检查该锁的 ts 是否与 StartVersion 相等, 若是, 则释放该锁. 然后根据 CommitVersion 是否为零, 决定是要提交还是回滚, 写入相应的 Write 记录.