源码阅读-Redis单机服务器: RDB

Posted by keys961 on October 7, 2019

1. Overview

RDB持久化是Redis的一个功能,可以看成是Redis的dump,将数据持久化到磁盘上,即使Redis进程退出,重启后也能将数据恢复。

RDB功能在rdb.hrdb.c中定义和实现。

2. 导入/导出RDB

2.1. 导出RDB

RDB导出可通过SAVE或者BGSAVE命令触发,前者同步阻塞,后者创建子进程后台处理。

下面的是SAVE命令的处理:

void saveCommand(client *c) {
    if (server.rdb_child_pid != -1) {
        // 若后台有RDB进程,就拒绝生成本次的RDB
        addReplyError(c,"Background save already in progress");
        return;
    }
    rdbSaveInfo rsi, *rsiptr;
    rsiptr = rdbPopulateSaveInfo(&rsi);
    // 这边rdbSave是保存RDB的主函数
    if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
        addReply(c,shared.ok);
    } else {
        addReply(c,shared.err);
    }
}

而这个是BGSAVE命令的处理:

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    // ...
	// 不允许在已创建RDB/AOF子线程存在下进行后台RDB导出
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // ...
    // 创建子进程进行RDB导出
    if ((childpid = fork()) == 0) {
        int retval;
		// 子进程下
        // 关闭socket监听(子进程不需要监听)
        // 而父进程依旧可以写入,fork的COW保证父子进程的数据进程安全
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi); // 执行RDB导出
        // ...
        exitFromChild((retval == C_OK) ? 0 : 1); // 退出子进程,这里调用_exit (int __status)函数(unistd.h定义)
    } else {
        // 父进程下
        // ...
        if (childpid == -1) {
            // 无法创建子进程,记录并返回错误
            // ...
            return C_ERR;
        }
        // 记录和更新后台RDB导出的信息和相关状态
        // 如更新dict resize policy(这种情况下,需要增加rehash的阈值)
        // ...
        return C_OK;
    }
    return C_OK; /* unreached */
}

最后核心的RDB导出函数如下:

int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    // 先保存在临时文件里,文件名为temp-<pid>.rdb
    fp = fopen(tmpfile,"w");
    if (!fp) {
        // 临时文件句柄无法创建,返回错误
        // ...
        return C_ERR;
    }
	// 初始化内容
    rioInitWithFile(&rdb,fp);
    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
	// 开始保存RDB文件内容(具体内容后面会说明)
    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }
    // 清空用户空间缓冲、内核缓冲,并关闭临时文件,保证内容完全写入磁盘
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    // 重命名文件
    if (rename(tmpfile,filename) == -1) {
        // 出错,打日志,删除文件(使用unlink)
        // ...
        return C_ERR;
    }
    // ...
    return C_OK;

werr:
    // 出错,打日志,关闭并删除文件(使用unlink)
    // ...
    return C_ERR;
}

具体如何保存RDB文件,在函数rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi)中,后面会具体说明,因为它涉及保存数据的格式。

2.2. 导入RDB

RDB导入只会在Redis启动时执行,只要有RDB文件存在,就会载入。

Redis还有AOF功能,会影响Redis是否执行RDB导入:

  • 当AOF开启时,优先选择AOF文件还原状态(因为AOF更新频率高)
  • 当AOF关闭时,才使用RDB文件还原状态

RDB导入在下面的rdbLoad(char *filename, rdbSaveInfo *rsi)函数实现:

int rdbLoad(char *filename, rdbSaveInfo *rsi) {
    FILE *fp;
    rio rdb;
    int retval;
	// 打开文件
    if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
    // 初始化
    startLoading(fp);
    rioInitWithFile(&rdb,fp);
    // 真正的加载恢复
    retval = rdbLoadRio(&rdb,rsi,0);
    // 关闭文件等处理后续事情
    fclose(fp);
    stopLoading();
    return retval;
}

具体的加载恢复在函数rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof)实现,后面会具体说明,因为它同样涉及数据的格式。

3. 自动间隔保存

