源码阅读-Redis单机服务器: AOF

Posted by keys961 on October 9, 2019

1. Overview

Redis还提供另一种持久化方案:AOF(Append-only file)。可以将其看成类似的数据库日志

AOF不同于WAL,它的追加是在写入命令完成之后进行。

不过AOF很像MySQL的binlog。

2. AOF持久化实现

AOF持久化分为3块:命令追加、文件写入、文件同步。

2.1. 命令追加

Redis在server.ccall(client *c, int flags)函数中执行命令,并追加AOF:

  • 调用c->cmd->proc(c)处理命令
  • 调用propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags),若开启AOF,将变更传播到AOF文件中
    • 调用feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) 将命令追加到AOF中

AOF追加在feedAppendOnlyFile函数实现,代码如下:

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    // 1. 生成一行追加项
    if (dictid != server.aof_selected_db) {
        char seldb[64];
		// SELECT命令
        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        // EXPIRE/PEXPIRE/EXPIREAT命令,转换成PEXPIREAT命令
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        // 将SETEX/PSETEX转成SET和PEXPIREAT
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setCommand && argc > 3) {
        int i;
        robj *exarg = NULL, *pxarg = NULL;
        // 将SET [EX seconds][PX milliseconds]转成SET和PEXPIREAT */
        buf = catAppendOnlyGenericCommand(buf,3,argv);
        for (i = 3; i < argc; i ++) {
            if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
            if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
        }
        serverAssert(!(exarg && pxarg));
        if (exarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
                                               exarg);
        if (pxarg)
            buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
                                               pxarg);
    } else {
        // 其他命令 
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }
    // 2. 将追加项追加到AOF缓冲中,等待被刷入磁盘
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
    // 3. 若有后台AOF重写,还需要将其添加到AOF重写的缓冲中进行刷入,以显示当前数据库的修改变化
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

可见其主要分为3步:生成追加项、追加到AOF缓冲、(AOF重写子进程运行时)添加到AOF重写缓冲

a) 生成追加项

AOF追加项的格式实际上在catAppendOnlyGenericCommand(sds dst, int argc, robj **argv)函数实现,它的代码很简单,所以略过,但总结一下AOF一条追加项的格式:

*<count>\r\n   // 命令的参数个数(包含命令本身)
$<len_arg_0>\r\n // 第一个参数长度
<content>\r\n  // 参数值
$<len_arg_0>\r\n // 第二个参数长度
<content>\r\n  // 参数值
...

而命令生成也有一定的转换规则:

  • 必要时,首先需要追加SELECT
  • EXPIRE/PEXPIRE/EXPIREAT命令,会转换成PEXPIREAT命令
  • SETEX/PSETEX/SET [EX seconds][PX milliseconds]命令,会转成SETPEXPIREAT2条命令
  • 其他写入命令,不需要转换

而对于设定过期的命令,时间参数还会被转成绝对时间(墙上时钟),并以毫秒为单位。

b) 追加AOF缓冲

Redis为了性能,不会每次将每条AOF记录刷入到磁盘,而是写入到缓冲中,以提高性能。

而Redis提供不同策略,将AOF缓冲刷入磁盘,这部分下面会说明。

struct redisServer {
    // ...
    sds aof_buf; // AOF buffer
    // ...
}

c) 写入AOF重写缓冲

若AOF重写进程正在执行,则还需要将数据追加到AOF重写缓冲。

AOF重写缓冲不是sds,而是一个aofrwblock结构的链表。

aofrwblock结构定义如下所示,它是一个10MB的内存块:

typedef struct aofrwblock {
    unsigned long used, free;
    char buf[AOF_RW_BUF_BLOCK_SIZE]; // 10MB
} aofrwblock;

追加AOF重写缓冲的代码也比较简单,当尾部数据块有空闲空间时,直接追加,没有就创建新的数据块,然后再追加:

void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;
	// 当尾部数据块有空闲空间时,直接追加,没有就创建新的数据块,然后再追加
    while(len) {
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) { 
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }
        if (len) {
            int numblocks;
            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                // 若数据块过多,会打日志提醒
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
    
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

而由于AOF重写是子进程在做,因此需要构建一个进程间通信的机制,传输AOF重写缓冲的数据。

Redis使用进程管道(单工)的方式,建立6条管道以通信:

int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;
	// 创建父进程和子进程之间的管道(fd[0]读端,fd[1]写端)
    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

error:
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
        strerror(errno));
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;
}

