本节开始主要介绍Redis单机服务器部分,主要介绍的是单机部分的核心功能。
本节主要参考的是《Redis设计与实现》,代码版本是5.0.5。
本文主要说明单机服务器的数据库存储,延续之前的数据结构的主题。
1. 服务器的代码定义
Redis单机服务器的定义在server.h
的redisServer
结构中。
这个结构定义了非常非常多的字段,不过关键的字段大概分为下面几类:
- 数据库存储
- 事件处理(文件事件、时间事件)
- RDB/AOF
- 统计字段
- 配置信息
- 日志
- 主从复制
- 订阅发布服务
- 集群管理
- 其他组件(如Lua的脚本管理,懒惰释放,延迟监控等等)
上面的功能包含了单机的和集群的。单机的部分基本在前6个,当然多机的功能基于单机的功能之上。
本文主要还是讲第一个功能,当然也会涉及下面的部分功能。下面的功能会在后面的文章中说明。
2. 服务器中的数据库
在server.h
的redisServer
结构中,定义了下面的字段,即Redis服务器数据库:
struct redisServer {
// ...
redisDb *db; // Redis数据库数组(即维护一组数据库)
int dbnum; // 数据库的个数
// ...
}
可知Redis可配置多个数据库。客户端可通过SELECT <db_id>
来切换数据库,这个信息保留在结构client
的db
字段中:
typedef struct client {
// ...
redisDb *db; // 当前选中的字段
// ...
}
2.1. 数据库定义
这个定义在结构redisDb
中,里面包含了非常多的dict
字段,很多字段本文不会说明,有些将在后面说明。
typedef struct redisDb {
dict *dict; // 该数据库的键空间,有效数据都在这里
dict *expires; // 存储过期键和过期信息
dict *blocking_keys; // 客户端正在阻塞获取值的相关键(如BLPOP操作)
dict *ready_keys; // 阻塞的并收到PUSH请求的键
dict *watched_keys; // 被监控的键,用于MULTI/EXEC CAS命令
int id; // 数据库ID
long long avg_ttl; // 统计值:平均TTL
list *defrag_later; // 键名称的列表,用于一个个进行碎片整理
} redisDb;
这里主要还是dict
和expire
字段,当然其他字段可能会有涉及。
2.2. 数据库的切换
上面可知,Redis维护一组数据库,因此支持切换。
切换的实现在db.c
的selectCommand(client *c)
函数中实现,实现很简单:
void selectCommand(client *c) {
long id;
// ... get id from the client command ...
if (server.cluster_enabled && id != 0) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
// 选择db
if (selectDb(c,id) == C_ERR) {
addReplyError(c,"DB index is out of range");
} else {
addReply(c,shared.ok);
}
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
// 这里将client的db字段进行修改
c->db = &server.db[id];
return C_OK;
}
要注意,在Cluster模式下,Redis不支持数据库切换。
3. 数据库的键空间
上面redisDb
的定义中可知,数据存在dict
字段中,dict
字段也是某个Redis数据库的键空间。
因此,关于数据的操作,都会作用到这个字典上,字典的键就是Redis的键,字典的值就是对应键代表的值(可以是二进制数据、字符串、列表、集合、有序集合、散列表等等数据结构)。
关于数据结构的部分之前已经说的比较详细了,因此各个命令(如SET
, RPUSH
, HSET
等)的实现也仅仅是对数据结构的操作。
我们以HSET
命令为例说明,假如有命令HSET p_key s_key val
:
void hsetCommand(client *c) {
int i, created = 0;
robj *o; // p_key下的值对象,这里是散列表对象
if ((c->argc % 2) == 1) {
addReplyError(c,"wrong number of arguments for HMSET");
return;
}
// 1. 先寻找db的dict中,是否有p_key这个键,若没有就创建一项(默认是ziplist)
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
// 2. 判断条件并尝试转换ziplist到dict
hashTypeTryConversion(o,c->argv,2,c->argc-1);
for (i = 2; i < c->argc; i += 2)
// 3. 这边往p_key下的散列表添加键值对,这里是<s_key, val>
// 这里会尝试将字符串转成数字
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
// ....
}
其他命令(增删改查)也是类似的,只是对数据结构的修改。
4. 过期数据管理
Redis客户端可以设置键的过期时间,如EXPIRE
、SETEX
、EXPIREAT
等命令;也可通过TTL
等命令查看某个键的可生存时间。
很容易发现,过期数据管理需要通过redisDb
结构的expires
字典字段实现,下面就会较为详细地说明。
4.1. 设置过期时间
这里直接进入EXPIRE
命令的实现,在expire.c
中:
void expireCommand(client *c) {
expireGenericCommand(c,mstime(),UNIT_SECONDS);
}
// EXPIRE,PEXPIRE,EXPIREAT,PEXPIREAT都会走到这里
void expireGenericCommand(client *c, long long basetime, int unit) {
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* unix time in milliseconds when the key will expire. */
// 获取以毫秒为单位的过期时间戳
if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
return;
if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;
// key不存在,返回
if (lookupKeyWrite(c->db,key) == NULL) {
addReply(c,shared.czero);
return;
}
if (when <= mstime() && !server.loading && !server.masterhost) {
// 若时间戳小于当前时间,即过期,需要删除
// 但加载AOF时或自己是从节点时,不会执行DEL,而是等待主节点的DEL指令
robj *aux;
// 懒惰删除:异步删;否则同步删(普通的删除)
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
serverAssertWithInfo(c,key,deleted);
server.dirty++;
/* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,key);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone);
return;
} else {
// 设置expire时间
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
return;
}
}
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
// 这里就是用了expires字段,为该字典添加/修改一个键值对<key, when/expire>
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
所以很容易知道,设置某个key
的过期时间,Redis会额外在expires
字段添加一个键值对<key, expire>
,其中expire
是过期时间(绝对的Unix时间戳,以毫秒计)
4.2. 移除过期时间
可使用PERSIST
命令移除某个键的过期时间。
它的实现也非常简单,只是移除expires
字典字段的对应键值对。
void persistCommand(client *c) {
if (lookupKeyWrite(c->db,c->argv[1])) {
// 这边删除db->expires的对应键值对,从而移除过期设置
if (removeExpire(c->db,c->argv[1])) {
addReply(c,shared.cone);
server.dirty++;
} else {
addReply(c,shared.czero);
}
} else {
addReply(c,shared.czero);
}
}
4.3. 计算TTL
TTL的计算非常简单,首先从expires
字段获取该键的expire
过期时间,然后和当前时间相减即可得到:
ttl = expire-mstime();
if (ttl < 0) ttl = 0;
// ...
addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); // 默认以秒为单位输出
4.4. 过期数据的删除
Redis提供2种删除策略:
- 懒惰删除:仅在访问键的时候进行检查,从而删除过期数据
- 定期删除:隔一段时间执行过期删除操作
a) 懒惰删除
懒惰删除的实现在函数expireIfNeeded(redisDb *db, robj *key)
int expireIfNeeded(redisDb *db, robj *key) {
// 若键不存在/没过期,直接返回
if (!keyIsExpired(db,key)) return 0;
// 若自己是从节点,也返回,删除的指令会从主节点发过来
if (server.masterhost != NULL) return 1;
// 删除键
server.stat_expiredkeys++;
// 这里将过期事件加到AOF,并传播给从节点
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
// 这里就是真正删除(根据lazyfree_lazy_expire字段决定)
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
删除分为2类,一个是懒惰释放,即异步释放(不会立即删除):
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
// 首先会删除db->expires字典的键值对
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
// 懒惰删除的情况下,若值的长度太大,则很费时间,会放到后台任务处理
// 若值小,则直接释放
// 这里先将键值对从db->dict取出,并断链,但没释放
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
// 获取值的robj对象
robj *val = dictGetVal(de);
// 计算释放开销,实际上就是值的长度
size_t free_effort = lazyfreeGetFreeEffort(val);
// 若长度大于阈值,且引用计数为1,则将其添加到后台任务里进行释放(放入队列里)
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
// 若值小,则直接释放entry(键和值都被释放)
// 若值大,值会放到后台处理,entry的值为NULL,因此只回收键
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
另一个是普通的同步释放,实现就比较简单了:
int dbSyncDelete(redisDb *db, robj *key) {
// 这里直接删除和释放了expires和dict的键值对
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
b) 定期删除
在expire.c
的activeExpireCycle(int type)
函数实现,它有2种类型:
-
ACTIVE_EXPIRE_CYCLE_FAST
:执行一次快速扫描,清理时长不会超过EXPIRE_FAST_CYCLE_DURATION
-
ACTIVE_EXPIRE_CYCLE_SLOW
:标准的清理,时限为REDIS_HS
常量的一个百分比,这个百分比由REDIS_EXPIRELOOKUPS_TIME_PERC
定义默认25,即一般情况下,时限
timelimit
满足:(server.hz/1000000) * (timelimit) <= 25
因为有规定时间,所以Redis是分多次遍历服务器的各个数据库(用全局变量记录进度,增量清理),然后从数据库的expires
字典字段随机检查一部分键的过期信息,并删除之。
由于源码过长,所以源码就略过了。
它是一个后台任务,当active_expire_enabled
开启,且节点是主节点时,才会启动这个后台任务。
5. AOF、RDB、复制对过期数据管理的影响
5.1. RDB
生成RDB文件时(如SAVE
, BGSAVE
命令),过期的键是不会还是会被保存到RDB文件中(加载的时候过滤)。
具体在
rioSaveRio(rio *rdb, int *error, int flag, rdbSaveInfo *rsi)
中实现。
载入RDB文件的时候,会有几种情况:
- 若自己是主节点,则会检查过期时间,将过期数据去除
- 若自己是从节点,则加载全部数据,过期数据的清除由主节点控制
具体在
rdbLoad(char *filename, rdbSaveInfo *rsi)
中实现加载通常在节点启动时进行,若开启AOF,优先加载AOF,若没有,则加载RDB。
5.2. AOF
当数据过期,但没被删除的时候,AOF日志不会改变。
当数据过期,且被清除的时候,AOF会追加一条DEL
日志,标识数据已删除。
此外AOF重写过程中,也不会写入过期数据。
具体在
rewriteAppendOnlyFile(char *filename)
实现,通过或者REWRITEAOF
BGREWRITEAOF
命令触发(AOF重写只会在后台做了)
5.3. 主从复制
主节点的过期清理和之前所说的一样。
但从节点不一样,正如之前所说的,它由主节点控制,当数据过期时:
- 主节点删除数据,并向从节点发送删除指令,告诉该数据过期
- 从节点读取删除指令,删除数据
而当读请求落入从节点,数据过期时,从节点不会删除数据,反而会将过期的值返回给客户端(造成了滞后)。
总而言之,从节点的过期数据清理基本由主节点控制,这简化了实现,且易于控制。