源码阅读-Redis独立功能: 监视器

Posted by keys961 on December 11, 2019

1. Overview

客户端可以作为某个服务节点监视器,实时输出该节点处理的命令。

客户端可发送MONITOR命令,成为某个服务节点的监视器。

2. 成为监视器

客户端发送MONITOR指令即可成为监视器,这部分的实现就是monitorCommand函数,它让该客户端进入监视器上下文,并将客户端添加到服务器的监视器列表中:

void monitorCommand(client *c) {
    // 忽略从节点和已经成为监视器的客户端
    if (c->flags & CLIENT_SLAVE) return;
    // 客户端进入监视器上下文
    c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
    // 将客户端添加到监视器列表中
    listAddNodeTail(server.monitors,c);
    addReply(c,shared.ok);
}

3. 向监视器发送命令

执行命令,命令会被发送给所有的监视器:

void call(client *c, int flags) {
 	// ...
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
        // 执行命令前,先把命令发送给所有监视器
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
    // ...
    // 执行命令等操作...
}

而发送命令给监视器的函数就是replicationFeedMonitors,这个函数很简单,就是遍历所有的监视器,把命令消息发送给它们。消息主要包含:

  • 时间戳
  • 命令本身
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
    listNode *ln;
    listIter li;
    int j;
    sds cmdrepr = sdsnew("+");
    robj *cmdobj;
    struct timeval tv;
	// 1. 获取并组装时间戳
    gettimeofday(&tv,NULL);
    cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
    if (c->flags & CLIENT_LUA) {
        cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
    } else if (c->flags & CLIENT_UNIX_SOCKET) {
        cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
    } else {
        cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
    }
    // 2. 获取并组装执行的命令
    for (j = 0; j < argc; j++) {
        if (argv[j]->encoding == OBJ_ENCODING_INT) {
            cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
        } else {
            cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
                        sdslen(argv[j]->ptr));
        }
        if (j != argc-1)
            cmdrepr = sdscatlen(cmdrepr," ",1);
    }
    cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
    cmdobj = createObject(OBJ_STRING,cmdrepr);
	// 3. 遍历监视器列表,将信息拷贝到监视器的输出缓冲区,待事件循环中将信息刷到网络
    listRewind(monitors,&li);
    while((ln = listNext(&li))) {
        client *monitor = ln->value;
        addReply(monitor,cmdobj);
    }
    decrRefCount(cmdobj);
}

4. 总结

监视器功能,和慢查询功能一样,也是很简单的,核心也就是维护一个监视器的列表。