源码阅读-Redis单机服务器: 客户端状态

Posted by keys961 on October 24, 2019

1. Overview

Redis是典型的一对多的服务器顺序,每个客户端可与服务器进行交互。

交互事件的触发和处理源于基于I/O多路复用的文件事件及其处理器,这在上一篇已经说明了。

而交互功能的实现,是有状态的(比如事务),因此Redis需要维护客户端的信息。

server.h中,Redis定义了结构client存储一个客户端的状态。而多个客户端,以一个链表保存活跃的客户端:

typedef struct redisServer {
    // ...
    list *clients;   // 活跃客户端列表
    // ...
}

2. 客户端属性

客户端client结构有很多字段下面分别进行描述。

2.1. 文件描述符

typedef struct client {
    // ...
    int fd; // File descriptor
    // ...
}

fd可以取-1或者自然数:

  • -1:代表伪客户端,用于处理AOF恢复/Lua脚本。由于它不来源于网络,所以不需要套接字描述符
  • 自然数:代表普通客户端,其fd代表和服务端直接的连接描述符

2.2. 客户端名字

可通过CLIENT SETNAME命令给客户端设置一个名字,而客户端的名字记录在name字段中:

typedef struct client {
    // ...
    robj *name; // 客户端名字,是一个OBJ_STRING
    // ...
}

2.3. 标志

客户端的flag字段记录了客户端的角色和状态,状态可以是复合的,通过|进行添加:

typedef struct client {
    // ...
    int flags; // 客户端标志,记录角色和状态
    // ...
}

下面是几个标志的宏定义:

/* Client flags */
#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
#define CLIENT_MASTER (1<<1) /* This client is a master server */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in server.unblocked_clients */
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1<<9) /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define CLIENT_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
#define CLIENT_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
#define CLIENT_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define CLIENT_FORCE_REPL (1<<15) /* Force replication of current cmd. */
#define CLIENT_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_AOF_PROP (1<<19) /* Don't propagate to AOF. */
#define CLIENT_PREVENT_REPL_PROP (1<<20) /* Don't propagate to slaves. */
#define CLIENT_PREVENT_PROP (CLIENT_PREVENT_AOF_PROP|CLIENT_PREVENT_REPL_PROP)
#define CLIENT_PENDING_WRITE (1<<21) /* Client has output to send but a write handler is yet not installed. */
#define CLIENT_REPLY_OFF (1<<22) /* Don't send replies to client. */
#define CLIENT_REPLY_SKIP_NEXT (1<<23) /* Set CLIENT_REPLY_SKIP for next cmd */
#define CLIENT_REPLY_SKIP (1<<24) /* Don't send just this reply. */
#define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */

PUBSUBSCRIPT LOAD命令的特殊性

它们都没有对数据库进行写入,但是也要写入到AOF中(需要有CLIENT_FORCE_AOF标志),因为:

  • PUBSUB发布消息时,带有副作用;
  • SCRIPT LOAD没直接修改数据库数据,但改变了服务器状态,也带有副作用。且它需要让从服务器执行脚本加载,还需要CLIENT_FORCE_REPL标志。

2.4. 输入缓冲区

它用于缓存客户端发送的命令,命令缓存后,通过事件循环取出并执行:

typedef struct client {
    // ...
    sds querybuf; // 缓存客户端的请求
    // ...
}

命令可以是多个,格式和AOF类似。但是它的大小不能超过1GB,否则客户端会被强制关闭。

2.5. 命令及其参数

将命令缓存后,通过事件循环,服务端取出缓存数据,解析成命令和命令参数,保存在argv字段中:

typedef struct client {
    // ...
    int argc; // argv的个数
    robj **argv; // 命令及其参数,第一个元素是命令,后面是参数
    // ...
}

2.6. 命令实现函数

服务端解析完argvargc后,就要查找对应命令的实现函数,然后将其赋值到cmd字段(函数指针)中:

typedef struct client {
    // ...
     struct redisCommand *cmd, *lastcmd; // 当前和上一次执行的命令实现函数
    // ...
}

命令查找是从一个命令字典中查找的:

typedef struct redisServer {
    // ...
    dict *commands; // 存储命令表,用于查找,键是sds,值是redisCommand
    // ...
}

命令字典是从命令表中加载的,它定义在server.credisCommandTable数组中:

struct redisCommand {
    char *name; // 命令名
    redisCommandProc *proc; // 命令执行函数
    int arity; // 参数个数
    char *sflags; // 命令的字符串标志
    int flags; // 从sflags计算的整数的标志
    redisGetKeysProc *getkeys_proc; // 可选,用于获得命令的key参数
    int firstkey; // 第一个key的参数下标
    int lastkey; // 最后一个key的参数下标
    int keystep; // 每个key之间的间隔
    long long microseconds, calls; // 记录命令执行总时间和函数调用次数
};

struct redisCommand redisCommandTable[] = {
    ...
}

2.7. 响应缓冲区

之前事件循环说过,客户端的响应是先存储到响应缓冲区中,然后待写事件就绪后将缓冲区的内容写出去。

缓冲区有2个。

一个是固定大小的缓冲区,用于存储长度较小的响应:

typedef struct client {
    // ...
    // fixed resp buffer
    int bufpos; // 已使用缓存的尾部位置
    char buf[PROTO_REPLY_CHUNK_BYTES]; // 响应缓冲区,默认16KB
}

另一个是可变大小的缓冲区,当固定大小缓冲区空间不足时使用,它是一个clientReplyBlock链表:

typedef struct client {
    // ...
    list *reply; // 
    // ...
}

typedef struct clientReplyBlock {
    size_t size, used;
    char buf[];
} clientReplyBlock;

2.8. 时间

时间主要有3个字段,见下面注释的解释:

typedef struct client {
    // ...
    time_t ctime;  // 创建时间
    time_t lastinteraction; // 上次交互的时间
    time_t obuf_soft_limit_reached_time; // 响应缓冲区第一次到达软性限制的时间
    // ...
}

Redis服务端对响应缓冲区的大小有一定的限制:

  • 硬性限制:若缓冲区大小超过该限制,客户端立刻关闭
  • 软性限制:若缓冲区大小超过该限制,则会记录时间(obuf_soft_limit_reached_time),服务端持续监控
    • 若一段时间响应缓冲区依旧超过限制,客户端就会强制关闭
    • 否则客户端保留,obuf_soft_limit_reached_time置0

响应缓冲区的大小限制是可以配置的,遵循下面的格式:

1
2
3
4
5
6
client-output-buffer-limit <class> <hard-limit> <soft-limit> <soft-seconds>

# Examples
client-output-buffer-limit normal 0 0 0 # 普通客户端没有任何硬性和软性的限制
client-output-buffer-limit slave 128mb 64mb 60 # 从服务器作为客户端时,硬性128MB,软性64MB,限制时长60s
client-output-buffer-limit pubsub 16mb 8mb 60 # 执行发布订阅的客户端,硬性16MB,软性8MB,限制时长60s

2.9. 其他字段

其他字段主要包含有下面几个方面:

  • 客户端当前访问的数据库
  • 客户端身份验证信息
  • 主从复制的信息
  • 事务执行的状态
  • BRPOP/BLPOP等阻塞命令执行的状态
  • 发布-订阅的状态

3. 客户端的创建与关闭

3.1. 普通客户端的创建

之前事件循环里,服务端在启动服务端时会注册acceptTcpHandler读事件回调,客户端就在这个回调里创建,最后进入下面这个函数:

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 1. 创建客户端
    if ((c = createClient(fd)) == NULL) {
        // ...
        close(fd);
        return;
    }
    // 2. 若已连接客户端数量超过上限,则拒绝连接,释放之前创建的客户端
    if (listLength(server.clients) > server.maxclients) {
        char *err = "...";
        if (write(c->fd,err,strlen(err)) == -1) {
            // Do nothing when err
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    // 3. 保护模式(默认)下,没密码,没有指定绑定地址,则拒绝非回环地址的连接
    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
            char *err = "..."
            if (write(c->fd,err,strlen(err)) == -1) {
                // Do nothing
            }
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    }
    server.stat_numconnections++;
    c->flags |= flags;
}

创建客户端本身在下面这个函数,它除了创建client本身之外,还会注册readQueryFromClient读回调到多路复用器上,以在读事件就绪时读到客户端的请求。上面的代码第1步会调用这个函数:

client *createClient(int fd) {
    // 创建客户端
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        // 当fd != -1时,说明客户端来自网络
        // 设置非阻塞,关闭Nagle算法,必要时设置keep alive参数
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // 最重要的是将readQueryFromClient读回调注册到多路复用器上
        // fd读事件就绪时,该回调会读取客户端发来的请求
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    selectDb(c,0); // 默认选择db[0]
    // ... 初始化其他参数...
    if (fd != -1) linkClient(c); // 将客户端连接到链表上
    initClientMultiState(c); // 初始化事务状态为空
    return c;
}