而每次事件循环之后,AOF重写缓冲会被写入到管道中,这在函数aofChildWriteDiffData中可以体现,代码还是很简单的:

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ...
    while(1) {
        // 一个一个从AOF重写缓冲链表中取头
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        // 若被设置停止发送diff(即AOF重写缓冲)或者链表头为空,则删除管道写入事件,并返回
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        // ...
        if (block->used > 0) {
            // 这里将数据写入aof_pipe_write_data_to_child管道中
            // 子进程可以通过aof_pipe_read_data_from_parent管道收到数据
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return; // 写不下了就返回
            // ...
        }
        // ...
    }
}

至于子进程如何重写AOF,以及如何处理从管道接收的AOF重写缓冲数据,将会在下面几节专门说明。

2.2. 文件写入和同步

之前所说,AOF记录是追加到缓冲,然后根据配置好的策略刷入磁盘。策略有3种:

#define AOF_FSYNC_NO 0 // 由操作系统决定(最快,最不安全)
#define AOF_FSYNC_ALWAYS 1 // 每次事件循环刷入一次(最慢,最安全)
#define AOF_FSYNC_EVERYSEC 2 // 每秒同步一次(平衡,是默认的配置)

执行的入口从事件循环中进入:aeMain -> beforeSleep -> flushAppendOnlyFile

所以着重看函数flushAppendOnlyFile(int force)

// 每次事件循环都会进入,而这是在主进程中进行的,因此不会有另外的请求被处理
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;
    mstime_t latency;

    if (sdslen(server.aof_buf) == 0) {
        // AOF缓冲为空时,依旧要检查,尝试进行fsync
        // 因为在AOF_FSYNC_EVERYSEC下,fsync是后台进行的
        // 因此下一次进入本函数时,之前的fsync根本没有执行
        // 这种情况下需要强制执行一次fsync
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
            server.aof_fsync_offset != server.aof_current_size &&
            server.unixtime > server.aof_last_fsync &&
            !(sync_in_progress = aofFsyncInProgress())) {
            goto try_fsync;
        } else {
            return;
        }
    }

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        // AOF_FSYNC_EVERYSEC下,检查系统是否正在执行fsync
        sync_in_progress = aofFsyncInProgress();

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        // 在AOF_FSYNC_EVERYSEC和非强制下,检查是否在fsync(因为这个模式是后台进行的)
        if (sync_in_progress) {
            // 若检查到之前的fsync没好,则:
            // 若2秒之内没好,就直接推迟write + fsync
            // 若超过2秒还没好,就直接fall-through(即后面还会创建一个write + fsync后台任务)
            if (server.aof_flush_postponed_start == 0) {
                // 记录推迟的开始时间
                server.aof_flush_postponed_start = server.unixtime;
                return; // 首次出现,推迟
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                return; // 2s还没好,推迟
            }
            server.aof_delayed_fsync++;
            // 这里2s后还没好,就直接放过,执行write + 后台fsync
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
        }
    }
    
    latencyStartMonitor(latency);
    // 这里将用户态的AOF缓冲写入内核缓冲(调用syscall write)
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    // 记录统计信息
    // ...
    server.aof_flush_postponed_start = 0; // 清零推迟开始时间的记录
    // 判断是否刷入内核缓冲时出现错误
    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        // 处理写入错误:
        // a) 记录错误
        // b) 打日志,并truncate刚才部分写入的数据(若truncate失败可能AOF加载会有问题,这里会打日志)
        // c) 处理错误: 当为AOF_FSYNC_ALWAYS时,直接退出;当为其他的策略时,截断已写入的AOF缓冲,留下的部分下一次重试
        // 这里就直接返回了
    } else {
        // 这里是成功的场景,若上次write出错,则记录/恢复状态为正常状态
    }
    server.aof_current_size += nwritten;
    // write写好时候,清空AOF缓冲,若缓冲比较小(4000字节以下)就复用,否则重新创建一个新缓冲
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
        sdsclear(server.aof_buf);
    } else {
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    }

try_fsync:
    // 当no-appendfsync-on-rewrite置yes且有RDB/AOF重写后台进程时,不执行fsync
    if (server.aof_no_fsync_on_rewrite &&
        (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
            return;
    // 当配置成非AOF_FSYNC_NO时,进行fsync刷入磁盘
    // AOF_FSYNC_NO由操作系统调度
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        // 当配置成AOF_FSYNC_ALWAYS时,说明每次进入该函数,都需要fsync
        // 那么就执行fsync(Linux上执行fdatasync以避免元数据的刷入)
        latencyStartMonitor(latency);
        redis_fsync(server.aof_fd);
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_fsync_offset = server.aof_current_size;
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        // 若配置成AOF_FSYNC_EVERYSEC,且当前时间与上次fsync时间差>=1s(这里unixtime字段是秒为单位的)
        // 创建一个fsync后台任务执行
        if (!sync_in_progress) {
            aof_background_fsync(server.aof_fd);
            server.aof_fsync_offset = server.aof_current_size;
        }
        server.aof_last_fsync = server.unixtime;
    }
}

