源码阅读-Redis集群: 集群(3)

Posted by keys961 on December 5, 2019

1. Overview

Redis Cluster同样也是HA的,即它支持:

  • 主从复制

  • 故障检测
  • 故障转移

下面分别分析这3个功能。

2. 集群主从复制

Redis Cluster要实现HA,复制功能是一切的基础。而集群模式下,复制只支持1层,即不支持级联复制,也就是说,从节点不允许拥有从节点。

开启复制,需要客户端向节点发送CLUSTER REPLICATE <node>命令,节点会把节点node设为自己的主节点,并开始数据同步的流程。设置节点成为节点node的从节点的条件是:

  • 目标节点必须知道节点node
  • 节点node不能是自己,且不能是一个从节点
  • 目标节点若原本是主节点,但被分配了槽,或者存储了数据,则不能设置node成为目标节点的主节点

这一部分在clusterCommand中,下面截取其片段:

void clusterCommand(client *c) {
    // ...
    else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
        // 寻找待设置的主节点
        clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
        // 新主节点必须必须已知
        if (!n) {
            addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
            return;
        }
		// 不能复制自己
        if (n == myself) {
            addReplyError(c,"Can't replicate myself");
            return;
        }
		// 不能将一个从节点设置成自己的主节点(即不能级联复制)
        if (nodeIsSlave(n)) {
            addReplyError(c,"I can only replicate a master, not a replica.");
            return;
        }
        // 若自己是一个主节点,且有被指定槽,或者存储了数据,就不能降级成从节点
        // 不过自己可以切换自己的主节点为另一个
        if (nodeIsMaster(myself) &&
            (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
            addReplyError(c,
                "To set a master the node must be empty and "
                "without assigned slots.");
            return;
        }
        // 设置新的主节点,自己成为它的从节点
        clusterSetMaster(n);
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
        addReply(c,shared.ok);
    } // ...
    // ...
}

从上可知关键设置目标节点成为节点node的从节点,在于函数clusterSetMaster,它主要做的事情如下所示:

void clusterSetMaster(clusterNode *n) {
    serverAssert(n != myself);
    serverAssert(myself->numslots == 0);
    if (nodeIsMaster(myself)) {
        // 若自己原来是主节点,则标记自己是CLUSTER_NODE_SLAVE,并清除importing和migrating状态
        // 这里可以大胆做,因为节点是没有数据的
        myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
        myself->flags |= CLUSTER_NODE_SLAVE;
        clusterCloseAllSlots();
    } else {
        // 若自己原来是从节点,则从原来主节点的从节点表中删除自己
        if (myself->slaveof)
            clusterNodeRemoveSlave(myself->slaveof,myself);
    }
    // 标记自己的主节点是节点node
    myself->slaveof = n;
    // 将自己加入到节点node的从节点表
    clusterNodeAddSlave(n,myself);
    // 初始化同步的状态,供replicationCron函数启动数据同步
    replicationSetMaster(n->ip, n->port);
    // 重置手动故障转移
    resetManualFailover();
}

replicationSetMaster就是初始化数据同步的状态,最主要的是设置server.repl_state = REPL_STATE_CONNECT),在replicationCron中若观察到这个状态,就会开始进行和主节点的数据同步(而数据同步的部分可参考这里):

void replicationSetMaster(char *ip, int port) {
    int was_master = server.masterhost == NULL;

    sdsfree(server.masterhost);
    // 设置新主节点的地址信息
    server.masterhost = sdsnew(ip);
    server.masterport = port;
    if (server.master) {
        freeClient(server.master);
    }
    disconnectAllBlockedClients(); // 关闭阻塞的客户端

    disconnectSlaves(); // 关闭下属从节点的连接
    cancelReplicationHandshake(); // 取消握手
    if (was_master) replicationCacheMasterUsingMyself();
    // 初始化数据同步握手状态,当replicationCron观察到这个状态时开始进行数据同步
    server.repl_state = REPL_STATE_CONNECT;
}

之后的内容之前已经讲过了,不多叙述。而这里,集群的状态只更新到了一个节点上,不够根据Gossip消息传播,这个变更会被同步到其它节点上。

3. 故障检测

集群中的节点会定期向其它部分节点发送PING消息,以判断节点是否出现了下线问题。下线状态和哨兵检测类似,包含:

  • 主观下线
  • 客观下线

3.1. 主观下线检测

和哨兵系统类似,Redis Cluster中的节点会定期向其它节点发送PING,响应PONG。这是Gossip协议的一部分。这类信息不仅可以传播节点槽信息,还可以传播主从状态、节点故障信息等(自己和部分其它节点,参考之前的Redis Gossip部分)。因此故障检测也是基于此。

a) 定期向最有可能故障的节点发送PING

Redis Cluster节点会每隔1s,随机取出5个节点,向1个最有可能发生故障的节点发送PING,这一部分在clusterCron函数中执行:

void clusterCron(void) {
    // ...
    // clusterCron每秒10次,因此这部分每秒1次
    if (!(iteration % 10)) {
        int j;
        // 随机选择5个节点
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            // 忽略断连的,已经发送PING但没收到PONG的
            // (当PING发出后,没收到PONG,之前分析过,ping_sent不为0,而收到PONG后会置0)
            if (this->link == NULL || this->ping_sent != 0) continue;
            // 忽略自己和还没握手完成的
            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
                continue;
            // 挑选收到PONG时间戳最小的,说明现在离收到PONG最久远,也最有可能发生故障
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        // 向最有可能发生故障的节点发送PING
        if (min_pong_node) {
            serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }
    // ...
}

b) 判断和处理主观下线

当节点发送了PING(即ping_sent不为0),但迟迟没有收到PONG时,对方节点就很有可能下线了。因此当满足这个条件时,节点可以判断其主观下线,也即“疑似下线”,给该节点标记CLUSTER_NODE_PFAIL

这部分代码也在clusterCron里:

void clusterCron(void) {
	// ...
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    // 遍历所有已知节点
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;
		// 跳过自己,没地址的和没完成集群握手的节点
        if (node->flags &
            (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
                continue;

        // 若自己是从节点,对应节点是主节点且没下线
        // 判断孤立主节点,统计个数,之后会为孤立主节点迁移一个新的工作的从节点
        // 孤立主节点是:
        // 1. 没下线且被分配了槽的节点,它下属配置有从节点,但从节点全部不可用
        // 2. 它被标记为导出状态
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);
            if (okslaves == 0 && node->numslots > 0 &&
                node->flags & CLUSTER_NODE_MIGRATE_TO) {
                orphaned_masters++;
            }
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }
        // 若节点有连接,连接不是重连的,且发送了PING但没收到PONG,时间超过上限的一半
        // 则先释放连接,可能之后会重连(怀疑网络原因,但节点还是存活的)
        if (node->link && /* is connected */
            now - node->link->ctime >
            server.cluster_node_timeout && /* was not already reconnected */
            node->ping_sent && /* we already sent a ping */
            node->pong_received < node->ping_sent && /* still waiting pong */
            /* and we are waiting for the pong more than timeout/2 */
            now - node->ping_sent > server.cluster_node_timeout/2) {
            freeClusterLink(node->link);
        }
        // 若节点收到了PONG,但是收到的时间距离现在也有超时时限的一半,且之后没发过PING
        // 再发送一次PING,这里一般是握手刚结束后,第一次发送PING
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2) {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }
        // 若自己是主节点,有一个从节点发起故障转移,那么也向该发起请求的节点发送PING
        if (server.cluster->mf_end &&
            nodeIsMaster(myself) &&
            server.cluster->mf_slave == node &&
            node->link) {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        // 到这里,若检测到收到了PONG,跳过
        if (node->ping_sent == 0) continue;

		// 到这里,我们向节点发送了PING,但没收到PONG
        // 计算结果了多少时间
        delay = now - node->ping_sent;
		// 若超时,则标记为CLUSTER_NODE_PFAIL
        if (delay > server.cluster_node_timeout) {
            if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
                serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
                    node->name);
                node->flags |= CLUSTER_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);
    // ...
}

