1. Overview
Redis Cluster同样也是HA的,即它支持:
-
主从复制
- 故障检测
- 故障转移
下面分别分析这3个功能。
2. 集群主从复制
Redis Cluster要实现HA,复制功能是一切的基础。而集群模式下,复制只支持1层,即不支持级联复制,也就是说,从节点不允许拥有从节点。
开启复制,需要客户端向节点发送CLUSTER REPLICATE <node>
命令,节点会把节点node
设为自己的主节点,并开始数据同步的流程。设置节点成为节点node
的从节点的条件是:
- 目标节点必须知道节点
node
- 节点
node
不能是自己,且不能是一个从节点 - 目标节点若原本是主节点,但被分配了槽,或者存储了数据,则不能设置
node
成为目标节点的主节点
这一部分在clusterCommand
中,下面截取其片段:
void clusterCommand(client *c) {
// ...
else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
// 寻找待设置的主节点
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
// 新主节点必须必须已知
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
}
// 不能复制自己
if (n == myself) {
addReplyError(c,"Can't replicate myself");
return;
}
// 不能将一个从节点设置成自己的主节点(即不能级联复制)
if (nodeIsSlave(n)) {
addReplyError(c,"I can only replicate a master, not a replica.");
return;
}
// 若自己是一个主节点,且有被指定槽,或者存储了数据,就不能降级成从节点
// 不过自己可以切换自己的主节点为另一个
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
return;
}
// 设置新的主节点,自己成为它的从节点
clusterSetMaster(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} // ...
// ...
}
从上可知关键设置目标节点成为节点node
的从节点,在于函数clusterSetMaster
,它主要做的事情如下所示:
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);
if (nodeIsMaster(myself)) {
// 若自己原来是主节点,则标记自己是CLUSTER_NODE_SLAVE,并清除importing和migrating状态
// 这里可以大胆做,因为节点是没有数据的
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
// 若自己原来是从节点,则从原来主节点的从节点表中删除自己
if (myself->slaveof)
clusterNodeRemoveSlave(myself->slaveof,myself);
}
// 标记自己的主节点是节点node
myself->slaveof = n;
// 将自己加入到节点node的从节点表
clusterNodeAddSlave(n,myself);
// 初始化同步的状态,供replicationCron函数启动数据同步
replicationSetMaster(n->ip, n->port);
// 重置手动故障转移
resetManualFailover();
}
而replicationSetMaster
就是初始化数据同步的状态,最主要的是设置server.repl_state = REPL_STATE_CONNECT
),在replicationCron
中若观察到这个状态,就会开始进行和主节点的数据同步(而数据同步的部分可参考这里):
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
// 设置新主节点的地址信息
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); // 关闭阻塞的客户端
disconnectSlaves(); // 关闭下属从节点的连接
cancelReplicationHandshake(); // 取消握手
if (was_master) replicationCacheMasterUsingMyself();
// 初始化数据同步握手状态,当replicationCron观察到这个状态时开始进行数据同步
server.repl_state = REPL_STATE_CONNECT;
}
之后的内容之前已经讲过了,不多叙述。而这里,集群的状态只更新到了一个节点上,不够根据Gossip消息传播,这个变更会被同步到其它节点上。
3. 故障检测
集群中的节点会定期向其它部分节点发送PING
消息,以判断节点是否出现了下线问题。下线状态和哨兵检测类似,包含:
- 主观下线
- 客观下线
3.1. 主观下线检测
和哨兵系统类似,Redis Cluster中的节点会定期向其它节点发送PING
,响应PONG
。这是Gossip协议的一部分。这类信息不仅可以传播节点槽信息,还可以传播主从状态、节点故障信息等(自己和部分其它节点,参考之前的Redis Gossip部分)。因此故障检测也是基于此。
a) 定期向最有可能故障的节点发送PING
Redis Cluster节点会每隔1s,随机取出5个节点,向1个最有可能发生故障的节点发送PING
,这一部分在clusterCron
函数中执行:
void clusterCron(void) {
// ...
// clusterCron每秒10次,因此这部分每秒1次
if (!(iteration % 10)) {
int j;
// 随机选择5个节点
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
// 忽略断连的,已经发送PING但没收到PONG的
// (当PING发出后,没收到PONG,之前分析过,ping_sent不为0,而收到PONG后会置0)
if (this->link == NULL || this->ping_sent != 0) continue;
// 忽略自己和还没握手完成的
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
// 挑选收到PONG时间戳最小的,说明现在离收到PONG最久远,也最有可能发生故障
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 向最有可能发生故障的节点发送PING
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// ...
}
b) 判断和处理主观下线
当节点发送了PING
(即ping_sent
不为0),但迟迟没有收到PONG
时,对方节点就很有可能下线了。因此当满足这个条件时,节点可以判断其主观下线,也即“疑似下线”,给该节点标记CLUSTER_NODE_PFAIL
。
这部分代码也在clusterCron
里:
void clusterCron(void) {
// ...
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
// 遍历所有已知节点
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
// 跳过自己,没地址的和没完成集群握手的节点
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
// 若自己是从节点,对应节点是主节点且没下线
// 判断孤立主节点,统计个数,之后会为孤立主节点迁移一个新的工作的从节点
// 孤立主节点是:
// 1. 没下线且被分配了槽的节点,它下属配置有从节点,但从节点全部不可用
// 2. 它被标记为导出状态
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0 &&
node->flags & CLUSTER_NODE_MIGRATE_TO) {
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}
// 若节点有连接,连接不是重连的,且发送了PING但没收到PONG,时间超过上限的一半
// 则先释放连接,可能之后会重连(怀疑网络原因,但节点还是存活的)
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > server.cluster_node_timeout/2) {
freeClusterLink(node->link);
}
// 若节点收到了PONG,但是收到的时间距离现在也有超时时限的一半,且之后没发过PING
// 再发送一次PING,这里一般是握手刚结束后,第一次发送PING
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2) {
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 若自己是主节点,有一个从节点发起故障转移,那么也向该发起请求的节点发送PING
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link) {
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 到这里,若检测到收到了PONG,跳过
if (node->ping_sent == 0) continue;
// 到这里,我们向节点发送了PING,但没收到PONG
// 计算结果了多少时间
delay = now - node->ping_sent;
// 若超时,则标记为CLUSTER_NODE_PFAIL
if (delay > server.cluster_node_timeout) {
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
// ...
}
当然这里也做了其它事情,如:
- 统计孤立主节点个数,之后会从其它主节点的从节点中,为孤立主节点迁移一个从节点
- 释放回复
PONG
过慢的连接,这个连接可能会重新建立,使得连接更健壮 - 节点第一次加入集群后,触发第一次
PING
(因为MEET
完,会收到PONG
,但没发过PING
) - 向请求故障转移的从节点发送
PING
- 最后故障检测,将疑似下线的节点标记为
CLUSTER_NODE_PFAIL
,默认超时15s
3.2. 客观下线检测
和哨兵一样,主观下线不代表一定下线,需要得到集群其它节点的同意/共识,才能让其客观下线。
客观下线检测依旧依赖PING/PONG
机制,不过这里利用了Gossip协议的特性,即消息中不仅有自己的状态,也带有其它节点的状态。
在之前的文章里已经提及了Redis的Gossip协议(见此文的3.5.节)。Redis Cluster节点会选择自己已知的1/10的节点,且数量最小是3,将这些信息包在Gossip消息体里,随者PING/PONG
发送给对方。而接收方,会在函数clusterProcessGossipSection
中,把Gossip消息体的消息抽出来,而在这里,也会做客观下线的检测工作:
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
// 获取PING/PONG消息的Gossip部分
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
// 获取发送方节点
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
// 遍历Gossip部分消息,每个消息都是集群中某一个节点的状态信息
while(count--) {
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
// ...
// 获取Gossip消息中对应的节点标识
node = clusterLookupNode(g->nodename);
if (node) {
// 若节点是自己知道的
// 若节点是主节点,且不是自己,则:
// 1. 若节点被发送方标记为FAIL/PFAIL,创建和更新错误报告
// 2. 判断节点是否客观下线
// 3. 若不满足1,则删除该节点的错误报告,认为其上线
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
markNodeAsFailingIfNeeded(node);
} else {
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
// 若节点是正常且没有错误报告,且该节点没有等待由PING发出后的PONG
// 则更新该节点的pong_received时间戳
if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
node->ping_sent == 0 &&
clusterNodeFailureReportsCount(node) == 0) {
mstime_t pongtime = ntohl(g->pong_received);
pongtime *= 1000;
if (pongtime <= (server.mstime+500) &&
pongtime > node->pong_received) {
node->pong_received = pongtime;
}
}
// 若自己知道该节点,但该节点不可用,且看到该节点地址变化
// 则更改该节点的地址,释放其连接,等待下一次连接(到新地址)的创建
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport))) {
// 释放连接
if (node->link) freeClusterLink(node->link);
// 更改地址,并消除CLUSTER_NODE_NOADDR标识
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
}
} else {
// 若该节点自己不知道(即不在本视角集群内),且有地址,不在黑名单内
// 则将其添加到本视角下的集群中,并准备握手(设置握手前的状态)
// 握手和黑名单在之前已经说明,这里略
if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename)) {
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
}
}
g++; // 遍历到下一条Gossip消息
}
}
Gossip消息处理这块还是相对简单的,它将PING/PONG
携带的Gossip消息解析并遍历,对于每条Gossip消息:
-
消息中,节点是我知道的
- 假如节点是主节点,且不是自己
- 若消息中,节点被发送者打上
PFAIL/FAIL
(即下线标识),创建或更新错误报告,判断并标记客观下线 - 否则,删除该节点的错误报告
- 若消息中,节点被发送者打上
- 假如节点是正常的,没有错误报告
- 若节点此时没有因为发出
PING
而等待PONG
,则更新PONG
的接收时间
- 若节点此时没有因为发出
- 假如节点之前被标记了
PFAIL/FAIL
,而消息中节点是上线的,而且地址变化- 释放和清理旧连接,等待下一轮事件循环的重连
- 更新地址信息
- 消除
NOADDR
标识
- 假如节点是主节点,且不是自己
-
消息中,节点是我不知道的
- 添加节点到集群(但此时并没有真正加入,只是打了
HANDSHAKE
标识) - 设置并准备开始握手
- 添加节点到集群(但此时并没有真正加入,只是打了
客观下线的判断,首先需要收集错误报告,当节点被报告为FAIL/PFAIL
时,会调用clusterNodeAddFailureReport
函数创建一个错误报告到链表中,链表的个数就是集群中认为该节点下线的节点个数:
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
list *l = failing->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;
// 首先遍历错误报告链表,若错误报告中已有发送方的报告,更新它即可
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) {
fr->time = mstime();
return 0;
}
}
// 否则创建一个新的错误报告,附上发送方和时间戳,然后添加到链表fail_reports里
fr = zmalloc(sizeof(*fr));
fr->node = sender;
fr->time = mstime();
listAddNodeTail(l,fr);
return 1;
}
而客观下线这块判断,很明显就在函数markNodeAsFailingIfNeeded
,易知:
- 节点客观下线,需要集群中半数以上同意
- 节点标记为客观下线后,若自己是主节点,需要将其广播到其它节点上,以强制其它节点将该节点标记为
FAIL
(客观下线)
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
// 客观下线需要的票数,即半数以上
int needed_quorum = (server.cluster->size / 2) + 1;
if (!nodeTimedOut(node)) return; // 若可达则直接返回
if (nodeFailed(node)) return; // 已标记为FAIL,直接返回
// 获取错误报告的个数,即集群中认为该节点下线的节点数
failures = clusterNodeFailureReportsCount(node);
if (nodeIsMaster(myself)) failures++; // 若自己是主节点还得将计数+1
// 若没超过半数,直接返回,不能判断其客观下线
if (failures < needed_quorum) return;
// 到这里,票数超过一半
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
// 标记节点为FAIL,即客观下线
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
// 若自己是主节点,将客观下线的消息广播到集群其它节点,以强制
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
最后是广播下线消息,这部分比较简单,看下代码就行。代码里也标记了下线消息的格式,其实就是一个节点名字。广播之后,其它节点就知道该节点客观下线了:
typedef struct {
char nodename[CLUSTER_NAMELEN]; // 客观下线的节点名
} clusterMsgDataFail;
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
// 构建FAIL的消息包包头
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
// 设置下线节点的名字
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
// 发送给所有集群中的节点
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
4. 故障转移
当节点收到clusterMsgDataFail
消息后,会把消息内对应节点设置成客观下线(CLUSTER_NODE_FAIL
),并设置下线时间。
而到了下一轮周期时,在clusterCron
中,执行故障转移:
void clusterCron(void) {
// ...
// 若自己是从节点,则去检查故障并执行故障转移
if (nodeIsSlave(myself)) {
// 设置手动故障转移的状态
clusterHandleManualFailover();
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
// 执行从节点的自动或手动故障转移,从节点获取其主节点的哈希槽,并传播新配置
clusterHandleSlaveFailover();
// 若存在孤立节点,且集群中某个主节点有超过1个从节点,且这个主节点就是自己的主节点
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
// 则给这个孤立主节点分配(通过迁移)一个从节点
clusterHandleSlaveMigration(max_slaves);
}
// ...
}
当自己是从节点时,需要检查自己的主节点是否客观下线,并和其它条件一起,判断自己是否需要执行故障转移。上面的代码主要有3步:
- 设置手动故障转移的状态
- 必要时,执行故障转移
- 为孤立主节点迁移从节点
这里最关键的是第2步,即函数clusterHandleSlaveFailover
,它是做故障转移的,它主要可以分为下面几大步:
- 检查是否能执行故障转移
- 发起选举,尝试被其它主节点提升为主节点
- 执行故障转移,并告知其它节点
4.1. 故障转移选举前的检查
由于故障转移需要选出一个新的主节点,这需要选举投票,因此本节点是否有资格执行故障转移,首先需要检查本节点是否有资格发起选举,这部分代码如下:
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
// 计算上次选举过去的时间
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
// 自己成为主节点所需要的票数,需要半数以上**主节点**同意
int needed_quorum = (server.cluster->size / 2) + 1;
// 手动故障转移的标识
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
// 投票超时时间: 2*cluster_node_timeout (至少2s)
// 投票重试超时时间: 2*auth_timeout (至少4s)
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;
// 自己执行故障转移函数的条件:
// 1. 自己是从节点
// 2. 自己的主节点被标记为客观下线,或者被设置了手动故障转移
// 3. 自己的主节点有负责槽位
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
// 不满足条件,则置失败原因为CLUSTER_CANT_FAILOVER_NONE
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return; // 直接返回
}
// 计算自己和主节点连接断开/没有交互的时间,这里需要减去超时时间
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
// 判断这个从节点是不是比较新,若不是很新,则不能执行故障转移
// 手动故障转移除外
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
// ...
}
若自己要发起选举,首先需要满足下面的条件:
- 自己必须是从节点
- 自己的主节点必须客观下线/手动设置了故障转移
- 自己的主节点原先被分配了槽位
- 若配置了
cluster-slave-validity-factor
,则还需要自己的数据足够新
4.2. 发起选举的准备
若通过4.1.,那么本节点就有资格发起选举。而选举是基于Raft算法,下面的代码可以看到很多Raft算法的影子。
这时候需要更新选举开始时间,当到达这个时间后,才能执行后续的选举流程。
void clusterHandleSlaveFailover(void) {
// ...
// 若上次尝试选举超时,且重试时间已过
// 则设置下一次选举的时间
if (auth_age > auth_retry_time) {
// 下一次选举时间设置(类似Raft的超时时间)
server.cluster->failover_auth_time = mstime() +
500 +
random() % 500; // Raft算法中的timeout/delay是随机的
server.cluster->failover_auth_count = 0; // 收到的票数
server.cluster->failover_auth_sent = 0; // 是否发起投票
server.cluster->failover_auth_rank = clusterGetSlaveRank(); // 本节点的rank
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
// 这里是手动故障转移
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
// ...
// 将PONG发给其它从节点,带有自己的复制偏移量
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
// 若没有开始请求故障转移的投票
// 则获取本节点的最新rank,因为其它心跳包携带的复制偏移量会影响本节点的rank
// 若新rank靠后,那么还得延迟故障转移开始的时间,并更新到server.cluster->failover_auth_rank
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
// 如果还没有到故障转移选举的时间,直接返回
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
// 如果距离本轮发起故障转移投票的时间过了太久,那么也不执行故障转移,直接返回
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
// ...
}
这里总结一下:
- 若
auth_age > auth_retry_time
(超时且重试时间已过),那么表示这一次故障转移超时,那么:- 设置下一轮新的故障转移开始时间(随机延迟,这是Raft规定的),并初始化其它字段,准备请求新一轮的投票
- 向其它从节点发送
PONG
,并携带自己的复制偏移,用于给其它从节点计算rank
- 返回,等待下一轮事件循环以发起投票
- 更新并处理
rank
,若rank
靠后了,则需要延后下一次请求投票的时间(因为Raft中,延后发起投票将有更小几率成为主节点) - 若当前没到发起选举的时间,直接返回
- 若发起投票后超时,也直接返回,放弃故障转移(即不成为主节点,成为从节点)
总体而言出现了很多Raft的影子。
4.3. 发起投票选举
4.3.1. 发起投票请求
一轮一轮事件循环后,执行到这里,说明已经到了发起投票选举的时候,这时候本节点会发起一轮选举(即成为candidate,发起投票,等同于Raft的requestVote
):
void clusterHandleSlaveFailover(void) {
// ...
// 这时候,自己成为candidate,发起新一轮投票
if (server.cluster->failover_auth_sent == 0) {
// 根据Raft, epoch要自增
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
// 向所有节点发送FAILOVE_AUTH_REQUEST消息,类似Raft论文中的requestVote RPC
clusterRequestFailoverAuth();
// 标记本节点已经发起了投票请求
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
// 这里直接返回,下一轮事件循环是不会进入这里,因为failover_auth_sent == 1
return;
}
// 下面是处理投票响应...
// ...
}
而发起投票会发给其它所有的从节点,其消息会包含CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST
标识(clusterMsg->type
标识)。
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
// 构建消息头,打上CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST标识
// 类似表示一个Raft的requestVote请求
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
// 若是手动故障转移,则还需要设置CLUSTERMSG_FLAG0_FORCEACK
// 表示即使主节点在线,也要认证故障转移
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
// 广播投票请求给所有节点
clusterBroadcastMessage(buf,totlen);
}
注意,发起投票请求后,函数就直接返回了。因此等待投票响应需要等待下一轮的事件循环,这部分可看上面代码注释的解释。
4.3.2. 响应投票
其它从节点收到投票请求(即Raft的requestVote
)后,需要判断是否把票投给它,把投票的结果返回给对方。
这里处理的函数是clusterSentFailoverAuthIfNeeded
,代码如下:
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
// 若自己是从节点,或者自己负责的槽数为0,则无权投票
if (nodeIsSlave(myself) || myself->numslots == 0) return;
// 请求epoch小于自己维护的epoch,不投票给对方,直接返回
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
// 若本轮epoch我已经投过票了,也拒绝给对方投票,直接返回
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
// 拒绝给下面这些情况的投票:
// 1. 请求的节点是主节点
// 2. 请求的主节点未知
// 3. 请求的主节点没客观下线,且没设置手动故障转移
if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack)) {
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}
// 在cluster_node_timeout * 2时间内,拒绝再次投票,这不影响正确性
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}
// 检查请求从节点负责的槽的config epoch
// 若请求的config epoch小于本节点视角下槽的config epoch
// 则也拒绝投票,理由和之前判断epoch一样
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}
// 到这里,节点可以给这个从节点投票
// 更新lastVoteEpoch, voted_time
// 并响应FAILOVER_AUTH_ACK消息,把票给这个请求的从节点
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
总结一下:
- 若要投票权,则需要:
- 自己是主节点
- 需要负责一定数量的槽
- 是否要投票给请求的节点,投票会带有
FAILOVER_AUTH_ACK
标记:- 请求的
epoch
要大于本视角下集群的epoch
(小于:过期;等于:已投过票) - 请求的节点不能是:
- 主节点
- 其主节点未知
- 没设置手动故障转移下,其主节点没有客观下线
- 本次请求时间距离上一次投票在2s以外
- 请求的
configEpoch
不小于其主节点负责槽的configEpoch
(小于:过期)
- 请求的
可见,上面的投票规则,尽管有一定的不同,但实现思想和Raft基本类似。
Raft也是通过
epoch
,logTerm
,logIndex
和voteFor
决定是否投票:
- Raft的
epoch
对应上面的currentEpoch
- Raft的
voteFor
对应上面currentEpoch
相等的情况(这里投票方不需要知道选出来的主节点到底是谁,所以省略了voteFor
字段)- Raft的
logTerm
和logIndex
对应上面槽的configEpoch
标准的Raft论文实现,可参考这里的第6节。
4.3.3. 处理投票响应
发起投票后,需要处理对方投票的响应。
由4.2.可知,若对方投票给自己,则会收到响应,否则就收不到响应。所以,当收到投票响应时,只要给当前epoch
的投票计数+1即可。这部分仍然在clusterProcessPacket
函数执行:
int clusterProcessPacket(clusterLink *link) {
// ...
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; // 忽略未知发送方
// 若投票方是主节点,且该节点负责一定数量的槽,投票响应的epoch不小于自己的epoch
// 则自己认可这次投票,直接给failover_auth_count计数加1,用于统计票数
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch) {
server.cluster->failover_auth_count++;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
}
// ...
}
4.4. 执行故障转移/主节点替换
当投票请求发出,并接收到足够多的投票后,就可以判断自己是否可以成为主节点,从而执行故障转移。
这部分代码就在4.1.代码的后面,在发起投票请求的后几轮事件循环执行:
- 若本轮检查,得到的票数超过集群主节点的半数,则
- 更新
configEpoch
- 执行故障转移,接管旧主节点的槽
- 更新
- 若检查,票数不够,则不能执行故障转移,需要等待下一轮的票数检查
void clusterHandleSlaveFailover(void) {
// ... 上面是发起投票 ...
// ... 下面是处理投票响应 ...
// 若成为新主节点,则需要得到集群其它半数以上主节点的统一
if (server.cluster->failover_auth_count >= needed_quorum) {
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
// 更新configEpoch(若更大)
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
// 接管旧主节点的槽,即执行故障转移
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
}
从上面可见,当票数过半时,就可以执行故障转移了。故障转移的逻辑就在函数clusterFailoverReplaceYourMaster
中,主要的动作是:
- 取消和原主节点的复制,设置自己为主节点
- 接管原主节点的槽
- 更新集群状态,并写入文件
- 广播
PONG
,告知所有节点主节点已经切换 - 重置手动故障转移
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof;
if (nodeIsMaster(myself) || oldmaster == NULL) return;
// 1. 将自己设置为主节点,并取消和旧主节点的复制和连接
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
// 2. 将旧主节点的槽指定到自己身上
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
clusterDelSlot(j);
clusterAddSlot(myself,j);
}
}
// 3. 更新集群状态,并将状态写入集群配置文件
clusterUpdateState();
clusterSaveConfigOrDie(1);
// 4. 向其它所有的节点发送PONG,告知主节点已被切换
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
// 5. 重置手动故障转移的状态
resetManualFailover();
}
4.5. 广播故障转移/主节点替换结果
当新主节点接管了旧主节点的槽后,会广播PONG
给集群的所有节点,以告知主节点已经被切换。而其它节点接收到消息后,将变更应用到本视角的集群状态上,即:
- 将发送方(新主节点)提升为主节点
- 移除旧主节点和发送方的主从关系
void clusterProcessPacket(clusterLink *link) {
// ...
if (sender) {
// 消息体中,发送方说明自己是主节点
// 而本节点视角下,发送方是从节点
// 那么就移除本视角下旧主节和发送方的主从关系,并把发送方提升为主节点
if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
sizeof(hdr->slaveof)))
{
clusterSetNodeAsMaster(sender);
} else {
// ...
}
}
// ...
}
这样,通过广播,再经过Gossip传播,主从节点切换的变更会被同步到所有节点上。
而对于其它从节点,它们需要和新主节点同步。代码紧跟在上面代码的下方,这里是为了检查槽配置是否和消息中一致,很明显这里是不一致的,因此要更新槽配置:
void clusterProcessPacket(clusterLink *link) {
// ...
clusterNode *sender_master = NULL; /* Sender or its master if slave. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) {
// 这里sender是新提升的主节点
// 但是本视角下并没有为它分配槽,而消息中是有分配槽
// 所以不一致,dirty_slots置1
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
// 因此,后面会进入clusterUpdateSlotsConfigWith函数,更新槽配置关系
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
// ...
}
而更新槽配置的代码如下,可以发现这里处理了故障转移,并初始化了数据同步的状态。它也处理了旧主节点重新上线的情形。下面的注释解释了一些if
条件出现的情形:
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
int j;
// curmaster: 旧主节点
// newmaster: 新主节点,非空的条件:故障转移发生,旧主节点被替换成新主节点,且旧主节点是自己的旧主节的(即本节点和新主节点原本是旧主节点的从节点)
clusterNode *curmaster, *newmaster = NULL;
uint16_t dirty_slots[CLUSTER_SLOTS];
int dirty_slots_count = 0;
curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
if (sender == myself) {
serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
return;
}
// 更新槽信息,将对应槽设置为sender
// 这里消息的configEpoch肯定更大,因为经过新一轮的故障转移
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
if (server.cluster->slots[j] == sender) continue;
if (server.cluster->importing_slots_from[j]) continue;
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
{
// ...
if (server.cluster->slots[j] == curmaster)
// 故障转移时,本节点视角下该槽属于旧主节点,且旧主节点是自己原来的主节点
// 则标记newmaster为sender,即本节点需要把sender设为自己的新主节点
newmaster = sender;
// 更新槽信息
clusterDelSlot(j);
clusterAddSlot(sender,j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
}
}
// ...
// 故障转移情况下,若自己和新主节点(sender)原本是旧主节点的从节点
// 则: newmaster不为空,而curmaster的槽数被减为0
// 因此这里会调用clusterSetMaster函数,设置新主节点同时,初始化数据同步
// 而对于其它主节点,newmaster为空,不会进入该代码块
// 而对于其它主节点的从节点,也是这样
if (newmaster && curmaster->numslots == 0) {
serverLog(LL_WARNING,
"Configuration change detected. Reconfiguring myself "
"as a replica of %.40s", sender->name);
clusterSetMaster(sender);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]);
}
}
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);
if (nodeIsMaster(myself)) {
// 自己是主节点时,则把自己降级为从节点
// 这用于处理旧主节点重新上线的情况
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
// 自己是从节点时,把自己旧主节点的联系移除
if (myself->slaveof)
clusterNodeRemoveSlave(myself->slaveof,myself);
}
myself->slaveof = n;
// 让自己作为sender的从节点
clusterNodeAddSlave(n,myself);
// 初始化数据同步的状态,待replicationCron中开始执行数据同步
replicationSetMaster(n->ip, n->port);
resetManualFailover();
}
从上面的代码可以看到,在更新槽的同时,也更新了主从关系(之前只是删除旧主节点和自己的主从关系):
- 若本节点和新主节点
sender
,原本是旧主节点的从节点:- 设自己的主节点为
sender
- 初始化数据同步
- 设自己的主节点为
- 若本节点是旧的主节点,且刚刚上线:
- 将自己降级,设自己的主节点为
sender
- 初始化数据同步
- 将自己降级,设自己的主节点为
- 其它主节点和下属的从节点:什么都不做
之后,等到数据同步完成,则新的主从关系就正式确立。
至此,故障转移就全部完成了。
4.6. 故障转移的总结
故障转移主要还是分为下面几步:
- 选举新的主节点:基于Raft算法,需要得到其它主节点半数以上的同意
- 新主节点接管旧主节的的槽
- 新主节点广播主从变更
- 其它节点处理变更并更新集群信息:
- 更新本视角下集群中的主从信息和关系
- 更新本视角下的槽位分配信息
- 若本节点和新主节点原本属于旧主节的从节点,或者旧主节点重新上线:建立新的主从联系并初始数据同步(以建立新的主从复制关系)
此外,可以看到,如果同时删掉一个主节点和它所有的从节点,那么这部分已分配的槽将不可用,这会导致Redis集群不可用,执行的命令会响应CLUSTERDOWN
。