源码阅读-Redis集群: 哨兵

Posted by keys961 on November 21, 2019

1. Overview

哨兵(Sentinel)是Redis HA的一个解决方案。

它由一个或多个节点构成,组成哨兵系统,主要作用是监控故障转移

  • 监视多个主节点及其下属从节点的状态
  • 当主节点下线后,选取一个从节点,将其升级为主节点
  • 当主节点重新上线后,将其降级为从节点

本文主要分析哨兵是如何进行节点监控和故障转移的。

2. 哨兵的启动

2.1. 初始化服务器配置

哨兵本质上是Redis服务器的特殊模式,因此第一步还是需要初始化一个普通的Redis服务器,这在之前的文章中已经说明,如:

  • 初始化一些基本配置(如环境变量等),这一步它会标记服务器为“哨兵”(checkForSentinelMode函数)
  • 初始化服务器配置默认值(initServerConfig函数)
  • 初始化模块

这些和普通Redis服务器启动并无大差别。

2.2. 初始化哨兵状态

下面的代码初始化哨兵的配置(并没有加载配置文件的配置):

// In main (server.c)
if (server.sentinel_mode) {
    initSentinelConfig();
    initSentinel();
}

其中initSentinelConfig函数,它只是初始化哨兵的配置:

  • 设置端口为REDIS_SENTINEL_PORT(26379)
  • 关闭保护模式

initSentinel()函数初始化了哨兵的状态,包括:

  • 只能用于哨兵的命令
  • 其它数据结构,如主节点列表等

哨兵可用的命令被记录在sentinelcmds[]数组里,如下所示:

struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
    {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
    {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
};

而哨兵状态存储在struct sentinelState结构中:

struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; // 哨兵ID
    uint64_t current_epoch; // 当前epoch号,用于故障转移
    dict *masters; // 监视的主节点字典,键是实例名,值是sentinelRedisInstance指针
    int tilt;  // 是否进入TILT模式(后面会说)
    int running_scripts; // 正在执行的脚本数量
    mstime_t tilt_start_time; // 开启TILT模式的时间
    mstime_t previous_time; // 上一次执行时间处理器的时间
    list *scripts_queue; // 脚本队列
    char *announce_ip; // 哨兵间通信的IP地址,空则是自动发现
    int announce_port; // 哨兵间通信的端口,0则是自动发现   
    unsigned long simfailure_flags; // 故障模拟标志
    int deny_scripts_reconfig; // 是否允许哨兵在运行时修改脚本位置
} sentinel;

2.3. 加载哨兵配置

之后,和普通Redis服务器一样,从配置文件中加载配置。

对于哨兵,它的配置加载主要调用sentinelHandleConfiguration函数,它主要做的事情是:

  • 加载并设置本哨兵的配置,如announce_ip,announce_port,deny_scripts_reconfig
  • 初始化监视的主节点字典masters

这里最主要的是第二步。它根据配置,创建sentinelRedisInstance实例,并添加到masters字典中。

char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;
    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        // monitor <name> <host> <port> <quorum> 
        // 监视某个MASTER节点
        // 其中quorum代表: 该节点**客观宕机**判断需要至少quorum个哨兵同意
        int quorum = atoi(argv[4]);
        // ...
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL) {
            // ...
        }
    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
        // down-after-milliseconds <name> <milliseconds>
        // 哨兵定期向节点发送PING判断状态,若无正常响应则认为不可达
        // 当不可达时间超过milliseconds,则认为该节点**主观宕机**
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->down_after_period = atoi(argv[2]);
        // ...
        // 把该配置传播到该MASTER下属的SLAVE以及连接到它的哨兵
        sentinelPropagateDownAfterPeriod(ri);
    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
        // failover-timeout <name> <milliseconds>
        // 配置name节点的故障转移超时时间
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->failover_timeout = atoi(argv[2]);
        // ...
   } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
        // parallel-syncs <name> <nums>
        // 当新MASTER出来后,SLAVE会对MASTER发起数据同步
        // 这需要限制同步的数量,由nums决定
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->parallel_syncs = atoi(argv[2]);
   } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
        // notification-script <name> <path> 
        // 配置name节点的通知脚本,当出现故障时,脚本被触发,通知外部
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->notification_script = sdsnew(argv[2]);
   } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
        // client-reconfig-script <name> <path>
        // 配置故障转移成功的通知脚本,通知客户端
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->client_reconfig_script = sdsnew(argv[2]);
   } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
        // auth-pass <name> <password> 
        // 配置监控主节点的验证信息
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->auth_pass = sdsnew(argv[2]);
    } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
        // current-epoch <epoch> 
        // 配置当前哨兵的epoch(可认为是版本,用于故障转移)
        unsigned long long current_epoch = strtoull(argv[1],NULL,10);
        if (current_epoch > sentinel.current_epoch)
            sentinel.current_epoch = current_epoch;
    } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
        // 解析哨兵的ID
        // ...
        memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
    } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
        // config-epoch <name> <epoch> 
        // 配置监控节点的epoch
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->config_epoch = strtoull(argv[2],NULL,10);
        if (ri->config_epoch > sentinel.current_epoch)
            sentinel.current_epoch = ri->config_epoch;
    } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
        // leader-epoch <name> <epoch> 
        // Leader的epoch,这里Leader是哨兵集群的Leader
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        ri->leader_epoch = strtoull(argv[2],NULL,10);
    } else if ((!strcasecmp(argv[0],"known-slave") ||
                !strcasecmp(argv[0],"known-replica")) && argc == 4) {
        sentinelRedisInstance *slave;
        // known-replica <name> <ip> <port> 
        // 显式添加SLAVE节点的信息到名字为name的MASTER节点实例上
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
                    atoi(argv[3]), ri->quorum, ri)) == NULL) {
            // ...
        }
    } else if (!strcasecmp(argv[0],"known-sentinel") &&
               (argc == 4 || argc == 5)) {
        sentinelRedisInstance *si;
        if (argc == 5) {
            // known-sentinel <name> <ip> <port> [runid]
            // 添加其它哨兵信息到名字为name的MASTER节点实例上
            ri = sentinelGetMasterByName(argv[1]);
            // ...
            if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
                        atoi(argv[3]), ri->quorum, ri)) == NULL) {
               // ...
            }
            si->runid = sdsnew(argv[4]);
            // 尝试与该哨兵节点共享监控对象(instanceLink)
            sentinelTryConnectionSharing(si);
        }
    } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
        // rename-command <name> <command> <renamed-command> 
        // 配置名字为name的主节点的命令别名
        ri = sentinelGetMasterByName(argv[1]);
        // ...
        sds oldcmd = sdsnew(argv[2]);
        sds newcmd = sdsnew(argv[3]);
        if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
            // ...
        }
    } 
    // ... 配置哨兵的其它参数,如哨兵自己的IP和端口等等
    return NULL;
}