Redis配置文件上会有下面的几项:

1
2
3
save 900 1 
save 300 10
save 60 10000

这里save <a1> <a2>表示,服务器在a1秒之内,至少修改数据库a2次,BGSAVE就会被执行。

因此上面的配置意思是,满足下面3个条件之一的,触发一次BGSAVE

  • 900秒内,至少修改1次
  • 300秒内,至少修改10次
  • 60秒内,至少修改10000次

3.1. 自动保存的配置信息

上述配置信息,会保存到结构saveparam里:

struct saveparam {
    time_t seconds; // 时间间隔(秒)
    int changes; // 数据库修改次数
};

saveparam信息保存在redisServer里:

struct redisServer {
    // ...
    struct saveparam *saveparams; // 一组自动保存的配置信息
    int saveparamslen; // 上面配置信息的个数
    char *rdb_filename;  // RDB文件名
    // ...
};

3.2. 自动保存的条件检查

Redis内部有一个定时任务函数serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData),执行一些定时的任务,间隔是每秒执行server.hz次(默认100ms)。

关于定时任务的实现,之后会详细说明。

其中有一项就是检查自动保存的条件,它遍历整个saveparams数组,若一条满足就执行BGSAVE(前提是没有RDB, AOF Rewrite等子进程,并且还要考虑上次BGSAVE失败的情形)。

代码如下,检查的条件已经在注释中说明:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
	// ...
    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren()) {
        // 当有RDB/AOF Rewrite/其他子进程时
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            // 非阻塞等待子进程返回,若有返回的子进程,进行后续处理
            // 如RDB/AOF Rewrite子进程的后续处理
            // 并关闭子进程和父进程的管道,更新dict resize policy(rehash阈值减小)等
            // ...
        }
    } else {
        // 当没有RDB/AOF Rewrite等其他子进程时,检查自动保存配置,即saveparam
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
			// 检查并满足下面3条:
            // 1. 修改次数(大于changes)
            // 2. 与上一次RDB Save的时间间隔(大于seconds)
            // 3. 若上次RDB Save失败,还要检查与上次失败的RDB Save的时间间隔(大于CONFIG_BGSAVE_RETRY_DELAY,默认5秒)
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK)) {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                // 满足条件,直接执行BGSAVE
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
        // ...Trigger an AOF rewrite if needed.
    }
    // ...
}

4. RDB文件数据格式

4.1. 整个RDB文件的组成

第2节中的2个函数(内部真正执行RDB导出和导入),如下所示,和RDB文件数据格式有关,本节就是说明这点:

  • rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi)
  • rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof)

这里以第一个函数为例(因为第二个函数只是第一个函数的逆操作)。

int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    // 1. 首先写入“REDIS0009”,即“REDIS” + rdb_version(当前版本为9),共9字节
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
	// 2. 然后写入一些相关字段,如:
    // Redis版本号、服务器的位数、当前时间、已使用的内存大小等
    // 另外还有rsi提供的数据(可选),如当前选择的db号等等
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
	// 3. 然后遍历db,写入数据
    for (j = 0; j < server.dbnum; j++) {
        // 对于每个db
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue; // 无键值对则跳过
        di = dictGetSafeIterator(d);
        // 3.1. 首先写入一个常量RDB_OPCODE_SELECTDB(254,1字节)
        // 表示下面是db的ID号
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        // 3.2. 然后写入当前db的ID
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        // 3.3. 接着写入常量RDB_OPCODE_RESIZEDB(251,1字节)
        // 表示下面要写入该db存储的键值数量,供重建用
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        // 3.4. 写入dict键值对数量和expires设置过期的键值对数量
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
        // 3.5. 遍历整个dict
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;
            
            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            // 写入键值对到RDB文件,这块下面会讲
            // (这里我没有发现过滤过期数据,但加载的时候若是master就被过滤掉了)
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
			// 处理RDB和AOF混合持久的情况,在AOF后台重写的时候会执行
            // 这里接收父进程传来的AOF重写缓冲数据
            // 然后将其生成新的AOF文件,配合前面生成的RDB一起持久数据
            // 这会在下一篇讲
            // ...
        }
        dictReleaseIterator(di);
        di = NULL; 
    }
    // 4. 保存lua脚本的数据
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL;
    }
	// 5. 写入EOF常量(255,1字节)
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
	// 6. 最后写入校验和,8字节,即64位CRC校验
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}

