源码阅读-Redis单机服务器: 服务器总结

Posted by keys961 on October 31, 2019

1. Overview

之前讲了Redis单机服务器的相关内容,包括:

  • 单机数据的存储
  • RDB/AOF
  • 事件循环
  • 客户端状态维持

这里,综合上面的内容,梳理一下:

  • 服务端执行命令的流程
  • serverCron:唯一的时间事件,维护服务端状态
  • 服务器的启动

2. 命令执行流程

2.1. 连接创建

客户端的连接创建事件被注册到了事件循环上,是一个读事件,回调是acceptTcpHandler。它会accept连接,创建客户端,并根据该连接向事件循环创建读事件,供客户端和服务端之间的交互。

代码略,之前也提及过,下面就是关键的调用链:

acceptTcpHandler
=> acceptCommonHandler
===> createClient
=====> aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)

2.2. 读取客户端请求

从2.1.可知,客户端请求从readQueryFromClient回调中读取。

a) 读取输入缓冲区

服务器从套接字中读取请求,格式参考AOF。之前提及过,请求会被缓冲到客户端client的输入缓冲区中(即querybuf字段里)。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    // ...
    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 这里对querybuf进行扩容,并读取请求到缓冲中
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        // ... handle errors and return
    } else if (nread == 0) {
        // ... handle errors and return
    } else if (c->flags & CLIENT_MASTER) {
        // 当客户端是MASTER节点(主备复制),还需要将内容添加到pending_querybuf上
        // 该缓冲记录了待完成的主备复制请求
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
	// ... update metadata
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // 若超过输入缓冲的上限大小,强制释放客户端并返回
    }
    processInputBufferAndReplicate(c);
}

b) 转换得到argvargc

上面的代码中的processInputBufferAndReplicate完成了整个命令执行和复制的功能,当然也包括执行前,从缓冲中解析出argvargc

void processInputBufferAndReplicate(client *c) {
    if (!(c->flags & CLIENT_MASTER)) {
        // 当客户端不是MASTER,之间处理完请求
        processInputBuffer(c);
    } else {
        // 若是MASTER,处理完请求后
        // 还需要将MASTER的复制数据分发自己下属的slaves节点上(多级slave)
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

Redis协议分为PROTO_REQ_MULTIBULKPROTO_REQ_INLINE,这里为了简便选择后者。

而真正处理请求的函数就是processInputBuffer了:

void processInputBuffer(client *c) {
    server.current_client = c;
    // 一直处理,直到缓冲区被消耗干净为止
    while(c->qb_pos < sdslen(c->querybuf)) {
        // ...
		// ...
        // 1. 解析命令
        if (c->reqtype == PROTO_REQ_INLINE) {
            // 这里主要看inline协议
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 2. 处理和执行命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    // 事务开启下replication offset不会修改 
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }
                // ...
            }
            // ...
        }
    }
	// ...
}

这里,解析命令分协议,将缓冲区的数据解析成argcargv

  • 对于PROTO_REQ_INLINEprocessInlineBuffer
  • 对于PROTO_REQ_MULTIBULKprocessMultibulkBuffer

协议和AOF比较类似,这里不详细说明。

2.3. 执行命令

执行命令的函数,在2.2.b.中的代码块中已经说明,是函数processCommand。执行命令也得分为很多步,下面挑选一些关键的进行说明。

a) 查找命令实现函数

这在lookupCommand中实现:

int processCommand(client *c) {
	// ...
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 查找命令,第一个就是命令字符串
    // ...
}
struct redisCommand *lookupCommand(sds name) {
    return dictFetchValue(server.commands, name);
}

void *dictFetchValue(dict *d, const void *key) {
    dictEntry *he;
    he = dictFind(d,key);
    return he ? dictGetVal(he) : NULL;
}

typedef struct redisServer {
    // ...
    dict *commands; // 命令表
    // ...
}

而之前说过,server.command是一个命令字典。Redis就是通过这个字典找到对应的命令实现函数。

b) 执行准备操作

