1. Overview
客户端可以作为某个服务节点监视器,实时输出该节点处理的命令。
客户端可发送MONITOR
命令,成为某个服务节点的监视器。
2. 成为监视器
客户端发送MONITOR
指令即可成为监视器,这部分的实现就是monitorCommand
函数,它让该客户端进入监视器上下文,并将客户端添加到服务器的监视器列表中:
void monitorCommand(client *c) {
// 忽略从节点和已经成为监视器的客户端
if (c->flags & CLIENT_SLAVE) return;
// 客户端进入监视器上下文
c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
// 将客户端添加到监视器列表中
listAddNodeTail(server.monitors,c);
addReply(c,shared.ok);
}
3. 向监视器发送命令
执行命令前,命令会被发送给所有的监视器:
void call(client *c, int flags) {
// ...
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
// 执行命令前,先把命令发送给所有监视器
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
// ...
// 执行命令等操作...
}
而发送命令给监视器的函数就是replicationFeedMonitors
,这个函数很简单,就是遍历所有的监视器,把命令消息发送给它们。消息主要包含:
- 时间戳
- 命令本身
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
// 1. 获取并组装时间戳
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
}
// 2. 获取并组装执行的命令
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == OBJ_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(OBJ_STRING,cmdrepr);
// 3. 遍历监视器列表,将信息拷贝到监视器的输出缓冲区,待事件循环中将信息刷到网络
listRewind(monitors,&li);
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
4. 总结
监视器功能,和慢查询功能一样,也是很简单的,核心也就是维护一个监视器的列表。