源码阅读-Redis单机服务器: 单机数据库存储

Posted by keys961 on October 5, 2019

本节开始主要介绍Redis单机服务器部分,主要介绍的是单机部分的核心功能。

本节主要参考的是《Redis设计与实现》,代码版本是5.0.5。

本文主要说明单机服务器的数据库存储,延续之前的数据结构的主题。

1. 服务器的代码定义

Redis单机服务器的定义在server.hredisServer结构中。

这个结构定义了非常非常多的字段,不过关键的字段大概分为下面几类:

  • 数据库存储
  • 事件处理(文件事件、时间事件)
  • RDB/AOF
  • 统计字段
  • 配置信息
  • 日志
  • 主从复制
  • 订阅发布服务
  • 集群管理
  • 其他组件(如Lua的脚本管理,懒惰释放,延迟监控等等)

上面的功能包含了单机的和集群的。单机的部分基本在前6个,当然多机的功能基于单机的功能之上。

本文主要还是讲第一个功能,当然也会涉及下面的部分功能。下面的功能会在后面的文章中说明。

2. 服务器中的数据库

server.hredisServer结构中,定义了下面的字段,即Redis服务器数据库:

struct redisServer {
    // ...
    redisDb *db; // Redis数据库数组(即维护一组数据库)
    int dbnum; // 数据库的个数
    // ...
}

可知Redis可配置多个数据库。客户端可通过SELECT <db_id>来切换数据库,这个信息保留在结构clientdb字段中:

typedef struct client {
	// ...
    redisDb *db; // 当前选中的字段
    // ...
}

2.1. 数据库定义

这个定义在结构redisDb中,里面包含了非常多的dict字段,很多字段本文不会说明,有些将在后面说明。

typedef struct redisDb {
    dict *dict;    // 该数据库的键空间,有效数据都在这里
    dict *expires;  // 存储过期键和过期信息
    dict *blocking_keys; // 客户端正在阻塞获取值的相关键(如BLPOP操作)
    dict *ready_keys;    // 阻塞的并收到PUSH请求的键
    dict *watched_keys;  // 被监控的键,用于MULTI/EXEC CAS命令
    int id;     // 数据库ID
    long long avg_ttl;  // 统计值:平均TTL
    list *defrag_later; // 键名称的列表,用于一个个进行碎片整理
} redisDb;

这里主要还是dictexpire字段,当然其他字段可能会有涉及。

2.2. 数据库的切换

上面可知,Redis维护一组数据库,因此支持切换。

切换的实现在db.cselectCommand(client *c)函数中实现,实现很简单:

