概述
关于Raft算法和Hashicorp Raft,可以查看分布式协议和算法总结#Raft算法和Hashicorp Raft源码阅读这两篇文章。本文将基于该开源项目,实现一个Raft协议的分布式KV系统。
架构设计
整体来说我们希望实现一个分布式的KV引擎,利用Raft协议提供的共识和一致性保证。
该分布式KV引擎,核心模块可以分为三层:
客户端接入层。
在第一个版本我打算采用简单的HTTP RESTful接口作为客户端接入协议。在这一层需要搭建一个HTTP的服务器,根据功能设计实现对应的接口。通过下一层的Raft获取集群和节点相关的信息。
Raft协议层。
这一层直接采用Hashicorp Raft帮我们完成Raft集群搭建。
日志持久化层
对于日志项的持久化,我们可以采用Boltdb,作为LogStore和StableStore的实现。
除此之外,对于FSM,由于我们实现的是KV系统,所以我们可以使用Map,日志项会被提交给FSM后,会被存储在Map中。
对于SnapshotStore,我们可以使用Hashicorp Raft提供的FileSnapshotStore,文件形式保存日志快照即可。
那么大体的架构如下:
其中蓝框内容是我们需要重点实现的内容
接口设计
首先是基本功能,也就是操作key的接口。
操作key
POST /key/{key}?val={val}
设置一个键,该请求只能发送至Leader,如果发送到了Follower,会返回307重定向。
GET /key/{key}
查询某个键的值。
DELETE /key/{key}
删除某个键。
GET /keys
查询所有键,不包括值。
POST /keys
批量写入键。请求体接收一个Map,存储键和值。防止同时插入可以过多,我们这里默认限制一次请求最多不超过100个键。
操作node
需要处理节点加入和移除功能。由于只有Leader才能处理节点加入,所以新节点启动时,需要访问Leader节点,将自己加入到集群。
这方面的接口
POST /node/{nodeId}?addr={nodeAddr}
将一个新节点加入集群。
DELETE /join/{nodeId}?addr={nodeAddr}
从集群中移除某个节点,这里的addr参数也需要填,双重保险防止删错节点。
运维和debug接口
预留几个运维或者debug的相关接口。
GET /health
健康检查接口,正常返回200,响应体为"health"
GET /servers
查询当前集群所有节点信息。
GET /stats
查询集群各个节点内部一些统计信息,比如最新日志项索引值,任期信息等。
GET /state
查询某个节点当前的角色。
代码
代码仓位置:rkv
我们只简单看下两个核心模块实现。
FSM
我们需要实现raft.FSM接口的三个方法:Apply、Snapshot和Restore。
另外我们需要定义Raft日志的组成,定义一个Command结构体代表一条日志项的内容。在写入键时,要将该结构体的实例写入Raft日志。Op代表操作,可以取set和delete。
type Command struct {
Op string `json:"op,omitempty"`
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
}
在Apply方法中,需要取出Raft日志项,并运行日志项内容。
// Apply applies a Raft log entry to the key-value store.
func (f *FSM) Apply(l *raft.Log) interface{} {
var c Command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
}
switch c.Op {
case "set":
return f.applySet(c.Key, c.Value)
case "delete":
return f.applyDelete(c.Key)
default:
panic(fmt.Sprintf("unrecognized command op: %s", c.Op))
}
}
更多代码请参考:代码位置
客户端接入层
我们通过 gorilla/mux 搭建一个HTTP服务器,可以方便地设置路由,获取请求中的路径参数,并且也比较轻量级,我们希望整体的架构尽量简洁,因此不采用 gin 等功能更丰富的框架。
考虑未来可能会支持更多协议,比如gRPC、或者自定义的协议,所以这里我们抽象出来几个接口。代码位置
客户端接入层需要向下调用Raft和FSM,用来写入或查询数据,因此我们包装一个结构体,表示HTTP服务器:
type HServ struct {
logger *log.Logger
// HTTP Server bind address
bindAddr string
// Mux Router, dispatch requests
router *mux.Router
// Hold raft operation
store store.Store
// Query keys
fsm *fsm.FSM
}
定义好各个API的路由分发:
func (h *HServ) Dispatch() {
h.router.HandleFunc("/keys/{key}", h.HandleKey)
h.router.HandleFunc("/nodes/{id}", h.HandleJoin)
h.router.HandleFunc("/keys", h.handleKeys)
h.router.HandleFunc("/health", h.handleHealth)
h.router.HandleFunc("/servers", h.HandleServers)
h.router.HandleFunc("/state", h.HandleState)
h.router.HandleFunc("/stats", h.handleStats)
}
批量操作key的Handler代码示例:
func (h *HServ) handleKeys(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
keys, err := json.Marshal(h.fsm.Keys())
if err != nil {
h.logger.Println("Error marshalling keys: ", err)
} else {
w.WriteHeader(http.StatusOK)
_, err = w.Write(keys)
if err != nil {
h.logger.Println("Error writing keys: ", err)
}
}
case http.MethodPost:
m := map[string]string{}
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if len(m) > defaultMaxKeysInRequest {
w.WriteHeader(http.StatusRequestEntityTooLarge)
return
}
for k, v := range m {
if err := h.store.Set(k, v); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
首先区分请求谓词对不同请求做处理。
对于GET请求,直接调用FSM查询所有键,得到所有键的列表。
对于POST请求,需要取出请求体中的键值,并调用Raft,将操作写入日志。
更多代码请参考:代码位置
调试
启动一个单节点rkv集群
在将项目构建成一个二进制文件后。只需要通过`./rkvd --id 1`即可启动一个单节点集群。
# go build -o bin/rkvd cmd/server/main.go
# ./bin/rkvd --id 1
2024-10-11T11:59:48.397+0800 [INFO] raft: initial configuration: index=1 servers="[{Suffrage:Voter ID:1 Address:127.0.0.1:10001}]"
2024-10-11T11:59:48.397+0800 [INFO] raft: entering follower state: follower="Node at 127.0.0.1:10001 [Follower]" leader-address= leader-id=
2024/10/11 11:59:48 [INFO] rkv: raft server listening at 127.0.0.1:10001/127.0.0.1:10002
2024-10-11T11:59:50.397+0800 [WARN] raft: heartbeat timeout reached, starting election: last-leader-addr= last-leader-id=
2024-10-11T11:59:50.397+0800 [INFO] raft: entering candidate state: node="Node at 127.0.0.1:10001 [Candidate]" term=5
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-voting for self: term=5 id=1
2024-10-11T11:59:50.397+0800 [DEBUG] raft: calculated votes needed: needed=1 term=5
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-vote received: from=1 term=5 tally=0
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-vote granted: from=1 term=5 tally=1
2024-10-11T11:59:50.397+0800 [INFO] raft: pre-vote successful, starting election: term=5 tally=1 refused=0 votesNeeded=1
2024-10-11T11:59:50.406+0800 [DEBUG] raft: voting for self: term=5 id=1
2024-10-11T11:59:50.423+0800 [DEBUG] raft: vote granted: from=1 term=5 tally=1
2024-10-11T11:59:50.424+0800 [INFO] raft: election won: term=5 tally=1
2024-10-11T11:59:50.424+0800 [INFO] raft: entering leader state: leader="Node at 127.0.0.1:10001 [Leader]"
启动一个三节点rkv集群
我提供了startup.sh和docker-compose,可以快速构建镜像,并启动一个三节点集群。只需要运行`./hack/startup.sh`即可。也可以通过二进制方式手工启动。
可以参考代码仓的README
TODO
未来将考虑完成客户端SDK和命令行工具的实现。
评论区