这里可以看到,整个RDB文件可以分为:

  • REDIS常量头
  • RDB版本号
  • 辅助字段,包括:
    • Redis版本
    • Redis服务器位数
    • 当前时间
    • 已使用内存大小
    • rdbSaveInforepl-stream-db, repl-id, repl-offset
    • aof-preamble(是否开启混合持久)
  • 每个数据库的信息,对于每个数据库:
    • RDB_OPCODE_SELECTDB常量
    • 当前数据库ID
    • RDB_OPCODE_RESIZEDB常量
    • 当前数据库的键值对数量
    • 当前数据库设置过期的键值对数量
    • 各个键值对
  • Lua脚本信息
  • RDB_OPCODE_EOF常量
  • 64位CRC校验和

4.2. 键值对的保存格式

见下面的代码:

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
	// 1. 若设置了过期时间
    if (expiretime != -1) {
        // 1.1. 先写入RDB_OPCODE_EXPIRETIME_MS(252)常量
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        // 1.2. 然后写入过期时间
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    // 2. 根据LRU/LFU统计策略,写入统计信息
    // LRU: RDB_OPCODE_IDLE(248)常量+LRU空闲时间
    // LFU: RDB_OPCODE_FREQ(249)常量+访问频率
    if (savelru) {
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    if (savelfu) {
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }

    // 3. 写入值的类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 4. 写入键
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 5. 写入值
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    return 1;
}

这里可以看到,一个键值对的格式是:

  • (可选) 若设置过期时间,写入该信息:
    • RDB_OPCODE_EXPIRETIME_MS常量
    • 过期时间
  • LRU/LFU统计信息:
    • LRU:RDB_OPCODE_IDLE常量+LRU空闲时间
    • LFU:RDB_OPCODE_FREQ常量+访问频率
  • 值的类型

另外,这里没有对过期数据进行过滤,似乎之前版本是有的,不过这里(5.0.5)没发现。所以《Redis设计与实现》9.7.节可能需要修改。

a) 值的类型

值类型定义如下,有些值是为了兼容而用的:

#define RDB_TYPE_STRING 0 // OBJ_STRING
#define RDB_TYPE_LIST   1
#define RDB_TYPE_SET    2 // OBJ_SET (散列表实现)
#define RDB_TYPE_ZSET   3
#define RDB_TYPE_HASH   4 // OBJ_HASH
#define RDB_TYPE_ZSET_2 5  // OBJ_ZSET (跳表+散列表实现)
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 // OBJ_MODULE
#define RDB_TYPE_HASH_ZIPMAP    9
#define RDB_TYPE_LIST_ZIPLIST  10
#define RDB_TYPE_SET_INTSET    11 // OBJ_SET
#define RDB_TYPE_ZSET_ZIPLIST  12 // OBJ_ZSET
#define RDB_TYPE_HASH_ZIPLIST  13 // OBJ_HASH
#define RDB_TYPE_LIST_QUICKLIST 14 // OBJ_LIST
#define RDB_TYPE_STREAM_LISTPACKS 15 // OBJ_STREAM

保存的时候,主要取上面带注释的。

它们都用1个字节保存。

b) 值保存格式

由于类型众多,所以挑选常见的类型格式,如下图,From https://juejin.im/post/5d24a1b4f265da1baf7d126e:

img

最上面的是字符串,左边是字符串,右边是8位以内的整数(超过8位的以字符串形式存储)

注意,OBJ_STRING以及RDB_TYPE_*的字段是不写入的,仅作标识用。

c) 键的写入

键的写入按照上一节字符串的写入方式执行。

5. 写入RDB的接口——rio

RDB的写入通过一个结构rio操作(定义和实现在rio.hrio.c中):