void selectCommand(client *c) {
    long id;
    // ... get id from the client command ...
    if (server.cluster_enabled && id != 0) {
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
    // 选择db
    if (selectDb(c,id) == C_ERR) {
        addReplyError(c,"DB index is out of range");
    } else {
        addReply(c,shared.ok);
    }
}

int selectDb(client *c, int id) {
    if (id < 0 || id >= server.dbnum)
        return C_ERR;
    // 这里将client的db字段进行修改
    c->db = &server.db[id];
    return C_OK;
}

要注意,在Cluster模式下,Redis不支持数据库切换。

3. 数据库的键空间

上面redisDb的定义中可知,数据存在dict字段中,dict字段也是某个Redis数据库的键空间。

因此,关于数据的操作,都会作用到这个字典上,字典的键就是Redis的键,字典的值就是对应键代表的值(可以是二进制数据、字符串、列表、集合、有序集合、散列表等等数据结构)。

关于数据结构的部分之前已经说的比较详细了,因此各个命令(如SET, RPUSH, HSET等)的实现也仅仅是对数据结构的操作。

我们以HSET命令为例说明,假如有命令HSET p_key s_key val

void hsetCommand(client *c) {
    int i, created = 0;
    robj *o; // p_key下的值对象,这里是散列表对象

    if ((c->argc % 2) == 1) {
        addReplyError(c,"wrong number of arguments for HMSET");
        return;
    }
	// 1. 先寻找db的dict中,是否有p_key这个键,若没有就创建一项(默认是ziplist)
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    // 2. 判断条件并尝试转换ziplist到dict
    hashTypeTryConversion(o,c->argv,2,c->argc-1);

    for (i = 2; i < c->argc; i += 2)
        // 3. 这边往p_key下的散列表添加键值对,这里是<s_key, val>
        // 这里会尝试将字符串转成数字
        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
    // ....
}

其他命令(增删改查)也是类似的,只是对数据结构的修改。

4. 过期数据管理

Redis客户端可以设置键的过期时间,如EXPIRESETEXEXPIREAT等命令;也可通过TTL等命令查看某个键的可生存时间。

很容易发现,过期数据管理需要通过redisDb结构的expires字典字段实现,下面就会较为详细地说明。

4.1. 设置过期时间

这里直接进入EXPIRE命令的实现,在expire.c中:

void expireCommand(client *c) {
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
}

// EXPIRE,PEXPIRE,EXPIREAT,PEXPIREAT都会走到这里
void expireGenericCommand(client *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
	// 获取以毫秒为单位的过期时间戳
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
    // key不存在,返回
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }
    
    if (when <= mstime() && !server.loading && !server.masterhost) {
        // 若时间戳小于当前时间,即过期,需要删除
        // 但加载AOF时或自己是从节点时,不会执行DEL,而是等待主节点的DEL指令
        robj *aux;
		// 懒惰删除:异步删;否则同步删(普通的删除)
        int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
                                                    dbSyncDelete(c->db,key);
        serverAssertWithInfo(c,key,deleted);
        server.dirty++;
        /* Replicate/AOF this as an explicit DEL or UNLINK. */
        aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
        rewriteClientCommandVector(c,2,aux,key);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        addReply(c, shared.cone);
        return;
    } else {
        // 设置expire时间
        setExpire(c,c->db,key,when);
        addReply(c,shared.cone);
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
        server.dirty++;
        return;
    }
}

void setExpire(client *c, redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
    kde = dictFind(db->dict,key->ptr);
    serverAssertWithInfo(NULL,key,kde != NULL);
    // 这里就是用了expires字段,为该字典添加/修改一个键值对<key, when/expire>
    de = dictAddOrFind(db->expires,dictGetKey(kde));
    dictSetSignedIntegerVal(de,when);

    int writable_slave = server.masterhost && server.repl_slave_ro == 0;
    if (c && writable_slave && !(c->flags & CLIENT_MASTER))
        rememberSlaveKeyWithExpire(db,key);
}

所以很容易知道,设置某个key的过期时间,Redis会额外在expires字段添加一个键值对<key, expire>,其中expire是过期时间(绝对的Unix时间戳,以毫秒计)

4.2. 移除过期时间

可使用PERSIST命令移除某个键的过期时间。

它的实现也非常简单,只是移除expires字典字段的对应键值对。

void persistCommand(client *c) {
    if (lookupKeyWrite(c->db,c->argv[1])) {
        // 这边删除db->expires的对应键值对,从而移除过期设置
        if (removeExpire(c->db,c->argv[1])) {
            addReply(c,shared.cone);
            server.dirty++;
        } else {
            addReply(c,shared.czero);
        }
    } else {
        addReply(c,shared.czero);
    }
}

4.3. 计算TTL

TTL的计算非常简单,首先从expires字段获取该键的expire过期时间,然后和当前时间相减即可得到:

ttl = expire-mstime();
if (ttl < 0) ttl = 0;
// ...
addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); // 默认以秒为单位输出

4.4. 过期数据的删除

Redis提供2种删除策略:

  • 懒惰删除:仅在访问键的时候进行检查,从而删除过期数据
  • 定期删除:隔一段时间执行过期删除操作

a) 懒惰删除

懒惰删除的实现在函数expireIfNeeded(redisDb *db, robj *key)