代码还是比较长的,但是主要逻辑还是简单的,大多说明到了注释上。这里大致整理一下:

  • 每次事件循环都会尝试刷新AOF缓冲到磁盘(即进入上面这个函数),而事件循环是主进程执行,因此函数内AOF的数据不会变多。
  • 刷新的流程是:
    • write:将AOF缓冲刷入内核缓冲(三种策略都会做)
    • fsync:将内核缓冲刷入磁盘
      • AOF_FSYNC_NO:不会调用fsync,由操作系统同步
      • AOF_FSYNC_EVERYSEC:创建fsync后台任务,后台执行
      • AOF_FSYNC_ALWAYS:每次进入该函数同步地进行fsync
  • 失败处理:
    • write失败时,若是AOF_FSYNC_ALWAYS,就会直接报错退出,其他条件下就会恢复状态等待下一次事件循环的重试
    • 没看到fsync失败的处理
  • AOF_FSYNC_EVERYSEC下的后台任务:
    • 若后台任务fsync任务迟迟不执行,且AOF缓冲为空下,会手动再添加一个fsync后台任务
    • 当之前的后台fsync在执行但没执行完毕,本次的write可能会推迟,时限为2s,若超过时限会fall-through,执行write + fsync,这会拖慢系统速度

3. AOF载入

和数据库的日志一样,Redis只要从头到尾执行一遍AOF即可恢复数据。

原理很简单,Redis创建一个伪客户端,读取AOF命令,然后一条一条执行(注意事务的回滚)。

实现在loadAppendOnlyFile(char *filename)函数中:

int loadAppendOnlyFile(char *filename) {
    struct client *fakeClient;
    // 1. 打开文件
    FILE *fp = fopen(filename,"r");
    // ...

    if (fp == NULL) {
        // 打不开就退出进程
    }

    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
        // 处理AOF文件为空的情况
        // ...
        return C_ERR;
    }

    // ...
	// 2. 创建伪客户端
    fakeClient = createFakeClient();
    startLoading(fp);

    char sig[5]; /* "REDIS" */
    if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
        /* No RDB preamble, seek back at 0 offset. */
        if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
    } else {
        // 3. 若读到RDB文件,直接按RDB处理
        rioInitWithFile(&rdb,fp);
        // ...
    }

    // 4. 一行一行读AOF,构造命令,执行
    while(1) {
        int argc, j;
        unsigned long len;
        robj **argv;
        char buf[128];
        sds argsds;
        struct redisCommand *cmd;
        // 4.1. 获取命令
        // ...

        // 4.2. 查找命令
        cmd = lookupCommand(argv[0]->ptr);
        // ...
        // 4.3. 执行命令,分为普通的和MULTI事务命令
        fakeClient->cmd = cmd;
        if (fakeClient->flags & CLIENT_MULTI &&
            fakeClient->cmd->proc != execCommand)
        {
            queueMultiCommand(fakeClient);
        } else {
            cmd->proc(fakeClient);
        }
        // 4.4. 清理一些内容
        // ...
    }
    // 5. 处理未提交的事务,执行回滚
    if (fakeClient->flags & CLIENT_MULTI) {
        serverLog(LL_WARNING,
            "Revert incomplete MULTI/EXEC transaction in AOF file");
        // 从valid_before_multi回滚,执行事务的时候会记录
        valid_up_to = valid_before_multi;
        goto uxeof;
    }

// ...
}

4. AOF重写

AOF重写会生成一个新的AOF文件,以替代旧文件。但AOF重写不会访问旧AOF文件,而是获取服务器当前全量状态,构造一个AOF文件

可以认为是对旧AOF文件的合并操作。