这边再解析一下创建sentinelRedisInstance的过程,熟悉一下它有哪些字段:

sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    sentinelAddr *addr;
    dict *table = NULL;
    char slavename[NET_PEER_ID_LEN], *sdsname;
    // ...
    // 检查地址
    addr = createSentinelAddr(hostname,port);
    if (addr == NULL) return NULL;
	// 若实例是SLAVE,那么名字就是ip:port
    if (flags & SRI_SLAVE) {
        anetFormatAddr(slavename, sizeof(slavename), hostname, port);
        name = slavename;
    }
	// 确定需要添加的字典表
    // 若是MASTER,则是sentinel.masters
    // 若是SLAVE,则是参数中给的master->slaves,即绑定在master上
    // 若是SENTINEL,则是参数中给的master->sentinals,也是绑定在master上
    if (flags & SRI_MASTER) table = sentinel.masters;
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    sdsname = sdsnew(name);
    if (dictFind(table,sdsname)) {
        // 判重
        releaseSentinelAddr(addr);
        sdsfree(sdsname);
        errno = EBUSY;
        return NULL;
    }

    // 创建新的sentinelRedisInstance
    ri = zmalloc(sizeof(*ri));
    ri->flags = flags; // 给的状态,MASTER/SLAVE/SENTINEL
    ri->name = sdsname; // 实例name
    ri->runid = NULL; // 实例runid
    ri->config_epoch = 0; // 实例的epoch
    ri->addr = addr; // 实例地址
    ri->link = createInstanceLink(); // 创建一个disconnected的连接对象,维护一个初始连接状态
    ri->last_pub_time = mstime(); // 最后一次通过PUB/SUB发送hello的时间
    ri->last_hello_time = mstime(); // 最后一次从PUB/SUB收到hello的时间,当SENTINEL才使用
    ri->last_master_down_reply_time = mstime(); // 最后一次主节点回复下线的时间
    ri->s_down_since_time = 0; // 主观下线时间
    ri->o_down_since_time = 0; // 客观下线时间
    ri->down_after_period = master ? master->down_after_period :
                            SENTINEL_DEFAULT_DOWN_AFTER; // 实例没有响应多少毫秒后,被判断为**主观**宕机
    ri->info_refresh = 0; // 获取INFO信息的时间
    ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL); // 命令别名字典
    
    /* Master Specific */
    ri->auth_pass = NULL; // 认证密码
    ri->sentinels = dictCreate(&instancesDictType,NULL); // 连接该实例的哨兵
    ri->slaves = dictCreate(&instancesDictType,NULL); // 连接该实例的从节点
    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS; // 故障转移时,可同时对新主节点进行同步的从节点个数
    ri->quorum = quorum; // 判断主节点**客观**宕机的
    /* Slave Specific */
    ri->master_link_down_time = 0; // 从节点复制操作断开时间
    ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; // 从节点优先权
    ri->slave_reconf_sent_time = 0; // 故障转移时从节点发送SLAVEOF命令的时间
    ri->slave_master_host = NULL; // 从节点对应主节点的地址
    ri->slave_master_port = 0; // 从节点对应主节点的端口
    ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN; // 从节点连接主节点的状态,默认没有连接
    ri->slave_repl_offset = 0; // 从节点复制的偏移量
    ri->master = master; // 从节点对应的主节点实例

    /* Failover state. */
    ri->leader = NULL; // 若本实例是主节点,那么该值就是执行故障转移的哨兵runid;
    				   // 若本实例是哨兵,那么该值是哨兵集群选出来的Leader的runid
    ri->leader_epoch = 0; // 选举Leader的epoch
    ri->failover_epoch = 0; // 执行故障转移的epoch
    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; // 故障转移状态
    ri->failover_state_change_time = 0; // 故障转移状态改变的时间
    ri->failover_start_time = 0; // 故障转移开始的时间
    ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT; // 故障转移超时时间
    ri->failover_delay_logged = 0; // 故障转移延迟时间
    ri->promoted_slave = NULL; // 晋升为新主节点的从节点实例
    
    /* Script */
    ri->notification_script = NULL; // 警告触发脚本
    ri->client_reconfig_script = NULL; // 故障转移成功脚本
    ri->info = NULL; // 缓存INFO命令输出

    /* Role */
    ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE); // 实例的角色(这里只维护MASTER或SLAVE)
    ri->role_reported_time = mstime(); // 角色变更时间
    ri->slave_conf_change_time = mstime(); // 最后一次从节点地址变更时间

    // 把信息添加到对应的字典表中
    dictAdd(table, ri->name, ri);
    return ri;
}

2.4. 初始化整个服务器

这里和普通Redis服务器一样,首先配置/启动监视和守护(后台),然后调用initServer函数初始化整个服务器,比如:

  • 创建事件循环
  • 监听端口
  • 初始化数据库及其内部信息
  • 初始化统计值
  • 注册时间事件(serverCron)和文件事件(Socket、模块等回调)

这部分和普通Redis服务器差不多。不过哨兵有些东西不需要执行,如AOF、集群初始化等等。

2.5. 启动哨兵

哨兵和普通Redis服务器一样,都以启动事件循环代表服务启动。不过在启动事件循环前,它们之间有一些不同:

  • 普通Redis服务器:加载数据和模块
  • 哨兵:调用sentinelIsRunning函数使哨兵就绪

这里sentinelIsRunning函数让哨兵节点准备就绪,具体如下:

void sentinelIsRunning(void) {
    int j;
    // ...
    // 若sentinal配置文件中没有添加ID,则随机生成一个并保存到配置文件里
    for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
        if (sentinel.myid[j] != 0) break;
    if (j == CONFIG_RUN_ID_SIZE) {
        getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
        sentinelFlushConfig();
    }
    serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
    // 启动时,生成一个+monitor事件,它会把该事件通过PUB/SUB发布,并调用警告脚本进行通知
    sentinelGenerateInitialMonitorEvents();
}

至此,哨兵节点就启动了。

3. 哨兵事件

哨兵启动后,哨兵的任务是由serverCron发起的,注意下面的代码:

// in serverCron()
if (server.sentinel_mode) sentinelTimer(); // 执行哨兵任务

可见执行哨兵任务的函数是sentinelTimer,它大致的任务如下:

void sentinelTimer(void) {
    // 1. 检查TILT模式,更新最近一次执行哨兵任务的事件
    sentinelCheckTiltCondition();
    // 2. 对哨兵监控的所有主节点递归进行检测(这一步包含连接建立)
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    // 3. 运行队列中的脚本
    sentinelRunPendingScripts();
    // 4. 将已执行完的脚本移出队列,重新执行出错的脚本
    sentinelCollectTerminatedScripts();
    // 5. 取消执行超时的脚本,这些脚本会在下一个事件循环中于第4步再次执行
    sentinelKillTimedoutScripts();

	// 这里更改server.hz,调整执行哨兵任务的频率
    // 这是为了选Leader服务的,它可以降低选主选不到的概率
    // 这和Raft的思想一致, Raft选Leader也是采用随机超时的机制
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

3.1. 检查TITL

a) TITL模式

TITL是哨兵的保护模式。

由于哨兵严重依赖于系统时间,当系统时间被调整,或因为某种原因哨兵任务产生阻塞(如高负载、I/O阻塞、进程停止等),哨兵行为就不可预知。

这时候哨兵会进入保护模式(TITL),它:

  • 只定期发送命令,收集信息
  • 不做其它实质性动作(如故障转移)
  • 向哨兵询问节点是否宕机(SENTINEL is-master-down-by-addr)会返回负值,告诉外部判断不准确

默认情况下,进入TITL模式30s后,哨兵任务恢复了正常,那么会退出TITL模式。

b) TITL检查

这里是sentinelCheckTiltCondition函数,总体很简单,根据a)中的规则判断是否进入TITL模式:

void sentinelCheckTiltCondition(void) {
    mstime_t now = mstime();
    // 计算现在和上次执行sentinel任务的时间差
    mstime_t delta = now - sentinel.previous_time;
	// 若: 
    // 1. delta<0,说明时间被调整了
    // 2. delta>SENTINEL_TITL_TRIGGER(2000ms),说明上一次sentinel任务执行慢/阻塞
    // 这些情况下需要进入TITL保护模式
    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
        sentinel.tilt = 1; // 启动保护模式
        sentinel.tilt_start_time = mstime(); // 记录启动保护模式时间
        sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); // 触发+titl事件
    }
    // 设置当前这次sentinel任务的时间
    sentinel.previous_time = mstime();
}

3.2. 对所有主节点递归地监控和检查

这里调用sentinelHandleDictOfRedisInstances,对所有主节点及其下属从节点和哨兵节点进行检查,且是递归的:

void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;
    di = dictGetIterator(instances);
    // 遍历所有监控对象实例(根调用是所有的主节点)
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
		// 这里对监控对象实例执行监控和检查
        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            // 对于主节点,还得递归处理下属的从节点和连接的哨兵节点
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            // 若当前实例处于完成故障转移的状态,且所有从节点完成了新主节点的同步
            // 则设置switch_to_promited标识
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        // 若设置switch_to_promoted标识,
        // 则用新晋升的主节点替代原来的主节点,旧主节点成为新主节点的从节点
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

而执行节点实例的监控操作在函数sentinelHandleRedisInstance中,代码如下,它主要做的是:

  • 所有实例
    • 建立通信
    • 发送PINGINFOPUBLISH命令
    • 检测节点实例是否“主观”宕机
  • 主节点实例
    • 检查是否“客观”下线
    • 判断节点是否需要进行故障转移,若需要强制向其它哨兵获取该主节点实例的状态
    • 必要时执行故障转移
    • 若节点没客观下线,则也向其它哨兵获取该节点实例的状态
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
	// 1. 建立连接
    sentinelReconnectInstance(ri);
    // 2. 定期发送命令
    sentinelSendPeriodicCommands(ri);
    if (sentinel.tilt) {
        // 若在TITL模式下,后面的故障检测和转移都不做
        // 若当前时间距离TITL模式开启有30s,则退出TITL模式
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
        sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
    }
    // 3. 检测节点实例是否“主观”宕机
    sentinelCheckSubjectivelyDown(ri);
    // ...
    if (ri->flags & SRI_MASTER) {
        // 4. 对于主节点实例,检查它是否“客观”宕机(根据投票)
        sentinelCheckObjectivelyDown(ri); 
        // 5. 检查是否需要故障转移
        if (sentinelStartFailoverIfNeeded(ri))
            // 若需要则强制向其它哨兵获取该主节点的状态,收集投票
            // 这里发送SENTINEL is-master-down-by-addr命令
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        // 6. 必要时执行故障转移
        sentinelFailoverStateMachine(ri);
        // 7. 若节点没客观下线,也要向其它哨兵获取该节点实例的状态
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

下面对这些步骤一一说明。

a) 建立连接

这里调用sentinelReconnectInstance函数,哨兵和监控的节点建立连接。

当启动后,连接并没有建立,这里一开始调用该函数,可以让本哨兵和需要监控的节点建立连接。