这些准备操作按照顺序有:

  • 检查命令,校验请求参数
  • 检查身份验证
  • 处理错误的重定向
  • 若开启maxmemory选项,需要检查内存占用,必要时执行内存回收
  • 若上一次持久化失败,(对于RDB还需开启stop-writes-on-bgsave-err),拒绝写命令
  • 若配置min-slaves-to-write,若正常slave节点不足,拒绝写命令
  • 若自己是只读slave,只接收客户端是master的写
  • 若客户端使用发布-订阅,只允许执行SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBEPING命令
  • slave-serve-stale-data设置为no,且自己是slave,和master的连接不正常,只允许flagCMD_STALE的命令,如INFOSLAVEOF(不允许从slave中访问out-of-date的数据,所以拒绝命令执行)
  • 若服务器正在载入,那么之内执行带有CMD_LOADINGflag的命令,如INFO, SHUTDOWN, PUBLISH
  • 若服务器执行Lua脚本而超时并阻塞,那么只会执行SHUTDOWN NOSAVESCRIPT KILL命令
  • 若服务器正在执行事务,那么只会执行EXEC, DISCARD, MULTI, WATCH命令,其他命令将被缓存到事务队列中
  • 若服务器打开了监视器功能,执行前还需要将执行了命令信息发给monitor

上述完成之后,就开始真正执行命令。

c) 执行命令

最终执行命令的函数为call

int processCommand(client *c) {
	// ...
    // 这里真正执行命令 
    call(c,CMD_CALL_FULL);
    // ...
}

call中,不仅执行了命令,也执行了一些扫尾的操作,这里最关键的就是下面这一段:

void call(client *c, int flag) {
    // ... prepare ...
    c->cmd->proc(c); // 执行命令
    // ... after works ...
}

不论什么命令,只要是有响应的,最后都会调用 addReply函数写入响应,它会将响应写入响应缓冲区,待写事件就绪时通过套接字刷掉:

void addReply(client *c, robj *obj) {
    // 1. 准备,把客户端添加到server.clients_pending_write的链表头
    if (prepareClientToWrite(c) != C_OK) return; 
    if (sdsEncodedObject(obj)) {
        // 2. 写入响应缓冲
        // 首先尝试写入定长缓冲,若不成功则写入边长缓冲中
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        // 2. 写入响应缓冲
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

// 写入定长的响应缓冲
int _addReplyToBuffer(client *c, const char *s, size_t len) {
    size_t available = sizeof(c->buf)-c->bufpos;
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
    if (listLength(c->reply) > 0) return C_ERR; // 若变长缓冲非空,返回失败,写入变长缓冲
    if (len > available) return C_ERR; // 若空间不足,则返回失败,写入变长缓冲
    memcpy(c->buf+c->bufpos,s,len);
    c->bufpos+=len;
    return C_OK;
}

// 写入变长的响应缓冲(内存块链表)
void _addReplyStringToList(client *c, const char *s, size_t len) {
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
    listNode *ln = listLast(c->reply);
    clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
    if (tail) {
        size_t avail = tail->size - tail->used;
        size_t copy = avail >= len? len: avail;
        memcpy(tail->buf + tail->used, s, copy);
        tail->used += copy;
        s += copy;
        len -= copy;
    }
    if (len) {
        size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
        tail = zmalloc(size + sizeof(clientReplyBlock));
        tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
        tail->used = len;
        memcpy(tail->buf, s, len);
        listAddNodeTail(c->reply, tail);
        c->reply_bytes += tail->size;
    }
    asyncCloseClientOnOutputBufferLimitReached(c);
}

d) 执行扫尾工作

扫尾工作包括:

  • 若开启慢查询日志,检查该查询,必要时添加慢查询日志

  • 计算执行时长,更新client的关于时间和计数器等元数据字段

  • 若开启AOF,则将当前执行的命令写入AOF缓冲

  • 若开启主备复制,则将当前执行的命令传播给slave节点

    第三、四步都由propagate函数完成,传播的命令必须对数据库内容有修改(即dirty不为0)。

这些完成后,一条命令就执行完成了。

2.4. 回复响应

命令执行完成后,结果会被保存到响应缓冲区中。

每个事件循环开始前(即beforeSleep函数),Redis会清除输出缓冲区的内容,将数据写出到套接字(若没有清除完全则会注册回调,待套接字写事件就绪后再写入,即放在事件循环内处理文件事件时进行):

void beforeSleep(struct aeEventLoop *eventLoop) {
    // ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
    // ...
}
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    listRewind(server.clients_pending_write,&li);
    // 遍历server.clients_pending_write链表
    while((ln = listNext(&li))) {
        // 取出一个待输出的客户端
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        // PROTECTED客户端不输出
        if (c->flags & CLIENT_PROTECTED) continue;
        // 尝试清空响应缓冲,输出相应
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
		// 若客户端的缓冲还有剩余,则需要注册一个写文件事件,利用回调将缓冲清空
        if (clientHasPendingReplies(c)) {
            // 注册的是写事件
            int ae_flags = AE_WRITABLE;
            // 若开启AOF,且策略是FSYNC_ALWAYS,那么事件类型是AE_BARRIER(先写后读)
            /* For the fsync=always policy, we want that a given FD is never
             * served for reading and writing in the same event loop iteration,
             * so that in the middle of receiving the query, and serving it
             * to the client, we'll call beforeSleep() that will do the
             * actual fsync of AOF to disk. AE_BARRIER ensures that. */
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS) {
                ae_flags |= AE_BARRIER;
            }
            // 给fd再注册写文件事件
            // 利用sendReplyToClient回调(实际还是writeToClient函数)清空缓冲
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR) {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

3. serverCron时间事件

serverCron是一个时间事件,它在初始化时,添加到了时间事件链表,并按照上面的时间戳指示的时间执行,默认100ms执行一次。

serverCron主要用于维护服务器状态,这里将会进行说明。

3.1. 调试看门狗

为了调试方便,serverCron会处理开门狗,用于调试。当执行命令耗时过长,会发送SIGALRM信号,这时Redis会响应信息并记录当前堆栈。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    // ...
}

SIGALRM信号处理回调为watchdogSignalHandler,在debug.c中实现,主要打出警告和调用栈信息。

3.2. 更新缓存的时间戳

由于从时钟硬件获取时间的开销很大,Redis会缓存并定期更新时间戳。

typedef struct redisServer {
    // ...
    time_t unixtime; // Unix时间戳
    long long mstime; // 毫秒下的Unix时间戳。
    // ...
}

更新时间戳的函数是updateCachedTime

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    updateCachedTime();
    // ...
}