struct _rio {
    // 数据的读
    size_t (*read)(struct _rio *, void *buf, size_t len);
    // 数据的写
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    // 获取当前读写偏移
    off_t (*tell)(struct _rio *);
    // 刷新I/O
    int (*flush)(struct _rio *);
    // 写入时,调用改函数更新校验和
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);
	// 当前校验和
    uint64_t cksum;
	// 当前已读/写的字节数
    size_t processed_bytes;
    // 最大单次读/写的大小
    size_t max_processing_chunk;
    // io变量,可以视作句柄/描述符
    union {
        // 内存buffer
        struct {
            sds ptr; // 内容的起始地址
            off_t pos; // 偏移量
        } buffer;
        // 文件
        struct {
            FILE *fp; // 文件句柄/描述符
            off_t buffered; // 上次调用fsync后,写入的数据量
            off_t autosync; // 若打开autosync,那么当写入量超过这个字段的值后,需要调用fsync将其刷入磁盘
        } file;
        // 多I/O描述符(如写入到多个socket). 
        struct {
            int *fds; // 一组I/O描述符
            int *state; // 对应的状态
            int numfds; // 描述符数量
            off_t pos; // 缓冲区当前偏移
            sds buf; // 缓冲区起始地址
        } fdset;
    } io;
};

typedef struct _rio rio;

可见rio是对I/O做了一个接口,它默认有2个实现:

  • Buffer I/O
  • File I/O
// Buffer I/O
static const rio rioBufferIO = {
    rioBufferRead,
    rioBufferWrite,
    rioBufferTell,
    rioBufferFlush,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};
// 初始化
void rioInitWithBuffer(rio *r, sds s) {
    *r = rioBufferIO;
    r->io.buffer.ptr = s;
    r->io.buffer.pos = 0;
}

// File I/O
static const rio rioFileIO = {
    rioFileRead,
    rioFileWrite,
    rioFileTell,
    rioFileFlush,
    NULL,           /* update_checksum */
    0,              /* current checksum */
    0,              /* bytes read or written */
    0,              /* read/write chunk size */
    { { NULL, 0 } } /* union for io-specific vars */
};
// 初始化
void rioInitWithFile(rio *r, FILE *fp) {
    *r = rioFileIO;
    r->io.file.fp = fp;
    r->io.file.buffered = 0;
    r->io.file.autosync = 0;
}

在RDB实现中,它使用的就是File I/O,并调用rioInitWithFile(rio *r, FILE *fp)初始化。

对于rio的操作实现,我不想多写。这里分析几个和RDB相关的操作——文件的读、文件的写。

对于文件的读,实现如下,很简单,就是调用fread函数:

/* Returns 1 or 0 for success/failure. */
static size_t rioFileRead(rio *r, void *buf, size_t len) {
    // 单纯fread读取数据到buf缓冲中
    return fread(buf,len,1,r->io.file.fp);
}

对于文件的写,需要考虑autosync自动刷新的功能,所以略微复杂,不过核心还是很简单的:

/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;
	// 1. 调用fwrite写入数据
    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len; // 更新buffer字段值
	// 2. autosync功能,刷入磁盘
    // 若autosync字段不为0,且buffer >= autosync,需要将缓冲区的数据强刷进磁盘
    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
        // 2.1. 刷用户缓冲到内核缓冲
        fflush(r->io.file.fp);
        // 2.2. 刷内核缓冲到文件
        redis_fsync(fileno(r->io.file.fp));
        // 2.3. 重置buffer为0
        r->io.file.buffered = 0;
    }
    return retval;
}

写入的数据流向:

fwrite $\rightarrow$ 用户态缓冲 $\rightarrow$ fflush $\rightarrow$ 内核缓冲 $\rightarrow$ fsync/fdatasync $\rightarrow$ 磁盘。

对于fsync/fdatasync,它们都阻塞写到文件后再返回,但前者可能修改文件属性,后者只修改数据部分。

总之rio对内存、文件、多描述符的I/O操作进行了简单抽象,实现了一个统一的接口。