此外,在下文的b)中,可能会有更多的节点实例加入进来(通过发送命令和订阅消息感知),与它们建立连接也是在这里进行(在下一轮事件循环中)

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    // 1. 若已连接,则直接返回
    if (ri->link->disconnected == 0) return;
    if (ri->addr->port == 0) return; 
    instanceLink *link = ri->link;
    mstime_t now = mstime();
	// 2. 如果最近一次连接的时间距离现在太短(默认1s),返回,等待下一次重连
    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
    ri->link->last_reconn_time = now; // 重新设置连接时间
    // 3. 对于任何待监控的实例,需要建立命令连接(command connection)
    if (link->cc == NULL) {
        // 绑定地址并异步创建连接(下面操作几乎是异步的)
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        if (link->cc->err) {
            // 失败则关闭连接
            sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
                link->cc->errstr);
            instanceLinkCloseConnection(link,link->cc);
        } else {
            // 成功
            // 重置连接属性
            link->pending_commands = 0;
            link->cc_conn_time = mstime();
            link->cc->data = link;
            // 将服务器的事件循环关联到连接的上下文中
            redisAeAttach(server.el,link->cc);
            // 设置连接建立的回调函数
            redisAsyncSetConnectCallback(link->cc,
                    sentinelLinkEstablishedCallback);
            // 设置连接断开的回调函数
            redisAsyncSetDisconnectCallback(link->cc,
                    sentinelDisconnectCallback);
            // 对于cc,这时候立即:
            // 1.发送AUTH认证(响应直接丢弃)
            // 2.发送客户端名字(响应直接丢弃)
            // 3.PING这个实例(响应回调是sentinelPingReplyCallback,用于更新时间等)
            sentinelSendAuthIfNeeded(ri,link->cc);
            sentinelSetClientName(ri,link->cc,"cmd");
            sentinelSendPing(ri);
        }
    }
    // 4. 对于主/从节点,还必须创建PUB/SUB连接(pub connection)
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
        // 绑定地址并异步创建连接
        link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        if (link->pc->err) {
            // 错误则关闭连接
            sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
                link->pc->errstr);
            instanceLinkCloseConnection(link,link->pc);
        } else {
            // 成功
            // 重置连接属性
            int retval;
            link->pc_conn_time = mstime();
            link->pc->data = link;
            // 将服务器的事件循环关联到连接的上下文中
            redisAeAttach(server.el,link->pc);
            // 设置连接建立的回调函数
            redisAsyncSetConnectCallback(link->pc,
                    sentinelLinkEstablishedCallback);
            // 设置连接断开的回调函数
            redisAsyncSetDisconnectCallback(link->pc,
                    sentinelDisconnectCallback);
            // 对于pc,这时候立即:
            // 1.发送AUTH认证(响应直接丢弃)
            // 2.发送客户端名字(响应直接丢弃)
            // 3.发送SUBSCRIBE命令,让哨兵订阅该节点实例上名为__sentinel__:hello的频道
            // 哨兵订阅该频道的消息,消息会被回调sentinelReceiveHelloMessages处理
            // 该回调会从响应中获取该实例下属的从节点和其它哨兵消息等
            sentinelSendAuthIfNeeded(ri,link->pc);
            sentinelSetClientName(ri,link->pc,"pubsub");
            /* Now we subscribe to the Sentinels "Hello" channel. */
            retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "%s %s",
                sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
                SENTINEL_HELLO_CHANNEL);
            if (retval != C_OK) {
                // 订阅失败则关闭连接,返回,等待重试
                instanceLinkCloseConnection(link,link->pc);
                return;
            }
        }
    }
    // 5. 若连接均已成功建立,则清除disconnected标记
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
        link->disconnected = 0;
}

这里的代码使用了hiredis的异步库。由于连接是异步的,操作会立刻返回,而响应由对应的回调处理。

从上面代码中,哨兵会向节点(主/从)建立2条连接:

  • 命令连接:哨兵向被监控节点发送命令,然后处理其响应
  • 订阅连接:哨兵向被监控节点订阅频道__sentinel__:hello,并定期向频道里发布消息(主要是哨兵自己和对应的被监控节点),其它哨兵也会收到这条消息,并更新监控状态信息

b) 定期发送命令

这部分是在函数sentinelSendPeriodicCommands中,它向对应的被监控节点发送3条指令,分别是INFOPINGPUBLISH

  • INFO:收集节点和集群信息,用于刷新集群的信息,感知集群其它成员,处理成员角色变化等
  • PING:判断网络连通性
  • PUBLISH:往频道发布哨兵自己和对应节点的信息,其它哨兵订阅后可以收到消息,感知节点其它的哨兵
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;
	// 实例未连接,返回
    if (ri->link->disconnected) return;
    // 实例未回复命令超过一定个数,返回
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    // 如果实例目前是从节点,且他的主节点处于故障状态的状态或者从节点和主节点断开复制了
    // 就提高INFO监控的频率为1s,其它情况下为10s
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }
    // 如果最后一次接受到PONG的时候间隔比down-after-milliseconds更长
    // 并且如果down-after-milliseconds设置大于1秒,则直接按照1秒的频率PING
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
	// 若时间到了,向主节点和从节点发送INFO
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO")); // 异步发INFO
        if (retval == C_OK) ri->link->pending_commands++; // pending_commands自增
    }
    // 若时间到了,向所有类型的节点发送PING
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri); // 异步发PING,pending_commands自增且更新last_ping_time和act_ping_time
    }
	// 若时间到了,向所有类型的节点发送PUBLISH指令
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        // 往__sentinel__:hello频道发送HELLO信息,pending_commands自增
        sentinelSendHello(ri);
    }
}

而哨兵对这3类命令的响应处理也有一定的不同。

首先是INFO命令的响应回调sentinelInfoReplyCallback,回调主要用于更新实例信息,包括:

  • 获取并更新对应实例的基本信息(如run_id,role等),并自动发现集群网络其它活跃节点信息(INFO接收方为主节点,会额外返回从节点信息;若为从节点,则额外返回主节点信息)。
  • 处理角色变化:
    • 实例为主节点,INFO返回从节点:什么都不做
    • 实例为从节点,INFO返回主节点:
      • 实例是被晋升的从节点,且它的主节点正等待从节点晋升:仅更新相关数据
      • 实例是被晋升的从节点,但它的主节点在故障转移时间内重新上线:将晋升的从节点降级,并向其发送SLAVEOF命令
    • 实例为从节点,INFO也返回从节点:
      • 若主节点地址变化:向该实例发送SLAVEOF命令进行重绑定
      • 若该实例以接收slaveof命令或正在执行slaveof的同步:将其标记为下一步的状态(SRI_RECONF_SENT -> SRI_RECONF_INPROG, SRI_RECONF_INPROG -> SRI_RECONF_DONE
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    // ...
    if (!reply || !link) return;
    link->pending_commands--;
    r = reply;
    if (r->type == REDIS_REPLY_STRING)
        sentinelRefreshInstanceInfo(ri,r->str); // 主要是这一步,更新实例信息,代码有点长,这里略了
}

