Redis有一套非常完整的数据结构库,但是它并没有直接使用这些数据结构构建数据库,而是构建了一个对象系统,并通过这个对象系统构建数据库。
这个对象系统包含下面几种对象:
#define OBJ_STRING 0 /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */
#define OBJ_MODULE 5 /* Module object. */
#define OBJ_STREAM 6 /* Stream object. */
除此之外,这个对象系统实现了基于引用计数的GC和内存共享机制,提高内存使用效率。
最后,Redis对象记录对象的访问时间,可用于计算空转时长,以此来计算其被删除的优先级。
1. Redis对象定义
Redis对象的定义在redisObject
中:
typedef struct redisObject {
unsigned type:4; // 4-bit类型
unsigned encoding:4; // 4-bit编码
unsigned lru:LRU_BITS; // 24-bit,替换策略:LRU-time或者是LFU-data
int refcount; // 引用计数
void *ptr; // 指向真正数据结构的指针
} robj;
这里进行字段的说明:
-
type
:4-bit,表明这个对象的类型,即开头提及的7个类型 -
encoding
:4-bit,表明对象底层真正使用的数据结构,可用的数据结构有10种,如下所示:#define OBJ_ENCODING_RAW 0 /* 简单SDS */ #define OBJ_ENCODING_INT 1 /* 整数 */ #define OBJ_ENCODING_HT 2 /* 散列表dict */ #define OBJ_ENCODING_ZIPMAP 3 /* zipmap */ #define OBJ_ENCODING_LINKEDLIST 4 /* (弃用)链表 */ #define OBJ_ENCODING_ZIPLIST 5 /* ziplist */ #define OBJ_ENCODING_INTSET 6 /* 整数集合 */ #define OBJ_ENCODING_SKIPLIST 7 /* 跳表 */ #define OBJ_ENCODING_EMBSTR 8 /* 使用embstr编码的SDS */ #define OBJ_ENCODING_QUICKLIST 9 /* quicklist */ #define OBJ_ENCODING_STREAM 10 /* 流对象 */
类型和编码的联系如下:
OBJ_STRING
$\rightarrow$OBJ_ENCODING_RAW/OBJ_ENCODING_INT/OBJ_ENCODING_EMBSTR
OBJ_LIST
$\rightarrow$OBJ_ENCODING_QUICKLIST
OBJ_SET
$\rightarrow$OBJ_ENCODING_INTSET/OBJ_ENCODING_HT
OBJ_ZSET
$\rightarrow$OBJ_ENCODING_SKIPLIST/OBJ_ENCODING_ZIPLIST
OBJ_HASH
$\rightarrow$OBJ_ENCODING_HT/OBJ_ENCODING_ZIPLIST
OBJ_STREAM
$\rightarrow$OBJ_ENCODING_STREAM
这种复杂的对应关系主要用于节省内存。
-
lru
:24-bit,用于数据项的替换和回收,有2种模式- LRU-time:24位均用于记录对象最后一次访问时间(相对于全局的LRU时钟)
- LFU-data:后8位记录访问频率,前16位记录访问时间区间长度
-
refcount
:用于共享和内存回收 -
ptr
:指向底层数据结构的指针
下面对常用类型(开头的前5个)的对象进行说明。
1.1. 字符串
在t_string.c
中,可指定,创建字符串的时通常调用函数在tryObjectEncoding(robj *o)
(该函数在object.c
中)中:
/* Try to encode a string object in order to save space */
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;
// 1. 验证数据类型
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
// 2. 若数据结构不是raw(普通字符串)或者embstr,直接返回
if (!sdsEncodedObject(o)) return o;
// 3. 若引用计数大于1,不需要创建新串,直接返回
if (o->refcount > 1) return o;
len = sdslen(s);
// 4. 若可以转成整数(64-bit整数长度不超过20)
if (len <= 20 && string2l(s,len,&value)) {
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
// 这里使用资源池,[0,10000)的整数直接返回池中对象,节省内存
// 只需池中资源计数+1,而传入的robj引用计数-1
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
// 其他情况下,整数值已经在value中
// 所以释放原字符串,将整数value赋给robj,并置编码为OBJ_ENCODING_INT
if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr);
o->encoding = OBJ_ENCODING_INT;
o->ptr = (void*) value;
return o;
}
}
// 5. 若字符串长度不超过44,那么使用embstr编码
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;
if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
emb = createEmbeddedStringObject(s,sdslen(s));
decrRefCount(o);
return emb;
}
// 6. 以上尝试均不成功,则尝试对原SDS串进行trim
// 即空闲空间大于字符串长度的1/10时,执行resize,清除空闲空间
trimStringObjectIfNeeded(o);
// 7. 返回原对象
return o;
}
可以看出,对于字符串的存储:
- 首先尝试将其转成整数,并尽量使用池化的整数
- 然后尝试将其转成
embstr
(对于长度不超过44的字符串) - 最后尝试消除空闲空间
一切都是为了节省空间。
对于
embstr
,它的结构如下,是一个连续的robj
结构(而普通的字符串robj
是分成2块的):而之所以限制长度44,是因为使用了jemalloc,将分配的大小限制在64字节以下:
robj
本身最多16字节sdshdr8
本身最多3字节- 字符串长度最多44字节
- 加上
\0
,1字节以上正好64字节。
见第4步中的代码,提及到了资源池,它实现了资源共享,节省了内存。
Redis会预先创建一堆字符串,默认是0~10000(左闭右开)的整数字符串,当需要使用这些串时,直接取用即可。
由于是共享,所以使用时引用计数会+1,防止被回收。
(资源池和JDK的字符串池很类似,而引用计数Netty
ByteBuf
也有类似的一套。)
1.2. 列表
由于现在都使用quicklist
作为列表实现,所以创建它时(t_list.c
中的创建,如函数pushGenericCommand(client *c, int where)
),最后调用的只有函数createQuicklistObject(void)
,它仅仅简单创建了一个quicklist
:
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate(); // 仅仅创建一个quicklist
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}
1.3. 散列表
散列表的数据结构可以是ziplist
或者dict
,相关代码在t_hash.c
中。
默认情况下,创建的散列表是ziplist
:
robj *createHashObject(void) {
unsigned char *zl = ziplistNew(); // 默认ziplist作为散列表
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
不过两种数据结构怎么转换,还是以命令HSET
为例,下面是该命令的实现:
void hsetCommand(client *c) {
int i, created = 0;
robj *o;
// ... error handling
// 1. 从数据库中查找或创建一个散列表对象
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
// 2. 尝试转换编码
hashTypeTryConversion(o,c->argv,2,c->argc-1);
// 3. 添加键值对,并统计新创建了多少键值对(散列表中的)
for (i = 2; i < c->argc; i += 2)
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
/* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[0]->ptr;
if (cmdname[1] == 's' || cmdname[1] == 'S') {
// HSET
// 4. 通知客户端修改/创建了多少个键值对
addReplyLongLong(c, created);
} else {
// ...
}
// 5. 通知数据变更,并推送变更的订阅消息
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty++;
}
这里主要看第2步和第3步。
对于第2步,它尝试转换编码,且会所有键值长度进行检查,若长度超标,则转换成dict
:
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
// 1. 若已经不是ziplist,直接返回
if (o->encoding != OBJ_ENCODING_ZIPLIST) return;
for (i = start; i <= end; i++) {
// 2. 对所有键值长度进行检查
// 若长度超过server.hash_max_ziplist_value,就转化成dict
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
// 这里的转换主要还是遍历ziplist得到键值,然后将其转成dict
// ziplist存储键值对是以"键-值-键-值"形式线性存储
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}
对于第3步,它往这个散列表添加键值对,若键值对个数超过一定限制,也会转成dict
:
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
// 若为ziplist
unsigned char *zl, *fptr, *vptr;
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 先寻找有没有已存在的键值对
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
// 若存在,则在对应位置上替换成新的值
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;
zl = ziplistDelete(zl, &vptr);
zl = ziplistInsert(zl, vptr, (unsigned char*)value,
sdslen(value));
}
}
if (!update) {
// 若不存在,则往ziplist尾部添加键和值
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl;
// 最后检查,若ziplist中键值对个数超过hash_max_ziplist_entries时,转成dict
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
// 这里是dict类型
// 这里就将键和值都编码成sds,然后保存即可
dictEntry *de = dictFind(o->ptr,field);
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
}
/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}
可以看出,散列表的数据结构会变化,当在下面条件时,使用ziplist
:
- 键和值长度都不超过
server.hash_max_ziplist_value
(默认64) - 键值对个数不超过
server.hash_max_ziplist_entries
(默认512)
其他情况下都使用dict
保存散列表。
1.4. 集合
散列表的数据结构可以是intset
或者dict
,相关代码在t_set.c
中。
这里以SADD
命令实现解释集合数据结构的转换:
void saddCommand(client *c) {
robj *set;
int j, added = 0;
// 查找有没有集合存在
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
// 不存在则创建一个,并确定了类型
set = setTypeCreate(c->argv[2]->ptr);
// 将新键值对插入数据库中
dbAdd(c->db,c->argv[1],set);
} else {
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}
// 将所有元素加入到集合中
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
// 通知事件
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
// 响应客户端
addReplyLongLong(c,added);
}
这里先看创建集合的函数setTypeCreate(sds value)
,它判断若插入的值可以编码成整数,则创建intset
,否则创建一个dict
:
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject(); // 若value可编码成整数,创建intset
return createSetObject(); // 否则为dict
}
然后是集合元素添加,即setTypeAdd(robj *subject, sds value)
函数,插入后也会做检查,可能出现类型转换:
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
// 若类型是dict,类似于Java HashSet操作
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
// 若是intset
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
// 数据可转成整数,先往intset加入数据
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
// 若集合长度大于server.set_max_intset_entries,则转换成dict
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
// 数据不可转整数,直接转换成dict
setTypeConvert(subject,OBJ_ENCODING_HT);
// 添加集合数据(这里不可能存在重复数据)
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
和散列表类似,当intset
元素个数超过server.set_max_intset_entries
时(默认512),也会转换成dict
。当然往intset
插入非数字字符串也会产生一次转换。
1.5. 有序集合
散列表的数据结构可以是ziplist
或者dict + skiplist
,相关代码在t_zset.c
中。
这里以ZADD
命令实现解释集合数据结构的转换:
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
int added = 0;
int updated = 0;
int processed = 0;
scoreidx = 2;
// ... adjust scoreidx
// ... options to check vars
elements = c->argc-scoreidx;
// ... handle errors
elements /= 2; // score-element对的个数
// ... handler & reply errors
// 解析scores,保存成一个数组
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
// 查找是否有zset存在
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
// 若不存在,检查参数
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
// 若zset_max_ziplist_entries为0
// 或者元素长度大于zset_max_ziplist_value
// 底层数据结构就是skiplist + dict
zobj = createZsetObject();
} else {
// 否则就是ziplist
zobj = createZsetZiplistObject();
}
// 添加新的键值对到数据库
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
// 插入新值(和score)/修改score
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;
ele = c->argv[scoreidx+1+j*2]->ptr;
// 这里是具体的修改操作
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
// 响应客户端
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
// ...
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
// 后续清理
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
而具体修改集合的函数是zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore)
:
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
// ...
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
// 若为ziplist
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
// 且元素项找到了
// ...
// 移除旧项,并插入新项
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
// 新插入一项
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
// 检查:若ziplist长度大于server.zset_max_ziplist_entries
// 或插入的元素长度大于server.zset_max_ziplist_value
// 直接转成dict + skiplist
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// 若为skiplist+dict,则用dict判重,skiplist存真实数据
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
de = dictFind(zs->dict,ele);
if (de != NULL) {
// update...
return 1;
} else if (!xx) {
// insert/add...
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
可知,当满足下面条件时,创建ziplist
,否则是/被转换成dict + skiplist
:
- 元素个数不超过
server.zset_max_ziplist_entries
(默认64) - 元素编码后的长度不超过
server.zset_max_ziplist_value
(默认128)
而dict + skiplist
的数据结构实现见下代码:
typedef struct zset {
dict *dict; // value - score pair
zskiplist *zsl; // value - score ordered list
} zset;
这种数据结构在执行ZSCORE
时,利用dict
,可以以$O(1)$时间完成(而ziplist
要$O(N)$);而范围查找,如ZRANK
, ZRANGE
,在skiplist
的帮助下速度也很快。
2. 引用计数内存管理
C语言不支持GC,所以Redis使用引用计数管理内存,见robj
的refCount
字段。
新对象的创建默认引用计数是1。
引用计数的增加很简单,就是计数+1:
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}
而引用计数的减可能涉及内存释放,当减到0时,需要释放内存:
void decrRefCount(robj *o) {
if (o->refcount == 1) {
// 当计数从1 -> 0, 则释放内存(数据结构和robj本身都被释放)
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
// 否则直接计数-1
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}
这种内存管理其他存储系统/库也有,如Netty ByteBuf
。
3. 空转时长
robj
的lru
字段记录了该对象最后一次被访问的时间(对于LFU-data还记录了访问的频率)。
这个字段可通过OBJECT subcommand [args]
命令获取(这里subcommand
可以是idletime
、freq
),注意它不会更新lru
字段:
void objectCommand(client *c) {
robj *o;
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
// ... reply help
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
// ... reply refcount
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
// ... reply
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
// 若maxmemory_policy不是LRU-time,返回错误
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
// ... reply error
return;
}
// 返回空闲时间(以秒计)
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
} else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
// 若maxmemory_policy不是LFU-data,返回错误
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
return;
}
// 返回访问频次
addReplyLongLong(c,LFUDecrAndReturn(o));
} else {
addReplySubcommandSyntaxError(c);
}
}
而更新这个lru
字段时,需要在server.c
的lookupKey(redisDb *db, robj *key, int flags)
中更新。这个函数最后基本都会被操作数据的命令调用:
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
// AOF/RDB时不更新lru,避免内存过多的写入
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
// LFU-data策略时更新频率到lru字段
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
// LRU-time策略时,更新当前全局LRU时钟到lru字段
val->lru = LRU_CLOCK();
}
}
return val;
} else {
return NULL;
}
}