1. Overview
之前讲了Redis单机服务器的相关内容,包括:
- 单机数据的存储
- RDB/AOF
- 事件循环
- 客户端状态维持
这里,综合上面的内容,梳理一下:
- 服务端执行命令的流程
serverCron
:唯一的时间事件,维护服务端状态- 服务器的启动
2. 命令执行流程
2.1. 连接创建
客户端的连接创建事件被注册到了事件循环上,是一个读事件,回调是acceptTcpHandler
。它会accept
连接,创建客户端,并根据该连接向事件循环创建读事件,供客户端和服务端之间的交互。
代码略,之前也提及过,下面就是关键的调用链:
acceptTcpHandler
=> acceptCommonHandler
===> createClient
=====> aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
2.2. 读取客户端请求
从2.1.可知,客户端请求从readQueryFromClient
回调中读取。
a) 读取输入缓冲区
服务器从套接字中读取请求,格式参考AOF。之前提及过,请求会被缓冲到客户端client
的输入缓冲区中(即querybuf
字段里)。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
// ...
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 这里对querybuf进行扩容,并读取请求到缓冲中
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
// ... handle errors and return
} else if (nread == 0) {
// ... handle errors and return
} else if (c->flags & CLIENT_MASTER) {
// 当客户端是MASTER节点(主备复制),还需要将内容添加到pending_querybuf上
// 该缓冲记录了待完成的主备复制请求
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// ... update metadata
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
// 若超过输入缓冲的上限大小,强制释放客户端并返回
}
processInputBufferAndReplicate(c);
}
b) 转换得到argv
和argc
上面的代码中的processInputBufferAndReplicate
完成了整个命令执行和复制的功能,当然也包括执行前,从缓冲中解析出argv
和argc
。
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
// 当客户端不是MASTER,之间处理完请求
processInputBuffer(c);
} else {
// 若是MASTER,处理完请求后
// 还需要将MASTER的复制数据分发自己下属的slaves节点上(多级slave)
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
Redis协议分为PROTO_REQ_MULTIBULK
和PROTO_REQ_INLINE
,这里为了简便选择后者。
而真正处理请求的函数就是processInputBuffer
了:
void processInputBuffer(client *c) {
server.current_client = c;
// 一直处理,直到缓冲区被消耗干净为止
while(c->qb_pos < sdslen(c->querybuf)) {
// ...
// ...
// 1. 解析命令
if (c->reqtype == PROTO_REQ_INLINE) {
// 这里主要看inline协议
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
// 2. 处理和执行命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
// 事务开启下replication offset不会修改
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}
// ...
}
// ...
}
}
// ...
}
这里,解析命令分协议,将缓冲区的数据解析成argc
和argv
:
- 对于
PROTO_REQ_INLINE
:processInlineBuffer
- 对于
PROTO_REQ_MULTIBULK
:processMultibulkBuffer
协议和AOF比较类似,这里不详细说明。
2.3. 执行命令
执行命令的函数,在2.2.b.中的代码块中已经说明,是函数processCommand
。执行命令也得分为很多步,下面挑选一些关键的进行说明。
a) 查找命令实现函数
这在lookupCommand
中实现:
int processCommand(client *c) {
// ...
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 查找命令,第一个就是命令字符串
// ...
}
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
void *dictFetchValue(dict *d, const void *key) {
dictEntry *he;
he = dictFind(d,key);
return he ? dictGetVal(he) : NULL;
}
typedef struct redisServer {
// ...
dict *commands; // 命令表
// ...
}
而之前说过,server.command
是一个命令字典。Redis就是通过这个字典找到对应的命令实现函数。
b) 执行准备操作
这些准备操作按照顺序有:
- 检查命令,校验请求参数
- 检查身份验证
- 处理错误的重定向
- 若开启
maxmemory
选项,需要检查内存占用,必要时执行内存回收 - 若上一次持久化失败,(对于RDB还需开启
stop-writes-on-bgsave-err
),拒绝写命令 - 若配置
min-slaves-to-write
,若正常slave节点不足,拒绝写命令 - 若自己是只读slave,只接收客户端是master的写
- 若客户端使用发布-订阅,只允许执行
SUBSCRIBE
,PSUBSCRIBE
,UNSUBSCRIBE
,PUNSUBSCRIBE
和PING
命令 - 若
slave-serve-stale-data
设置为no
,且自己是slave,和master的连接不正常,只允许flag
为CMD_STALE
的命令,如INFO
和SLAVEOF
(不允许从slave中访问out-of-date的数据,所以拒绝命令执行) - 若服务器正在载入,那么之内执行带有
CMD_LOADING
的flag
的命令,如INFO
,SHUTDOWN
,PUBLISH
等 - 若服务器执行Lua脚本而超时并阻塞,那么只会执行
SHUTDOWN NOSAVE
和SCRIPT KILL
命令 - 若服务器正在执行事务,那么只会执行
EXEC
,DISCARD
,MULTI
,WATCH
命令,其他命令将被缓存到事务队列中 - 若服务器打开了监视器功能,执行前还需要将执行了命令信息发给monitor
上述完成之后,就开始真正执行命令。
c) 执行命令
最终执行命令的函数为call
:
int processCommand(client *c) {
// ...
// 这里真正执行命令
call(c,CMD_CALL_FULL);
// ...
}
而call
中,不仅执行了命令,也执行了一些扫尾的操作,这里最关键的就是下面这一段:
void call(client *c, int flag) {
// ... prepare ...
c->cmd->proc(c); // 执行命令
// ... after works ...
}
不论什么命令,只要是有响应的,最后都会调用 addReply
函数写入响应,它会将响应写入响应缓冲区,待写事件就绪时通过套接字刷掉:
void addReply(client *c, robj *obj) {
// 1. 准备,把客户端添加到server.clients_pending_write的链表头
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 2. 写入响应缓冲
// 首先尝试写入定长缓冲,若不成功则写入边长缓冲中
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
// 2. 写入响应缓冲
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyStringToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
// 写入定长的响应缓冲
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
if (listLength(c->reply) > 0) return C_ERR; // 若变长缓冲非空,返回失败,写入变长缓冲
if (len > available) return C_ERR; // 若空间不足,则返回失败,写入变长缓冲
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}
// 写入变长的响应缓冲(内存块链表)
void _addReplyStringToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
listNode *ln = listLast(c->reply);
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
if (tail) {
size_t avail = tail->size - tail->used;
size_t copy = avail >= len? len: avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
}
if (len) {
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
tail = zmalloc(size + sizeof(clientReplyBlock));
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
listAddNodeTail(c->reply, tail);
c->reply_bytes += tail->size;
}
asyncCloseClientOnOutputBufferLimitReached(c);
}
d) 执行扫尾工作
扫尾工作包括:
-
若开启慢查询日志,检查该查询,必要时添加慢查询日志
-
计算执行时长,更新
client
的关于时间和计数器等元数据字段 -
若开启AOF,则将当前执行的命令写入AOF缓冲
-
若开启主备复制,则将当前执行的命令传播给slave节点
第三、四步都由
propagate
函数完成,传播的命令必须对数据库内容有修改(即dirty
不为0)。
这些完成后,一条命令就执行完成了。
2.4. 回复响应
命令执行完成后,结果会被保存到响应缓冲区中。
每个事件循环开始前(即beforeSleep
函数),Redis会清除输出缓冲区的内容,将数据写出到套接字(若没有清除完全则会注册回调,待套接字写事件就绪后再写入,即放在事件循环内处理文件事件时进行):
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
// ...
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
// 遍历server.clients_pending_write链表
while((ln = listNext(&li))) {
// 取出一个待输出的客户端
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
// PROTECTED客户端不输出
if (c->flags & CLIENT_PROTECTED) continue;
// 尝试清空响应缓冲,输出相应
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 若客户端的缓冲还有剩余,则需要注册一个写文件事件,利用回调将缓冲清空
if (clientHasPendingReplies(c)) {
// 注册的是写事件
int ae_flags = AE_WRITABLE;
// 若开启AOF,且策略是FSYNC_ALWAYS,那么事件类型是AE_BARRIER(先写后读)
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS) {
ae_flags |= AE_BARRIER;
}
// 给fd再注册写文件事件
// 利用sendReplyToClient回调(实际还是writeToClient函数)清空缓冲
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
3. serverCron
时间事件
serverCron
是一个时间事件,它在初始化时,添加到了时间事件链表,并按照上面的时间戳指示的时间执行,默认100ms执行一次。
而serverCron
主要用于维护服务器状态,这里将会进行说明。
3.1. 调试看门狗
为了调试方便,serverCron
会处理开门狗,用于调试。当执行命令耗时过长,会发送SIGALRM
信号,这时Redis会响应信息并记录当前堆栈。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
// ...
}
而SIGALRM
信号处理回调为watchdogSignalHandler
,在debug.c
中实现,主要打出警告和调用栈信息。
3.2. 更新缓存的时间戳
由于从时钟硬件获取时间的开销很大,Redis会缓存并定期更新时间戳。
typedef struct redisServer {
// ...
time_t unixtime; // Unix时间戳
long long mstime; // 毫秒下的Unix时间戳。
// ...
}
更新时间戳的函数是updateCachedTime
:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
updateCachedTime();
// ...
}
3.3. 记录流量信息
Redis每隔100ms执行一次流量数据的更新,更新的数据包括:
- 命令执行的个数
- 读流量
- 写流量
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
run_with_period(100) {
// 命令执行个数
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
// 读流量
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
// 写流量
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}
// ...
}
3.4. 更新LRU时钟
Redis的每个对象会有一个LRU时钟字段(lru
),记录了对象最后一次访问的时间。
而这个时间可用于计算对象空转时间(通过robj->lru - server->lruclock
)。当然这是一个模糊值。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
// ...
}
3.5. 更新内存使用信息
内存信息包括:
- 内存使用峰值
zmalloc
的使用内存信息(如RSS、已使用、已申请、已预留等)
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
// 记录内存峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
// 每隔100ms记录zmalloc的metrix(统计值)
run_with_period(100) {
server.cron_malloc_stats.process_rss = zmalloc_get_rss();
server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
&server.cron_malloc_stats.allocator_active,
&server.cron_malloc_stats.allocator_resident);
if (!server.cron_malloc_stats.allocator_resident) {
size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
}
if (!server.cron_malloc_stats.allocator_active)
server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
if (!server.cron_malloc_stats.allocator_allocated)
server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
}
// ..
}
3.6. 处理SIGTERM
信号
当接收到SIGTERM
信号时,系统会执行回调sigShutdownHandler
,然后将进程的关闭延迟到serverCron
执行:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
// initserver()中注册了信号处理函数
// 接受到SIGTERM信号后,会设置server.shutdown_asap为1
// 上述处理函数是sigShutdownHandler
// 而对SIGTERM信号的shutdown响应处理推迟到这里执行
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
// ...
}
而处理退出函数是prepareForShutdown
,它会最后刷新RDB到磁盘,并尽量清空slave输出缓冲。完成后,才能退出进程。
而收到
SIGINT
,或者是正在加载时收到SIGTERM
,进程会立即退出。
3.7. 更新客户端状态
这里是clientCron
函数,它主要做下面几件事:
- 若客户端连接超时/空转时间过长/阻塞时间过长,释放客户端
- 若输入缓冲区过大,收缩缓冲区以释放内存
- 记录输入/输出缓冲区内存使用峰值
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
int numclients = listLength(server.clients);
int iterations = numclients/server.hz;
mstime_t now = mstime();
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
numclients : CLIENTS_CRON_MIN_ITERATIONS;
// 遍历客户端链表
while(listLength(server.clients) && iterations--) {
client *c;
listNode *head;
listRotate(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
// 1. 若客户端连接超时/空转时间或阻塞时间过长,释放客户端
if (clientsCronHandleTimeout(c,now)) continue;
// 2. 若输入缓冲区过大,收缩缓冲区以释放内存
if (clientsCronResizeQueryBuffer(c)) continue;
// 3. 记录输入/输出缓冲区内存使用峰值
if (clientsCronTrackExpansiveClients(c)) continue;
}
}
3.8. 对数据库存储的维护
这里就是databasesCron
函数,它主要做下面几件事:
- 抽样并删除过期的键值对
- 对数据库进行resize
- 对数据库进行rehash(渐进式)
void databasesCron(void) {
// 抽样删除过期键值对
if (server.active_expire_enabled) {
if (server.masterhost == NULL) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
if (server.active_defrag_enabled)
activeDefragCycle();
// 在RDB/AOF子进程未开启下,进行resize和rehash
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
// Resize
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
// Rehash(渐进)
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
break;
} else {
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
3.9. 执行延迟的BGREWRITEAOF
若执行BGSAVE
期间,若有BGREWRITEAOF
请求,那么它会被延迟到BGSAVE
之后执行,此时redisServer
的aof_rewrite_scheduled
会被置为1。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled){
// 若有延迟的BGREWRITEAOF,当BGSAVE完成后才会执行
rewriteAppendOnlyFileBackground();
}
// ...
}
3.10. 检查后台RDB/AOF重写的状态
这里的判断逻辑相对复杂,这里看代码的注释:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{ // 若有持久化子进程
int statloc;
pid_t pid;
// 1. 执行一次wait3,检查子进程有无信号发过来
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
// 2.1. 若有信号,且-1,是错误,打日志
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
// 2.2. 若是RDB子进程,说明RDB任务执行完成,处理后续的动作
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
// 2.3. 若是AOF子进程,说明AOF重写完成,处理后续的动作
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
// 3, 后台任务完成,则需要重新设置resize policy并关闭管道
updateDictResizePolicy();
closeChildInfoPipe();
}
} else {
// 若没有持久化子进程
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
// 1. 根据配置文件的save定期RDB配置,必要时创建后台RDB任务
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
// 2. 若AOF打开,此外还满足
// 当前没有执行后台持久化操作,AOF增长速度超过上限且AOF大小使用超过了rewrite上限
// 那么还得执行一次后台AOF重写
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
// ...
}
3.11. 将AOF缓冲刷入文件
AOF开启时,需要通过这个时间事件,定期将AOF缓冲刷入磁盘:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
run_with_period(1000) {
// 若上次错误,每隔1s刷一次盘
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
// ...
}
3.12. 关闭待关闭的客户端
这一步需要关闭异步关闭的客户端(标志位CLIENT_CLOSE_ASAP
)。从server.clients_to_close
链表中遍历并释放客户端)。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
freeClientsInAsyncFreeQueue();
// ...
}
3.13. 关于集群相关
包括:
- 主备复制(每1s)
- 集群状态(每100ms)
- 哨兵
- 清理过期的迁移相关的套接字(每1s)
这些内容之后会提及,这里略过
3.14. 后台执行推迟的RDB(BGSAVE SCHEDULE
)
若rdb_bgsave_scheduled
又被设置了1,那么没有后台持久化进程下,还得执行一次后台RDB:
当AOF重写在进行,然后客户端发来
BGSAVE SCHEDULE
,那么这个标志被置1,即BGSAVE
会被推迟执行。
SCHEDULE
的加入不会让BGSAVE
返回错误。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
// ...
}
3.15. 更新循环计数
最后就是对循环计数自加,并返回下一次执行的时间(差):
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
server.cronloops++; // 循环计数增加
return 1000/server.hz; // 下一次执行serverCron在未来1000/server.hz毫秒之后
}
4. 服务器的启动
最后来看一下服务器是如何启动的。函数主要是server.c
的main
函数。
4.1. 基本设置
最开始的时候,Redis会做一些最基本的设置,如环境变量、时区、hash种子。另外还有一个哨兵设置(当参数里有sentinal
,服务器就被设置成哨兵)。
int main(int argc, char **argv) {
struct timeval tv;
int j;
#ifdef REDIS_TEST
// ...
#endif
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
tzset(); /* Populates 'timezone' global. */
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
char hashseed[16];
getRandomHexChars(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed((uint8_t*)hashseed);
server.sentinel_mode = checkForSentinelMode(argc,argv);
// ...
}
4.2. 初始化服务器配置
这里主要调用的是initServerConfig
函数,这个函数回对server
的各个字段初始化一个默认值(不是配置文件值)。这些字段包括如:时间、文件地址、服务模式、日志级别、复制、命令表、集群配置等等。
下面截取一段代码:
void initServerConfig(void) {
// ...
updateCachedTime();
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
clearReplicationId2();
server.timezone = getTimeZone();
server.configfile = NULL;
server.executable = NULL;
server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
// ...
}
4.3. 初始化模块
Redis有模块扩展机制。外部代码通过模块的规范,可以注入到Redis服务里。
初始化的函数是moduleInitModulesSystem
:
void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL);
moduleKeyspaceSubscribers = listCreate();
moduleFreeContextReusedClient = createClient(-1);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
moduleCommandFilters = listCreate();
// 这里注册核心模块,这里的模块是Redis自带的
moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
anetNonBlock(NULL,server.module_blocked_pipe[0]);
anetNonBlock(NULL,server.module_blocked_pipe[1]);
Timers = raxNew();
pthread_mutex_lock(&moduleGIL);
}
而moduleRegisterCoreAPI
主要调用的还是下面的函数/宏,实际上就是把模块的函数对外暴露的指针注册到server.moduleapi
字典里:
int moduleRegisterApi(const char *funcname, void *funcptr) {
return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}
#define REGISTER_API(name) \
moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)
4.4. 初始化哨兵
若4.1.中设置了哨兵模式,那么这里需要初始化,让自己成为哨兵:
int main(int argc, char **argv) {
// ...
if (server.sentinel_mode) {
initSentinelConfig(); // 初始化哨兵配置
initSentinel(); // 初始化哨兵数据结构
}
// ...
}
4.5. 从配置文件中加载配置
这里代码比较长,大致就是:
- 先检查启动参数,并记录相关配置信息(记录在
option
字符串中)。有些启动参数会带来一些额外的效果,如:- 检查RDB/AOF文件(
redis-check-rdb/aof
) - 打印版本号,处理帮助等
- 测试内存(
--test-memory
)
- 检查RDB/AOF文件(
- 从配置文件(记录在
configfile
为名字的文件中)和上述配置里,加载Redis配置。
加载配置主要是在loadServerConfig
函数里:
void loadServerConfig(char *filename, char *options) {
sds config = sdsempty();
char buf[CONFIG_MAX_LINE+1];
/* Load the file content */
if (filename) {
// ...
}
/* Append the additional options */
if (options) {
config = sdscat(config,"\n");
config = sdscat(config,options);
}
// 这里对读到的配置信息统一进行加载
loadServerConfigFromString(config);
sdsfree(config);
}
至于具体的加载,也是一段很长的代码,代码中充满了if-else
。server
的相关字段根据这些信息被设置。
4.6. daemon化
Redis通常而言是后台运行的,因此需要后台化处理。
Redis的后台化写的很小巧,即创建一个子进程,然后子进程把标准输入流、标准输出流和标准错误流重定向到/dev/null
里。
void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
int dup2(int oldfd, int newfd)
:从
oldfd
拷贝一个fd
出来,它们都指向同一个文件。拷贝的fd
值等于newfd
。若原有newfd
指向的文件被打开,那么被重用前,它会被关闭。
4.7. 初始化整个服务器
这一步算是最重要的一步。它根据配置好的信息,将整个服务完全的初始化。当初始化完成后,只需启动其初始化好的时间循环,服务就正式启动了。
服务器的初始化位于函数initServer
,代码非常长,这里大致说明流程:
- 创建事件循环(
aeEventLoop
) - 监听端口(TCP和Unix domain socket/IPC)
- 初始化数据库和数据库内部的信息
- 状态、统计数据的初始化
- 添加时间事件(
serverCron
) - 给事件循环/多路复用器注册读回调,用于
- 接受TCP和Unix domain socket的连接
- 读取模块发来的信号(若事件循环本身作为一个阻塞客户端监听模块的信号)
- (若需要)打开AOF文件
- 初始化其它组件,必要时执行,如:
- 集群管理
- 主从复制
- 脚本管理
- 慢查询日志
- 延迟监控
- 后台I/O服务
4.8. 加载模块与数据
若Redis被设置成了哨兵,那么模块和数据不需要加载,而是启动哨兵服务进程。
// In main
if (!server.sentinel_mode) {
// ...
} else {
sentinelIsRunning();
}
若不是哨兵,那么首先会加载模块,函数是moduleLoadFromQueue
:
void moduleLoadFromQueue(void) {
listIter li;
listNode *ln;
// 遍历模块链表(这里的初始化是在4.5.中初始化的,通过loadmodule配置)
listRewind(server.loadmodule_queue,&li);
while((ln = listNext(&li))) {
struct moduleLoadQueueEntry *loadmod = ln->value;
// 加载模块
if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
== C_ERR){
// handle error and exit
}
}
}
然后加载数据,函数是loadDataFromDisk
,优先加载AOF,没有AOF才加载RDB:
对于混合持久化,它是走第一个分支的。
因为混合持久化的文件结构是大RDB+少量AOF,加载AOF时,若读到了RDB头,会先加载RDB部分的数据,然后再加载最后少量的AOF数据。
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
// 优先加载AOF
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
// 未开启AOF,则加载RDB
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
}
} else if (errno != ENOENT) {
// 若未找到文件,错误是不打出的,也不会退出进程
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}
4.9. 启动服务
启动服务就很简单了,由于一切都已初始化完毕,只需要启动事件循环,服务器就会持续监听外部的请求,处理文件事件(如客户端请求)和时间事件(如serverCron
)。
int main(int argc, char **argv) {
// ...
aeSetBeforeSleepProc(server.el,beforeSleep); // 设置beforeSleep,每次事件循环前执行
aeSetAfterSleepProc(server.el,afterSleep); // 设置afterSleep,每次poll完多路复用器后执行
aeMain(server.el); // 启动事件循环
// ...
}
5. 总结
-
命令处理的过程如下:
- 服务器监听到读事件,读取命令并保存到输入缓冲区,然后解析
- 查找命令的函数表,执行命令,将结果保存到响应缓冲区
- 事件循环开始前(
beforeSleep
)将响应缓冲区写出,若没写完,则注册写事件,延迟到事件循环内部执行(当写事件准备完毕时)
-
serverCron
是一个时间时间,默认100ms执行一次,主要更新和维护服务器的状态,处理一些信号(如SIGTERM
),处理一些持久化、集群、复制、监控等维护工作 -
服务端的启动主要还是:
- 初始化和载入配置
- 初始化数据结构并还原数据
- 启动监听,创建事件循环并注册事件
- 启动事件循环
最主要的还是3、4步。