1. Overview
Redis是典型的一对多的服务器顺序,每个客户端可与服务器进行交互。
交互事件的触发和处理源于基于I/O多路复用的文件事件及其处理器,这在上一篇已经说明了。
而交互功能的实现,是有状态的(比如事务),因此Redis需要维护客户端的信息。
在server.h
中,Redis定义了结构client
存储一个客户端的状态。而多个客户端,以一个链表保存活跃的客户端:
typedef struct redisServer {
// ...
list *clients; // 活跃客户端列表
// ...
}
2. 客户端属性
客户端client
结构有很多字段下面分别进行描述。
2.1. 文件描述符
typedef struct client {
// ...
int fd; // File descriptor
// ...
}
fd
可以取-1
或者自然数:
-1
:代表伪客户端,用于处理AOF恢复/Lua脚本。由于它不来源于网络,所以不需要套接字描述符- 自然数:代表普通客户端,其
fd
代表和服务端直接的连接描述符
2.2. 客户端名字
可通过CLIENT SETNAME
命令给客户端设置一个名字,而客户端的名字记录在name
字段中:
typedef struct client {
// ...
robj *name; // 客户端名字,是一个OBJ_STRING
// ...
}
2.3. 标志
客户端的flag
字段记录了客户端的角色和状态,状态可以是复合的,通过|
进行添加:
typedef struct client {
// ...
int flags; // 客户端标志,记录角色和状态
// ...
}
下面是几个标志的宏定义:
/* Client flags */
#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
#define CLIENT_MASTER (1<<1) /* This client is a master server */
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define CLIENT_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define CLIENT_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define CLIENT_UNBLOCKED (1<<7) /* This client was unblocked and is stored in server.unblocked_clients */
#define CLIENT_LUA (1<<8) /* This is a non connected client used by Lua */
#define CLIENT_ASKING (1<<9) /* Client issued the ASKING command */
#define CLIENT_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define CLIENT_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define CLIENT_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
#define CLIENT_MASTER_FORCE_REPLY (1<<13) /* Queue replies even if is master */
#define CLIENT_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define CLIENT_FORCE_REPL (1<<15) /* Force replication of current cmd. */
#define CLIENT_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_AOF_PROP (1<<19) /* Don't propagate to AOF. */
#define CLIENT_PREVENT_REPL_PROP (1<<20) /* Don't propagate to slaves. */
#define CLIENT_PREVENT_PROP (CLIENT_PREVENT_AOF_PROP|CLIENT_PREVENT_REPL_PROP)
#define CLIENT_PENDING_WRITE (1<<21) /* Client has output to send but a write handler is yet not installed. */
#define CLIENT_REPLY_OFF (1<<22) /* Don't send replies to client. */
#define CLIENT_REPLY_SKIP_NEXT (1<<23) /* Set CLIENT_REPLY_SKIP for next cmd */
#define CLIENT_REPLY_SKIP (1<<24) /* Don't send just this reply. */
#define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
PUBSUB
和SCRIPT LOAD
命令的特殊性它们都没有对数据库进行写入,但是也要写入到AOF中(需要有
CLIENT_FORCE_AOF
标志),因为:
PUBSUB
发布消息时,带有副作用;SCRIPT LOAD
没直接修改数据库数据,但改变了服务器状态,也带有副作用。且它需要让从服务器执行脚本加载,还需要CLIENT_FORCE_REPL
标志。
2.4. 输入缓冲区
它用于缓存客户端发送的命令,命令缓存后,通过事件循环取出并执行:
typedef struct client {
// ...
sds querybuf; // 缓存客户端的请求
// ...
}
命令可以是多个,格式和AOF类似。但是它的大小不能超过1GB,否则客户端会被强制关闭。
2.5. 命令及其参数
将命令缓存后,通过事件循环,服务端取出缓存数据,解析成命令和命令参数,保存在argv
字段中:
typedef struct client {
// ...
int argc; // argv的个数
robj **argv; // 命令及其参数,第一个元素是命令,后面是参数
// ...
}
2.6. 命令实现函数
服务端解析完argv
和argc
后,就要查找对应命令的实现函数,然后将其赋值到cmd
字段(函数指针)中:
typedef struct client {
// ...
struct redisCommand *cmd, *lastcmd; // 当前和上一次执行的命令实现函数
// ...
}
命令查找是从一个命令字典中查找的:
typedef struct redisServer {
// ...
dict *commands; // 存储命令表,用于查找,键是sds,值是redisCommand
// ...
}
命令字典是从命令表中加载的,它定义在server.c
的redisCommandTable
数组中:
struct redisCommand {
char *name; // 命令名
redisCommandProc *proc; // 命令执行函数
int arity; // 参数个数
char *sflags; // 命令的字符串标志
int flags; // 从sflags计算的整数的标志
redisGetKeysProc *getkeys_proc; // 可选,用于获得命令的key参数
int firstkey; // 第一个key的参数下标
int lastkey; // 最后一个key的参数下标
int keystep; // 每个key之间的间隔
long long microseconds, calls; // 记录命令执行总时间和函数调用次数
};
struct redisCommand redisCommandTable[] = {
...
}
2.7. 响应缓冲区
之前事件循环说过,客户端的响应是先存储到响应缓冲区中,然后待写事件就绪后将缓冲区的内容写出去。
缓冲区有2个。
一个是固定大小的缓冲区,用于存储长度较小的响应:
typedef struct client {
// ...
// fixed resp buffer
int bufpos; // 已使用缓存的尾部位置
char buf[PROTO_REPLY_CHUNK_BYTES]; // 响应缓冲区,默认16KB
}
另一个是可变大小的缓冲区,当固定大小缓冲区空间不足时使用,它是一个clientReplyBlock
链表:
typedef struct client {
// ...
list *reply; //
// ...
}
typedef struct clientReplyBlock {
size_t size, used;
char buf[];
} clientReplyBlock;
2.8. 时间
时间主要有3个字段,见下面注释的解释:
typedef struct client {
// ...
time_t ctime; // 创建时间
time_t lastinteraction; // 上次交互的时间
time_t obuf_soft_limit_reached_time; // 响应缓冲区第一次到达软性限制的时间
// ...
}
Redis服务端对响应缓冲区的大小有一定的限制:
- 硬性限制:若缓冲区大小超过该限制,客户端立刻关闭
- 软性限制:若缓冲区大小超过该限制,则会记录时间(
obuf_soft_limit_reached_time
),服务端持续监控
- 若一段时间响应缓冲区依旧超过限制,客户端就会强制关闭
- 否则客户端保留,
obuf_soft_limit_reached_time
置0响应缓冲区的大小限制是可以配置的,遵循下面的格式:
1 2 3 4 5 6 client-output-buffer-limit <class> <hard-limit> <soft-limit> <soft-seconds> # Examples client-output-buffer-limit normal 0 0 0 # 普通客户端没有任何硬性和软性的限制 client-output-buffer-limit slave 128mb 64mb 60 # 从服务器作为客户端时,硬性128MB,软性64MB,限制时长60s client-output-buffer-limit pubsub 16mb 8mb 60 # 执行发布订阅的客户端,硬性16MB,软性8MB,限制时长60s
2.9. 其他字段
其他字段主要包含有下面几个方面:
- 客户端当前访问的数据库
- 客户端身份验证信息
- 主从复制的信息
- 事务执行的状态
BRPOP/BLPOP
等阻塞命令执行的状态- 发布-订阅的状态
3. 客户端的创建与关闭
3.1. 普通客户端的创建
之前事件循环里,服务端在启动服务端时会注册acceptTcpHandler
读事件回调,客户端就在这个回调里创建,最后进入下面这个函数:
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 1. 创建客户端
if ((c = createClient(fd)) == NULL) {
// ...
close(fd);
return;
}
// 2. 若已连接客户端数量超过上限,则拒绝连接,释放之前创建的客户端
if (listLength(server.clients) > server.maxclients) {
char *err = "...";
if (write(c->fd,err,strlen(err)) == -1) {
// Do nothing when err
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
// 3. 保护模式(默认)下,没密码,没有指定绑定地址,则拒绝非回环地址的连接
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
char *err = "..."
if (write(c->fd,err,strlen(err)) == -1) {
// Do nothing
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
}
server.stat_numconnections++;
c->flags |= flags;
}
创建客户端本身在下面这个函数,它除了创建client
本身之外,还会注册readQueryFromClient
读回调到多路复用器上,以在读事件就绪时读到客户端的请求。上面的代码第1步会调用这个函数:
client *createClient(int fd) {
// 创建客户端
client *c = zmalloc(sizeof(client));
if (fd != -1) {
// 当fd != -1时,说明客户端来自网络
// 设置非阻塞,关闭Nagle算法,必要时设置keep alive参数
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 最重要的是将readQueryFromClient读回调注册到多路复用器上
// fd读事件就绪时,该回调会读取客户端发来的请求
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
selectDb(c,0); // 默认选择db[0]
// ... 初始化其他参数...
if (fd != -1) linkClient(c); // 将客户端连接到链表上
initClientMultiState(c); // 初始化事务状态为空
return c;
}
3.2. 普通客户端的关闭
客户端的关闭可能有很多原因:
- 客户端进程退出,连接断开
- 客户端发送不符合协议的命令请求
- 客户端被
CLIENT KILL
timeout
被配置,空转超过这个时间限制- 客户端发送请求的大小超过了输入缓冲区的限制(默认1GB)
- 客户端的响应缓冲区占用大小超过限制
- …
关闭客户端的函数实现如下,除了处理主从复制的代码,以及非立即(异步)释放的代码外,其他都是释放内存的操作:
void freeClient(client *c) {
listNode *ln;
// 保护模式下的客户端不会立即释放,先放到异步队列中等待下面几个事件循环处理
if (c->flags & CLIENT_PROTECTED) {
// 置flag |= CLIENT_CLOSE_ASAP
// 并将client节点加入clients_to_close链表
freeClientAsync(c);
return;
}
// 若客户端是主服务器
// 那么必要时需要先保存这个客户端
// 因为可能是意外断连,缓存这个客户端可以实现重新同步
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
// 当客户端是从服务器,先打日志
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c));
}
// 释放输入缓冲
sdsfree(c->querybuf);
sdsfree(c->pending_querybuf);
c->querybuf = NULL;
// 释放阻塞操作所需要的数据和操作
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
dictRelease(c->bpop.keys);
// UNWATCH所有的Key
unwatchAllKeys(c);
listRelease(c->watched_keys);
// 取消订阅PUB/SUB的通道
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
// 释放响应缓冲
listRelease(c->reply);
// 释放客户端当前执行的命令和参数
freeClientArgv(c);
// 关闭client的连接,并将client移出clients链表
unlinkClient(c);
// 处理主从复制的场景
if (c->flags & CLIENT_SLAVE) {
// a) 若客户端是从服务器
if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
// 若客户端是monitor,则从monitors链表中移除
// 否则从slaves链表中移除
list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
serverAssert(ln != NULL);
listDelNode(l,ln);
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
}
// b) 客户端是主服务器,处理主的连接丢失问题
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
// 若客户端已经被调度,需要被删除(见第一步),就从clients_to_close链表中删除这个client节点
if (c->flags & CLIENT_CLOSE_ASAP) {
ln = listSearchKey(server.clients_to_close,c);
serverAssert(ln != NULL);
listDelNode(server.clients_to_close,ln);
}
// 释放相关内存,执行清理
if (c->name) decrRefCount(c->name);
zfree(c->argv);
freeClientMultiState(c);
sdsfree(c->peerid);
zfree(c);
}
3.3. 伪客户端的创建和关闭
伪客户端主要有:
- AOF加载伪客户端
- Lua脚本伪客户端
对于AOF加载伪客户端,它在加载AOF时创建(调用aof.c
定义的createFakeClient(void)
函数),加载完成后释放客户端(调用aof.c
定义的freeFakeClient(struct client *c)
函数)
而对于Lua脚本伪客户端,它存在于整个生命周期,只有服务器被关闭时,这个客户端才会被关闭。它的创建在scripting.c
中的scriptingInit(int setup)
函数中创建。(调用栈initServer -> scriptingInit -> createClient(-1)
)
typedef struct redisServer {
// ...
client *lua_client; /* The "fake client" to query Redis from Lua */
// ...
}