源码阅读-Redis数据结构: 对象系统

Posted by keys961 on September 16, 2019

Redis有一套非常完整的数据结构库,但是它并没有直接使用这些数据结构构建数据库,而是构建了一个对象系统,并通过这个对象系统构建数据库。

这个对象系统包含下面几种对象:

#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1      /* List object. */
#define OBJ_SET 2       /* Set object. */
#define OBJ_ZSET 3      /* Sorted set object. */
#define OBJ_HASH 4      /* Hash object. */
#define OBJ_MODULE 5    /* Module object. */
#define OBJ_STREAM 6    /* Stream object. */

除此之外,这个对象系统实现了基于引用计数的GC和内存共享机制,提高内存使用效率。

最后,Redis对象记录对象的访问时间,可用于计算空转时长,以此来计算其被删除的优先级。

1. Redis对象定义

Redis对象的定义在redisObject中:

typedef struct redisObject {
    unsigned type:4; // 4-bit类型
    unsigned encoding:4; // 4-bit编码
    unsigned lru:LRU_BITS; // 24-bit,替换策略:LRU-time或者是LFU-data
    int refcount; // 引用计数
    void *ptr; // 指向真正数据结构的指针
} robj;

这里进行字段的说明:

  • type:4-bit,表明这个对象的类型,即开头提及的7个类型

  • encoding:4-bit,表明对象底层真正使用的数据结构,可用的数据结构有10种,如下所示:

    #define OBJ_ENCODING_RAW 0     /* 简单SDS */
    #define OBJ_ENCODING_INT 1     /* 整数 */
    #define OBJ_ENCODING_HT 2      /* 散列表dict */
    #define OBJ_ENCODING_ZIPMAP 3  /* zipmap */
    #define OBJ_ENCODING_LINKEDLIST 4 /* (弃用)链表 */
    #define OBJ_ENCODING_ZIPLIST 5 /* ziplist */
    #define OBJ_ENCODING_INTSET 6  /* 整数集合 */
    #define OBJ_ENCODING_SKIPLIST 7  /* 跳表 */
    #define OBJ_ENCODING_EMBSTR 8  /* 使用embstr编码的SDS */
    #define OBJ_ENCODING_QUICKLIST 9 /* quicklist */
    #define OBJ_ENCODING_STREAM 10 /* 流对象 */
    

    类型和编码的联系如下:

    • OBJ_STRING$\rightarrow$OBJ_ENCODING_RAW/OBJ_ENCODING_INT/OBJ_ENCODING_EMBSTR
    • OBJ_LIST$\rightarrow$OBJ_ENCODING_QUICKLIST
    • OBJ_SET$\rightarrow$OBJ_ENCODING_INTSET/OBJ_ENCODING_HT
    • OBJ_ZSET$\rightarrow$OBJ_ENCODING_SKIPLIST/OBJ_ENCODING_ZIPLIST
    • OBJ_HASH$\rightarrow$OBJ_ENCODING_HT/OBJ_ENCODING_ZIPLIST
    • OBJ_STREAM$\rightarrow$OBJ_ENCODING_STREAM

    这种复杂的对应关系主要用于节省内存

  • lru:24-bit,用于数据项的替换和回收,有2种模式

    • LRU-time:24位均用于记录对象最后一次访问时间(相对于全局的LRU时钟)
    • LFU-data:后8位记录访问频率,前16位记录访问时间区间长度
  • refcount:用于共享和内存回收

  • ptr:指向底层数据结构的指针

下面对常用类型(开头的前5个)的对象进行说明。

1.1. 字符串

t_string.c中,可指定,创建字符串的时通常调用函数在tryObjectEncoding(robj *o)(该函数在object.c中)中:

/* Try to encode a string object in order to save space */
robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;
    // 1. 验证数据类型
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
	// 2. 若数据结构不是raw(普通字符串)或者embstr,直接返回
    if (!sdsEncodedObject(o)) return o;
	// 3. 若引用计数大于1,不需要创建新串,直接返回
    if (o->refcount > 1) return o;

    len = sdslen(s);
    // 4. 若可以转成整数(64-bit整数长度不超过20)
    if (len <= 20 && string2l(s,len,&value)) {
        if ((server.maxmemory == 0 ||
            !(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            // 这里使用资源池,[0,10000)的整数直接返回池中对象,节省内存
        	// 只需池中资源计数+1,而传入的robj引用计数-1
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            // 其他情况下,整数值已经在value中
            // 所以释放原字符串,将整数value赋给robj,并置编码为OBJ_ENCODING_INT
            if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr);
            o->encoding = OBJ_ENCODING_INT;
            o->ptr = (void*) value;
            return o;
        }
    }
	// 5. 若字符串长度不超过44,那么使用embstr编码
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }
	// 6. 以上尝试均不成功,则尝试对原SDS串进行trim
    // 即空闲空间大于字符串长度的1/10时,执行resize,清除空闲空间
    trimStringObjectIfNeeded(o);
	// 7. 返回原对象
    return o;
}