3.3. 记录流量信息

Redis每隔100ms执行一次流量数据的更新,更新的数据包括:

  • 命令执行的个数
  • 读流量
  • 写流量
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    run_with_period(100) {
        // 命令执行个数
        trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
        // 读流量
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
                server.stat_net_input_bytes);
        // 写流量
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
                server.stat_net_output_bytes);
    }
    // ...
}

3.4. 更新LRU时钟

Redis的每个对象会有一个LRU时钟字段(lru),记录了对象最后一次访问的时间。

而这个时间可用于计算对象空转时间(通过robj->lru - server->lruclock)。当然这是一个模糊值。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    unsigned long lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);
    // ...
}

3.5. 更新内存使用信息

内存信息包括:

  • 内存使用峰值
  • zmalloc的使用内存信息(如RSS、已使用、已申请、已预留等)
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    // 记录内存峰值
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();
	// 每隔100ms记录zmalloc的metrix(统计值)
    run_with_period(100) {
        server.cron_malloc_stats.process_rss = zmalloc_get_rss();
        server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
        zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
                                   &server.cron_malloc_stats.allocator_active,
                                   &server.cron_malloc_stats.allocator_resident);
        
        if (!server.cron_malloc_stats.allocator_resident) {
            size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
            server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
        }
        if (!server.cron_malloc_stats.allocator_active)
            server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
        if (!server.cron_malloc_stats.allocator_allocated)
            server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
    }
    // ..
}

3.6. 处理SIGTERM信号

当接收到SIGTERM信号时,系统会执行回调sigShutdownHandler,然后将进程的关闭延迟到serverCron执行:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	// ...
    // initserver()中注册了信号处理函数
    // 接受到SIGTERM信号后,会设置server.shutdown_asap为1
    // 上述处理函数是sigShutdownHandler
    // 而对SIGTERM信号的shutdown响应处理推迟到这里执行
    if (server.shutdown_asap) {
        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
        serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }
    // ...
}

而处理退出函数是prepareForShutdown,它会最后刷新RDB到磁盘,并尽量清空slave输出缓冲。完成后,才能退出进程。

而收到SIGINT,或者是正在加载时收到SIGTERM,进程会立即退出。

3.7. 更新客户端状态