然后是处理PING命令的响应回调sentinelPingReplyCallback,它会:

  • 更新最近一次收到PING响应的时间
  • 根据响应处理
    • PONGLOADINGMASTERDOWN:实例可达,更新最近的可用时间
    • BUSY:实例因执行脚本而正忙,发送SCRIPT KILL终止其脚本执行
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    // ...
    link->pending_commands--;
    r = reply;
    if (r->type == REDIS_REPLY_STATUS ||
        r->type == REDIS_REPLY_ERROR) {
        // PONG, LOADING, MASTERDOWN表示实例和哨兵网络可达
        if (strncmp(r->str,"PONG",4) == 0 ||
            strncmp(r->str,"LOADING",7) == 0 ||
            strncmp(r->str,"MASTERDOWN",10) == 0) {
            link->last_avail_time = mstime(); // 更新最新可用时间
            link->act_ping_time = 0; // 标记为0表示可用
        } else {
            // BUSY,说明实例正忙,这是因为它在执行脚本而表现为下线状态
            // 所以发送SCRIPT KILL终止脚本运行
            if (strncmp(r->str,"BUSY",4) == 0 &&
                (ri->flags & SRI_S_DOWN) &&
                !(ri->flags & SRI_SCRIPT_KILL_SENT)) {
                if (redisAsyncCommand(ri->link->cc,
                        sentinelDiscardReplyCallback, ri,
                        "%s KILL",
                        sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK) {
                    ri->link->pending_commands++;
                }
                ri->flags |= SRI_SCRIPT_KILL_SENT;
            }
        }
    }
    link->last_pong_time = mstime(); // 更新last_pong_time
}

最后是处理PUBLISH命令的响应回调,由于它是一个PUB/SUB,因此相关消息是通过订阅得到的。从a)中可知,这个回调是sentinelReceiveHelloMessages,除了更新一些时间信息,这个回调的核心是函数sentinelProcessHelloMessage,它主要处理频道里的HELLO消息,如:

  • 感知对应主节点的其它哨兵节点,包括哨兵添加、哨兵信息更改等
  • 更新主节点的信息

HELLO消息主要包含哨兵以及其绑定的主节点信息

void sentinelProcessHelloMessage(char *hello, int hello_len) {
    /* Format is composed of 8 tokens:
     * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
     * 5=master_ip,6=master_port,7=master_config_epoch. */
    int numtokens, port, removed, master_port;
    uint64_t current_epoch, master_config_epoch;
    char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
    sentinelRedisInstance *si, *master;

    if (numtokens == 8) {
        // 获取根据master名字获取主节点的实例
        master = sentinelGetMasterByName(token[4]);
        if (!master) goto cleanup;  // 若找不到对应主节点实例则直接返回,这的感知放在INFO里做
		// 获取对应哨兵的端口
        port = atoi(token[1]);
        // 获取对应master的端口
        master_port = atoi(token[6]);
        // 通过地址和runid,查找这个哨兵的实例是否已存在
        si = getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels,token[0],port,token[2]);
        // 获取对应哨兵的epoch
        current_epoch = strtoull(token[3],NULL,10);
        // 获取对应master配置的epoch
        master_config_epoch = strtoull(token[7],NULL,10);

        if (!si) {
            // 若该哨兵目前不存在
            // 首先移除所有相同runid的哨兵节点
            // 因为可能一种情况是改变哨兵地址,这需要重绑定
            removed = removeMatchingSentinelFromMaster(master,token[2]);
            if (removed) {
                // 删除成功,则触发事件通知,说明需要将已有哨兵的地址更改
                sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
                    "%@ ip %s port %d for %s", token[0],port,token[2]);
            } else {
                // 没有删除,说明没有哨兵地址需要更改,检查是否有另一个哨兵具有与消息中相同的地址
                sentinelRedisInstance *other =
                    getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels, token[0],port,NULL);
                if (other) {
                    // 若有,将其端口置0表示无效
                    sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
                    other->addr->port = 0;
                    // 并将这个改动应用到所有已有的主节点实例上,并断开该哨兵的连接
                    sentinelUpdateSentinelAddressInAllMasters(other);
                }
            }
            
            // 创建新的哨兵实例,并添加到对应主节点的sentinel字典中
            si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
                            token[0],port,master->quorum,master);

            if (si) {
                if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
             	// 创建成功,设置runid
                si->runid = sdsnew(token[2]);
                // 尝试与其它哨兵节点共享连接对象
                sentinelTryConnectionSharing(si);
                // 若之前删除过哨兵,则需要将之前处理的哨兵地址更新
                if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
                // 刷新配置
                sentinelFlushConfig();
            }
        }
        // 然后更新哨兵epoch(当传来的更大时)
        if (current_epoch > sentinel.current_epoch) {
            sentinel.current_epoch = current_epoch;
            sentinelFlushConfig();
            sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
                (unsigned long long) sentinel.current_epoch);
        }
        // 更新对应master的配置epoch更大,即配置更加新
        // 则需要更新对应主节点信息
        if (si && master->config_epoch < master_config_epoch) {
            // 更新master的config epoch
            master->config_epoch = master_config_epoch;
            // 若地址改变了,还得更新地址并触发相关事件
            if (master_port != master->addr->port ||
                strcmp(master->addr->ip, token[5])) {
                sentinelAddr *old_addr;
                sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
                sentinelEvent(LL_WARNING,"+switch-master",
                    master,"%s %s %d %s %d",
                    master->name,
                    master->addr->ip, master->addr->port,
                    token[5], master_port);
                old_addr = dupSentinelAddr(master->addr);
                sentinelResetMasterAndChangeAddress(master, token[5], master_port);
                sentinelCallClientReconfScript(master,
                    SENTINEL_OBSERVER,"start",
                    old_addr,master->addr);
                releaseSentinelAddr(old_addr);
            }
        }
        // 更新时间last_hello_time
        if (si) si->last_hello_time = mstime();
    }