当然这里也做了其它事情,如:

  • 统计孤立主节点个数,之后会从其它主节点的从节点中,为孤立主节点迁移一个从节点
  • 释放回复PONG过慢的连接,这个连接可能会重新建立,使得连接更健壮
  • 节点第一次加入集群后,触发第一次PING(因为MEET完,会收到PONG,但没发过PING
  • 向请求故障转移的从节点发送PING
  • 最后故障检测,将疑似下线的节点标记为CLUSTER_NODE_PFAIL,默认超时15s

3.2. 客观下线检测

和哨兵一样,主观下线不代表一定下线,需要得到集群其它节点的同意/共识,才能让其客观下线。

客观下线检测依旧依赖PING/PONG机制,不过这里利用了Gossip协议的特性,即消息中不仅有自己的状态,也带有其它节点的状态。

在之前的文章里已经提及了Redis的Gossip协议(见此文的3.5.节)。Redis Cluster节点会选择自己已知的1/10的节点,且数量最小是3,将这些信息包在Gossip消息体里,随者PING/PONG发送给对方。而接收方,会在函数clusterProcessGossipSection中,把Gossip消息体的消息抽出来,而在这里,也会做客观下线的检测工作:

void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
    uint16_t count = ntohs(hdr->count);
    // 获取PING/PONG消息的Gossip部分
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
    // 获取发送方节点
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
    // 遍历Gossip部分消息,每个消息都是集群中某一个节点的状态信息
    while(count--) {
        uint16_t flags = ntohs(g->flags);
        clusterNode *node;
        sds ci;
        // ...
        // 获取Gossip消息中对应的节点标识
        node = clusterLookupNode(g->nodename);
        if (node) {
            // 若节点是自己知道的
            // 若节点是主节点,且不是自己,则:
            // 1. 若节点被发送方标记为FAIL/PFAIL,创建和更新错误报告
            // 2. 判断节点是否客观下线
            // 3. 若不满足1,则删除该节点的错误报告,认为其上线
            if (sender && nodeIsMaster(sender) && node != myself) {
                if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
                    if (clusterNodeAddFailureReport(node,sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }
                    markNodeAsFailingIfNeeded(node);
                } else {
                    if (clusterNodeDelFailureReport(node,sender)) {
                        serverLog(LL_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            // 若节点是正常且没有错误报告,且该节点没有等待由PING发出后的PONG
            // 则更新该节点的pong_received时间戳
            if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
                node->ping_sent == 0 &&
                clusterNodeFailureReportsCount(node) == 0) {
                mstime_t pongtime = ntohl(g->pong_received);
                pongtime *= 1000;
                if (pongtime <= (server.mstime+500) &&
                    pongtime > node->pong_received) {
                    node->pong_received = pongtime;
                }
            }

            // 若自己知道该节点,但该节点不可用,且看到该节点地址变化
            // 则更改该节点的地址,释放其连接,等待下一次连接(到新地址)的创建
            if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
                !(flags & CLUSTER_NODE_NOADDR) &&
                !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
                (strcasecmp(node->ip,g->ip) ||
                 node->port != ntohs(g->port) ||
                 node->cport != ntohs(g->cport))) {
                // 释放连接
                if (node->link) freeClusterLink(node->link);
                // 更改地址,并消除CLUSTER_NODE_NOADDR标识
                memcpy(node->ip,g->ip,NET_IP_STR_LEN);
                node->port = ntohs(g->port);
                node->cport = ntohs(g->cport);
                node->flags &= ~CLUSTER_NODE_NOADDR;
            }
        } else {
            // 若该节点自己不知道(即不在本视角集群内),且有地址,不在黑名单内
            // 则将其添加到本视角下的集群中,并准备握手(设置握手前的状态)
            // 握手和黑名单在之前已经说明,这里略
            if (sender &&
                !(flags & CLUSTER_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename)) {
                clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
            }
        }
        g++;  // 遍历到下一条Gossip消息
    }
}

Gossip消息处理这块还是相对简单的,它将PING/PONG携带的Gossip消息解析并遍历,对于每条Gossip消息:

  • 消息中,节点是我知道的

    • 假如节点是主节点,且不是自己
      • 若消息中,节点被发送者打上PFAIL/FAIL(即下线标识),创建或更新错误报告,判断并标记客观下线
      • 否则,删除该节点的错误报告
    • 假如节点是正常的,没有错误报告
      • 若节点此时没有因为发出PING而等待PONG,则更新PONG的接收时间
    • 假如节点之前被标记了PFAIL/FAIL,而消息中节点是上线的,而且地址变化
      • 释放和清理旧连接,等待下一轮事件循环的重连
      • 更新地址信息
      • 消除NOADDR标识
  • 消息中,节点是我不知道的

    • 添加节点到集群(但此时并没有真正加入,只是打了HANDSHAKE标识)
    • 设置并准备开始握手

客观下线的判断,首先需要收集错误报告,当节点被报告为FAIL/PFAIL时,会调用clusterNodeAddFailureReport函数创建一个错误报告到链表中,链表的个数就是集群中认为该节点下线的节点个数

int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
    list *l = failing->fail_reports;
    listNode *ln;
    listIter li;
    clusterNodeFailReport *fr;
	// 首先遍历错误报告链表,若错误报告中已有发送方的报告,更新它即可
    listRewind(l,&li);
    while ((ln = listNext(&li)) != NULL) {
        fr = ln->value;
        if (fr->node == sender) {
            fr->time = mstime();
            return 0;
        }
    }
    // 否则创建一个新的错误报告,附上发送方和时间戳,然后添加到链表fail_reports里
    fr = zmalloc(sizeof(*fr));
    fr->node = sender;
    fr->time = mstime();
    listAddNodeTail(l,fr);
    return 1;
}

而客观下线这块判断,很明显就在函数markNodeAsFailingIfNeeded,易知:

  • 节点客观下线,需要集群中半数以上同意
  • 节点标记为客观下线后,若自己是主节点,需要将其广播到其它节点上,以强制其它节点将该节点标记为FAIL(客观下线)
void markNodeAsFailingIfNeeded(clusterNode *node) {
    int failures;
    // 客观下线需要的票数,即半数以上
    int needed_quorum = (server.cluster->size / 2) + 1;
    if (!nodeTimedOut(node)) return; // 若可达则直接返回
    if (nodeFailed(node)) return; // 已标记为FAIL,直接返回
	// 获取错误报告的个数,即集群中认为该节点下线的节点数
    failures = clusterNodeFailureReportsCount(node);
    if (nodeIsMaster(myself)) failures++; // 若自己是主节点还得将计数+1
    // 若没超过半数,直接返回,不能判断其客观下线
    if (failures < needed_quorum) return; 
    // 到这里,票数超过一半
    serverLog(LL_NOTICE,
        "Marking node %.40s as failing (quorum reached).", node->name);
    // 标记节点为FAIL,即客观下线
    node->flags &= ~CLUSTER_NODE_PFAIL;
    node->flags |= CLUSTER_NODE_FAIL;
    node->fail_time = mstime();
    // 若自己是主节点,将客观下线的消息广播到集群其它节点,以强制
    if (nodeIsMaster(myself)) clusterSendFail(node->name);
    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

最后是广播下线消息,这部分比较简单,看下代码就行。代码里也标记了下线消息的格式,其实就是一个节点名字。广播之后,其它节点就知道该节点客观下线了:

typedef struct {
    char nodename[CLUSTER_NAMELEN]; // 客观下线的节点名
} clusterMsgDataFail;

void clusterSendFail(char *nodename) {
    unsigned char buf[sizeof(clusterMsg)];
    clusterMsg *hdr = (clusterMsg*) buf;
    // 构建FAIL的消息包包头
    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
    // 设置下线节点的名字
    memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
    // 发送给所有集群中的节点
    clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}

4. 故障转移

当节点收到clusterMsgDataFail消息后,会把消息内对应节点设置成客观下线(CLUSTER_NODE_FAIL),并设置下线时间。

而到了下一轮周期时,在clusterCron中,执行故障转移:

void clusterCron(void) {
    // ...
    // 若自己是从节点,则去检查故障并执行故障转移
    if (nodeIsSlave(myself)) {
        // 设置手动故障转移的状态
        clusterHandleManualFailover();
        if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
            // 执行从节点的自动或手动故障转移,从节点获取其主节点的哈希槽,并传播新配置
            clusterHandleSlaveFailover();
        // 若存在孤立节点,且集群中某个主节点有超过1个从节点,且这个主节点就是自己的主节点
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
            // 则给这个孤立主节点分配(通过迁移)一个从节点
            clusterHandleSlaveMigration(max_slaves);
    }
    // ...
}

当自己是从节点时,需要检查自己的主节点是否客观下线,并和其它条件一起,判断自己是否需要执行故障转移。上面的代码主要有3步:

  • 设置手动故障转移的状态
  • 必要时,执行故障转移
  • 为孤立主节点迁移从节点

这里最关键的是第2步,即函数clusterHandleSlaveFailover,它是做故障转移的,它主要可以分为下面几大步:

  • 检查是否能执行故障转移
  • 发起选举,尝试被其它主节点提升为主节点
  • 执行故障转移,并告知其它节点

4.1. 故障转移选举前的检查

由于故障转移需要选出一个新的主节点,这需要选举投票,因此本节点是否有资格执行故障转移,首先需要检查本节点是否有资格发起选举,这部分代码如下:

void clusterHandleSlaveFailover(void) {
    mstime_t data_age;
    // 计算上次选举过去的时间
    mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
    // 自己成为主节点所需要的票数,需要半数以上**主节点**同意
    int needed_quorum = (server.cluster->size / 2) + 1;
    // 手动故障转移的标识
    int manual_failover = server.cluster->mf_end != 0 &&
                          server.cluster->mf_can_start;
    mstime_t auth_timeout, auth_retry_time;

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

    // 投票超时时间: 2*cluster_node_timeout (至少2s)
    // 投票重试超时时间: 2*auth_timeout (至少4s)
    auth_timeout = server.cluster_node_timeout*2;
    if (auth_timeout < 2000) auth_timeout = 2000;
    auth_retry_time = auth_timeout*2;

    // 自己执行故障转移函数的条件:
    // 1. 自己是从节点
    // 2. 自己的主节点被标记为客观下线,或者被设置了手动故障转移
    // 3. 自己的主节点有负责槽位
    if (nodeIsMaster(myself) ||
        myself->slaveof == NULL ||
        (!nodeFailed(myself->slaveof) && !manual_failover) ||
        (server.cluster_slave_no_failover && !manual_failover) ||
        myself->slaveof->numslots == 0)
    {
        // 不满足条件,则置失败原因为CLUSTER_CANT_FAILOVER_NONE
        server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
        return; // 直接返回
    }
    
    // 计算自己和主节点连接断开/没有交互的时间,这里需要减去超时时间
    if (server.repl_state == REPL_STATE_CONNECTED) {
        data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
                   * 1000;
    } else {
        data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
    }
    if (data_age > server.cluster_node_timeout)
        data_age -= server.cluster_node_timeout;

    // 判断这个从节点是不是比较新,若不是很新,则不能执行故障转移
    // 手动故障转移除外
    if (server.cluster_slave_validity_factor &&
        data_age >
        (((mstime_t)server.repl_ping_slave_period * 1000) +
         (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
    {
        if (!manual_failover) {
            clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
            return;
        }
    }
    // ...
}

若自己要发起选举,首先需要满足下面的条件:

  • 自己必须是从节点
  • 自己的主节点必须客观下线/手动设置了故障转移
  • 自己的主节点原先被分配了槽位
  • 若配置了cluster-slave-validity-factor,则还需要自己的数据足够新

4.2. 发起选举的准备

若通过4.1.,那么本节点就有资格发起选举。而选举是基于Raft算法,下面的代码可以看到很多Raft算法的影子。

这时候需要更新选举开始时间,当到达这个时间后,才能执行后续的选举流程。

void clusterHandleSlaveFailover(void) {
    // ...
    // 若上次尝试选举超时,且重试时间已过
    // 则设置下一次选举的时间
    if (auth_age > auth_retry_time) {
        // 下一次选举时间设置(类似Raft的超时时间)
        server.cluster->failover_auth_time = mstime() +
            500 + 
            random() % 500; // Raft算法中的timeout/delay是随机的
        server.cluster->failover_auth_count = 0; // 收到的票数
        server.cluster->failover_auth_sent = 0; // 是否发起投票
        server.cluster->failover_auth_rank = clusterGetSlaveRank(); // 本节点的rank
        server.cluster->failover_auth_time +=
            server.cluster->failover_auth_rank * 1000;
        // 这里是手动故障转移
        if (server.cluster->mf_end) {
            server.cluster->failover_auth_time = mstime();
            server.cluster->failover_auth_rank = 0;
	    	clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }
        // ...
        // 将PONG发给其它从节点,带有自己的复制偏移量
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }

    // 若没有开始请求故障转移的投票
    // 则获取本节点的最新rank,因为其它心跳包携带的复制偏移量会影响本节点的rank
    // 若新rank靠后,那么还得延迟故障转移开始的时间,并更新到server.cluster->failover_auth_rank
    if (server.cluster->failover_auth_sent == 0 &&
        server.cluster->mf_end == 0)
    {
        int newrank = clusterGetSlaveRank();
        if (newrank > server.cluster->failover_auth_rank) {
            long long added_delay =
                (newrank - server.cluster->failover_auth_rank) * 1000;
            server.cluster->failover_auth_time += added_delay;
            server.cluster->failover_auth_rank = newrank;
            serverLog(LL_WARNING,
                "Replica rank updated to #%d, added %lld milliseconds of delay.",
                newrank, added_delay);
        }
    }
    // 如果还没有到故障转移选举的时间,直接返回
    if (mstime() < server.cluster->failover_auth_time) {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
        return;
    }
    // 如果距离本轮发起故障转移投票的时间过了太久,那么也不执行故障转移,直接返回
    if (auth_age > auth_timeout) {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
        return;
    }
    // ...
}

这里总结一下:

  • auth_age > auth_retry_time(超时且重试时间已过),那么表示这一次故障转移超时,那么:
    • 设置下一轮新的故障转移开始时间(随机延迟,这是Raft规定的),并初始化其它字段,准备请求新一轮的投票
    • 向其它从节点发送PONG,并携带自己的复制偏移,用于给其它从节点计算rank
    • 返回,等待下一轮事件循环以发起投票
  • 更新并处理rank,若rank靠后了,则需要延后下一次请求投票的时间(因为Raft中,延后发起投票将有更小几率成为主节点)
  • 若当前没到发起选举的时间,直接返回
  • 若发起投票后超时,也直接返回,放弃故障转移(即不成为主节点,成为从节点)

总体而言出现了很多Raft的影子。

4.3. 发起投票选举

4.3.1. 发起投票请求

一轮一轮事件循环后,执行到这里,说明已经到了发起投票选举的时候,这时候本节点会发起一轮选举(即成为candidate,发起投票,等同于Raft的requestVote):

void clusterHandleSlaveFailover(void) {
    // ...
    // 这时候,自己成为candidate,发起新一轮投票
    if (server.cluster->failover_auth_sent == 0) {
        // 根据Raft, epoch要自增
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
            (unsigned long long) server.cluster->currentEpoch);
        // 向所有节点发送FAILOVE_AUTH_REQUEST消息,类似Raft论文中的requestVote RPC
        clusterRequestFailoverAuth();
        // 标记本节点已经发起了投票请求
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
        // 这里直接返回,下一轮事件循环是不会进入这里,因为failover_auth_sent == 1
        return; 
    }
    // 下面是处理投票响应...
    // ...
}

而发起投票会发给其它所有的从节点,其消息会包含CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST标识(clusterMsg->type标识)。

void clusterRequestFailoverAuth(void) {
    unsigned char buf[sizeof(clusterMsg)];
    clusterMsg *hdr = (clusterMsg*) buf;
    uint32_t totlen;
    // 构建消息头,打上CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST标识
    // 类似表示一个Raft的requestVote请求
    clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
    // 若是手动故障转移,则还需要设置CLUSTERMSG_FLAG0_FORCEACK
    // 表示即使主节点在线,也要认证故障转移
    if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    hdr->totlen = htonl(totlen);
    // 广播投票请求给所有节点
    clusterBroadcastMessage(buf,totlen);
}

注意,发起投票请求后,函数就直接返回了。因此等待投票响应需要等待下一轮的事件循环,这部分可看上面代码注释的解释。

4.3.2. 响应投票

其它从节点收到投票请求(即Raft的requestVote)后,需要判断是否把票投给它,把投票的结果返回给对方。

这里处理的函数是clusterSentFailoverAuthIfNeeded,代码如下:

void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    clusterNode *master = node->slaveof;
    uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
    uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
    unsigned char *claimed_slots = request->myslots;
    int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
    int j;
    // 若自己是从节点,或者自己负责的槽数为0,则无权投票
    if (nodeIsSlave(myself) || myself->numslots == 0) return;
    // 请求epoch小于自己维护的epoch,不投票给对方,直接返回
    if (requestCurrentEpoch < server.cluster->currentEpoch) {
        serverLog(LL_WARNING,
            "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
            node->name,
            (unsigned long long) requestCurrentEpoch,
            (unsigned long long) server.cluster->currentEpoch);
        return;
    }
    // 若本轮epoch我已经投过票了,也拒绝给对方投票,直接返回
    if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: already voted for epoch %llu",
                node->name,
                (unsigned long long) server.cluster->currentEpoch);
        return;
    }
    // 拒绝给下面这些情况的投票:
    // 1. 请求的节点是主节点
    // 2. 请求的主节点未知
    // 3. 请求的主节点没客观下线,且没设置手动故障转移
    if (nodeIsMaster(node) || master == NULL ||
        (!nodeFailed(master) && !force_ack)) {
        if (nodeIsMaster(node)) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: it is a master node",
                    node->name);
        } else if (master == NULL) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: I don't know its master",
                    node->name);
        } else if (!nodeFailed(master)) {
            serverLog(LL_WARNING,
                    "Failover auth denied to %.40s: its master is up",
                    node->name);
        }
        return;
    }
    // 在cluster_node_timeout * 2时间内,拒绝再次投票,这不影响正确性
    if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2) {
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: "
                "can't vote about this master before %lld milliseconds",
                node->name,
                (long long) ((server.cluster_node_timeout*2)-
                             (mstime() - node->slaveof->voted_time)));
        return;
    }
	// 检查请求从节点负责的槽的config epoch
    // 若请求的config epoch小于本节点视角下槽的config epoch
    // 则也拒绝投票,理由和之前判断epoch一样
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(claimed_slots, j) == 0) continue;
        if (server.cluster->slots[j] == NULL ||
            server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
        {
            continue;
        }
        serverLog(LL_WARNING,
                "Failover auth denied to %.40s: "
                "slot %d epoch (%llu) > reqEpoch (%llu)",
                node->name, j,
                (unsigned long long) server.cluster->slots[j]->configEpoch,
                (unsigned long long) requestConfigEpoch);
        return;
    }

    // 到这里,节点可以给这个从节点投票
    // 更新lastVoteEpoch, voted_time
    // 并响应FAILOVER_AUTH_ACK消息,把票给这个请求的从节点
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
    node->slaveof->voted_time = mstime();
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
    clusterSendFailoverAuth(node);
    serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
        node->name, (unsigned long long) server.cluster->currentEpoch);
}