这里是clientCron函数,它主要做下面几件事:

  • 若客户端连接超时/空转时间过长/阻塞时间过长,释放客户端
  • 若输入缓冲区过大,收缩缓冲区以释放内存
  • 记录输入/输出缓冲区内存使用峰值
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
    int numclients = listLength(server.clients);
    int iterations = numclients/server.hz;
    mstime_t now = mstime();
    
    if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
        iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
                     numclients : CLIENTS_CRON_MIN_ITERATIONS;
	// 遍历客户端链表
    while(listLength(server.clients) && iterations--) {
        client *c;
        listNode *head;
        listRotate(server.clients);
        head = listFirst(server.clients);
        c = listNodeValue(head);
        // 1. 若客户端连接超时/空转时间或阻塞时间过长,释放客户端
        if (clientsCronHandleTimeout(c,now)) continue;
        // 2. 若输入缓冲区过大,收缩缓冲区以释放内存
        if (clientsCronResizeQueryBuffer(c)) continue;
        // 3. 记录输入/输出缓冲区内存使用峰值
        if (clientsCronTrackExpansiveClients(c)) continue;
    }
}

3.8. 对数据库存储的维护

这里就是databasesCron函数,它主要做下面几件事:

  • 抽样并删除过期的键值对
  • 对数据库进行resize
  • 对数据库进行rehash(渐进式)
void databasesCron(void) {
    // 抽样删除过期键值对
    if (server.active_expire_enabled) {
        if (server.masterhost == NULL) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }
    if (server.active_defrag_enabled)
        activeDefragCycle();

    // 在RDB/AOF子进程未开启下,进行resize和rehash
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
        // Resize 
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }
        // Rehash(渐进)
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    break;
                } else {
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

3.9. 执行延迟的BGREWRITEAOF

若执行BGSAVE期间,若有BGREWRITEAOF请求,那么它会被延迟到BGSAVE之后执行,此时redisServeraof_rewrite_scheduled会被置为1。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	// ...
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled){
        // 若有延迟的BGREWRITEAOF,当BGSAVE完成后才会执行
        rewriteAppendOnlyFileBackground(); 
    }
    // ...
}


3.10. 检查后台RDB/AOF重写的状态

这里的判断逻辑相对复杂,这里看代码的注释:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    
	if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {   // 若有持久化子进程
        int statloc;
        pid_t pid;
        // 1. 执行一次wait3,检查子进程有无信号发过来
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
            if (pid == -1) {
                // 2.1. 若有信号,且-1,是错误,打日志
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                // 2.2. 若是RDB子进程,说明RDB任务执行完成,处理后续的动作
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                // 2.3. 若是AOF子进程,说明AOF重写完成,处理后续的动作
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            // 3, 后台任务完成,则需要重新设置resize policy并关闭管道
            updateDictResizePolicy();
            closeChildInfoPipe();
        }
    } else {
        // 若没有持久化子进程
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
            // 1. 根据配置文件的save定期RDB配置,必要时创建后台RDB任务
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
        // 2. 若AOF打开,此外还满足
        // 当前没有执行后台持久化操作,AOF增长速度超过上限且AOF大小使用超过了rewrite上限
        // 那么还得执行一次后台AOF重写
        if (server.aof_state == AOF_ON &&
            server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }
    // ...
}

3.11. 将AOF缓冲刷入文件

AOF开启时,需要通过这个时间事件,定期将AOF缓冲刷入磁盘:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
	if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    run_with_period(1000) {
        // 若上次错误,每隔1s刷一次盘
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
   	// ...
}

3.12. 关闭待关闭的客户端

这一步需要关闭异步关闭的客户端(标志位CLIENT_CLOSE_ASAP)。从server.clients_to_close链表中遍历并释放客户端)。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
    freeClientsInAsyncFreeQueue();
    // ...
}

3.13. 关于集群相关

包括:

  • 主备复制(每1s)
  • 集群状态(每100ms)
  • 哨兵
  • 清理过期的迁移相关的套接字(每1s)

这些内容之后会提及,这里略过

3.14. 后台执行推迟的RDB(BGSAVE SCHEDULE

rdb_bgsave_scheduled又被设置了1,那么没有后台持久化进程下,还得执行一次后台RDB:

当AOF重写在进行,然后客户端发来BGSAVE SCHEDULE,那么这个标志被置1,即BGSAVE会被推迟执行。

SCHEDULE的加入不会让BGSAVE返回错误。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...
	if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.rdb_bgsave_scheduled &&
        (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == C_OK))
    {
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
            server.rdb_bgsave_scheduled = 0;
    }
    // ...
}

3.15. 更新循环计数

最后就是对循环计数自加,并返回下一次执行的时间(差):

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	// ...
    server.cronloops++; // 循环计数增加
    return 1000/server.hz; // 下一次执行serverCron在未来1000/server.hz毫秒之后
}

4. 服务器的启动