Redis的AOF重写是放在子进程中的,通过管道和父进程通信,当当前存在AOF重写/RDB子进程时,是不会再触发AOF重写的。

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // 1. 创建并初始化通信管道
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    // 2. 创建子进程
    if ((childpid = fork()) == 0) {
        char tmpfile[256];
		// a) 对于子进程,关闭socket(子进程不需要监听),并重写AOF
        // 而父进程依旧可以写入,fork的COW保证父子进程的数据进程安全(和BG-RDB一样)
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        // b) 重写AOF
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            // ...
            // c) 成功写完后,向父进程通知,并退出
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        // 父进程更新一些统计操作,并更新dict resize policy(提高rehash阈值)
        // ... 
        return C_OK;
    }
    return C_OK; /* unreached */
}

子进程重写AOF的函数在rewriteAppendOnlyFile(char *filename)中:

int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;
	// 1. 创建并初始化rio
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }
    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
	// 2. 持久文件,检查是否开启AOF,RDB混合持久
    if (server.aof_use_rdb_preamble) {
        // 2.1. 若开启,先直接持久RDB
        // 然后再重写父进程传来的diff数据(即AOF重写缓冲),生成新的AOF文件(见前一篇文章)
        int error;
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        // 2.2. 若没开启,则直接重写AOF
        // 遍历所有键值对,生成写命令,然后追加到新的AOF文件中
        // 这里我也没看到对过期数据的处理
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }
    // 3. 刷文件到磁盘
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    // 4. 读取主进程的AOF重写缓冲
    // 若20ms内没有新数据,则终止读取
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }
    // 5. 告诉主进程不要往通道发送AOF重写缓冲的数据
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    // 6. 读取5来自父进程的ACK(等待5s或ACK内容有误)
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
    // 7. 最后再读一次AOF重写缓冲
    aofReadDiffFromParent();
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    // 8. 把读到的AOF重写缓冲数据追加到新的AOF文件上
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;
    // 9. 再次刷新文件到磁盘
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
    // 10. 重命名文件并返回
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

总体而言,重写的逻辑还是比较清晰的:

  • 首先遍历整个Redis各个DB,持久化数据(不删除过期数据)
    • 混合模式:生成2个文件:RDB(主要的K-V数据)、AOF(来自主进程AOF重写缓冲的数据)
    • 普通AOF:生成1个文件:AOF(主要的K-V数据+来自主进程AOF重写缓冲的数据)
  • 然后通过管道读取AOF重写缓冲的数据
  • 将读到的数据追加到新AOF文件上

特别注意的是父子进程的通信处理(传输AOF重写缓冲),它们之间会发送一些信号:

  1. 父进程往管道写数据

  2. 子进程等待管道的数据,并将管道的数据读取下来

  3. 子进程等待管道超过20ms,仍没数据,则通知父进程停止向管道写数据

  4. 父进程收到通知,停止写数据,并返回子进程一个ACK

    3,4步实际上是父子进程互相发送一个!

    对于子进程:

    // In rewriteAppendOnlyFile
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    

    对于父进程:

    // In aofCreatePipes
    // 为aof_pipe_read_ack_from_child管道注册读事件
    // 回调函数是aofChildPipeReadable
    // 这里是创建一个文件事件,文件事件是注册到多路复用器上的,至于多路复用是上面我也不多说了
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    server.aof_pipe_read_ack_from_child = fds[2];
    
    void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
     // ...
     // 从为aof_pipe_read_ack_from_child管道读到'!‘
     if (read(fd,&byte,1) == 1 && byte == '!') {
         serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
         server.aof_stop_sending_diff = 1;
         // 发送ACK,值也为'!’给子进程
         if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
             serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                 strerror(errno));
         }
     }
     // ... 从多路复用器上删除读事件,因为这个事件在AOF重写中只触发1次
    }
    
  5. 子进程收到ACK后,读取管道中残留数据,并将所有读取的数据写入AOF文件中

  6. 一切完成后,子进程通知父进程AOF重写完成

这些可以总结为下图:

img

5. 总结

通过读RDB和AOF源码:

  1. AOF和RDB不同点
    • AOF存过程,类比成流
    • RDB存状态,类比成表
  2. AOF追加会有一层缓冲,通过事件循环来将其刷入磁盘(3种不同策略)
  3. AOF加载使用伪客户端执行AOF命令实现,需要注意事务,要记录提交的日志位置,以便于回滚
  4. AOF重写是为了降低AOF大小,本质上是对AOF的合并运算
  5. AOF重写时,读取获取父进程AOF缓冲数据,是通过管道实现的,并通过管道实现父子进程的通知和同步
  6. AOF和RDB混合持久时,重写AOF会生成一份大RDB和一份小AOF
  7. writeflush系统调用只能保证数据写入内核缓冲,而fsyncfdatasync系统调用才能保证数据完全写入磁盘