cleanup:
    sdsfreesplitres(token,numtokens);
}

c) 检测主观下线

哨兵会对所有节点实例进行主观下线检查。

所谓主观下线,就是哨兵监控的节点在down-after-milliseconds内,没有检测到正常的响应,就会被判定位主观下线。它不需要其它哨兵进行投票,因此并不是真正的下线(即客观下线)。

不过即使是投票确认客观下线,也并不是真正客观的。分布式没有什么东西是可靠的。

检测主观下线的函数是sentinelCheckSubjectivelyDown,它:

  • 检查实例最后响应到现在的间隔,必要时标记主观下线
  • 检查命令和发布订阅连接是否处于低活跃度,必要时关闭连接
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;
	// 1. 获取当前时间与实例最后响应的间隔
    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;
    // 2. 检查命令连接(PING/INFO),当处于低活跃度时,关闭连接(之后重新建立)
    // 条件是: 连接建立有一段时间,但有未响应的PING,且延迟超过了down_after_period/2
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->link->act_ping_time != 0 &&
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) {
        instanceLinkCloseConnection(ri->link,ri->link->cc);
    }
    // 3. 检查发布订阅连接(PUBLISH),当处于低活跃度时,关闭连接(之后重新建立)
    //  条件是: 连接建立有一段时间,且距离上次活动超过SENTINEL_PUBLISH_PERIOD*3的时间
    if (ri->link->pc &&
        (mstime() - ri->link->pc_conn_time) >r
         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) {
        instanceLinkCloseConnection(ri->link,ri->link->pc);
    }
    // 4. 判断主观下线,条件为:
    // a) 响应间隔超过down_after_period
    // b) 实例是主节点,但多次INFO报告认为是从节点,且持续了一段时间
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) {
        // 标记主观下线SRI_S_DOWN
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        // 若之前标记了主观下线,这里检查条件不成立,则需要清除SRI_S_DOWN标记
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

d) 检测客观下线

客观下线和主观下线不一样:在主观下线的基础上,还需要征得半数以上哨兵的投票,才能判断为客观下线。

客观下线的判断仅适用于主节点。

首先是判断是否客观下线,即统计票数,函数是sentinelCheckObjectivelyDown

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;
	// 在主观下线的基础上,判断客观下线
    if (master->flags & SRI_S_DOWN) {
        quorum = 1; // 初始票数1
        // 遍历该主节点的其它哨兵实例,统计票数
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
			// ri是其它相关哨兵,若标记为SRI_MASTER_DOWN,票数+1
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        // 超过半数则为客观下线
        if (quorum >= master->quorum) odown = 1;
    }
    if (odown) {
        // 若达到半数,标记客观下线
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        // 若没达到半数,清除客观下线标记
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

注意ri->flags & SRI_MASTER_DOWN这个投票标记,这是怎么来的?

不是INFO也不是PUBLISH,而是通过函数sentinelAskMasterStateToOtherSentinels,它会向其它哨兵发送SENTINEL is-master-down-by-addr指令,并通过回调sentinelReceiveIsMasterDownReply更新状态:

这个函数会在事件循环中被调用,从而跟踪其它哨兵对该主节点的上下线判断

#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;
    di = dictGetIterator(master->sentinels);
    // 遍历该主节点其它所有的哨兵
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        // 该哨兵实例最近一个回复SENTINEL is-master-down-by-addr所过去的时间
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;
        // 若太旧,则清除标记
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }
        // 该主节点没有主观下线,跳过
        if ((master->flags & SRI_S_DOWN) == 0) continue; 
        // 对应哨兵没有连接,跳过
        if (ri->link->disconnected) continue;
        // 非强制且距离上次发送该命令时间过近,也跳过
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
            continue;
		// 这里异步发送SENTINEL is-master-down-by-addr治理
        ll2string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, // 回调
                    ri, "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}

void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
    // ...
    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
        r->element[0]->type == REDIS_REPLY_INTEGER &&
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER) {
        ri->last_master_down_reply_time = mstime();
        // 根据响应,判断该哨兵是否认为该主节点是否(主观)下线
        // 哨兵收到SENTINEL is-master-down-by-addr,会根据该主节点是否主观下线返回整数
        // 1代表主观下线,0代表上线
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
        // ... 更新leader, epoch等其它信息
    }
}

e) 检查并准备故障转移

当主节点被标记为客观下线,那么就需要准备故障转移了。

但是故障转移需要一定的条件:

  • 主节点必须客观下线
  • 节点不能正在执行故障转移
  • 距离上次故障转移时间太短

若满足条件,就会执行故障转移的初始化,该实例的状态会被标记为SRI_FAILOVER_IN_PROGRESS,且故障转移状态会被标记为SENTINEL_FAILOVER_STATE_WAIT_START,故障转移epoch也会递增。

而只有标记为上述状态的实例才能执行接下来的故障转移。

上述代码在函数sentinelStartFailoverIfNeeded中实现:

int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    // 1. 检查
    // a. 节点必须客观下线
    if (!(master->flags & SRI_O_DOWN)) return 0;
	// b. 节点不能正在故障转移
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
	// c. 上一次故障转移不能太近
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2) {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            // ...
        }
        return 0;
    }
	// 2. 检查通过,设置状态,准备故障转移
    sentinelStartFailover(master);
    return 1;
}

// 准备故障祝阿姨
void sentinelStartFailover(sentinelRedisInstance *master) {
    serverAssert(master->flags & SRI_MASTER);
	// 主要是下面3行代码
    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    master->failover_epoch = ++sentinel.current_epoch;
    sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);
    sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}

f) 执行故障转移

一切检查通过,且就绪完毕后,就可以执行故障转移了。

