1. Overview
哨兵(Sentinel)是Redis HA的一个解决方案。
它由一个或多个节点构成,组成哨兵系统,主要作用是监控和故障转移:
- 监视多个主节点及其下属从节点的状态
- 当主节点下线后,选取一个从节点,将其升级为主节点
- 当主节点重新上线后,将其降级为从节点
本文主要分析哨兵是如何进行节点监控和故障转移的。
2. 哨兵的启动
2.1. 初始化服务器配置
哨兵本质上是Redis服务器的特殊模式,因此第一步还是需要初始化一个普通的Redis服务器,这在之前的文章中已经说明,如:
- 初始化一些基本配置(如环境变量等),这一步它会标记服务器为“哨兵”(
checkForSentinelMode
函数) - 初始化服务器配置默认值(
initServerConfig
函数) - 初始化模块
这些和普通Redis服务器启动并无大差别。
2.2. 初始化哨兵状态
下面的代码初始化哨兵的配置(并没有加载配置文件的配置):
// In main (server.c)
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
其中initSentinelConfig
函数,它只是初始化哨兵的配置:
- 设置端口为
REDIS_SENTINEL_PORT
(26379) - 关闭保护模式
而initSentinel()
函数初始化了哨兵的状态,包括:
- 只能用于哨兵的命令
- 其它数据结构,如主节点列表等
哨兵可用的命令被记录在sentinelcmds[]
数组里,如下所示:
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
};
而哨兵状态存储在struct sentinelState
结构中:
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; // 哨兵ID
uint64_t current_epoch; // 当前epoch号,用于故障转移
dict *masters; // 监视的主节点字典,键是实例名,值是sentinelRedisInstance指针
int tilt; // 是否进入TILT模式(后面会说)
int running_scripts; // 正在执行的脚本数量
mstime_t tilt_start_time; // 开启TILT模式的时间
mstime_t previous_time; // 上一次执行时间处理器的时间
list *scripts_queue; // 脚本队列
char *announce_ip; // 哨兵间通信的IP地址,空则是自动发现
int announce_port; // 哨兵间通信的端口,0则是自动发现
unsigned long simfailure_flags; // 故障模拟标志
int deny_scripts_reconfig; // 是否允许哨兵在运行时修改脚本位置
} sentinel;
2.3. 加载哨兵配置
之后,和普通Redis服务器一样,从配置文件中加载配置。
对于哨兵,它的配置加载主要调用sentinelHandleConfiguration
函数,它主要做的事情是:
- 加载并设置本哨兵的配置,如
announce_ip
,announce_port
,deny_scripts_reconfig
等 - 初始化监视的主节点字典
masters
这里最主要的是第二步。它根据配置,创建sentinelRedisInstance
实例,并添加到masters
字典中。
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
// monitor <name> <host> <port> <quorum>
// 监视某个MASTER节点
// 其中quorum代表: 该节点**客观宕机**判断需要至少quorum个哨兵同意
int quorum = atoi(argv[4]);
// ...
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL) {
// ...
}
} else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
// down-after-milliseconds <name> <milliseconds>
// 哨兵定期向节点发送PING判断状态,若无正常响应则认为不可达
// 当不可达时间超过milliseconds,则认为该节点**主观宕机**
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->down_after_period = atoi(argv[2]);
// ...
// 把该配置传播到该MASTER下属的SLAVE以及连接到它的哨兵
sentinelPropagateDownAfterPeriod(ri);
} else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
// failover-timeout <name> <milliseconds>
// 配置name节点的故障转移超时时间
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->failover_timeout = atoi(argv[2]);
// ...
} else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
// parallel-syncs <name> <nums>
// 当新MASTER出来后,SLAVE会对MASTER发起数据同步
// 这需要限制同步的数量,由nums决定
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->parallel_syncs = atoi(argv[2]);
} else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
// notification-script <name> <path>
// 配置name节点的通知脚本,当出现故障时,脚本被触发,通知外部
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->notification_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
// client-reconfig-script <name> <path>
// 配置故障转移成功的通知脚本,通知客户端
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->client_reconfig_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
// auth-pass <name> <password>
// 配置监控主节点的验证信息
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->auth_pass = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
// current-epoch <epoch>
// 配置当前哨兵的epoch(可认为是版本,用于故障转移)
unsigned long long current_epoch = strtoull(argv[1],NULL,10);
if (current_epoch > sentinel.current_epoch)
sentinel.current_epoch = current_epoch;
} else if (!strcasecmp(argv[0],"myid") && argc == 2) {
// 解析哨兵的ID
// ...
memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
} else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
// config-epoch <name> <epoch>
// 配置监控节点的epoch
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->config_epoch = strtoull(argv[2],NULL,10);
if (ri->config_epoch > sentinel.current_epoch)
sentinel.current_epoch = ri->config_epoch;
} else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
// leader-epoch <name> <epoch>
// Leader的epoch,这里Leader是哨兵集群的Leader
ri = sentinelGetMasterByName(argv[1]);
// ...
ri->leader_epoch = strtoull(argv[2],NULL,10);
} else if ((!strcasecmp(argv[0],"known-slave") ||
!strcasecmp(argv[0],"known-replica")) && argc == 4) {
sentinelRedisInstance *slave;
// known-replica <name> <ip> <port>
// 显式添加SLAVE节点的信息到名字为name的MASTER节点实例上
ri = sentinelGetMasterByName(argv[1]);
// ...
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL) {
// ...
}
} else if (!strcasecmp(argv[0],"known-sentinel") &&
(argc == 4 || argc == 5)) {
sentinelRedisInstance *si;
if (argc == 5) {
// known-sentinel <name> <ip> <port> [runid]
// 添加其它哨兵信息到名字为name的MASTER节点实例上
ri = sentinelGetMasterByName(argv[1]);
// ...
if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL) {
// ...
}
si->runid = sdsnew(argv[4]);
// 尝试与该哨兵节点共享监控对象(instanceLink)
sentinelTryConnectionSharing(si);
}
} else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
// rename-command <name> <command> <renamed-command>
// 配置名字为name的主节点的命令别名
ri = sentinelGetMasterByName(argv[1]);
// ...
sds oldcmd = sdsnew(argv[2]);
sds newcmd = sdsnew(argv[3]);
if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
// ...
}
}
// ... 配置哨兵的其它参数,如哨兵自己的IP和端口等等
return NULL;
}
这边再解析一下创建sentinelRedisInstance
的过程,熟悉一下它有哪些字段:
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;
// ...
// 检查地址
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
// 若实例是SLAVE,那么名字就是ip:port
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
// 确定需要添加的字典表
// 若是MASTER,则是sentinel.masters
// 若是SLAVE,则是参数中给的master->slaves,即绑定在master上
// 若是SENTINEL,则是参数中给的master->sentinals,也是绑定在master上
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
// 判重
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}
// 创建新的sentinelRedisInstance
ri = zmalloc(sizeof(*ri));
ri->flags = flags; // 给的状态,MASTER/SLAVE/SENTINEL
ri->name = sdsname; // 实例name
ri->runid = NULL; // 实例runid
ri->config_epoch = 0; // 实例的epoch
ri->addr = addr; // 实例地址
ri->link = createInstanceLink(); // 创建一个disconnected的连接对象,维护一个初始连接状态
ri->last_pub_time = mstime(); // 最后一次通过PUB/SUB发送hello的时间
ri->last_hello_time = mstime(); // 最后一次从PUB/SUB收到hello的时间,当SENTINEL才使用
ri->last_master_down_reply_time = mstime(); // 最后一次主节点回复下线的时间
ri->s_down_since_time = 0; // 主观下线时间
ri->o_down_since_time = 0; // 客观下线时间
ri->down_after_period = master ? master->down_after_period :
SENTINEL_DEFAULT_DOWN_AFTER; // 实例没有响应多少毫秒后,被判断为**主观**宕机
ri->info_refresh = 0; // 获取INFO信息的时间
ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL); // 命令别名字典
/* Master Specific */
ri->auth_pass = NULL; // 认证密码
ri->sentinels = dictCreate(&instancesDictType,NULL); // 连接该实例的哨兵
ri->slaves = dictCreate(&instancesDictType,NULL); // 连接该实例的从节点
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS; // 故障转移时,可同时对新主节点进行同步的从节点个数
ri->quorum = quorum; // 判断主节点**客观**宕机的
/* Slave Specific */
ri->master_link_down_time = 0; // 从节点复制操作断开时间
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; // 从节点优先权
ri->slave_reconf_sent_time = 0; // 故障转移时从节点发送SLAVEOF命令的时间
ri->slave_master_host = NULL; // 从节点对应主节点的地址
ri->slave_master_port = 0; // 从节点对应主节点的端口
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN; // 从节点连接主节点的状态,默认没有连接
ri->slave_repl_offset = 0; // 从节点复制的偏移量
ri->master = master; // 从节点对应的主节点实例
/* Failover state. */
ri->leader = NULL; // 若本实例是主节点,那么该值就是执行故障转移的哨兵runid;
// 若本实例是哨兵,那么该值是哨兵集群选出来的Leader的runid
ri->leader_epoch = 0; // 选举Leader的epoch
ri->failover_epoch = 0; // 执行故障转移的epoch
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; // 故障转移状态
ri->failover_state_change_time = 0; // 故障转移状态改变的时间
ri->failover_start_time = 0; // 故障转移开始的时间
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT; // 故障转移超时时间
ri->failover_delay_logged = 0; // 故障转移延迟时间
ri->promoted_slave = NULL; // 晋升为新主节点的从节点实例
/* Script */
ri->notification_script = NULL; // 警告触发脚本
ri->client_reconfig_script = NULL; // 故障转移成功脚本
ri->info = NULL; // 缓存INFO命令输出
/* Role */
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE); // 实例的角色(这里只维护MASTER或SLAVE)
ri->role_reported_time = mstime(); // 角色变更时间
ri->slave_conf_change_time = mstime(); // 最后一次从节点地址变更时间
// 把信息添加到对应的字典表中
dictAdd(table, ri->name, ri);
return ri;
}
2.4. 初始化整个服务器
这里和普通Redis服务器一样,首先配置/启动监视和守护(后台),然后调用initServer
函数初始化整个服务器,比如:
- 创建事件循环
- 监听端口
- 初始化数据库及其内部信息
- 初始化统计值
- 注册时间事件(
serverCron
)和文件事件(Socket、模块等回调) - …
这部分和普通Redis服务器差不多。不过哨兵有些东西不需要执行,如AOF、集群初始化等等。
2.5. 启动哨兵
哨兵和普通Redis服务器一样,都以启动事件循环代表服务启动。不过在启动事件循环前,它们之间有一些不同:
- 普通Redis服务器:加载数据和模块
- 哨兵:调用
sentinelIsRunning
函数使哨兵就绪
这里sentinelIsRunning
函数让哨兵节点准备就绪,具体如下:
void sentinelIsRunning(void) {
int j;
// ...
// 若sentinal配置文件中没有添加ID,则随机生成一个并保存到配置文件里
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;
if (j == CONFIG_RUN_ID_SIZE) {
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}
serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
// 启动时,生成一个+monitor事件,它会把该事件通过PUB/SUB发布,并调用警告脚本进行通知
sentinelGenerateInitialMonitorEvents();
}
至此,哨兵节点就启动了。
3. 哨兵事件
哨兵启动后,哨兵的任务是由serverCron
发起的,注意下面的代码:
// in serverCron()
if (server.sentinel_mode) sentinelTimer(); // 执行哨兵任务
可见执行哨兵任务的函数是sentinelTimer
,它大致的任务如下:
void sentinelTimer(void) {
// 1. 检查TILT模式,更新最近一次执行哨兵任务的事件
sentinelCheckTiltCondition();
// 2. 对哨兵监控的所有主节点递归进行检测(这一步包含连接建立)
sentinelHandleDictOfRedisInstances(sentinel.masters);
// 3. 运行队列中的脚本
sentinelRunPendingScripts();
// 4. 将已执行完的脚本移出队列,重新执行出错的脚本
sentinelCollectTerminatedScripts();
// 5. 取消执行超时的脚本,这些脚本会在下一个事件循环中于第4步再次执行
sentinelKillTimedoutScripts();
// 这里更改server.hz,调整执行哨兵任务的频率
// 这是为了选Leader服务的,它可以降低选主选不到的概率
// 这和Raft的思想一致, Raft选Leader也是采用随机超时的机制
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}
3.1. 检查TITL
a) TITL模式
TITL是哨兵的保护模式。
由于哨兵严重依赖于系统时间,当系统时间被调整,或因为某种原因哨兵任务产生阻塞(如高负载、I/O阻塞、进程停止等),哨兵行为就不可预知。
这时候哨兵会进入保护模式(TITL),它:
- 只定期发送命令,收集信息
- 不做其它实质性动作(如故障转移)
- 向哨兵询问节点是否宕机(
SENTINEL is-master-down-by-addr
)会返回负值,告诉外部判断不准确
默认情况下,进入TITL模式30s后,哨兵任务恢复了正常,那么会退出TITL模式。
b) TITL检查
这里是sentinelCheckTiltCondition
函数,总体很简单,根据a)中的规则判断是否进入TITL模式:
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
// 计算现在和上次执行sentinel任务的时间差
mstime_t delta = now - sentinel.previous_time;
// 若:
// 1. delta<0,说明时间被调整了
// 2. delta>SENTINEL_TITL_TRIGGER(2000ms),说明上一次sentinel任务执行慢/阻塞
// 这些情况下需要进入TITL保护模式
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1; // 启动保护模式
sentinel.tilt_start_time = mstime(); // 记录启动保护模式时间
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered"); // 触发+titl事件
}
// 设置当前这次sentinel任务的时间
sentinel.previous_time = mstime();
}
3.2. 对所有主节点递归地监控和检查
这里调用sentinelHandleDictOfRedisInstances
,对所有主节点及其下属从节点和哨兵节点进行检查,且是递归的:
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
di = dictGetIterator(instances);
// 遍历所有监控对象实例(根调用是所有的主节点)
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 这里对监控对象实例执行监控和检查
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
// 对于主节点,还得递归处理下属的从节点和连接的哨兵节点
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
// 若当前实例处于完成故障转移的状态,且所有从节点完成了新主节点的同步
// 则设置switch_to_promited标识
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
// 若设置switch_to_promoted标识,
// 则用新晋升的主节点替代原来的主节点,旧主节点成为新主节点的从节点
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
而执行节点实例的监控操作在函数sentinelHandleRedisInstance
中,代码如下,它主要做的是:
- 所有实例
- 建立通信
- 发送
PING
、INFO
、PUBLISH
命令 - 检测节点实例是否“主观”宕机
- 主节点实例
- 检查是否“客观”下线
- 判断节点是否需要进行故障转移,若需要强制向其它哨兵获取该主节点实例的状态
- 必要时执行故障转移
- 若节点没客观下线,则也向其它哨兵获取该节点实例的状态
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
// 1. 建立连接
sentinelReconnectInstance(ri);
// 2. 定期发送命令
sentinelSendPeriodicCommands(ri);
if (sentinel.tilt) {
// 若在TITL模式下,后面的故障检测和转移都不做
// 若当前时间距离TITL模式开启有30s,则退出TITL模式
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
// 3. 检测节点实例是否“主观”宕机
sentinelCheckSubjectivelyDown(ri);
// ...
if (ri->flags & SRI_MASTER) {
// 4. 对于主节点实例,检查它是否“客观”宕机(根据投票)
sentinelCheckObjectivelyDown(ri);
// 5. 检查是否需要故障转移
if (sentinelStartFailoverIfNeeded(ri))
// 若需要则强制向其它哨兵获取该主节点的状态,收集投票
// 这里发送SENTINEL is-master-down-by-addr命令
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 6. 必要时执行故障转移
sentinelFailoverStateMachine(ri);
// 7. 若节点没客观下线,也要向其它哨兵获取该节点实例的状态
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
下面对这些步骤一一说明。
a) 建立连接
这里调用sentinelReconnectInstance
函数,哨兵和监控的节点建立连接。
当启动后,连接并没有建立,这里一开始调用该函数,可以让本哨兵和需要监控的节点建立连接。
此外,在下文的b)中,可能会有更多的节点实例加入进来(通过发送命令和订阅消息感知),与它们建立连接也是在这里进行(在下一轮事件循环中)
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
// 1. 若已连接,则直接返回
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return;
instanceLink *link = ri->link;
mstime_t now = mstime();
// 2. 如果最近一次连接的时间距离现在太短(默认1s),返回,等待下一次重连
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now; // 重新设置连接时间
// 3. 对于任何待监控的实例,需要建立命令连接(command connection)
if (link->cc == NULL) {
// 绑定地址并异步创建连接(下面操作几乎是异步的)
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->cc->err) {
// 失败则关闭连接
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
// 成功
// 重置连接属性
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
// 将服务器的事件循环关联到连接的上下文中
redisAeAttach(server.el,link->cc);
// 设置连接建立的回调函数
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
// 设置连接断开的回调函数
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
// 对于cc,这时候立即:
// 1.发送AUTH认证(响应直接丢弃)
// 2.发送客户端名字(响应直接丢弃)
// 3.PING这个实例(响应回调是sentinelPingReplyCallback,用于更新时间等)
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");
sentinelSendPing(ri);
}
}
// 4. 对于主/从节点,还必须创建PUB/SUB连接(pub connection)
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
// 绑定地址并异步创建连接
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->pc->err) {
// 错误则关闭连接
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
// 成功
// 重置连接属性
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
// 将服务器的事件循环关联到连接的上下文中
redisAeAttach(server.el,link->pc);
// 设置连接建立的回调函数
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
// 设置连接断开的回调函数
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
// 对于pc,这时候立即:
// 1.发送AUTH认证(响应直接丢弃)
// 2.发送客户端名字(响应直接丢弃)
// 3.发送SUBSCRIBE命令,让哨兵订阅该节点实例上名为__sentinel__:hello的频道
// 哨兵订阅该频道的消息,消息会被回调sentinelReceiveHelloMessages处理
// 该回调会从响应中获取该实例下属的从节点和其它哨兵消息等
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
// 订阅失败则关闭连接,返回,等待重试
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
// 5. 若连接均已成功建立,则清除disconnected标记
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
这里的代码使用了hiredis
的异步库。由于连接是异步的,操作会立刻返回,而响应由对应的回调处理。
从上面代码中,哨兵会向节点(主/从)建立2条连接:
- 命令连接:哨兵向被监控节点发送命令,然后处理其响应
- 订阅连接:哨兵向被监控节点订阅频道
__sentinel__:hello
,并定期向频道里发布消息(主要是哨兵自己和对应的被监控节点),其它哨兵也会收到这条消息,并更新监控状态信息
b) 定期发送命令
这部分是在函数sentinelSendPeriodicCommands
中,它向对应的被监控节点发送3条指令,分别是INFO
、PING
和PUBLISH
:
INFO
:收集节点和集群信息,用于刷新集群的信息,感知集群其它成员,处理成员角色变化等PING
:判断网络连通性PUBLISH
:往频道发布哨兵自己和对应节点的信息,其它哨兵订阅后可以收到消息,感知节点其它的哨兵
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
// 实例未连接,返回
if (ri->link->disconnected) return;
// 实例未回复命令超过一定个数,返回
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
// 如果实例目前是从节点,且他的主节点处于故障状态的状态或者从节点和主节点断开复制了
// 就提高INFO监控的频率为1s,其它情况下为10s
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
// 如果最后一次接受到PONG的时候间隔比down-after-milliseconds更长
// 并且如果down-after-milliseconds设置大于1秒,则直接按照1秒的频率PING
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
// 若时间到了,向主节点和从节点发送INFO
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO")); // 异步发INFO
if (retval == C_OK) ri->link->pending_commands++; // pending_commands自增
}
// 若时间到了,向所有类型的节点发送PING
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri); // 异步发PING,pending_commands自增且更新last_ping_time和act_ping_time
}
// 若时间到了,向所有类型的节点发送PUBLISH指令
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
// 往__sentinel__:hello频道发送HELLO信息,pending_commands自增
sentinelSendHello(ri);
}
}
而哨兵对这3类命令的响应处理也有一定的不同。
首先是INFO
命令的响应回调sentinelInfoReplyCallback
,回调主要用于更新实例信息,包括:
- 获取并更新对应实例的基本信息(如
run_id
,role
等),并自动发现集群网络其它活跃节点信息(INFO
接收方为主节点,会额外返回从节点信息;若为从节点,则额外返回主节点信息)。 - 处理角色变化:
- 实例为主节点,
INFO
返回从节点:什么都不做 - 实例为从节点,
INFO
返回主节点:- 实例是被晋升的从节点,且它的主节点正等待从节点晋升:仅更新相关数据
- 实例是被晋升的从节点,但它的主节点在故障转移时间内重新上线:将晋升的从节点降级,并向其发送
SLAVEOF
命令
- 实例为从节点,
INFO
也返回从节点:- 若主节点地址变化:向该实例发送
SLAVEOF
命令进行重绑定 - 若该实例以接收
slaveof
命令或正在执行slaveof
的同步:将其标记为下一步的状态(SRI_RECONF_SENT -> SRI_RECONF_INPROG
,SRI_RECONF_INPROG -> SRI_RECONF_DONE
)
- 若主节点地址变化:向该实例发送
- 实例为主节点,
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
// ...
if (!reply || !link) return;
link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str); // 主要是这一步,更新实例信息,代码有点长,这里略了
}
然后是处理PING
命令的响应回调sentinelPingReplyCallback
,它会:
- 更新最近一次收到
PING
响应的时间 - 根据响应处理
PONG
、LOADING
、MASTERDOWN
:实例可达,更新最近的可用时间BUSY
:实例因执行脚本而正忙,发送SCRIPT KILL
终止其脚本执行
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
// ...
link->pending_commands--;
r = reply;
if (r->type == REDIS_REPLY_STATUS ||
r->type == REDIS_REPLY_ERROR) {
// PONG, LOADING, MASTERDOWN表示实例和哨兵网络可达
if (strncmp(r->str,"PONG",4) == 0 ||
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0) {
link->last_avail_time = mstime(); // 更新最新可用时间
link->act_ping_time = 0; // 标记为0表示可用
} else {
// BUSY,说明实例正忙,这是因为它在执行脚本而表现为下线状态
// 所以发送SCRIPT KILL终止脚本运行
if (strncmp(r->str,"BUSY",4) == 0 &&
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT)) {
if (redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri,
"%s KILL",
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK) {
ri->link->pending_commands++;
}
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
link->last_pong_time = mstime(); // 更新last_pong_time
}
最后是处理PUBLISH
命令的响应回调,由于它是一个PUB/SUB,因此相关消息是通过订阅得到的。从a)中可知,这个回调是sentinelReceiveHelloMessages
,除了更新一些时间信息,这个回调的核心是函数sentinelProcessHelloMessage
,它主要处理频道里的HELLO
消息,如:
- 感知对应主节点的其它哨兵节点,包括哨兵添加、哨兵信息更改等
- 更新主节点的信息
HELLO
消息主要包含哨兵以及其绑定的主节点信息
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;
if (numtokens == 8) {
// 获取根据master名字获取主节点的实例
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; // 若找不到对应主节点实例则直接返回,这的感知放在INFO里做
// 获取对应哨兵的端口
port = atoi(token[1]);
// 获取对应master的端口
master_port = atoi(token[6]);
// 通过地址和runid,查找这个哨兵的实例是否已存在
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
// 获取对应哨兵的epoch
current_epoch = strtoull(token[3],NULL,10);
// 获取对应master配置的epoch
master_config_epoch = strtoull(token[7],NULL,10);
if (!si) {
// 若该哨兵目前不存在
// 首先移除所有相同runid的哨兵节点
// 因为可能一种情况是改变哨兵地址,这需要重绑定
removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
// 删除成功,则触发事件通知,说明需要将已有哨兵的地址更改
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
// 没有删除,说明没有哨兵地址需要更改,检查是否有另一个哨兵具有与消息中相同的地址
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);
if (other) {
// 若有,将其端口置0表示无效
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0;
// 并将这个改动应用到所有已有的主节点实例上,并断开该哨兵的连接
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
// 创建新的哨兵实例,并添加到对应主节点的sentinel字典中
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
// 创建成功,设置runid
si->runid = sdsnew(token[2]);
// 尝试与其它哨兵节点共享连接对象
sentinelTryConnectionSharing(si);
// 若之前删除过哨兵,则需要将之前处理的哨兵地址更新
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
// 刷新配置
sentinelFlushConfig();
}
}
// 然后更新哨兵epoch(当传来的更大时)
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
// 更新对应master的配置epoch更大,即配置更加新
// 则需要更新对应主节点信息
if (si && master->config_epoch < master_config_epoch) {
// 更新master的config epoch
master->config_epoch = master_config_epoch;
// 若地址改变了,还得更新地址并触发相关事件
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5])) {
sentinelAddr *old_addr;
sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);
old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}
// 更新时间last_hello_time
if (si) si->last_hello_time = mstime();
}
cleanup:
sdsfreesplitres(token,numtokens);
}
c) 检测主观下线
哨兵会对所有节点实例进行主观下线检查。
所谓主观下线,就是哨兵监控的节点在down-after-milliseconds
内,没有检测到正常的响应,就会被判定位主观下线。它不需要其它哨兵进行投票,因此并不是真正的下线(即客观下线)。
不过即使是投票确认客观下线,也并不是真正客观的。分布式没有什么东西是可靠的。
检测主观下线的函数是sentinelCheckSubjectivelyDown
,它:
- 检查实例最后响应到现在的间隔,必要时标记主观下线
- 检查命令和发布订阅连接是否处于低活跃度,必要时关闭连接
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
// 1. 获取当前时间与实例最后响应的间隔
if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;
// 2. 检查命令连接(PING/INFO),当处于低活跃度时,关闭连接(之后重新建立)
// 条件是: 连接建立有一段时间,但有未响应的PING,且延迟超过了down_after_period/2
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 &&
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) {
instanceLinkCloseConnection(ri->link,ri->link->cc);
}
// 3. 检查发布订阅连接(PUBLISH),当处于低活跃度时,关闭连接(之后重新建立)
// 条件是: 连接建立有一段时间,且距离上次活动超过SENTINEL_PUBLISH_PERIOD*3的时间
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >r
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) {
instanceLinkCloseConnection(ri->link,ri->link->pc);
}
// 4. 判断主观下线,条件为:
// a) 响应间隔超过down_after_period
// b) 实例是主节点,但多次INFO报告认为是从节点,且持续了一段时间
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2))) {
// 标记主观下线SRI_S_DOWN
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
// 若之前标记了主观下线,这里检查条件不成立,则需要清除SRI_S_DOWN标记
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
d) 检测客观下线
客观下线和主观下线不一样:在主观下线的基础上,还需要征得半数以上哨兵的投票,才能判断为客观下线。
客观下线的判断仅适用于主节点。
首先是判断是否客观下线,即统计票数,函数是sentinelCheckObjectivelyDown
:
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
// 在主观下线的基础上,判断客观下线
if (master->flags & SRI_S_DOWN) {
quorum = 1; // 初始票数1
// 遍历该主节点的其它哨兵实例,统计票数
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// ri是其它相关哨兵,若标记为SRI_MASTER_DOWN,票数+1
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 超过半数则为客观下线
if (quorum >= master->quorum) odown = 1;
}
if (odown) {
// 若达到半数,标记客观下线
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
// 若没达到半数,清除客观下线标记
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
注意ri->flags & SRI_MASTER_DOWN
这个投票标记,这是怎么来的?
不是INFO
也不是PUBLISH
,而是通过函数sentinelAskMasterStateToOtherSentinels
,它会向其它哨兵发送SENTINEL is-master-down-by-addr
指令,并通过回调sentinelReceiveIsMasterDownReply
更新状态:
这个函数会在事件循环中被调用,从而跟踪其它哨兵对该主节点的上下线判断
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
// 遍历该主节点其它所有的哨兵
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
// 该哨兵实例最近一个回复SENTINEL is-master-down-by-addr所过去的时间
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
// 若太旧,则清除标记
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
// 该主节点没有主观下线,跳过
if ((master->flags & SRI_S_DOWN) == 0) continue;
// 对应哨兵没有连接,跳过
if (ri->link->disconnected) continue;
// 非强制且距离上次发送该命令时间过近,也跳过
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
// 这里异步发送SENTINEL is-master-down-by-addr治理
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, // 回调
ri, "%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
// ...
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER) {
ri->last_master_down_reply_time = mstime();
// 根据响应,判断该哨兵是否认为该主节点是否(主观)下线
// 哨兵收到SENTINEL is-master-down-by-addr,会根据该主节点是否主观下线返回整数
// 1代表主观下线,0代表上线
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
// ... 更新leader, epoch等其它信息
}
}
e) 检查并准备故障转移
当主节点被标记为客观下线,那么就需要准备故障转移了。
但是故障转移需要一定的条件:
- 主节点必须客观下线
- 节点不能正在执行故障转移
- 距离上次故障转移时间太短
若满足条件,就会执行故障转移的初始化,该实例的状态会被标记为SRI_FAILOVER_IN_PROGRESS
,且故障转移状态会被标记为SENTINEL_FAILOVER_STATE_WAIT_START
,故障转移epoch
也会递增。
而只有标记为上述状态的实例才能执行接下来的故障转移。
上述代码在函数sentinelStartFailoverIfNeeded
中实现:
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
// 1. 检查
// a. 节点必须客观下线
if (!(master->flags & SRI_O_DOWN)) return 0;
// b. 节点不能正在故障转移
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
// c. 上一次故障转移不能太近
if (mstime() - master->failover_start_time <
master->failover_timeout*2) {
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
// ...
}
return 0;
}
// 2. 检查通过,设置状态,准备故障转移
sentinelStartFailover(master);
return 1;
}
// 准备故障祝阿姨
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);
// 主要是下面3行代码
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
f) 执行故障转移
一切检查通过,且就绪完毕后,就可以执行故障转移了。
这里调用函数sentinelFailoverStateMachine
,从代码里看出,故障转移功能的实现基于状态机,这里可以大致看下该函数所呈现的状态,第4节将详细说明故障转移状态机的实现:
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
// 1. 初始状态,故障转移开始
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
// 2. 正在选择要晋升的从节点
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
// 3. 准备发送SLAVE no one将该节点晋升
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
// 4. 等待该节点的晋升
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
// 5. 准备给所有从节点配置新主节点(广播SLAVEOF命令)
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
3.3. 执行和处理脚本
监控检查做完后,就是执行和处理脚本的工作了,主要是下面3行代码。脚本这块不是重点,这里就省略不讲了。
void sentinelTimer(void) {
// ... TITL, monitor & failover
// 3. 运行队列中的脚本
sentinelRunPendingScripts();
// 4. 将已执行完的脚本移出队列,重新执行出错的脚本
sentinelCollectTerminatedScripts();
// 5. 取消执行超时的脚本,这些脚本会在下一个事件循环中于第4步再次执行
sentinelKillTimedoutScripts();
// ...
}
4. 故障转移
这里回到3.2.f的故障转移状态机实现(函数sentinelFailoverStateMachine
),下面根据状态机的执行顺序来说明故障转移的流程。
4.1. 选举Leader
执行故障转移需要由一个Leader哨兵控制,因此需要在哨兵集群中选主。
选主使用的是Raft算法:
- 论文在这:In Search of an Understandable Consensus Algorithm
- Raft算法的整理在这:
只有Leader哨兵才能执行故障转移,非Leader会中断转移流程。
这部分在函数sentinelFailoverWaitStart
执行,若自己是Leader,那么故障转移状态会变成SENTINEL_FAILOVER_STATE_SELECT_SLAVE
,进入下一步(下一轮事件循环进入,下同)。
4.2. 选出晋升的从节点
进入SENTINEL_FAILOVER_STATE_SELECT_SLAVE
状态后,就需要选出一个晋升的从节点。
这部分由函数sentinelFailoverSelectSlave
处理,若选出从节点,更新相关信息,并标记状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
,从而进入下一步:
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
// 选出待晋升的从节点
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
if (slave == NULL) {
// 没有从节点,则终止故障转移
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
// 标记待晋升的从节点
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
// 更改状态为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
// 从而进入发送SLAVEOF noone的状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
而具体选出从节点在函数sentinelSelectSlave
执行,这边不分析源码,只是稍微总结一下:
- 从节点的条件:
- 不能主观下线、客观下线或者未建立连接
- 最近一次响应
PING
距现在不能太长(5倍SENTINEL_PING_PERIOD
) - 最近一次响应
INFO
距现在不能太长(3倍/5倍SENTINEL_PING_PERIOD
) - 优先级不能为0
- 主从节点之间断开的时间不能太长(10倍
down-after-period
)
-
选取从节点优先级(优先从高往低排):
可从函数
compareSlavesForPromotion
知- 优先选择高优先级(
priority
) - 优先选择更大的复制偏移(
slave_repl_offset
) - 优先选择更小的
runid
- 优先选择处理更多命令的从节点
- 优先选择高优先级(
4.3. 晋升从节点
进入SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
状态后,待晋升的从节点就会选出,因此需要将其变成主节点。
正如这个状态名所述,哨兵会向待晋升的从节点发送SLAVEOF no one
,解绑其对旧主节的绑定,然后状态变成SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
等待其晋升结束。
这部分在函数sentinelFailoverSendSlaveOfNoOne
执行:
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
// ...
// 发送SLAVEOF no one命令
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
// 变更状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION,等待晋升完毕
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
注意这里发送SLAVEOF no one
的响应回调是“直接丢弃响应”,Redis哨兵对该响应并不关心。
4.4. 等待从节点晋升
进入状态SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
,则需要等待其晋升完毕。
说是等待晋升,实际上只是检查故障转移是否超时而已,见函数sentinelFailoverWaitPromotion
:
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
// 检查故障转移超时而已
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
而哨兵如何知道从节点是否晋升完毕,是在INFO
指令的响应回调中知道的,具体是在函数sentinelRefreshInstanceInfo
:
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
// ...
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
// 这里显示报告的role是主节点,而哨兵维护的类型是从节点
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) {
// 若该从节点是被晋升的,且正在执行故障转移,故障转移的状态为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
// 则说明该从节点已经执行完SLAVEOF no one命令,成为主节点了
ri->master->config_epoch = ri->master->failover_epoch;
// 那么就置状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES
// 准备让其它从节点重新绑定新的主节点(即晋升的从节点)
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
// ... Update metadata of this instance
// 这里把这个从节点信息发布到PUB/SUB,告知其它哨兵该节点已经被晋升
sentinelForceHelloUpdateForMaster(ri->master);
}
} else {
// ...
}
// ...
}
4.5. 从节点同步新的主节点
在INFO
响应回调将故障转移状态置为SENTINEL_FAILOVER_STATE_RECONF_SLAVES
后,就可以将其它从节点绑定到新的主节点上,即发送SLAVEOF
命令,sentinelFailoverReconfNextSlave
就是这么做的:
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
di = dictGetIterator(master->slaves);
// 统计已发送slaveof命令或者正在同步的从节点数目
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);
di = dictGetIterator(master->slaves);
// 若正在同步的节点数量小于配置的parrallel-sync
// 则遍历slaves字典,尝试去给其它没同步的从节点发送SLAVEOF
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
// 跳过晋升的从节点
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
// 若从节点已经发送SLAVEOF命令,但是
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}
// 跳过断连的和正在同步的从节点
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;
// 发送SLAVEOF命令给从节点,这里配置的是新主节点
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT; // 设置标识SRI_RECONF_SENT
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);
// 检查这些从节点有没有配置完毕,并处理超时
sentinelFailoverDetectEnd(master);
}
这里最后会检查故障转移结束的情况,即函数sentinelFailoverDetectEnd
。这里,结束的情况有2种:
- 超时而被动结束
- 从节点已完成同步
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
// 得到自上次更新故障转移状态的时间差
mstime_t elapsed = mstime() - master->failover_state_change_time;
// 若晋升主节点不可达了,则得返回,不能认为故障转移结束,直接返回
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;
// 统计所有未完成的、正在同步的从节点数量
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);
// 若超时,则
// 1. 强制结束本次故障转移
// 2. 并在下面重新给没同步完的从节点发送SLAVEOF命令,重新重配置
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}
// 这里所有的从节点都配置完成
// 则更改故障转移状态未SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
// 这个状态一旦被设置,哨兵就可以把旧主节点的信息移除,设置新的主节点,并更新其它从节点的主从信息绑定
if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}
// 这里根据上面的超时,若超时则重新给没同步完成的从节点发送SLAVEOF命令
if (timeout) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
而判断从节点是否同步完成,也是从INFO
命令回调里得到的,由于消息异步传输,因此需要多次轮询(即多次事件循环)触发回调才能得到“同步完成”状态:
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
// ...
// 这里应对的是已经发送SLAVEOF的需要重配置的从节点
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))) {
// 若当前从节点实例所属主节点的地址已经和新主节点地址相同
// 则说明它接收到了SLAVEOF命令,正在同步
// 因此设置为SRI_RECONF_INPROG
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port) {
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}
// 根据INFO消息,若当前从节点实例和新主节点已经建立了连接,说明同步完成
// 则设置状态为SRI_RECONF_DONE
if ((ri->flags & SRI_RECONF_INPROG) &&
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) {
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}
4.6. 切换哨兵内维护的主从节点配置
我们回到3.2开头的那段代码:
void sentinelHandleDictOfRedisInstances(dict *instances) {
// ...
while((de = dictNext(di)) != NULL) {
// ...
// 这里对监控对象实例执行监控和检查
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
// ... Recursive invoke this function on slaves and sentinels ...
// 若故障转移状态是SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
// 设置switch_to_promote为该实例(即旧主节点,它记录了晋升的新主节点实例)
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
// 若设置switch_to_promoted标识
// 则用新晋升的主节点替代原来的主节点,旧主节点成为新主节点的从节点
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
这里,若最后故障转移的状态为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG
,那么故障转移已经结束,需要更新哨兵维护的对应主从实例的信息,即切换主从实例。这里会调用函数sentinelFailoverSwitchToPromotedSlave
进行新旧主节点实例的切换。
4.7. 故障转移的总结
虽说故障转移的步骤是按照上述的流程来的,但是实际上这些流程都是非阻塞的,它们的完成需要经过非常多次的事件循环和异步回调,且有些顺序并不完全是按照代码所写的顺序执行(因为不在同一轮事件循环)。
不过根据状态机的设计思维,这块功能的逻辑还是比较清晰的,不过还是需要注意一些细节:
-
故障转移的一些命令发送,如
SLAVEOF
,都是以事务执行的,即失败会回滚,这极大简化了代码复杂度; -
网络通信是异步的,通过触发回调的方式处理响应,因此流程非阻塞,但完成任务需要多次事件循环;
-
故障转移的状态基本由
INFO
命令和SENTINEL
命令的响应获得,务必注意它的回调函数,会更新状态;因此当故障转移流程执行时,获取节点状态不必阻塞等待请求响应,只需读维护的字段值即可,因为
- 这些字段会在一轮一轮的事件循环中最终会得到更新
- 若没有更新,等到下一轮事件循环再处理,这不会影响正确性
-
由于使用单线程模型,不必考虑网络延迟带来的锁同步问题,因此状态机模型是一个很好的设计模式,只要一遍一遍事件循环,根据某个状态执行对应代码即可,避免了代码逻辑的混乱(若没有想要的状态,可以等到下一轮事件循环再处理,不影响正确性)。