可以看出,对于字符串的存储:

  • 首先尝试将其转成整数,并尽量使用池化的整数
  • 然后尝试将其转成embstr(对于长度不超过44的字符串)
  • 最后尝试消除空闲空间

一切都是为了节省空间。

对于embstr,它的结构如下,是一个连续robj结构(而普通的字符串robj是分成2块的):

embstr

而之所以限制长度44,是因为使用了jemalloc,将分配的大小限制在64字节以下:

  • robj本身最多16字节
  • sdshdr8本身最多3字节
  • 字符串长度最多44字节
  • 加上\0,1字节

以上正好64字节。

见第4步中的代码,提及到了资源池,它实现了资源共享,节省了内存。

Redis会预先创建一堆字符串,默认是0~10000(左闭右开)的整数字符串,当需要使用这些串时,直接取用即可。

由于是共享,所以使用时引用计数会+1,防止被回收。

(资源池和JDK的字符串池很类似,而引用计数Netty ByteBuf也有类似的一套。)

1.2. 列表

由于现在都使用quicklist作为列表实现,所以创建它时(t_list.c中的创建,如函数pushGenericCommand(client *c, int where)),最后调用的只有函数createQuicklistObject(void),它仅仅简单创建了一个quicklist

robj *createQuicklistObject(void) {
    quicklist *l = quicklistCreate(); // 仅仅创建一个quicklist
    robj *o = createObject(OBJ_LIST,l);
    o->encoding = OBJ_ENCODING_QUICKLIST;
    return o;
}

1.3. 散列表

散列表的数据结构可以是ziplist或者dict,相关代码在t_hash.c中。

默认情况下,创建的散列表是ziplist

robj *createHashObject(void) {
    unsigned char *zl = ziplistNew(); // 默认ziplist作为散列表
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

不过两种数据结构怎么转换,还是以命令HSET为例,下面是该命令的实现:

void hsetCommand(client *c) {
    int i, created = 0;
    robj *o;
    // ... error handling
    // 1. 从数据库中查找或创建一个散列表对象
    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    // 2. 尝试转换编码
    hashTypeTryConversion(o,c->argv,2,c->argc-1);
    // 3. 添加键值对,并统计新创建了多少键值对(散列表中的)
    for (i = 2; i < c->argc; i += 2)
        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
    
    /* HMSET (deprecated) and HSET return value is different. */
    char *cmdname = c->argv[0]->ptr;
    if (cmdname[1] == 's' || cmdname[1] == 'S') {
        // HSET
        // 4. 通知客户端修改/创建了多少个键值对
        addReplyLongLong(c, created);
    } else {
        // ...
    }
    // 5. 通知数据变更,并推送变更的订阅消息
    signalModifiedKey(c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty++;
}

这里主要看第2步和第3步。

对于第2步,它尝试转换编码,且会所有键值长度进行检查,若长度超标,则转换成dict

void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
    int i;
    // 1. 若已经不是ziplist,直接返回
    if (o->encoding != OBJ_ENCODING_ZIPLIST) return;
    for (i = start; i <= end; i++) {
        // 2. 对所有键值长度进行检查
        // 若长度超过server.hash_max_ziplist_value,就转化成dict
        if (sdsEncodedObject(argv[i]) &&
            sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
        {
            // 这里的转换主要还是遍历ziplist得到键值,然后将其转成dict
            // ziplist存储键值对是以"键-值-键-值"形式线性存储
            hashTypeConvert(o, OBJ_ENCODING_HT);
            break;
        }
    }
}

对于第3步,它往这个散列表添加键值对,若键值对个数超过一定限制,也会转成dict

#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
    int update = 0;

    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
        // 若为ziplist
        unsigned char *zl, *fptr, *vptr;
        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            // 先寻找有没有已存在的键值对
            fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
            if (fptr != NULL) {
                // 若存在,则在对应位置上替换成新的值
                vptr = ziplistNext(zl, fptr);
                serverAssert(vptr != NULL);
                update = 1;
                zl = ziplistDelete(zl, &vptr);
                zl = ziplistInsert(zl, vptr, (unsigned char*)value,
                        sdslen(value));
            }
        }
        if (!update) {
            // 若不存在,则往ziplist尾部添加键和值
            zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
                    ZIPLIST_TAIL);
            zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
                    ZIPLIST_TAIL);
        }
        o->ptr = zl;
        // 最后检查,若ziplist中键值对个数超过hash_max_ziplist_entries时,转成dict
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, OBJ_ENCODING_HT);
    } else if (o->encoding == OBJ_ENCODING_HT) {
        // 这里是dict类型
        // 这里就将键和值都编码成sds,然后保存即可
        dictEntry *de = dictFind(o->ptr,field);
        if (de) {
            sdsfree(dictGetVal(de));
            if (flags & HASH_SET_TAKE_VALUE) {
                dictGetVal(de) = value;
                value = NULL;
            } else {
                dictGetVal(de) = sdsdup(value);
            }
            update = 1;
        } else {
            sds f,v;
            if (flags & HASH_SET_TAKE_FIELD) {
                f = field;
                field = NULL;
            } else {
                f = sdsdup(field);
            }
            if (flags & HASH_SET_TAKE_VALUE) {
                v = value;
                value = NULL;
            } else {
                v = sdsdup(value);
            }
            dictAdd(o->ptr,f,v);
        }
    } else {
        serverPanic("Unknown hash encoding");
    }

    /* Free SDS strings we did not referenced elsewhere if the flags
     * want this function to be responsible. */
    if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
    if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
    return update;
}