3.2. 普通客户端的关闭

客户端的关闭可能有很多原因:

  • 客户端进程退出,连接断开
  • 客户端发送不符合协议的命令请求
  • 客户端被CLIENT KILL
  • timeout被配置,空转超过这个时间限制
  • 客户端发送请求的大小超过了输入缓冲区的限制(默认1GB)
  • 客户端的响应缓冲区占用大小超过限制

关闭客户端的函数实现如下,除了处理主从复制的代码,以及非立即(异步)释放的代码外,其他都是释放内存的操作:

void freeClient(client *c) {
    listNode *ln;
    // 保护模式下的客户端不会立即释放,先放到异步队列中等待下面几个事件循环处理
    if (c->flags & CLIENT_PROTECTED) {
        // 置flag |= CLIENT_CLOSE_ASAP
        // 并将client节点加入clients_to_close链表
        freeClientAsync(c);
        return;
    }
    // 若客户端是主服务器
    // 那么必要时需要先保存这个客户端
    // 因为可能是意外断连,缓存这个客户端可以实现重新同步
    if (server.master && c->flags & CLIENT_MASTER) {
        serverLog(LL_WARNING,"Connection with master lost.");
        if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
                          CLIENT_CLOSE_ASAP|
                          CLIENT_BLOCKED)))
        {
            replicationCacheMaster(c);
            return;
        }
    }
    // 当客户端是从服务器,先打日志
    if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
        serverLog(LL_WARNING,"Connection with replica %s lost.",
            replicationGetSlaveName(c));
    }
    // 释放输入缓冲
    sdsfree(c->querybuf);
    sdsfree(c->pending_querybuf);
    c->querybuf = NULL;
    // 释放阻塞操作所需要的数据和操作
    if (c->flags & CLIENT_BLOCKED) unblockClient(c);
    dictRelease(c->bpop.keys);
	// UNWATCH所有的Key
    unwatchAllKeys(c);
    listRelease(c->watched_keys);
	// 取消订阅PUB/SUB的通道
    pubsubUnsubscribeAllChannels(c,0);
    pubsubUnsubscribeAllPatterns(c,0);
    dictRelease(c->pubsub_channels);
    listRelease(c->pubsub_patterns);
    // 释放响应缓冲
    listRelease(c->reply);
    // 释放客户端当前执行的命令和参数
    freeClientArgv(c);
    // 关闭client的连接,并将client移出clients链表
    unlinkClient(c);
	// 处理主从复制的场景
    if (c->flags & CLIENT_SLAVE) {
        // a) 若客户端是从服务器
        if (c->replstate == SLAVE_STATE_SEND_BULK) {
            if (c->repldbfd != -1) close(c->repldbfd);
            if (c->replpreamble) sdsfree(c->replpreamble);
        }
        // 若客户端是monitor,则从monitors链表中移除
        // 否则从slaves链表中移除
        list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
        ln = listSearchKey(l,c);
        serverAssert(ln != NULL);
        listDelNode(l,ln);
        if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
            server.repl_no_slaves_since = server.unixtime;
        refreshGoodSlavesCount();
    }
    // b) 客户端是主服务器,处理主的连接丢失问题
    if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
    // 若客户端已经被调度,需要被删除(见第一步),就从clients_to_close链表中删除这个client节点
    if (c->flags & CLIENT_CLOSE_ASAP) {
        ln = listSearchKey(server.clients_to_close,c);
        serverAssert(ln != NULL);
        listDelNode(server.clients_to_close,ln);
    }
    // 释放相关内存,执行清理
    if (c->name) decrRefCount(c->name);
    zfree(c->argv);
    freeClientMultiState(c);
    sdsfree(c->peerid);
    zfree(c);
}

3.3. 伪客户端的创建和关闭

伪客户端主要有:

  • AOF加载伪客户端
  • Lua脚本伪客户端

对于AOF加载伪客户端,它在加载AOF时创建(调用aof.c定义的createFakeClient(void)函数),加载完成后释放客户端(调用aof.c定义的freeFakeClient(struct client *c)函数)

而对于Lua脚本伪客户端,它存在于整个生命周期,只有服务器被关闭时,这个客户端才会被关闭。它的创建在scripting.c中的scriptingInit(int setup)函数中创建。(调用栈initServer -> scriptingInit -> createClient(-1)

typedef struct redisServer {
    // ...
    client *lua_client;   /* The "fake client" to query Redis from Lua */
    // ...
}