1. Overview
Redis Cluster可以解决单机瓶颈,把负载分摊,并提高并发度。本文和下面几篇会详细分析Redis Cluster的关键源码。
这一篇主要讲Redis Cluster的搭建流程,而集群搭建主要分为下面3步:
- 节点准备
- 节点握手
- 分配槽位
2. 节点准备
启动集群,首先配置文件中cluster-enabled yes
要打开。
不过启动节点的时候,和普通Redis节点类似,只是增加了集群配置的初始化:
- 首先是
loadServerConfig
函数,载入配置文件的cluster-
配置; - 然后是在
initServer
函数中,调用clusterInit
初始化集群的状态。
其它部分和普通Redis节点基本一样。因此,主要看clusterInit
函数。不过在之前,先说明一下关于集群相关的数据结构:clusterNode
和clusterState
。
2.1. 集群数据结构
这里主要是clusterNode
和clusterState
。
首先是clusterNode
,它用来描述集群中的一个节点:
typedef struct clusterNode {
mstime_t ctime; // 节点对象创建时间
char name[CLUSTER_NAMELEN]; // 节点名称
int flags; // 节点标识(CLUSTER_NODE_)
uint64_t configEpoch; // 该节点观察到的上一次的config epoch
unsigned char slots[CLUSTER_SLOTS/8]; // 本结点的槽位图
int numslots; // 本节点的槽个数
int numslaves; // 本节点下属的从节点个数(若自己是主节点)
struct clusterNode **slaves; // 从节点数组
struct clusterNode *slaveof; // 若节点是从节点,该指针指向对应的主节点
mstime_t ping_sent; // 上一次发送ping的时间
mstime_t pong_received; // 上一次收到pong的时间
mstime_t fail_time; // 上一次设置FAIL下线的时间
mstime_t voted_time; // 上一次为从节点投票的时间
mstime_t repl_offset_time; // 上一次更新复制偏移量的时间
mstime_t orphaned_time; // 孤立主节点的迁移时间
long long repl_offset; // 本节点已知的复制偏移
char ip[NET_IP_STR_LEN]; // 本节点IP
int port; // 本节点端口
int cport; // 本节点的集群通信端口
clusterLink *link; // 与本节点相关的连接对象
list *fail_reports; // 下线报告链表
} clusterNode;
每个节点包含一个连接对象clusterLink
,维护与该节点相关的连接状态:
typedef struct clusterLink {
mstime_t ctime; // 连接创建时间
int fd; // 连接文件描述符
sds sndbuf; // 输出缓冲
sds rcvbuf; // 输入缓冲
struct clusterNode *node; // 与该连接相关的节点
} clusterLink;
然后是clusterState
,代表本节点视角下的集群的状态:
typedef struct clusterState {
clusterNode *myself; // 本节点实例
uint64_t currentEpoch; // 当前的epoch
int state; // 本节点视角的集群状态
int size; // 集群的节点个数(节点已分配槽)
dict *nodes; // name -> clusterNode的字典
dict *nodes_black_list; // 防止重复添加节点的黑名单
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; // 导入槽数据到目标节点的数组
clusterNode *importing_slots_from[CLUSTER_SLOTS]; // 从目标节点槽数据的数组
clusterNode *slots[CLUSTER_SLOTS]; // 槽和负责该槽节点的映射
uint64_t slots_keys_count[CLUSTER_SLOTS]; // 每个槽的键个数
rax *slots_to_keys; // 槽映射到键的radix tree/trie
// The following fields are used to take the slave state on elections.
mstime_t failover_auth_time; // 之前/下一次选举的时间
int failover_auth_count; // 节点获得的票数
int failover_auth_sent; // 若为真,表示本节点向其它节点请求了投票
int failover_auth_rank; // 该从节点在当前请求中的排名
uint64_t failover_auth_epoch; // 当前选举的epoch
int cant_failover_reason; // 不能执行故障转移的原因,见CLUSTER_CANT_FAILOVER
// Manual failover state in common.
mstime_t mf_end; // 手动故障转移的时间限制,0代表没有进行手动故障转移
// Manual failover state of master.
clusterNode *mf_slave; // 手动故障转移时,对应的从节点
// Manual failover state of slave.
long long mf_master_offset; // 主节点的偏移量
int mf_can_start; // 若非0,说明手动故障转移能开始
// The followign fields are used by masters to take state on elections.
uint64_t lastVoteEpoch; // 上一次给节点投票的epoch
int todo_before_sleep; // 调用clusterBeforeSleep()所做的一些事
// Messages received and sent by type.
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; //发送的字节数,根据不同种类区分
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; // 接收到的字节数,根据不同种类区分
long long stats_pfail_nodes; // 被标记为PFAIL(潜在下线)的节点数量,不包括没有地址的节点
} clusterState;
2.2. clusterInit
初始化
在clusterInit
初始化的时候:
-
首先它会初始化
clusterState
数据结构 -
若有集群配置文件,优先加载它(用于启动和恢复已有集群);否则创建一个
myself
的clusterNode
实例,代表自己,并保存一个新的集群配置文件这个集群配置文件是通过Redis本身创建和更新的,不是用于我们手动修改的
- 初始化并监听集群通信相关的文件描述符,并注册回调
clusterAcceptHandler
- 其它初始化操作
下面就是具体的代码:
void clusterInit(void) {
int saveconf = 0;
server.cluster = zmalloc(sizeof(clusterState));
// ... 初始化clusterState字段...
clusterCloseAllSlots(); // 清除所有的槽
// 首先锁定cluster config文件
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);
// 优先加载原先有的cluster config配置
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
// 若没有,则创建一个自己节点的实例,添加到clusterState中
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
// 保存cluster config文件(如果之前不存在)
if (saveconf) clusterSaveConfigOrDie(1);
server.cfd_count = 0;
// ...
// 监听集群通信相关的fd,并注册回调clusterAcceptHandler
if (listenToPort(server.port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR) {
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
// ...
}
}
// 初始化slots -> keys映射的radix tree
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
// 设置端口,默认cport = port+CLUSTER_PORT_INCR
myself->port = server.port;
myself->cport = server.port+CLUSTER_PORT_INCR;
if (server.cluster_announce_port)
myself->port = server.cluster_announce_port;
if (server.cluster_announce_bus_port)
myself->cport = server.cluster_announce_bus_port;
server.cluster->mf_end = 0;
resetManualFailover(); // 重置手动故障转移
clusterUpdateMyselfFlags(); // 更新myself的标识,主要是CLUSTER_NODE_NOFAILOVER
}
3. 节点握手
创建好节点后,集群并没有真正建立,需要我们手动建立(比如通过客户端,或者redis-trib.rb
)。而核心的命令就是CLUSTER MEET <ip> <port> [<cport>]
。
这个命令的处理函数为clusterCommand
,这里取关键的分支,这里关键的就是和目标节点握手:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
long long port, cport;
// 1. 获取IP, 端口,集群端口
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
// ... error and return ...
}
if (c->argc == 5) {
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
// ... error and return ...
}
} else {
cport = port + CLUSTER_PORT_INCR; // 默认cport = port + CLUSTER_PORT_INCR
}
// 2. 和目标握手
if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
errno == EINVAL) {
// ... error and return ...
} else {
addReply(c,shared.ok);
}
}
// ...
}
可见,握手的关键函数就是clusterStartHandshake
,下面就分析这个函数。
3.1. 节点:准备握手
节点握手的准备就是上述的函数clusterStartHandshake
,它主要的工作就是:
- 检查节点地址信息
- 检查待握手的节点是否已在握手
- 创建
clusterNode
节点实例,将其添加到clusterState
中,准备握手
节点状态初始为
CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET
:
CLUSTER_NODE_HANDSHAKE
:未完成第一次握手(第一次ping
)CLUSTER_NODE_MEET
:需要给节点发送meet
消息
int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n;
// 1. 检查地址
// ...
// 2. 检查是否正在握手
if (clusterHandshakeInProgress(norm_ip,port,cport)) {
errno = EAGAIN;
return 0;
}
// 3. 添加节点到clusterState中,即server.cluster->nodes字典
// CLUSTER_NODE_HANDSHAKE: 未完成第一次握手(发送第一次ping)
// CLUSTER_NODE_MEET: 需要给节点发送meet消息
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
// ...
clusterAddNode(n);
return 1;
}
3.2. 节点:给目标节点发送MEET
消息
这部分的流程是在clusterCron
函数中的,它每秒执行10次。
下面截取发送MEET
消息的代码,实际上,这块代码主要用于
- 重新建立断连的节点连接
- 发送
MEET
以和目标节点握手
void clusterCron(void) {
// ...
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000; // 超时最低1s
clusterUpdateMyselfFlags(); // 更新自己节点myself的标识
di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
// 遍历节点
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// 不关注自己和没有地址的节点
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
// 统计潜在下线的节点
if (node->flags & CLUSTER_NODE_PFAIL)
server.cluster->stats_pfail_nodes++;
// 若节点正在握手,且超时,则将节点从clusterState删除
// 即握手超时/失败,不将其加入集群
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
// 若没创建连接,则创建连接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// 创建socket连接,并构建对应的clusterLink
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
if (fd == -1) {
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
// 注册读回调clusterReadHandler,用于接收对方的响应
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
old_ping_sent = node->ping_sent;
// 发送握手请求,初始一般是MEET消息
// 不过没有CLUSTER_NODE_MEET标识,则发送PING请求
// (例如A MEET B, B收到A并添加A,而B视角中还没和A连接,因此会落到这里)
// 这个函数是Gossip协议的重要一环,后面会说明
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
node->ping_sent = old_ping_sent;
}
// 到这步,就可以移除CLUSTER_NODE_MEET标识
// 这步没有移除CLUSTER_NODE_HANDSHAKE,因为它还在握手之中
node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
}
dictReleaseIterator(di);
// ...
}
3.3. 目标节点:回复MEET
消息
从2.2.可知,集群中节点启动,会注册clusterAcceptHandler
回调。当有集群通信连接建立,该回调会被调用,然后被注册一个新回调clusterReadHandler
,用于接收对方发来的消息:
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// ...
if (server.masterhost == NULL && server.loading) return;
while(max--) {
// 接收连接,获得fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
// ... error and return ...
}
// ...
// 创建clusterLink对象
link = createClusterLink(NULL);
link->fd = cfd;
// 注册clusterReadHandler回调,用于接受消息
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
这里处理对方请求的函数就是clusterReadHandler
,这里大体的步骤是:
- 一直读取消息,将其追加到
link->rcvbuf
缓冲 - 若消息头读取完整,则校验
- 若消息整体读取完整,处理消息,并清空
link->rcvbuf
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
unsigned int readlen, rcvbuflen;
UNUSED(el);
UNUSED(mask);
// 一直读取,直到没有为止
while(1) {
rcvbuflen = sdslen(link->rcvbuf);
// 检查消息头是否完整
if (rcvbuflen < 8) {
// 这里是不完整的,接下来读取8-rcvbuflen以使其完整
readlen = 8 - rcvbuflen;
} else {
// 这里是完整的
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
// 检查头,若不是RCmb,则报错
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {
serverLog(LL_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
// 获取读取整个完整消息还需要的长度
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
// 读取请求,把整个消息全部读完
nread = read(fd,buf,readlen);
// 没有数据,返回
if (nread == -1 && errno == EAGAIN) return;
if (nread <= 0) {
// 读取错误,处理错误
serverLog(LL_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
// 把数据添加到clusterLink的rcvbuf缓冲中
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
// 若整个消息以读取完全,则处理消息
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
// 这里clusterProcessPacket就是处理rcvbuf缓存的请求
if (clusterProcessPacket(link)) {
// 处理成功,则将缓冲清空
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
而最终处理接收缓冲消息的函数,就是clusterProcessPacket
了。这部分也是Gossip协议的重要部分,因此也非常长。这里截取相关的代码,这里主要是:
- 必要时根据
MEET
更新自己节点的地址信息 - 若发送方在自己视角下未知,且为
MEET
,则说明发送方是新加入集群的节点,则:- 将发送方添加到自己的
clusterState
中,状态为CLUSTER_NODE_HANDSHAKE
,之后的clusterCron
会创建与发送方的连接并与其握手 - 将消息Gossip到集群
- 将发送方添加到自己的
- 响应发送方
PONG
int clusterProcessPacket(clusterLink *link) {
// ...
// 获取发送方是否已知
sender = clusterLookupNode(hdr->sender);
// ...
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);
// 若消息是MEET,且自己的地址没有设置,则根据MEET消息更新自己的地址
if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
server.cluster_announce_ip == NULL) {
char ip[NET_IP_STR_LEN];
if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip)) {
memcpy(myself->ip,ip,NET_IP_STR_LEN);
serverLog(LL_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
// 若发送方未知,且类型是MEET
// 则在自己的视角下,创建发送方节点实例,并添加到自己的clusterState中
// 节点状态是CLUSTER_NODE_HANDSHAKE,因为它没有完成自己和发送方的握手
// 而发送方节点的flags、slaveof等等都没有设置,因为可从其他节点接收到PONG时获取到这些信息
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
if (!sender && type == CLUSTERMSG_TYPE_MEET)
// 把MEET消息Gossip到其它节点上
clusterProcessGossipSection(hdr,link);
// 响应对方PONG,以完成发送方和自己的握手(反过来说是不对的)
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
// ...
}
3.4. 节点:处理目标节点的PONG
节点收到目标节点的PONG
,也是根据clusterReadHandler
这个回调处理,最终也会落入clusterProcessPacket
函数(参考3.2.节),注意本节点已经知道了目标节点。
这里也截取关键的代码,它主要工作是:
- 更新本节点视角下,目标节点的信息(如名字等)
- 清除
CLUSTER_NODE_HANDSHAKE
标识 - 清除故障转移的标识
- 清除潜在下线的标识,必要时清除下线标识(可能下线后重新上线)
int clusterProcessPacket(clusterLink *link) {
// ...
// 获取发送方是否存在
// 这里会找不到
// 因为目标节点的响应PONG携带的节点名是目标节点随机生成的,因此名字对不上,从而就找不到了
sender = clusterLookupNode(hdr->sender); // Is null when
// ...
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET) {
// ...
// 若与该连接的节点存在(这里会进入)
if (link->node) {
// 若节点正在握手
if (nodeInHandshake(link->node)) {
// 若发送方已知,则更新发送方信息,并清除该连接相关的节点(这里不会进入)
if (sender) {
serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,hdr)) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
clusterDelNode(link->node);
return 0;
}
// 这里更新目标节点在本节点视角下的名字
clusterRenameNode(link->node, hdr->sender);
serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
// 清除CLUSTER_NODE_HANDSHAKE,表示本节点视角下,与目标节点握手完成
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else if (memcmp(link->node->name,hdr->sender,
CLUSTER_NAMELEN) != 0) {
// 若握手已完成,但是名字对不上
// 则本视角下,置该节点为CLUSTER_NODE_NOADDR,并清除地址和关联的连接对象
// ...
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
// Set nofailover (clear failover) ...
// Update sender's info if PING and not handshaking ...
// 这里是PONG的处理
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = mstime(); // 更新pong时间戳,用其判断是否下线
link->node->ping_sent = 0; // 清除ping时间戳
// 清除PFAIL潜在下线标识
if (nodeTimedOut(link->node)) {
link->node->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
} else if (nodeFailed(link->node)) {
// 必要时清除下线标识
clearNodeFailureIfNeeded(link->node);
}
}
// ...
}
这步过后,本节点和目标节点的握手就完成了。
而目标节点和本节点的握手并没有完成,因为在目标节点视角下,本节点设置了
CLUSTER_NODE_HANDSHAKE
标识,但是没有设置CLUSTER_NODE_MEET
。因此目标节点会在下一轮事件循环中,向本节点发送PING
,本节点响应PONG
并被目标节点处理,此时目标节点才与本节点握手完成。
握手完成后,节点就会定期向其它节点发送PING
,以探测节点是否可用,代码在clusterCron
中:
void clusterCron(void) {
// ...
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
// ...
// 跳过自己、没地址和正在握手的节点
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
// ...
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2) {
// 定期发送ping
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// ...
}
// ...
}
3.5. Gossip协议
从上面可知,我们没有两两执行CLUSTER MEET
指令,但是新节点被集群内所有节点感知,这就是因为使用了Gossip协议。
Gossip协议的消息可以是PING
,MEET
和PONG
,消息的格式如下所示:
typedef struct {
char nodename[CLUSTER_NAMELEN]; // 节点名字(可以不是本节点)
uint32_t ping_sent; // 最近一次发送PING的时间戳
uint32_t pong_received; // 最后一次接受PONG的时间
char ip[NET_IP_STR_LEN]; // 节点IP
uint16_t port; // 节点端口
uint16_t cport; // 节点集群端口
uint16_t flags; // 节点标识
uint32_t notused1; // 未使用
} clusterMsgDataGossip;
而发送方可以发送很多的clusterMsgDataGossip
,即数组:
union clusterMsgData {
/* PING, MEET and PONG */
struct {
// Gossip消息数组
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
而Redis Cluster消息是有消息头和消息体组成,体部是上面的定义,下面定义的是整个消息的格式:
typedef struct { char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */ uint32_t totlen; /* Total length of this message */ uint16_t ver; /* Protocol version, currently set to 1. */ uint16_t port; /* TCP base port number. */ uint16_t type; /* Message type */ uint16_t count; /* Only used for some kind of messages. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. */ uint64_t offset; /* Master replication offset if node is a master or processed replication offset if node is a slave. */ char sender[CLUSTER_NAMELEN]; /* Name of the sender node */ unsigned char myslots[CLUSTER_SLOTS/8]; char slaveof[CLUSTER_NAMELEN]; char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ char notused1[34]; /* 34 bytes reserved for future usage. */ uint16_t cport; /* Sender TCP cluster bus port */ uint16_t flags; /* Sender node flags */ unsigned char state; /* Cluster state from the POV of the sender */ unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ union clusterMsgData data; // 这里就是定义的体部,如gossip消息 } clusterMsg;
当节点发送PING
,PONG
和MEET
时,会使用Gossip协议,这里使用的是clusterSendPing
函数,发送方会:
- 选取几个自己已知的节点,将这些节点的信息追加到
gossip
数组 - 把消息发送给目标接收方
- 发送方会将1~2步应用到其它所有已知节点上
而接收方:
- 处理Gossip消息,不仅感知发送方,而且感知了
gossip
数组中其它节点的信息
而发送怎么选取节点,可以参考clusterSendPing
函数,这里代码略,不过有2个重要变量需要说明:
freshnodes
:除了自己和目标节点之外,集群中其它所有节点个数,即freshnodes = dictSize(server.cluster->nodes)-2
wanted
:Gossip消息中,需要选取节点的个数,满足wanted = floor(dictSize(server.cluster->nodes)/10)
wanted
最小为3
4. 槽指派
组成集群后,实际上还是不能工作的,因为没有分配槽位(slot)。
而槽的分配可通过CLUSTER ADDSLOTS
命令指定。
4.1. 槽信息维护
如之前所述,每个节点的槽分配信息记录在clusterNode
里,如下所示:
slots
:实际上是一个bitmap,若某个槽位是自己管理的,对应的位会被置1,否则置0(因此它是一个长度为CLUSTER_SLOTS/8
的char
数组)numslots
:该节点分配到的槽个数
#define CLUSTER_SLOTS 16384
typedef struct clusterNode {
// ...
unsigned char slots[CLUSTER_SLOTS/8]; // 本结点的槽位图
int numslots; // 本节点的槽个数
// ...
}
此外,集群状态clusterState
也会有对应的槽位管理信息:
migrating_slots_to
:记录导出槽数据到的目标节点,即:- 本节点需要把第
i
槽的数据导出到migration_slots_to[i]
- 本节点需要把第
importing_slots_from
:记录导入数据的源节点,即:- 本节点需要把第
i
槽的数据从importing_slots_from[i]
导入进来
- 本节点需要把第
slots
:槽和节点的映射,即:slots[i]
的节点负责第i
槽
slots_keys_count
:每个槽的键个数
typedef struct clusterState {
// ...
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
uint64_t slots_keys_count[CLUSTER_SLOTS];
// ...
}
4.2. 处理槽位分配
客户端发起CLUSTER ADDSLOTS <slot> [slot ...]
,即可执行槽位分配。
这里入口函数依旧是clusterCommand
,这里截取关键分支,:
void clientCommand(client *c) {
// ...
else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3){
int j, slot;
// 创建本节点一个新slots分配图,存储请求参数的信息
unsigned char *slots = zmalloc(CLUSTER_SLOTS);
int del = !strcasecmp(c->argv[1]->ptr,"delslots");
memset(slots,0,CLUSTER_SLOTS);
// 遍历所有参数
for (j = 2; j < c->argc; j++) {
// 获取参数中的槽位置
if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
zfree(slots);
return;
}
if (del && server.cluster->slots[slot] == NULL) {
// 如果是删除操作,但是槽没有指定负责的节点,回复错误信息
addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
zfree(slots);
return;
} else if (!del && server.cluster->slots[slot]) {
// 如果是添加操作,但是槽已有指定负责的节点,也回复错误信息
addReplyErrorFormat(c,"Slot %d is already busy", slot);
zfree(slots);
return;
}
// 给对应slot的分配图+1,表示已分配
if (slots[slot]++ == 1) {
// 若重新分配多次,也返回错误
addReplyErrorFormat(c,"Slot %d specified multiple times",
(int)slot);
zfree(slots);
return;
}
}
// 遍历所有的槽(根据新创建的槽位图,即参数信息)
// 根据新槽位图(参数信息),更新槽位图(添加/删除)
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;
// 如果这个槽被设置为导入状态,那么取消该状态
if (server.cluster->importing_slots_from[j])
server.cluster->importing_slots_from[j] = NULL;
// 添加/删除槽位分配
retval = del ? clusterDelSlot(j) :
clusterAddSlot(myself,j);
serverAssertWithInfo(c,NULL,retval == C_OK);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
// ...
}
这里的代码还是很简单的:
- 遍历参数,并记录
- 若是删除操作,但是槽位没有指定负责的节点,回复错误信息
- 若是添加操作,但是槽位已经指定负责的节点,回复错误信息
- 若某个槽位值已经指定过多次了(在参数中指定了多次),回复错误信息
- 根据这个记录,添加/删除槽位分配
而本身添加和删除槽位也是很简单的:
- 添加槽位:
clusterAddSlot
,这里就是把自己节点的对应槽位1(clusterNode->slots
),并设置负责该槽的节点(clusterState->slots
),并更新其它信息 - 删除节点:
clusterDelSlot
,这里就是把负责该槽的节点的槽位0,并置空负责该槽的节点,并更新其它信息
4.3. 广播槽位信息
CLUSTER ADDSLOT
命令仅可以更改节点本视角下集群的槽位分配,因此需要将这个信息传播,以保证集群一致性。
4.3.1. 槽位信息发送
当每次发送消息时(不论什么类型的消息,因此不仅仅是Gossip了),发送的消息都遵守一定的格式,如3.5.所述。消息包含头部和体部,体部根据不同类型有所变化,但是头部格式是基本不变的。
而槽位的信息就包含在头部。每当节点向其他成员发消息时,都会赋上本节点负责的槽位图,代码如下所示:
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
// ...
// 附带本节点负责的槽位图到消息头上
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
// ...
}
4.3.2. 槽位信息接收和处理
和之前的一样,消息接收处理的最后进入clusterProcessPacket
函数,这里截取更新槽位信息的片段:
int clusterProcessPacket(clusterLink *link) {
sender = clusterLookupNode(hdr->sender); // 查找是否是已知节点
// ...
if(type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET) {
// ...
clusterNode *sender_master = NULL; /* Sender or its master if slave. */
int dirty_slots = 0; // 槽位图是否不一致
// 1. 判断本节点视角下,发送方节点槽位图和请求中的槽位图是否不一致
if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
// 2. 若发送方是主节点,且出现了不一致,则需要更新本节点视角下发送方的槽位图
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
// ... handling greater epoch ...
// ...
}
// ...
}
这里关键函数就是clusterUpdateSlotsConfigWith
:
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
// sender一定是主节点,slot是sender发来的分配到sender的槽位图
int j;
clusterNode *curmaster, *newmaster = NULL;
uint16_t dirty_slots[CLUSTER_SLOTS];
int dirty_slots_count = 0;
// 如果当前节点是主节点,那么获取当前节点
// 如果当前节点是从节点,那么获取当前从节点所从属的主节点
curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
if (sender == myself) {
serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
return;
}
// 遍历所有槽
for (j = 0; j < CLUSTER_SLOTS; j++) {
// 若请求中第j槽被分配
if (bitmapTestBit(slots,j)) {
// 若本节点视角下,第j槽本身就被sender负责,跳过
if (server.cluster->slots[j] == sender) continue;
// 若本节点视角下,第j槽需要从其它节点导入,也跳过
// 因为这个字段只能被redis-trib修改(如当重新分片时)
if (server.cluster->importing_slots_from[j]) continue;
// 当满足下面条件时,重新分配第j槽给sender(clusterState中):
// 1. 第j槽本身没有分配(clusterState中),或者请求的configEpoch比本节点的更大
// 2. 该槽的数据并没有执行导入
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch) {
// 记录脏槽
// 脏槽指: 原本属于自己的,且有数据的槽,该槽需要被分配给其它节点(即sender)
if (server.cluster->slots[j] == myself &&
countKeysInSlot(j) &&
sender != myself) {
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}
// 若执行这一步,说明第j槽原来是自己的主节点,现在由sender接管
// 即发生了故障转移
if (server.cluster->slots[j] == curmaster)
newmaster = sender;
// 删除当前被指定的槽
clusterDelSlot(j);
// 将该槽分配给sender
clusterAddSlot(sender,j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
}
}
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return;
// 若发生了故障转移,且老的主节点已经不负责任何槽
// 则在本节点视角下,将sender提升为自己的主节点
if (newmaster && curmaster->numslots == 0) {
serverLog(LL_WARNING,
"Configuration change detected. Reconfiguring myself "
"as a replica of %.40s", sender->name);
clusterSetMaster(sender);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
// 若检测到脏槽,那么需要把脏槽的数据清除
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]);
}
}
总结一下上面的函数,本节点发送更新槽位信息分为2种情况:
- 若本节点视角下,槽位没有被指定:直接分配给发送者
- 若本节点视角下,槽位已被指定,且
configEpoch
更大- 记录脏槽位(原本属于自己,但要分配给发送者)
- 若槽位原本属于自己/自己的主节点,那么还得记录故障转移信息
- 分配槽位给发送者
而分配完成后,还有一些事情要做:
- 若检测到故障转移的信息,即槽位从自己/自己的主节点变成了发送方,则设置本节点为发送方的从节点
- 若检测到脏槽位,则需要清除脏槽位的键数据
如果时间足够,每个主节点都会将自己负责的槽位信息告知给集群中的其他节点,于是,集群中的每一个节点都会知道16384
个槽分别指派给了集群中的哪个节点,达成最终一致。
达到最终一致后,集群就正式可用了。
5. 槽路由
计算键属于哪个槽,可通过CLUSTER KEYSLOT <key>
获得,内部起始调用的就是CRC16算法,代码如下:
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
而当请求的键所在的槽不属于本节点时,会返回客户端MOVE
错误,并捎带正确客户端的信息,客户端就可以重定向请求到正确节点。
6. 集群模式下的数据库
在单机模式下,Redis可拥有多个数据库。但是在集群模式下,只能使用0号数据库(即第一个)。
除此之外,Redis中clusterState
的slots_to_keys
字段,维护了一个槽和键的关系。
typedef struct clusterState {
// ...
rax *slots_to_keys;
// ...
} clusterState;
这个字段是一个radix tree,你可以将其看作一个压缩的trie,具体可以看这篇文章:https://my.oschina.net/yunqi/blog/3039132。
而slots_to_keys
存储了本节点上存储的所有的键,并且附带了键所在的槽号。
Redis是这样存的:
- 获取记录的键
key
,计算它所在的槽号id
- 然后将
id + key
作为整个字符串,存储到slots_to_keys
中
当客户端发起如CLUSTER GETKEYSINSLOT
时,由于槽号是存储在树的开头,因此能很容易的找到对应槽下所有键的子树($O(logN)$),然后再执行遍历。而之前版本使用skip list存储,虽然定位对应槽下所有键的时间复杂度也一样(也是$O(logN)$),但是相比radix tree更耗内存(因为radix tree重用字符且会压缩子节点)。