总结一下:

  • 若要投票权,则需要:
    • 自己是主节点
    • 需要负责一定数量的槽
  • 是否要投票给请求的节点,投票会带有FAILOVER_AUTH_ACK标记:
    • 请求的epoch要大于本视角下集群的epoch(小于:过期;等于:已投过票)
    • 请求的节点不能是:
      • 主节点
      • 其主节点未知
      • 没设置手动故障转移下,其主节点没有客观下线
    • 本次请求时间距离上一次投票在2s以外
    • 请求的configEpoch不小于其主节点负责槽的configEpoch(小于:过期)

可见,上面的投票规则,尽管有一定的不同,但实现思想和Raft基本类似。

Raft也是通过epochlogTermlogIndexvoteFor决定是否投票:

  • Raft的epoch对应上面的currentEpoch
  • Raft的voteFor对应上面currentEpoch相等的情况(这里投票方不需要知道选出来的主节点到底是谁,所以省略了voteFor字段)
  • Raft的logTermlogIndex对应上面槽的configEpoch

标准的Raft论文实现,可参考这里的第6节。

4.3.3. 处理投票响应

发起投票后,需要处理对方投票的响应。

由4.2.可知,若对方投票给自己,则会收到响应,否则就收不到响应。所以,当收到投票响应时,只要给当前epoch的投票计数+1即可。这部分仍然在clusterProcessPacket函数执行:

int clusterProcessPacket(clusterLink *link) {
    // ...
    else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        if (!sender) return 1;  // 忽略未知发送方
        // 若投票方是主节点,且该节点负责一定数量的槽,投票响应的epoch不小于自己的epoch
        // 则自己认可这次投票,直接给failover_auth_count计数加1,用于统计票数
        if (nodeIsMaster(sender) && sender->numslots > 0 &&
            senderCurrentEpoch >= server.cluster->failover_auth_epoch) {
            server.cluster->failover_auth_count++;
            clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }
    } 
    // ...
}

4.4. 执行故障转移/主节点替换

当投票请求发出,并接收到足够多的投票后,就可以判断自己是否可以成为主节点,从而执行故障转移。

这部分代码就在4.1.代码的后面,在发起投票请求的后几轮事件循环执行:

  • 若本轮检查,得到的票数超过集群主节点的半数,则
    • 更新configEpoch
    • 执行故障转移,接管旧主节点的槽
  • 若检查,票数不够,则不能执行故障转移,需要等待下一轮的票数检查
void clusterHandleSlaveFailover(void) {
 	// ... 上面是发起投票 ...
    // ... 下面是处理投票响应 ...
    // 若成为新主节点,则需要得到集群其它半数以上主节点的统一
    if (server.cluster->failover_auth_count >= needed_quorum) {
        serverLog(LL_WARNING,
            "Failover election won: I'm the new master.");
        // 更新configEpoch(若更大)
        if (myself->configEpoch < server.cluster->failover_auth_epoch) {
            myself->configEpoch = server.cluster->failover_auth_epoch;
            serverLog(LL_WARNING,
                "configEpoch set to %llu after successful failover",
                (unsigned long long) myself->configEpoch);
        }
        // 接管旧主节点的槽,即执行故障转移
        clusterFailoverReplaceYourMaster();
    } else {
        clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
    }
}