可以看出,散列表的数据结构会变化,当在下面条件时,使用ziplist

  • 键和值长度都不超过server.hash_max_ziplist_value(默认64)
  • 键值对个数不超过server.hash_max_ziplist_entries(默认512)

其他情况下都使用dict保存散列表。

1.4. 集合

散列表的数据结构可以是intset或者dict,相关代码在t_set.c中。

这里以SADD命令实现解释集合数据结构的转换:

void saddCommand(client *c) {
    robj *set;
    int j, added = 0;
    // 查找有没有集合存在
    set = lookupKeyWrite(c->db,c->argv[1]);
    if (set == NULL) {
        // 不存在则创建一个,并确定了类型
        set = setTypeCreate(c->argv[2]->ptr);
        // 将新键值对插入数据库中
        dbAdd(c->db,c->argv[1],set);
    } else {
        if (set->type != OBJ_SET) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }
	// 将所有元素加入到集合中
    for (j = 2; j < c->argc; j++) {
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
    // 通知事件
    if (added) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    }
    server.dirty += added;
    // 响应客户端
    addReplyLongLong(c,added);
}

这里先看创建集合的函数setTypeCreate(sds value),它判断若插入的值可以编码成整数,则创建intset,否则创建一个dict

robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject(); // 若value可编码成整数,创建intset
    return createSetObject(); // 否则为dict
}

然后是集合元素添加,即setTypeAdd(robj *subject, sds value)函数,插入后也会做检查,可能出现类型转换:

int setTypeAdd(robj *subject, sds value) {
    long long llval;
    if (subject->encoding == OBJ_ENCODING_HT) {
        // 若类型是dict,类似于Java HashSet操作
        dict *ht = subject->ptr;
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
        // 若是intset
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            // 数据可转成整数,先往intset加入数据
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                // 若集合长度大于server.set_max_intset_entries,则转换成dict
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            // 数据不可转整数,直接转换成dict
            setTypeConvert(subject,OBJ_ENCODING_HT);
            // 添加集合数据(这里不可能存在重复数据)
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
            return 1;
        }
    } else {
        serverPanic("Unknown set encoding");
    }
    return 0;
}

和散列表类似,当intset元素个数超过server.set_max_intset_entries时(默认512),也会转换成dict。当然往intset插入非数字字符串也会产生一次转换。

1.5. 有序集合

散列表的数据结构可以是ziplist或者dict + skiplist,相关代码在t_zset.c中。

这里以ZADD命令实现解释集合数据结构的转换:

void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    int added = 0;      
    int updated = 0;   
    int processed = 0;  
    
    scoreidx = 2;
    // ... adjust scoreidx
    // ... options to check vars
    elements = c->argc-scoreidx;
    // ... handle errors
    elements /= 2; // score-element对的个数
    // ... handler & reply errors
    
    // 解析scores,保存成一个数组
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    // 查找是否有zset存在
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        // 若不存在,检查参数
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            // 若zset_max_ziplist_entries为0
            // 或者元素长度大于zset_max_ziplist_value
            // 底层数据结构就是skiplist + dict
            zobj = createZsetObject();
        } else {
            // 否则就是ziplist
            zobj = createZsetZiplistObject();
        }
        // 添加新的键值对到数据库
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
	// 插入新值(和score)/修改score
    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = flags;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        // 这里是具体的修改操作
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

