侧边栏壁纸
  • 累计撰写 10 篇文章
  • 累计创建 18 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

基于Hashicorp Raft实现一个分布式KV系统

Harry Yang
2024-10-11 / 1 评论 / 1 点赞 / 37 阅读 / 9032 字 / 正在检测是否收录...

概述

关于Raft算法和Hashicorp Raft,可以查看分布式协议和算法总结#Raft算法Hashicorp Raft源码阅读这两篇文章。本文将基于该开源项目,实现一个Raft协议的分布式KV系统。

架构设计

整体来说我们希望实现一个分布式的KV引擎,利用Raft协议提供的共识和一致性保证。

该分布式KV引擎,核心模块可以分为三层:

  1. 客户端接入层。

在第一个版本我打算采用简单的HTTP RESTful接口作为客户端接入协议。在这一层需要搭建一个HTTP的服务器,根据功能设计实现对应的接口。通过下一层的Raft获取集群和节点相关的信息。

  1. Raft协议层。

这一层直接采用Hashicorp Raft帮我们完成Raft集群搭建。

  1. 日志持久化层

对于日志项的持久化,我们可以采用Boltdb,作为LogStore和StableStore的实现。

除此之外,对于FSM,由于我们实现的是KV系统,所以我们可以使用Map,日志项会被提交给FSM后,会被存储在Map中。

对于SnapshotStore,我们可以使用Hashicorp Raft提供的FileSnapshotStore,文件形式保存日志快照即可。

那么大体的架构如下:

rkv-arch.png

其中蓝框内容是我们需要重点实现的内容

接口设计

首先是基本功能,也就是操作key的接口。

操作key

POST /key/{key}?val={val}

设置一个键,该请求只能发送至Leader,如果发送到了Follower,会返回307重定向。

GET /key/{key}

查询某个键的值。

DELETE /key/{key}

删除某个键。

GET /keys

查询所有键,不包括值。

POST /keys

批量写入键。请求体接收一个Map,存储键和值。防止同时插入可以过多,我们这里默认限制一次请求最多不超过100个键。


操作node

需要处理节点加入和移除功能。由于只有Leader才能处理节点加入,所以新节点启动时,需要访问Leader节点,将自己加入到集群。

这方面的接口

POST /node/{nodeId}?addr={nodeAddr}

将一个新节点加入集群。

DELETE /join/{nodeId}?addr={nodeAddr}

从集群中移除某个节点,这里的addr参数也需要填,双重保险防止删错节点。

运维和debug接口

预留几个运维或者debug的相关接口。

GET /health

健康检查接口,正常返回200,响应体为"health"

GET /servers

查询当前集群所有节点信息。

GET /stats

查询集群各个节点内部一些统计信息,比如最新日志项索引值,任期信息等。

GET /state

查询某个节点当前的角色。

代码

代码仓位置:rkv

我们只简单看下两个核心模块实现。

FSM

我们需要实现raft.FSM接口的三个方法:Apply、Snapshot和Restore。

另外我们需要定义Raft日志的组成,定义一个Command结构体代表一条日志项的内容。在写入键时,要将该结构体的实例写入Raft日志。Op代表操作,可以取set和delete。

type Command struct {
	Op    string `json:"op,omitempty"`
	Key   string `json:"key,omitempty"`
	Value string `json:"value,omitempty"`
}

在Apply方法中,需要取出Raft日志项,并运行日志项内容。

// Apply applies a Raft log entry to the key-value store.
func (f *FSM) Apply(l *raft.Log) interface{} {
	var c Command
	if err := json.Unmarshal(l.Data, &c); err != nil {
		panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
	}

	switch c.Op {
	case "set":
		return f.applySet(c.Key, c.Value)
	case "delete":
		return f.applyDelete(c.Key)
	default:
		panic(fmt.Sprintf("unrecognized command op: %s", c.Op))
	}
}

更多代码请参考:代码位置

客户端接入层

我们通过 gorilla/mux 搭建一个HTTP服务器,可以方便地设置路由,获取请求中的路径参数,并且也比较轻量级,我们希望整体的架构尽量简洁,因此不采用 gin 等功能更丰富的框架。

考虑未来可能会支持更多协议,比如gRPC、或者自定义的协议,所以这里我们抽象出来几个接口。代码位置

客户端接入层需要向下调用Raft和FSM,用来写入或查询数据,因此我们包装一个结构体,表示HTTP服务器:

type HServ struct {
	logger *log.Logger

	// HTTP Server bind address
	bindAddr string

	// Mux Router, dispatch requests
	router *mux.Router

	// Hold raft operation
	store store.Store

	// Query keys
	fsm *fsm.FSM
}

定义好各个API的路由分发:

func (h *HServ) Dispatch() {
	h.router.HandleFunc("/keys/{key}", h.HandleKey)
	h.router.HandleFunc("/nodes/{id}", h.HandleJoin)
	h.router.HandleFunc("/keys", h.handleKeys)

	h.router.HandleFunc("/health", h.handleHealth)

	h.router.HandleFunc("/servers", h.HandleServers)
	h.router.HandleFunc("/state", h.HandleState)
	h.router.HandleFunc("/stats", h.handleStats)
}

批量操作key的Handler代码示例:

func (h *HServ) handleKeys(w http.ResponseWriter, r *http.Request) {
	switch r.Method {
	case http.MethodGet:
		keys, err := json.Marshal(h.fsm.Keys())
		if err != nil {
			h.logger.Println("Error marshalling keys: ", err)
		} else {
			w.WriteHeader(http.StatusOK)
			_, err = w.Write(keys)
			if err != nil {
				h.logger.Println("Error writing keys: ", err)
			}
		}
	case http.MethodPost:
		m := map[string]string{}
		if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
			w.WriteHeader(http.StatusBadRequest)
			return
		}
		if len(m) > defaultMaxKeysInRequest {
			w.WriteHeader(http.StatusRequestEntityTooLarge)
			return
		}
		for k, v := range m {
			if err := h.store.Set(k, v); err != nil {
				w.WriteHeader(http.StatusInternalServerError)
				return
			}
		}
		w.WriteHeader(http.StatusOK)
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
	}
}

首先区分请求谓词对不同请求做处理。

对于GET请求,直接调用FSM查询所有键,得到所有键的列表。

对于POST请求,需要取出请求体中的键值,并调用Raft,将操作写入日志。

更多代码请参考:代码位置

调试

启动一个单节点rkv集群

在将项目构建成一个二进制文件后。只需要通过`./rkvd --id 1`即可启动一个单节点集群。

# go build -o bin/rkvd cmd/server/main.go
# ./bin/rkvd --id 1
2024-10-11T11:59:48.397+0800 [INFO]  raft: initial configuration: index=1 servers="[{Suffrage:Voter ID:1 Address:127.0.0.1:10001}]"
2024-10-11T11:59:48.397+0800 [INFO]  raft: entering follower state: follower="Node at 127.0.0.1:10001 [Follower]" leader-address= leader-id=
2024/10/11 11:59:48 [INFO] rkv: raft server listening at 127.0.0.1:10001/127.0.0.1:10002
2024-10-11T11:59:50.397+0800 [WARN]  raft: heartbeat timeout reached, starting election: last-leader-addr= last-leader-id=
2024-10-11T11:59:50.397+0800 [INFO]  raft: entering candidate state: node="Node at 127.0.0.1:10001 [Candidate]" term=5
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-voting for self: term=5 id=1
2024-10-11T11:59:50.397+0800 [DEBUG] raft: calculated votes needed: needed=1 term=5
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-vote received: from=1 term=5 tally=0
2024-10-11T11:59:50.397+0800 [DEBUG] raft: pre-vote granted: from=1 term=5 tally=1
2024-10-11T11:59:50.397+0800 [INFO]  raft: pre-vote successful, starting election: term=5 tally=1 refused=0 votesNeeded=1
2024-10-11T11:59:50.406+0800 [DEBUG] raft: voting for self: term=5 id=1
2024-10-11T11:59:50.423+0800 [DEBUG] raft: vote granted: from=1 term=5 tally=1
2024-10-11T11:59:50.424+0800 [INFO]  raft: election won: term=5 tally=1
2024-10-11T11:59:50.424+0800 [INFO]  raft: entering leader state: leader="Node at 127.0.0.1:10001 [Leader]"

启动一个三节点rkv集群

我提供了startup.sh和docker-compose,可以快速构建镜像,并启动一个三节点集群。只需要运行`./hack/startup.sh`即可。也可以通过二进制方式手工启动。

可以参考代码仓的README

TODO

未来将考虑完成客户端SDK和命令行工具的实现。

1

评论区