从上面可见,当票数过半时,就可以执行故障转移了。故障转移的逻辑就在函数clusterFailoverReplaceYourMaster中,主要的动作是:

  • 取消和原主节点的复制,设置自己为主节点
  • 接管原主节点的槽
  • 更新集群状态,并写入文件
  • 广播PONG,告知所有节点主节点已经切换
  • 重置手动故障转移
void clusterFailoverReplaceYourMaster(void) {
    int j;
    clusterNode *oldmaster = myself->slaveof;

    if (nodeIsMaster(myself) || oldmaster == NULL) return;

    // 1. 将自己设置为主节点,并取消和旧主节点的复制和连接
    clusterSetNodeAsMaster(myself);
    replicationUnsetMaster();
    // 2. 将旧主节点的槽指定到自己身上
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (clusterNodeGetSlotBit(oldmaster,j)) {
            clusterDelSlot(j);
            clusterAddSlot(myself,j);
        }
    }
    // 3. 更新集群状态,并将状态写入集群配置文件
    clusterUpdateState();
    clusterSaveConfigOrDie(1);
    // 4. 向其它所有的节点发送PONG,告知主节点已被切换
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
    // 5. 重置手动故障转移的状态
    resetManualFailover();
}

4.5. 广播故障转移/主节点替换结果