int expireIfNeeded(redisDb *db, robj *key) {
    // 若键不存在/没过期,直接返回
    if (!keyIsExpired(db,key)) return 0;
    // 若自己是从节点,也返回,删除的指令会从主节点发过来
    if (server.masterhost != NULL) return 1;
    // 删除键
    server.stat_expiredkeys++;
    // 这里将过期事件加到AOF,并传播给从节点
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    // 这里就是真正删除(根据lazyfree_lazy_expire字段决定)
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

删除分为2类,一个是懒惰释放,即异步释放(不会立即删除):

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    // 首先会删除db->expires字典的键值对
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    // 懒惰删除的情况下,若值的长度太大,则很费时间,会放到后台任务处理
    // 若值小,则直接释放
    // 这里先将键值对从db->dict取出,并断链,但没释放
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        // 获取值的robj对象
        robj *val = dictGetVal(de);
        // 计算释放开销,实际上就是值的长度
        size_t free_effort = lazyfreeGetFreeEffort(val);
		// 若长度大于阈值,且引用计数为1,则将其添加到后台任务里进行释放(放入队列里)
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            dictSetVal(db->dict,de,NULL);
        }
    }
	// 若值小,则直接释放entry(键和值都被释放)
    // 若值大,值会放到后台处理,entry的值为NULL,因此只回收键
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

另一个是普通的同步释放,实现就比较简单了:

int dbSyncDelete(redisDb *db, robj *key) {
    // 这里直接删除和释放了expires和dict的键值对
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

b) 定期删除

expire.cactiveExpireCycle(int type)函数实现,它有2种类型:

  • ACTIVE_EXPIRE_CYCLE_FAST:执行一次快速扫描,清理时长不会超过EXPIRE_FAST_CYCLE_DURATION

  • ACTIVE_EXPIRE_CYCLE_SLOW:标准的清理,时限为REDIS_HS常量的一个百分比,这个百分比由REDIS_EXPIRELOOKUPS_TIME_PERC定义

    默认25,即一般情况下,时限timelimit满足:

    • (server.hz/1000000) * (timelimit) <= 25

因为有规定时间,所以Redis是分多次遍历服务器的各个数据库(用全局变量记录进度,增量清理),然后从数据库的expires字典字段随机检查一部分键的过期信息,并删除之。

由于源码过长,所以源码就略过了。

它是一个后台任务,当active_expire_enabled开启,且节点是主节点时,才会启动这个后台任务。

5. AOF、RDB、复制对过期数据管理的影响

5.1. RDB

生成RDB文件时(如SAVE, BGSAVE命令),过期的键是不会还是会被保存到RDB文件中(加载的时候过滤)。

具体在rioSaveRio(rio *rdb, int *error, int flag, rdbSaveInfo *rsi)中实现。

载入RDB文件的时候,会有几种情况:

  • 若自己是主节点,则会检查过期时间,将过期数据去除
  • 若自己是从节点,则加载全部数据,过期数据的清除由主节点控制

具体在rdbLoad(char *filename, rdbSaveInfo *rsi)中实现

加载通常在节点启动时进行,若开启AOF,优先加载AOF,若没有,则加载RDB。

5.2. AOF

当数据过期,但没被删除的时候,AOF日志不会改变。

当数据过期,且被清除的时候,AOF会追加一条DEL日志,标识数据已删除。

此外AOF重写过程中,也写入过期数据。

具体在rewriteAppendOnlyFile(char *filename)实现,通过REWRITEAOF或者BGREWRITEAOF命令触发(AOF重写只会在后台做了)

5.3. 主从复制

主节点的过期清理和之前所说的一样。

但从节点不一样,正如之前所说的,它由主节点控制,当数据过期时:

  • 主节点删除数据,并向从节点发送删除指令,告诉该数据过期
  • 从节点读取删除指令,删除数据

当读请求落入从节点,数据过期时,从节点不会删除数据,反而会将过期的值返回给客户端(造成了滞后)。

总而言之,从节点的过期数据清理基本由主节点控制,这简化了实现,且易于控制。