这里调用函数sentinelFailoverStateMachine,从代码里看出,故障转移功能的实现基于状态机,这里可以大致看下该函数所呈现的状态,第4节将详细说明故障转移状态机的实现:

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    serverAssert(ri->flags & SRI_MASTER);
    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
    switch(ri->failover_state) {
        // 1. 初始状态,故障转移开始
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        // 2. 正在选择要晋升的从节点
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        // 3. 准备发送SLAVE no one将该节点晋升
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        // 4. 等待该节点的晋升
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        // 5. 准备给所有从节点配置新主节点(广播SLAVEOF命令)
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

3.3. 执行和处理脚本

监控检查做完后,就是执行和处理脚本的工作了,主要是下面3行代码。脚本这块不是重点,这里就省略不讲了。

void sentinelTimer(void) {
    // ... TITL, monitor & failover
	// 3. 运行队列中的脚本
    sentinelRunPendingScripts();
    // 4. 将已执行完的脚本移出队列,重新执行出错的脚本
    sentinelCollectTerminatedScripts();
    // 5. 取消执行超时的脚本,这些脚本会在下一个事件循环中于第4步再次执行
    sentinelKillTimedoutScripts();
    // ...
}

4. 故障转移

这里回到3.2.f的故障转移状态机实现(函数sentinelFailoverStateMachine),下面根据状态机的执行顺序来说明故障转移的流程。

4.1. 选举Leader

执行故障转移需要由一个Leader哨兵控制,因此需要在哨兵集群中选主。

选主使用的是Raft算法:

只有Leader哨兵才能执行故障转移,非Leader会中断转移流程。

这部分在函数sentinelFailoverWaitStart执行,若自己是Leader,那么故障转移状态会变成SENTINEL_FAILOVER_STATE_SELECT_SLAVE,进入下一步(下一轮事件循环进入,下同)。

4.2. 选出晋升的从节点

进入SENTINEL_FAILOVER_STATE_SELECT_SLAVE状态后,就需要选出一个晋升的从节点。

这部分由函数sentinelFailoverSelectSlave处理,若选出从节点,更新相关信息,并标记状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE,从而进入下一步:

void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    // 选出待晋升的从节点
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);
    if (slave == NULL) {
        // 没有从节点,则终止故障转移
        sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
        sentinelAbortFailover(ri);
    } else {
        sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
        // 标记待晋升的从节点
        slave->flags |= SRI_PROMOTED;
        ri->promoted_slave = slave;
        // 更改状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
        // 从而进入发送SLAVEOF noone的状态
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
        ri->failover_state_change_time = mstime();
        sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
            slave, "%@");
    }
}

而具体选出从节点在函数sentinelSelectSlave执行,这边不分析源码,只是稍微总结一下:

  • 从节点的条件:
    • 不能主观下线、客观下线或者未建立连接
    • 最近一次响应PING距现在不能太长(5倍SENTINEL_PING_PERIOD
    • 最近一次响应INFO距现在不能太长(3倍/5倍SENTINEL_PING_PERIOD
    • 优先级不能为0
    • 主从节点之间断开的时间不能太长(10倍down-after-period
  • 选取从节点优先级(优先从高往低排):

    可从函数compareSlavesForPromotion

    • 优先选择高优先级(priority
    • 优先选择更大的复制偏移(slave_repl_offset
    • 优先选择更小的runid
    • 优先选择处理更多命令的从节点

4.3. 晋升从节点

进入SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE状态后,待晋升的从节点就会选出,因此需要将其变成主节点。

正如这个状态名所述,哨兵会向待晋升的从节点发送SLAVEOF no one,解绑其对旧主节的绑定,然后状态变成SENTINEL_FAILOVER_STATE_WAIT_PROMOTION等待其晋升结束。

这部分在函数sentinelFailoverSendSlaveOfNoOne执行:

void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;
	// ...
    // 发送SLAVEOF no one命令
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
    if (retval != C_OK) return;
    sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
        ri->promoted_slave,"%@");
    // 变更状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION,等待晋升完毕
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    ri->failover_state_change_time = mstime();
}

注意这里发送SLAVEOF no one的响应回调是“直接丢弃响应”,Redis哨兵对该响应并不关心。

4.4. 等待从节点晋升

进入状态SENTINEL_FAILOVER_STATE_WAIT_PROMOTION,则需要等待其晋升完毕。

说是等待晋升,实际上只是检查故障转移是否超时而已,见函数sentinelFailoverWaitPromotion

void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
    // 检查故障转移超时而已
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
        sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
        sentinelAbortFailover(ri);
    }
}

而哨兵如何知道从节点是否晋升完毕,是在INFO指令的响应回调中知道的,具体是在函数sentinelRefreshInstanceInfo

void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
	// ...   
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        // 这里显示报告的role是主节点,而哨兵维护的类型是从节点
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) {
            // 若该从节点是被晋升的,且正在执行故障转移,故障转移的状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
            // 则说明该从节点已经执行完SLAVEOF no one命令,成为主节点了
            ri->master->config_epoch = ri->master->failover_epoch;
            // 那么就置状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES
            // 准备让其它从节点重新绑定新的主节点(即晋升的从节点)
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            // ... Update metadata of this instance
            // 这里把这个从节点信息发布到PUB/SUB,告知其它哨兵该节点已经被晋升
            sentinelForceHelloUpdateForMaster(ri->master);
        } 
    } else {
        // ...
    }
    // ...
}

4.5. 从节点同步新的主节点

INFO响应回调将故障转移状态置为SENTINEL_FAILOVER_STATE_RECONF_SLAVES后,就可以将其它从节点绑定到新的主节点上,即发送SLAVEOF命令,sentinelFailoverReconfNextSlave就是这么做的:

void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int in_progress = 0;
    di = dictGetIterator(master->slaves);
    // 统计已发送slaveof命令或者正在同步的从节点数目
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
            in_progress++;
    }
    dictReleaseIterator(di);
    di = dictGetIterator(master->slaves);
    // 若正在同步的节点数量小于配置的parrallel-sync
    // 则遍历slaves字典,尝试去给其它没同步的从节点发送SLAVEOF
    while(in_progress < master->parallel_syncs &&
          (de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        int retval;
        // 跳过晋升的从节点
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
        // 若从节点已经发送SLAVEOF命令,但是
        if ((slave->flags & SRI_RECONF_SENT) &&
            (mstime() - slave->slave_reconf_sent_time) >
            SENTINEL_SLAVE_RECONF_TIMEOUT)
        {
            sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
            slave->flags &= ~SRI_RECONF_SENT;
            slave->flags |= SRI_RECONF_DONE;
        }
        // 跳过断连的和正在同步的从节点
        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
        if (slave->link->disconnected) continue;
        // 发送SLAVEOF命令给从节点,这里配置的是新主节点
        retval = sentinelSendSlaveOf(slave,
                master->promoted_slave->addr->ip,
                master->promoted_slave->addr->port);
        if (retval == C_OK) {
            slave->flags |= SRI_RECONF_SENT; // 设置标识SRI_RECONF_SENT
            slave->slave_reconf_sent_time = mstime();
            sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
            in_progress++;
        }
    }
    dictReleaseIterator(di);
	// 检查这些从节点有没有配置完毕,并处理超时
    sentinelFailoverDetectEnd(master);
}