// 响应客户端
reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        // ...
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }
// 后续清理
cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

而具体修改集合的函数是zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore)

int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
    // ...
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        // 若为ziplist
        unsigned char *eptr;
        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
			// 且元素项找到了
			// ...
            // 移除旧项,并插入新项
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *flags |= ZADD_UPDATED;
            }
            return 1;
        } else if (!xx) {
            // 新插入一项
            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
            // 检查:若ziplist长度大于server.zset_max_ziplist_entries
            // 或插入的元素长度大于server.zset_max_ziplist_value
            // 直接转成dict + skiplist
            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            if (sdslen(ele) > server.zset_max_ziplist_value)
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            if (newscore) *newscore = score;
            *flags |= ZADD_ADDED;
            return 1;
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        // 若为skiplist+dict,则用dict判重,skiplist存真实数据
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;

        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            // update...
            return 1;
        } else if (!xx) {
            // insert/add...
            return 1;
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

可知,当满足下面条件时,创建ziplist,否则是/被转换成dict + skiplist

  • 元素个数不超过server.zset_max_ziplist_entries(默认64)
  • 元素编码后的长度不超过server.zset_max_ziplist_value(默认128)

dict + skiplist的数据结构实现见下代码:

typedef struct zset {
    dict *dict; // value - score pair
    zskiplist *zsl; // value - score ordered list
} zset;

这种数据结构在执行ZSCORE时,利用dict,可以以$O(1)$时间完成(而ziplist要$O(N)$);而范围查找,如ZRANK, ZRANGE,在skiplist的帮助下速度也很快。

2. 引用计数内存管理

C语言不支持GC,所以Redis使用引用计数管理内存,见robjrefCount字段。

新对象的创建默认引用计数是1。

引用计数的增加很简单,就是计数+1:

void incrRefCount(robj *o) {
    if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}

而引用计数的减可能涉及内存释放,当减到0时,需要释放内存:

void decrRefCount(robj *o) {
    if (o->refcount == 1) {
        // 当计数从1 -> 0, 则释放内存(数据结构和robj本身都被释放)
        switch(o->type) {
        case OBJ_STRING: freeStringObject(o); break;
        case OBJ_LIST: freeListObject(o); break;
        case OBJ_SET: freeSetObject(o); break;
        case OBJ_ZSET: freeZsetObject(o); break;
        case OBJ_HASH: freeHashObject(o); break;
        case OBJ_MODULE: freeModuleObject(o); break;
        case OBJ_STREAM: freeStreamObject(o); break;
        default: serverPanic("Unknown object type"); break;
        }
        zfree(o);
    } else {
        // 否则直接计数-1
        if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
        if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
    }
}

这种内存管理其他存储系统/库也有,如Netty ByteBuf

3. 空转时长

robjlru字段记录了该对象最后一次被访问的时间(对于LFU-data还记录了访问的频率)。

这个字段可通过OBJECT subcommand [args]命令获取(这里subcommand可以是idletimefreq),注意它不会更新lru字段

void objectCommand(client *c) {
    robj *o;

    if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
        // ... reply help
    } else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
        // ... reply refcount
    } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
        // ... reply
    } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        // 若maxmemory_policy不是LRU-time,返回错误
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            // ... reply error
            return;
        }
        // 返回空闲时间(以秒计)
        addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
    } else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
                == NULL) return;
        // 若maxmemory_policy不是LFU-data,返回错误
        if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
            
            return;
        }
        // 返回访问频次
        addReplyLongLong(c,LFUDecrAndReturn(o));
    } else {
        addReplySubcommandSyntaxError(c);
    }
}

而更新这个lru字段时,需要在server.clookupKey(redisDb *db, robj *key, int flags)中更新。这个函数最后基本都会被操作数据的命令调用:

robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        // AOF/RDB时不更新lru,避免内存过多的写入
        if (server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            !(flags & LOOKUP_NOTOUCH))
        {
            // LFU-data策略时更新频率到lru字段
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                // LRU-time策略时,更新当前全局LRU时钟到lru字段
                val->lru = LRU_CLOCK();
            }
        }
        return val;
    } else {
        return NULL;
    }
}