最后来看一下服务器是如何启动的。函数主要是server.cmain函数。

4.1. 基本设置

最开始的时候,Redis会做一些最基本的设置,如环境变量、时区、hash种子。另外还有一个哨兵设置(当参数里有sentinal,服务器就被设置成哨兵)。

int main(int argc, char **argv) {
    struct timeval tv;
    int j;
#ifdef REDIS_TEST
    // ...
#endif
    #ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    setlocale(LC_COLLATE,"");
    tzset(); /* Populates 'timezone' global. */
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv,NULL);
    char hashseed[16];
    getRandomHexChars(hashseed,sizeof(hashseed));
    dictSetHashFunctionSeed((uint8_t*)hashseed);
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    // ...
}

4.2. 初始化服务器配置

这里主要调用的是initServerConfig函数,这个函数回对server的各个字段初始化一个默认值(不是配置文件值)。这些字段包括如:时间、文件地址、服务模式、日志级别、复制、命令表、集群配置等等。

下面截取一段代码:

void initServerConfig(void) {
    // ...
    updateCachedTime();
    getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
    server.runid[CONFIG_RUN_ID_SIZE] = '\0';
    changeReplicationId();
    clearReplicationId2();
    server.timezone = getTimeZone();
    server.configfile = NULL;
    server.executable = NULL;
    server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
    server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
    // ...
}

4.3. 初始化模块

Redis有模块扩展机制。外部代码通过模块的规范,可以注入到Redis服务里。

初始化的函数是moduleInitModulesSystem

void moduleInitModulesSystem(void) {
    moduleUnblockedClients = listCreate();
    server.loadmodule_queue = listCreate();
    modules = dictCreate(&modulesDictType,NULL);
    moduleKeyspaceSubscribers = listCreate();
    moduleFreeContextReusedClient = createClient(-1);
    moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
    moduleCommandFilters = listCreate();
	// 这里注册核心模块,这里的模块是Redis自带的
    moduleRegisterCoreAPI();
    if (pipe(server.module_blocked_pipe) == -1) {
        serverLog(LL_WARNING,
            "Can't create the pipe for module blocking commands: %s",
            strerror(errno));
        exit(1);
    }
    anetNonBlock(NULL,server.module_blocked_pipe[0]);
    anetNonBlock(NULL,server.module_blocked_pipe[1]);
    Timers = raxNew();
    pthread_mutex_lock(&moduleGIL);
}

moduleRegisterCoreAPI主要调用的还是下面的函数/宏,实际上就是把模块的函数对外暴露的指针注册到server.moduleapi字典里:

int moduleRegisterApi(const char *funcname, void *funcptr) {
    return dictAdd(server.moduleapi, (char*)funcname, funcptr);
}

