1. Overview
RDB持久化是Redis的一个功能,可以看成是Redis的dump,将数据持久化到磁盘上,即使Redis进程退出,重启后也能将数据恢复。
RDB功能在rdb.h
和rdb.c
中定义和实现。
2. 导入/导出RDB
2.1. 导出RDB
RDB导出可通过SAVE
或者BGSAVE
命令触发,前者同步阻塞,后者创建子进程后台处理。
下面的是SAVE
命令的处理:
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) {
// 若后台有RDB进程,就拒绝生成本次的RDB
addReplyError(c,"Background save already in progress");
return;
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
// 这边rdbSave是保存RDB的主函数
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
}
}
而这个是BGSAVE
命令的处理:
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
// ...
// 不允许在已创建RDB/AOF子线程存在下进行后台RDB导出
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
// ...
// 创建子进程进行RDB导出
if ((childpid = fork()) == 0) {
int retval;
// 子进程下
// 关闭socket监听(子进程不需要监听)
// 而父进程依旧可以写入,fork的COW保证父子进程的数据进程安全
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi); // 执行RDB导出
// ...
exitFromChild((retval == C_OK) ? 0 : 1); // 退出子进程,这里调用_exit (int __status)函数(unistd.h定义)
} else {
// 父进程下
// ...
if (childpid == -1) {
// 无法创建子进程,记录并返回错误
// ...
return C_ERR;
}
// 记录和更新后台RDB导出的信息和相关状态
// 如更新dict resize policy(这种情况下,需要增加rehash的阈值)
// ...
return C_OK;
}
return C_OK; /* unreached */
}
最后核心的RDB导出函数如下:
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
// 先保存在临时文件里,文件名为temp-<pid>.rdb
fp = fopen(tmpfile,"w");
if (!fp) {
// 临时文件句柄无法创建,返回错误
// ...
return C_ERR;
}
// 初始化内容
rioInitWithFile(&rdb,fp);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
// 开始保存RDB文件内容(具体内容后面会说明)
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
// 清空用户空间缓冲、内核缓冲,并关闭临时文件,保证内容完全写入磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
// 重命名文件
if (rename(tmpfile,filename) == -1) {
// 出错,打日志,删除文件(使用unlink)
// ...
return C_ERR;
}
// ...
return C_OK;
werr:
// 出错,打日志,关闭并删除文件(使用unlink)
// ...
return C_ERR;
}
具体如何保存RDB文件,在函数rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi)
中,后面会具体说明,因为它涉及保存数据的格式。
2.2. 导入RDB
RDB导入只会在Redis启动时执行,只要有RDB文件存在,就会载入。
Redis还有AOF功能,会影响Redis是否执行RDB导入:
- 当AOF开启时,优先选择AOF文件还原状态(因为AOF更新频率高)
- 当AOF关闭时,才使用RDB文件还原状态
RDB导入在下面的rdbLoad(char *filename, rdbSaveInfo *rsi)
函数实现:
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;
// 打开文件
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
// 初始化
startLoading(fp);
rioInitWithFile(&rdb,fp);
// 真正的加载恢复
retval = rdbLoadRio(&rdb,rsi,0);
// 关闭文件等处理后续事情
fclose(fp);
stopLoading();
return retval;
}
具体的加载恢复在函数rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof)
实现,后面会具体说明,因为它同样涉及数据的格式。
3. 自动间隔保存
Redis配置文件上会有下面的几项:
1
2
3
save 900 1
save 300 10
save 60 10000
这里save <a1> <a2>
表示,服务器在a1
秒之内,至少修改数据库a2
次,BGSAVE
就会被执行。
因此上面的配置意思是,满足下面3个条件之一的,触发一次
BGSAVE
:
- 900秒内,至少修改1次
- 300秒内,至少修改10次
- 60秒内,至少修改10000次
3.1. 自动保存的配置信息
上述配置信息,会保存到结构saveparam
里:
struct saveparam {
time_t seconds; // 时间间隔(秒)
int changes; // 数据库修改次数
};
saveparam
信息保存在redisServer
里:
struct redisServer {
// ...
struct saveparam *saveparams; // 一组自动保存的配置信息
int saveparamslen; // 上面配置信息的个数
char *rdb_filename; // RDB文件名
// ...
};
3.2. 自动保存的条件检查
Redis内部有一个定时任务函数serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)
,执行一些定时的任务,间隔是每秒执行server.hz
次(默认100ms)。
关于定时任务的实现,之后会详细说明。
其中有一项就是检查自动保存的条件,它遍历整个saveparams
数组,若一条满足就执行BGSAVE
(前提是没有RDB, AOF Rewrite等子进程,并且还要考虑上次BGSAVE
失败的情形)。
代码如下,检查的条件已经在注释中说明:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) {
// 当有RDB/AOF Rewrite/其他子进程时
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
// 非阻塞等待子进程返回,若有返回的子进程,进行后续处理
// 如RDB/AOF Rewrite子进程的后续处理
// 并关闭子进程和父进程的管道,更新dict resize policy(rehash阈值减小)等
// ...
}
} else {
// 当没有RDB/AOF Rewrite等其他子进程时,检查自动保存配置,即saveparam
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
// 检查并满足下面3条:
// 1. 修改次数(大于changes)
// 2. 与上一次RDB Save的时间间隔(大于seconds)
// 3. 若上次RDB Save失败,还要检查与上次失败的RDB Save的时间间隔(大于CONFIG_BGSAVE_RETRY_DELAY,默认5秒)
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK)) {
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
// 满足条件,直接执行BGSAVE
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
// ...Trigger an AOF rewrite if needed.
}
// ...
}
4. RDB文件数据格式
4.1. 整个RDB文件的组成
第2节中的2个函数(内部真正执行RDB导出和导入),如下所示,和RDB文件数据格式有关,本节就是说明这点:
rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi)
rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof)
这里以第一个函数为例(因为第二个函数只是第一个函数的逆操作)。
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
// 1. 首先写入“REDIS0009”,即“REDIS” + rdb_version(当前版本为9),共9字节
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
// 2. 然后写入一些相关字段,如:
// Redis版本号、服务器的位数、当前时间、已使用的内存大小等
// 另外还有rsi提供的数据(可选),如当前选择的db号等等
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
// 3. 然后遍历db,写入数据
for (j = 0; j < server.dbnum; j++) {
// 对于每个db
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue; // 无键值对则跳过
di = dictGetSafeIterator(d);
// 3.1. 首先写入一个常量RDB_OPCODE_SELECTDB(254,1字节)
// 表示下面是db的ID号
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
// 3.2. 然后写入当前db的ID
if (rdbSaveLen(rdb,j) == -1) goto werr;
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
// 3.3. 接着写入常量RDB_OPCODE_RESIZEDB(251,1字节)
// 表示下面要写入该db存储的键值数量,供重建用
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
// 3.4. 写入dict键值对数量和expires设置过期的键值对数量
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
// 3.5. 遍历整个dict
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
// 写入键值对到RDB文件,这块下面会讲
// (这里我没有发现过滤过期数据,但加载的时候若是master就被过滤掉了)
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
// 处理RDB和AOF混合持久的情况,在AOF后台重写的时候会执行
// 这里接收父进程传来的AOF重写缓冲数据
// 然后将其生成新的AOF文件,配合前面生成的RDB一起持久数据
// 这会在下一篇讲
// ...
}
dictReleaseIterator(di);
di = NULL;
}
// 4. 保存lua脚本的数据
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL;
}
// 5. 写入EOF常量(255,1字节)
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
// 6. 最后写入校验和,8字节,即64位CRC校验
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
这里可以看到,整个RDB文件可以分为:
REDIS
常量头- RDB版本号
- 辅助字段,包括:
- Redis版本
- Redis服务器位数
- 当前时间
- 已使用内存大小
rdbSaveInfo
:repl-stream-db
,repl-id
,repl-offset
aof-preamble
(是否开启混合持久)
- 每个数据库的信息,对于每个数据库:
RDB_OPCODE_SELECTDB
常量- 当前数据库ID
RDB_OPCODE_RESIZEDB
常量- 当前数据库的键值对数量
- 当前数据库设置过期的键值对数量
- 各个键值对
- Lua脚本信息
RDB_OPCODE_EOF
常量- 64位CRC校验和
4.2. 键值对的保存格式
见下面的代码:
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
// 1. 若设置了过期时间
if (expiretime != -1) {
// 1.1. 先写入RDB_OPCODE_EXPIRETIME_MS(252)常量
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
// 1.2. 然后写入过期时间
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
// 2. 根据LRU/LFU统计策略,写入统计信息
// LRU: RDB_OPCODE_IDLE(248)常量+LRU空闲时间
// LFU: RDB_OPCODE_FREQ(249)常量+访问频率
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
// 3. 写入值的类型
if (rdbSaveObjectType(rdb,val) == -1) return -1;
// 4. 写入键
if (rdbSaveStringObject(rdb,key) == -1) return -1;
// 5. 写入值
if (rdbSaveObject(rdb,val,key) == -1) return -1;
return 1;
}
这里可以看到,一个键值对的格式是:
- (可选) 若设置过期时间,写入该信息:
RDB_OPCODE_EXPIRETIME_MS
常量- 过期时间
- LRU/LFU统计信息:
- LRU:
RDB_OPCODE_IDLE
常量+LRU空闲时间 - LFU:
RDB_OPCODE_FREQ
常量+访问频率
- LRU:
- 值的类型
- 键
- 值
另外,这里没有对过期数据进行过滤,似乎之前版本是有的,不过这里(5.0.5)没发现。所以《Redis设计与实现》9.7.节可能需要修改。
a) 值的类型
值类型定义如下,有些值是为了兼容而用的:
#define RDB_TYPE_STRING 0 // OBJ_STRING
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2 // OBJ_SET (散列表实现)
#define RDB_TYPE_ZSET 3
#define RDB_TYPE_HASH 4 // OBJ_HASH
#define RDB_TYPE_ZSET_2 5 // OBJ_ZSET (跳表+散列表实现)
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 // OBJ_MODULE
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11 // OBJ_SET
#define RDB_TYPE_ZSET_ZIPLIST 12 // OBJ_ZSET
#define RDB_TYPE_HASH_ZIPLIST 13 // OBJ_HASH
#define RDB_TYPE_LIST_QUICKLIST 14 // OBJ_LIST
#define RDB_TYPE_STREAM_LISTPACKS 15 // OBJ_STREAM
保存的时候,主要取上面带注释的。
它们都用1个字节保存。
b) 值保存格式
由于类型众多,所以挑选常见的类型格式,如下图,From https://juejin.im/post/5d24a1b4f265da1baf7d126e:
最上面的是字符串,左边是字符串,右边是8位以内的整数(超过8位的以字符串形式存储)
注意,
OBJ_STRING
以及RDB_TYPE_*
的字段是不写入的,仅作标识用。
c) 键的写入
键的写入按照上一节字符串的写入方式执行。
5. 写入RDB的接口——rio
RDB的写入通过一个结构rio
操作(定义和实现在rio.h
和rio.c
中):
struct _rio {
// 数据的读
size_t (*read)(struct _rio *, void *buf, size_t len);
// 数据的写
size_t (*write)(struct _rio *, const void *buf, size_t len);
// 获取当前读写偏移
off_t (*tell)(struct _rio *);
// 刷新I/O
int (*flush)(struct _rio *);
// 写入时,调用改函数更新校验和
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
// 当前校验和
uint64_t cksum;
// 当前已读/写的字节数
size_t processed_bytes;
// 最大单次读/写的大小
size_t max_processing_chunk;
// io变量,可以视作句柄/描述符
union {
// 内存buffer
struct {
sds ptr; // 内容的起始地址
off_t pos; // 偏移量
} buffer;
// 文件
struct {
FILE *fp; // 文件句柄/描述符
off_t buffered; // 上次调用fsync后,写入的数据量
off_t autosync; // 若打开autosync,那么当写入量超过这个字段的值后,需要调用fsync将其刷入磁盘
} file;
// 多I/O描述符(如写入到多个socket).
struct {
int *fds; // 一组I/O描述符
int *state; // 对应的状态
int numfds; // 描述符数量
off_t pos; // 缓冲区当前偏移
sds buf; // 缓冲区起始地址
} fdset;
} io;
};
typedef struct _rio rio;
可见rio
是对I/O做了一个接口,它默认有2个实现:
- Buffer I/O
- File I/O
// Buffer I/O
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
// 初始化
void rioInitWithBuffer(rio *r, sds s) {
*r = rioBufferIO;
r->io.buffer.ptr = s;
r->io.buffer.pos = 0;
}
// File I/O
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
// 初始化
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}
而在RDB实现中,它使用的就是File I/O,并调用rioInitWithFile(rio *r, FILE *fp)
初始化。
对于rio
的操作实现,我不想多写。这里分析几个和RDB相关的操作——文件的读、文件的写。
对于文件的读,实现如下,很简单,就是调用fread
函数:
/* Returns 1 or 0 for success/failure. */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
// 单纯fread读取数据到buf缓冲中
return fread(buf,len,1,r->io.file.fp);
}
对于文件的写,需要考虑autosync
自动刷新的功能,所以略微复杂,不过核心还是很简单的:
/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;
// 1. 调用fwrite写入数据
retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len; // 更新buffer字段值
// 2. autosync功能,刷入磁盘
// 若autosync字段不为0,且buffer >= autosync,需要将缓冲区的数据强刷进磁盘
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
// 2.1. 刷用户缓冲到内核缓冲
fflush(r->io.file.fp);
// 2.2. 刷内核缓冲到文件
redis_fsync(fileno(r->io.file.fp));
// 2.3. 重置buffer为0
r->io.file.buffered = 0;
}
return retval;
}
写入的数据流向:
fwrite
$\rightarrow$ 用户态缓冲 $\rightarrow$fflush
$\rightarrow$ 内核缓冲 $\rightarrow$fsync/fdatasync
$\rightarrow$ 磁盘。对于
fsync/fdatasync
,它们都阻塞写到文件后再返回,但前者可能修改文件属性,后者只修改数据部分。
总之rio
对内存、文件、多描述符的I/O操作进行了简单抽象,实现了一个统一的接口。