当新主节点接管了旧主节点的槽后,会广播PONG给集群的所有节点,以告知主节点已经被切换。而其它节点接收到消息后,将变更应用到本视角的集群状态上,即:

  • 将发送方(新主节点)提升为主节点
  • 移除旧主节点和发送方的主从关系
void clusterProcessPacket(clusterLink *link) {
    // ...
    	if (sender) {
            // 消息体中,发送方说明自己是主节点
            // 而本节点视角下,发送方是从节点
            // 那么就移除本视角下旧主节和发送方的主从关系,并把发送方提升为主节点
            if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
                sizeof(hdr->slaveof)))
            {
                clusterSetNodeAsMaster(sender);
            } else {
                // ...
            }
        }
    // ...
}

这样,通过广播,再经过Gossip传播,主从节点切换的变更会被同步到所有节点上。

而对于其它从节点,它们需要和新主节点同步。代码紧跟在上面代码的下方,这里是为了检查槽配置是否和消息中一致,很明显这里是不一致的,因此要更新槽配置:

void clusterProcessPacket(clusterLink *link) {
 	// ...
    	clusterNode *sender_master = NULL; /* Sender or its master if slave. */
        int dirty_slots = 0; /* Sender claimed slots don't match my view? */

        if (sender) {
            // 这里sender是新提升的主节点
            // 但是本视角下并没有为它分配槽,而消息中是有分配槽
            // 所以不一致,dirty_slots置1
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {
                dirty_slots = memcmp(sender_master->slots,
                        hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }

        // 因此,后面会进入clusterUpdateSlotsConfigWith函数,更新槽配置关系
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
    // ...
}

而更新槽配置的代码如下,可以发现这里处理了故障转移,并初始化了数据同步的状态。它也处理了旧主节点重新上线的情形。下面的注释解释了一些if条件出现的情形:

void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
    int j;
    // curmaster: 旧主节点
    // newmaster: 新主节点,非空的条件:故障转移发生,旧主节点被替换成新主节点,且旧主节点是自己的旧主节的(即本节点和新主节点原本是旧主节点的从节点)
    clusterNode *curmaster, *newmaster = NULL;
    uint16_t dirty_slots[CLUSTER_SLOTS];
    int dirty_slots_count = 0;
    curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;

    if (sender == myself) {
        serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
        return;
    }
	// 更新槽信息,将对应槽设置为sender
    // 这里消息的configEpoch肯定更大,因为经过新一轮的故障转移
    for (j = 0; j < CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(slots,j)) {
            if (server.cluster->slots[j] == sender) continue;
            if (server.cluster->importing_slots_from[j]) continue;
            if (server.cluster->slots[j] == NULL ||
                server.cluster->slots[j]->configEpoch < senderConfigEpoch)
            {
                // ...
                if (server.cluster->slots[j] == curmaster)
                    // 故障转移时,本节点视角下该槽属于旧主节点,且旧主节点是自己原来的主节点
                    // 则标记newmaster为sender,即本节点需要把sender设为自己的新主节点
                    newmaster = sender;
                // 更新槽信息
                clusterDelSlot(j);
                clusterAddSlot(sender,j);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE|
                                     CLUSTER_TODO_FSYNC_CONFIG);
            }
        }
    }
	// ...
    // 故障转移情况下,若自己和新主节点(sender)原本是旧主节点的从节点
    // 则: newmaster不为空,而curmaster的槽数被减为0
    // 因此这里会调用clusterSetMaster函数,设置新主节点同时,初始化数据同步
    // 而对于其它主节点,newmaster为空,不会进入该代码块
    // 而对于其它主节点的从节点,也是这样
    if (newmaster && curmaster->numslots == 0) {
        serverLog(LL_WARNING,
            "Configuration change detected. Reconfiguring myself "
            "as a replica of %.40s", sender->name);
        clusterSetMaster(sender);
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
    } else if (dirty_slots_count) {
        for (j = 0; j < dirty_slots_count; j++)
            delKeysInSlot(dirty_slots[j]);
    }
}