#define REGISTER_API(name) \
    moduleRegisterApi("RedisModule_" #name, (void *)(unsigned long)RM_ ## name)

4.4. 初始化哨兵

若4.1.中设置了哨兵模式,那么这里需要初始化,让自己成为哨兵:

int main(int argc, char **argv) {
	// ...
    if (server.sentinel_mode) {
        initSentinelConfig(); // 初始化哨兵配置
        initSentinel(); // 初始化哨兵数据结构
    }
    // ...
}

4.5. 从配置文件中加载配置

这里代码比较长,大致就是:

  • 先检查启动参数,并记录相关配置信息(记录在option字符串中)。有些启动参数会带来一些额外的效果,如:
    • 检查RDB/AOF文件(redis-check-rdb/aof
    • 打印版本号,处理帮助等
    • 测试内存(--test-memory
  • 从配置文件(记录在configfile为名字的文件中)和上述配置里,加载Redis配置。

加载配置主要是在loadServerConfig函数里:

void loadServerConfig(char *filename, char *options) {
    sds config = sdsempty();
    char buf[CONFIG_MAX_LINE+1];
    /* Load the file content */
    if (filename) {
        // ...
    }
    /* Append the additional options */
    if (options) {
        config = sdscat(config,"\n");
        config = sdscat(config,options);
    }
    // 这里对读到的配置信息统一进行加载
    loadServerConfigFromString(config);
    sdsfree(config);
}

至于具体的加载,也是一段很长的代码,代码中充满了if-elseserver的相关字段根据这些信息被设置。

4.6. daemon化

Redis通常而言是后台运行的,因此需要后台化处理。

Redis的后台化写的很小巧,即创建一个子进程,然后子进程把标准输入流、标准输出流和标准错误流重定向到/dev/null里。

void daemonize(void) {
    int fd;

    if (fork() != 0) exit(0); /* parent exits */
    setsid(); /* create a new session */
    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        if (fd > STDERR_FILENO) close(fd);
    }
}

int dup2(int oldfd, int newfd)

oldfd拷贝一个fd出来,它们都指向同一个文件。拷贝的fd值等于newfd。若原有newfd指向的文件被打开,那么被重用前,它会被关闭。

4.7. 初始化整个服务器

这一步算是最重要的一步。它根据配置好的信息,将整个服务完全的初始化。当初始化完成后,只需启动其初始化好的时间循环,服务就正式启动了。

服务器的初始化位于函数initServer,代码非常长,这里大致说明流程:

  • 创建事件循环(aeEventLoop
  • 监听端口(TCP和Unix domain socket/IPC)
  • 初始化数据库和数据库内部的信息
  • 状态、统计数据的初始化
  • 添加时间事件(serverCron
  • 给事件循环/多路复用器注册读回调,用于
    • 接受TCP和Unix domain socket的连接
    • 读取模块发来的信号(若事件循环本身作为一个阻塞客户端监听模块的信号)
  • (若需要)打开AOF文件
  • 初始化其它组件,必要时执行,如:
    • 集群管理
    • 主从复制
    • 脚本管理
    • 慢查询日志
    • 延迟监控
    • 后台I/O服务

4.8. 加载模块与数据

若Redis被设置成了哨兵,那么模块和数据不需要加载,而是启动哨兵服务进程。

// In main
if (!server.sentinel_mode) {
    // ...
} else {
    sentinelIsRunning();
}

若不是哨兵,那么首先会加载模块,函数是moduleLoadFromQueue

void moduleLoadFromQueue(void) {
    listIter li;
    listNode *ln;
	// 遍历模块链表(这里的初始化是在4.5.中初始化的,通过loadmodule配置)
    listRewind(server.loadmodule_queue,&li);
    while((ln = listNext(&li))) {
        struct moduleLoadQueueEntry *loadmod = ln->value;
        // 加载模块
        if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
            == C_ERR){
           // handle error and exit
        }
    }
}

然后加载数据,函数是loadDataFromDisk,优先加载AOF,没有AOF才加载RDB:

对于混合持久化,它是走第一个分支的。

因为混合持久化的文件结构是大RDB+少量AOF,加载AOF时,若读到了RDB头,会先加载RDB部分的数据,然后再加载最后少量的AOF数据。

void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) {
        // 优先加载AOF
        if (loadAppendOnlyFile(server.aof_filename) == C_OK)
            serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        // 未开启AOF,则加载RDB
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
            serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
            if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
                rsi.repl_id_is_set &&
                rsi.repl_offset != -1 &&
                rsi.repl_stream_db != -1)
            {
                memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
                server.master_repl_offset = rsi.repl_offset;
                replicationCacheMasterUsingMyself();
                selectDb(server.cached_master,rsi.repl_stream_db);
            }
        } else if (errno != ENOENT) {
            // 若未找到文件,错误是不打出的,也不会退出进程
            serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}

4.9. 启动服务

启动服务就很简单了,由于一切都已初始化完毕,只需要启动事件循环,服务器就会持续监听外部的请求,处理文件事件(如客户端请求)和时间事件(如serverCron)。

int main(int argc, char **argv) {
    // ... 
    aeSetBeforeSleepProc(server.el,beforeSleep); // 设置beforeSleep,每次事件循环前执行
    aeSetAfterSleepProc(server.el,afterSleep); // 设置afterSleep,每次poll完多路复用器后执行
    aeMain(server.el); // 启动事件循环
    // ...
}

5. 总结

  • 命令处理的过程如下:

    • 服务器监听到读事件,读取命令并保存到输入缓冲区,然后解析
    • 查找命令的函数表,执行命令,将结果保存到响应缓冲区
    • 事件循环开始前(beforeSleep)将响应缓冲区写出,若没写完,则注册写事件,延迟到事件循环内部执行(当写事件准备完毕时)
  • serverCron是一个时间时间,默认100ms执行一次,主要更新和维护服务器的状态,处理一些信号(如SIGTERM),处理一些持久化、集群、复制、监控等维护工作

  • 服务端的启动主要还是:

    • 初始化和载入配置
    • 初始化数据结构并还原数据
    • 启动监听,创建事件循环并注册事件
    • 启动事件循环

    最主要的还是3、4步。