1. Overview
Redis还提供另一种持久化方案:AOF(Append-only file)。可以将其看成类似的数据库日志。
AOF不同于WAL,它的追加是在写入命令完成之后进行。
不过AOF很像MySQL的binlog。
2. AOF持久化实现
AOF持久化分为3块:命令追加、文件写入、文件同步。
2.1. 命令追加
Redis在server.c
的call(client *c, int flags)
函数中执行命令,并追加AOF:
- 调用
c->cmd->proc(c)
处理命令 - …
- 调用
propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags)
,若开启AOF,将变更传播到AOF文件中- 调用
feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc)
将命令追加到AOF中
- 调用
- …
AOF追加在feedAppendOnlyFile
函数实现,代码如下:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
// 1. 生成一行追加项
if (dictid != server.aof_selected_db) {
char seldb[64];
// SELECT命令
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
// EXPIRE/PEXPIRE/EXPIREAT命令,转换成PEXPIREAT命令
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
// 将SETEX/PSETEX转成SET和PEXPIREAT
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
// 将SET [EX seconds][PX milliseconds]转成SET和PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
// 其他命令
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
// 2. 将追加项追加到AOF缓冲中,等待被刷入磁盘
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
// 3. 若有后台AOF重写,还需要将其添加到AOF重写的缓冲中进行刷入,以显示当前数据库的修改变化
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
可见其主要分为3步:生成追加项、追加到AOF缓冲、(AOF重写子进程运行时)添加到AOF重写缓冲
a) 生成追加项
AOF追加项的格式实际上在catAppendOnlyGenericCommand(sds dst, int argc, robj **argv)
函数实现,它的代码很简单,所以略过,但总结一下AOF一条追加项的格式:
*<count>\r\n // 命令的参数个数(包含命令本身)
$<len_arg_0>\r\n // 第一个参数长度
<content>\r\n // 参数值
$<len_arg_0>\r\n // 第二个参数长度
<content>\r\n // 参数值
...
而命令生成也有一定的转换规则:
- 必要时,首先需要追加
SELECT
EXPIRE/PEXPIRE/EXPIREAT
命令,会转换成PEXPIREAT
命令SETEX/PSETEX/SET [EX seconds][PX milliseconds]
命令,会转成SET
和PEXPIREAT
2条命令- 其他写入命令,不需要转换
而对于设定过期的命令,时间参数还会被转成绝对时间(墙上时钟),并以毫秒为单位。
b) 追加AOF缓冲
Redis为了性能,不会每次将每条AOF记录刷入到磁盘,而是写入到缓冲中,以提高性能。
而Redis提供不同策略,将AOF缓冲刷入磁盘,这部分下面会说明。
struct redisServer {
// ...
sds aof_buf; // AOF buffer
// ...
}
c) 写入AOF重写缓冲
若AOF重写进程正在执行,则还需要将数据追加到AOF重写缓冲。
AOF重写缓冲不是sds
,而是一个aofrwblock
结构的链表。
aofrwblock
结构定义如下所示,它是一个10MB的内存块:
typedef struct aofrwblock {
unsigned long used, free;
char buf[AOF_RW_BUF_BLOCK_SIZE]; // 10MB
} aofrwblock;
追加AOF重写缓冲的代码也比较简单,当尾部数据块有空闲空间时,直接追加,没有就创建新的数据块,然后再追加:
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
// 当尾部数据块有空闲空间时,直接追加,没有就创建新的数据块,然后再追加
while(len) {
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) {
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) {
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
// 若数据块过多,会打日志提醒
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
而由于AOF重写是子进程在做,因此需要构建一个进程间通信的机制,传输AOF重写缓冲的数据。
Redis使用进程管道(单工)的方式,建立6条管道以通信:
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
// 创建父进程和子进程之间的管道(fd[0]读端,fd[1]写端)
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}
而每次事件循环之后,AOF重写缓冲会被写入到管道中,这在函数aofChildWriteDiffData
中可以体现,代码还是很简单的:
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
// ...
while(1) {
// 一个一个从AOF重写缓冲链表中取头
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
// 若被设置停止发送diff(即AOF重写缓冲)或者链表头为空,则删除管道写入事件,并返回
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
// ...
if (block->used > 0) {
// 这里将数据写入aof_pipe_write_data_to_child管道中
// 子进程可以通过aof_pipe_read_data_from_parent管道收到数据
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return; // 写不下了就返回
// ...
}
// ...
}
}
至于子进程如何重写AOF,以及如何处理从管道接收的AOF重写缓冲数据,将会在下面几节专门说明。
2.2. 文件写入和同步
之前所说,AOF记录是追加到缓冲,然后根据配置好的策略刷入磁盘。策略有3种:
#define AOF_FSYNC_NO 0 // 由操作系统决定(最快,最不安全)
#define AOF_FSYNC_ALWAYS 1 // 每次事件循环刷入一次(最慢,最安全)
#define AOF_FSYNC_EVERYSEC 2 // 每秒同步一次(平衡,是默认的配置)
执行的入口从事件循环中进入:aeMain -> beforeSleep -> flushAppendOnlyFile
所以着重看函数flushAppendOnlyFile(int force)
:
// 每次事件循环都会进入,而这是在主进程中进行的,因此不会有另外的请求被处理
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) {
// AOF缓冲为空时,依旧要检查,尝试进行fsync
// 因为在AOF_FSYNC_EVERYSEC下,fsync是后台进行的
// 因此下一次进入本函数时,之前的fsync根本没有执行
// 这种情况下需要强制执行一次fsync
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// AOF_FSYNC_EVERYSEC下,检查系统是否正在执行fsync
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
// 在AOF_FSYNC_EVERYSEC和非强制下,检查是否在fsync(因为这个模式是后台进行的)
if (sync_in_progress) {
// 若检查到之前的fsync没好,则:
// 若2秒之内没好,就直接推迟write + fsync
// 若超过2秒还没好,就直接fall-through(即后面还会创建一个write + fsync后台任务)
if (server.aof_flush_postponed_start == 0) {
// 记录推迟的开始时间
server.aof_flush_postponed_start = server.unixtime;
return; // 首次出现,推迟
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
return; // 2s还没好,推迟
}
server.aof_delayed_fsync++;
// 这里2s后还没好,就直接放过,执行write + 后台fsync
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
latencyStartMonitor(latency);
// 这里将用户态的AOF缓冲写入内核缓冲(调用syscall write)
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
// 记录统计信息
// ...
server.aof_flush_postponed_start = 0; // 清零推迟开始时间的记录
// 判断是否刷入内核缓冲时出现错误
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
// 处理写入错误:
// a) 记录错误
// b) 打日志,并truncate刚才部分写入的数据(若truncate失败可能AOF加载会有问题,这里会打日志)
// c) 处理错误: 当为AOF_FSYNC_ALWAYS时,直接退出;当为其他的策略时,截断已写入的AOF缓冲,留下的部分下一次重试
// 这里就直接返回了
} else {
// 这里是成功的场景,若上次write出错,则记录/恢复状态为正常状态
}
server.aof_current_size += nwritten;
// write写好时候,清空AOF缓冲,若缓冲比较小(4000字节以下)就复用,否则重新创建一个新缓冲
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
try_fsync:
// 当no-appendfsync-on-rewrite置yes且有RDB/AOF重写后台进程时,不执行fsync
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// 当配置成非AOF_FSYNC_NO时,进行fsync刷入磁盘
// AOF_FSYNC_NO由操作系统调度
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
// 当配置成AOF_FSYNC_ALWAYS时,说明每次进入该函数,都需要fsync
// 那么就执行fsync(Linux上执行fdatasync以避免元数据的刷入)
latencyStartMonitor(latency);
redis_fsync(server.aof_fd);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 若配置成AOF_FSYNC_EVERYSEC,且当前时间与上次fsync时间差>=1s(这里unixtime字段是秒为单位的)
// 创建一个fsync后台任务执行
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
代码还是比较长的,但是主要逻辑还是简单的,大多说明到了注释上。这里大致整理一下:
- 每次事件循环都会尝试刷新AOF缓冲到磁盘(即进入上面这个函数),而事件循环是主进程执行,因此函数内AOF的数据不会变多。
- 刷新的流程是:
write
:将AOF缓冲刷入内核缓冲(三种策略都会做)fsync
:将内核缓冲刷入磁盘AOF_FSYNC_NO
:不会调用fsync
,由操作系统同步AOF_FSYNC_EVERYSEC
:创建fsync
后台任务,后台执行AOF_FSYNC_ALWAYS
:每次进入该函数同步地进行fsync
- 失败处理:
- 当
write
失败时,若是AOF_FSYNC_ALWAYS
,就会直接报错退出,其他条件下就会恢复状态等待下一次事件循环的重试 - 没看到
fsync
失败的处理
- 当
AOF_FSYNC_EVERYSEC
下的后台任务:- 若后台任务
fsync
任务迟迟不执行,且AOF缓冲为空下,会手动再添加一个fsync
后台任务 - 当之前的后台
fsync
在执行但没执行完毕,本次的write
可能会推迟,时限为2s,若超过时限会fall-through,执行write + fsync
,这会拖慢系统速度
- 若后台任务
3. AOF载入
和数据库的日志一样,Redis只要从头到尾执行一遍AOF即可恢复数据。
原理很简单,Redis创建一个伪客户端,读取AOF命令,然后一条一条执行(注意事务的回滚)。
实现在loadAppendOnlyFile(char *filename)
函数中:
int loadAppendOnlyFile(char *filename) {
struct client *fakeClient;
// 1. 打开文件
FILE *fp = fopen(filename,"r");
// ...
if (fp == NULL) {
// 打不开就退出进程
}
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
// 处理AOF文件为空的情况
// ...
return C_ERR;
}
// ...
// 2. 创建伪客户端
fakeClient = createFakeClient();
startLoading(fp);
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
/* No RDB preamble, seek back at 0 offset. */
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
// 3. 若读到RDB文件,直接按RDB处理
rioInitWithFile(&rdb,fp);
// ...
}
// 4. 一行一行读AOF,构造命令,执行
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
// 4.1. 获取命令
// ...
// 4.2. 查找命令
cmd = lookupCommand(argv[0]->ptr);
// ...
// 4.3. 执行命令,分为普通的和MULTI事务命令
fakeClient->cmd = cmd;
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
queueMultiCommand(fakeClient);
} else {
cmd->proc(fakeClient);
}
// 4.4. 清理一些内容
// ...
}
// 5. 处理未提交的事务,执行回滚
if (fakeClient->flags & CLIENT_MULTI) {
serverLog(LL_WARNING,
"Revert incomplete MULTI/EXEC transaction in AOF file");
// 从valid_before_multi回滚,执行事务的时候会记录
valid_up_to = valid_before_multi;
goto uxeof;
}
// ...
}
4. AOF重写
AOF重写会生成一个新的AOF文件,以替代旧文件。但AOF重写不会访问旧AOF文件,而是获取服务器当前全量状态,构造一个AOF文件。
可以认为是对旧AOF文件的合并操作。
Redis的AOF重写是放在子进程中的,通过管道和父进程通信,当当前存在AOF重写/RDB子进程时,是不会再触发AOF重写的。
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
// 1. 创建并初始化通信管道
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
// 2. 创建子进程
if ((childpid = fork()) == 0) {
char tmpfile[256];
// a) 对于子进程,关闭socket(子进程不需要监听),并重写AOF
// 而父进程依旧可以写入,fork的COW保证父子进程的数据进程安全(和BG-RDB一样)
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// b) 重写AOF
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
// ...
// c) 成功写完后,向父进程通知,并退出
sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
// 父进程更新一些统计操作,并更新dict resize policy(提高rehash阈值)
// ...
return C_OK;
}
return C_OK; /* unreached */
}
子进程重写AOF的函数在rewriteAppendOnlyFile(char *filename)
中:
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
// 1. 创建并初始化rio
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
// 2. 持久文件,检查是否开启AOF,RDB混合持久
if (server.aof_use_rdb_preamble) {
// 2.1. 若开启,先直接持久RDB
// 然后再重写父进程传来的diff数据(即AOF重写缓冲),生成新的AOF文件(见前一篇文章)
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
// 2.2. 若没开启,则直接重写AOF
// 遍历所有键值对,生成写命令,然后追加到新的AOF文件中
// 这里我也没看到对过期数据的处理
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
// 3. 刷文件到磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
// 4. 读取主进程的AOF重写缓冲
// 若20ms内没有新数据,则终止读取
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
// 5. 告诉主进程不要往通道发送AOF重写缓冲的数据
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
// 6. 读取5来自父进程的ACK(等待5s或ACK内容有误)
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
// 7. 最后再读一次AOF重写缓冲
aofReadDiffFromParent();
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
// 8. 把读到的AOF重写缓冲数据追加到新的AOF文件上
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
// 9. 再次刷新文件到磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
// 10. 重命名文件并返回
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}
总体而言,重写的逻辑还是比较清晰的:
- 首先遍历整个Redis各个DB,持久化数据(不删除过期数据)
- 混合模式:生成2个文件:RDB(主要的K-V数据)、AOF(来自主进程AOF重写缓冲的数据)
- 普通AOF:生成1个文件:AOF(主要的K-V数据+来自主进程AOF重写缓冲的数据)
- 然后通过管道读取AOF重写缓冲的数据
- 将读到的数据追加到新AOF文件上
特别注意的是父子进程的通信处理(传输AOF重写缓冲),它们之间会发送一些信号:
-
父进程往管道写数据
-
子进程等待管道的数据,并将管道的数据读取下来
-
子进程等待管道超过20ms,仍没数据,则通知父进程停止向管道写数据
-
父进程收到通知,停止写数据,并返回子进程一个ACK
3,4步实际上是父子进程互相发送一个
!
。对于子进程:
// In rewriteAppendOnlyFile if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
对于父进程:
// In aofCreatePipes // 为aof_pipe_read_ack_from_child管道注册读事件 // 回调函数是aofChildPipeReadable // 这里是创建一个文件事件,文件事件是注册到多路复用器上的,至于多路复用是上面我也不多说了 if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_read_ack_from_child = fds[2];
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { // ... // 从为aof_pipe_read_ack_from_child管道读到'!‘ if (read(fd,&byte,1) == 1 && byte == '!') { serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs."); server.aof_stop_sending_diff = 1; // 发送ACK,值也为'!’给子进程 if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) { serverLog(LL_WARNING,"Can't send ACK to AOF child: %s", strerror(errno)); } } // ... 从多路复用器上删除读事件,因为这个事件在AOF重写中只触发1次 }
-
子进程收到ACK后,读取管道中残留数据,并将所有读取的数据写入AOF文件中
-
一切完成后,子进程通知父进程AOF重写完成
这些可以总结为下图:
5. 总结
通过读RDB和AOF源码:
- AOF和RDB不同点
- AOF存过程,类比成流
- RDB存状态,类比成表
- AOF追加会有一层缓冲,通过事件循环来将其刷入磁盘(3种不同策略)
- AOF加载使用伪客户端执行AOF命令实现,需要注意事务,要记录提交的日志位置,以便于回滚
- AOF重写是为了降低AOF大小,本质上是对AOF的合并运算
- AOF重写时,读取获取父进程AOF缓冲数据,是通过管道实现的,并通过管道实现父子进程的通知和同步
- AOF和RDB混合持久时,重写AOF会生成一份大RDB和一份小AOF
write
和flush
系统调用只能保证数据写入内核缓冲,而fsync
和fdatasync
系统调用才能保证数据完全写入磁盘