void clusterSetMaster(clusterNode *n) {
    serverAssert(n != myself);
    serverAssert(myself->numslots == 0);
    if (nodeIsMaster(myself)) {
        // 自己是主节点时,则把自己降级为从节点
        // 这用于处理旧主节点重新上线的情况
        myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
        myself->flags |= CLUSTER_NODE_SLAVE;
        clusterCloseAllSlots();
    } else {
        // 自己是从节点时,把自己旧主节点的联系移除
        if (myself->slaveof)
            clusterNodeRemoveSlave(myself->slaveof,myself);
    }
    myself->slaveof = n;
    // 让自己作为sender的从节点
    clusterNodeAddSlave(n,myself);
    // 初始化数据同步的状态,待replicationCron中开始执行数据同步
    replicationSetMaster(n->ip, n->port);
    resetManualFailover();
}

从上面的代码可以看到,在更新槽的同时,也更新了主从关系(之前只是删除旧主节点和自己的主从关系):

  • 若本节点和新主节点sender,原本是旧主节点的从节点:
    • 设自己的主节点为sender
    • 初始化数据同步
  • 若本节点是旧的主节点,且刚刚上线:
    • 将自己降级,设自己的主节点为sender
    • 初始化数据同步
  • 其它主节点和下属的从节点:什么都不做

之后,等到数据同步完成,则新的主从关系就正式确立。

至此,故障转移就全部完成了。

4.6. 故障转移的总结

故障转移主要还是分为下面几步:

  • 选举新的主节点:基于Raft算法,需要得到其它主节点半数以上的同意
  • 新主节点接管旧主节的的槽
  • 新主节点广播主从变更
  • 其它节点处理变更并更新集群信息:
    • 更新本视角下集群中的主从信息和关系
    • 更新本视角下的槽位分配信息
    • 若本节点和新主节点原本属于旧主节的从节点,或者旧主节点重新上线:建立新的主从联系并初始数据同步(以建立新的主从复制关系)

此外,可以看到,如果同时删掉一个主节点和它所有的从节点,那么这部分已分配的槽将不可用,这会导致Redis集群不可用,执行的命令会响应CLUSTERDOWN