这里最后会检查故障转移结束的情况,即函数sentinelFailoverDetectEnd。这里,结束的情况有2种:

  • 超时而被动结束
  • 从节点已完成同步
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
    int not_reconfigured = 0, timeout = 0;
    dictIterator *di;
    dictEntry *de;
    // 得到自上次更新故障转移状态的时间差
    mstime_t elapsed = mstime() - master->failover_state_change_time;
    // 若晋升主节点不可达了,则得返回,不能认为故障转移结束,直接返回
    if (master->promoted_slave == NULL ||
        master->promoted_slave->flags & SRI_S_DOWN) return;
    // 统计所有未完成的、正在同步的从节点数量
    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
        if (slave->flags & SRI_S_DOWN) continue;
        not_reconfigured++;
    }
    dictReleaseIterator(di);
    // 若超时,则
    // 1. 强制结束本次故障转移
    // 2. 并在下面重新给没同步完的从节点发送SLAVEOF命令,重新重配置
    if (elapsed > master->failover_timeout) {
        not_reconfigured = 0;
        timeout = 1;
        sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
    }
	// 这里所有的从节点都配置完成
    // 则更改故障转移状态未SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
 	// 这个状态一旦被设置,哨兵就可以把旧主节点的信息移除,设置新的主节点,并更新其它从节点的主从信息绑定
    if (not_reconfigured == 0) {
        sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
        master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
        master->failover_state_change_time = mstime();
    }
    // 这里根据上面的超时,若超时则重新给没同步完成的从节点发送SLAVEOF命令
    if (timeout) {
        dictIterator *di;
        dictEntry *de;
        di = dictGetIterator(master->slaves);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *slave = dictGetVal(de);
            int retval;
            if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
            if (slave->link->disconnected) continue;
            retval = sentinelSendSlaveOf(slave,
                    master->promoted_slave->addr->ip,
                    master->promoted_slave->addr->port);
            if (retval == C_OK) {
                sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
                slave->flags |= SRI_RECONF_SENT;
            }
        }
        dictReleaseIterator(di);
    }
}

而判断从节点是否同步完成,也是从INFO命令回调里得到的,由于消息异步传输,因此需要多次轮询(即多次事件循环)触发回调才能得到“同步完成”状态:

void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    // ...
    // 这里应对的是已经发送SLAVEOF的需要重配置的从节点
    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))) {
        // 若当前从节点实例所属主节点的地址已经和新主节点地址相同
        // 则说明它接收到了SLAVEOF命令,正在同步
        // 因此设置为SRI_RECONF_INPROG
        if ((ri->flags & SRI_RECONF_SENT) &&
            ri->slave_master_host &&
            strcmp(ri->slave_master_host,
                    ri->master->promoted_slave->addr->ip) == 0 &&
            ri->slave_master_port == ri->master->promoted_slave->addr->port) {
            ri->flags &= ~SRI_RECONF_SENT;
            ri->flags |= SRI_RECONF_INPROG;
            sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
        }

        // 根据INFO消息,若当前从节点实例和新主节点已经建立了连接,说明同步完成
        // 则设置状态为SRI_RECONF_DONE
        if ((ri->flags & SRI_RECONF_INPROG) &&
            ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) {
            ri->flags &= ~SRI_RECONF_INPROG;
            ri->flags |= SRI_RECONF_DONE;
            sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
        }
    }
}

4.6. 切换哨兵内维护的主从节点配置

我们回到3.2开头的那段代码:

void sentinelHandleDictOfRedisInstances(dict *instances) {
    // ...
    while((de = dictNext(di)) != NULL) {
        // ...
		// 这里对监控对象实例执行监控和检查
        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
			// ... Recursive invoke this function on slaves and sentinels ...
    		// 若故障转移状态是SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
            // 设置switch_to_promote为该实例(即旧主节点,它记录了晋升的新主节点实例)
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        // 若设置switch_to_promoted标识
        // 则用新晋升的主节点替代原来的主节点,旧主节点成为新主节点的从节点
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

这里,若最后故障转移的状态为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,那么故障转移已经结束,需要更新哨兵维护的对应主从实例的信息,即切换主从实例。这里会调用函数sentinelFailoverSwitchToPromotedSlave进行新旧主节点实例的切换。

4.7. 故障转移的总结

虽说故障转移的步骤是按照上述的流程来的,但是实际上这些流程都是非阻塞的,它们的完成需要经过非常多次的事件循环和异步回调,且有些顺序并不完全是按照代码所写的顺序执行(因为不在同一轮事件循环)。

不过根据状态机的设计思维,这块功能的逻辑还是比较清晰的,不过还是需要注意一些细节:

  • 故障转移的一些命令发送,如SLAVEOF,都是以事务执行的,即失败会回滚,这极大简化了代码复杂度;

  • 网络通信是异步的,通过触发回调的方式处理响应,因此流程非阻塞,但完成任务需要多次事件循环;

  • 故障转移的状态基本由INFO命令和SENTINEL命令的响应获得,务必注意它的回调函数,会更新状态;

    因此当故障转移流程执行时,获取节点状态不必阻塞等待请求响应,只需读维护的字段值即可,因为

    • 这些字段会在一轮一轮的事件循环中最终会得到更新
    • 若没有更新,等到下一轮事件循环再处理,这不会影响正确性
  • 由于使用单线程模型,不必考虑网络延迟带来的锁同步问题,因此状态机模型是一个很好的设计模式,只要一遍一遍事件循环,根据某个状态执行对应代码即可,避免了代码逻辑的混乱(若没有想要的状态,可以等到下一轮事件循环再处理,不影响正确性)。