源码阅读-Redis数据结构: 字典

Posted by keys961 on September 5, 2019

Redis自己实现了字典,即散列表。它的结构和Java的HashMap类似,都是用分离链表法处理散列冲突。

字典的定义和实现在文件dict.hdict.c中。

1. 字典定义

字典的主要构成是散列表,它的定义为dictht,如下所示:

typedef struct dictht {
    dictEntry **table; // 散列表数组
    unsigned long size; // 散列表数组大小
    unsigned long sizemask; // 散列表数组大小掩码, =size-1
    unsigned long used; // 已有的散列节点项个数
} dictht;

散列表中主要是散列项,定义为dictEntry,从代码可知:

  • 散列表使用分离链表解决冲突
  • 值可以是要么是64位数字(整数/浮点),要么是其它任意值(用void *表示)
typedef struct dictEntry {
    void *key; // 键
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v; // 值,可以是union中4种值的1种
    struct dictEntry *next; // 下一个散列项,用于处理冲突
} dictEntry;

此外散列表定义了一组操作,这些操作是可以被自定义实现的(多态),它们被包含在dictType中:

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key); // 散列函数
    void *(*keyDup)(void *privdata, const void *key); // 复制指定键
    void *(*valDup)(void *privdata, const void *obj); // 复制指定值
    int (*keyCompare)(void *privdata, const void *key1, const void *key2); // 比较给定的2个键
    void (*keyDestructor)(void *privdata, void *key); // 释放指定键
    void (*valDestructor)(void *privdata, void *obj); // 释放指定值
} dictType;

以上内容组合在一起,构成字典,定义位dict

typedef struct dict {
    dictType *type; // 对应类型字典的操作
    void *privdata; // 私有数据
    dictht ht[2]; // 散列表,正常情况下使用[0],rehash时使用[1]
    long rehashidx; // 记录rehash的进度,默认-1
    unsigned long iterators; // 记录该字典有多少safe迭代器运行
} dict;

当然它也定义了迭代器dictIter

typedef struct dictIterator {
    dict *d; // 对应字典
    long index; // 所在散列表中的数组的第几项
    int table, safe; // 所在的那个表(一般是[0]); 是否安全(0:只能在迭代时调用dictNext();1:其它操作都可以调用)
    dictEntry *entry, *nextEntry; // 当前散列项,下一个散列项
    long long fingerprint; // 用于unsafe迭代器的校验,避免误操作
} dictIterator;

2. 散列算法

Redis字典的散列算法调用是:

hash = d->type->hashFunction(key);
index = hash & d->ht[x]->sizemask; // x = 0 or 1

第二步看出,Redis的散列表数组长度是$2^n$,和Java的HashMap是一样的。

默认的散列函数是SipHash,定义和实现在spihash.c中。

3. 键值插入和冲突处理

易知,Redis字典使用分离链表来处理散列冲突。链表是单向链表。

这里以添加键值对函数dictAdd(dict *d, void *key, void *val)为例:

int dictAdd(dict *d, void *key, void *val)
{
    // 1.创建新的散列项
    dictEntry *entry = dictAddRaw(d,key,NULL);
    if (!entry) return DICT_ERR;
    // 2.往散列项赋值(默认调用拷贝,若未指定拷贝则赋值指针)
    dictSetVal(d, entry, val);
    return DICT_OK;
}

主要看第一步,不论冲突与否,都插入列表头部:

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
	// 1.检测是否在rehash,若需要则执行一步rehash
    // Redis是一步一步rehash的(即一条一条链表渐进式)
    // 每执行完一步就自增rehashidx
    if (dictIsRehashing(d)) _dictRehashStep(d);
    // 2.根据散列确定index(即第几条链表),若key已存在则返回-1,插入会失败
    // 若要替换原值,调用dictReplace函数,它不会修改任何链表结构
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
	// 3. 若在rehash,插入[1],否则插入[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 4. 分配空间
    entry = zmalloc(sizeof(*entry));
    // 5. 将散列项插入头部
    entry->next = ht->table[index];
    ht->table[index] = entry;
    // 6. 变更计数
    ht->used++;
    // 7. 设置键的值(若拷贝函数指定,则拷贝,否则赋值指针)
    dictSetKey(d, entry, key);
    return entry;
}

这里看到,它没有像Java 8以上的HashMap一样,引入红黑树。

4. Rehash

dict中定义了ht[2]字段,它用于rehash,将新表先存在ht[1]上,完成后赋值到ht[0]上。

4.1. 扩容

Rehash有2个条件:

  • 当没有执行BGSAVE/BGREWRITEAOF时,负载因子大于等于1
  • 当正在执行BGSAVE/BGREWRITEAOF时,负载因子大于等于5

对于后者,由于执行这些命令需要创建子进程,而创建子进程大多是“写时复制”。若rehash,则会带来更多的内存开销。因此扩大了负载因子。

上述条件在_dictExpandIfNeeded(dict *d)函数中体现,就是下面的if条件:

  • 首先必须ht->used >= ht->size,即负载因子不小于1

  • 然后判断dict_can_resize是否非0

    • 若非0,则没有执行上面2个命令时
    • 若为0,则正在后台执行上面2个命令之一

    而这个字段由updateDictResizePolicy()函数改变,判断RDB/AOF子进程是否存在,并设置dict_can_resize值:

    void updateDictResizePolicy(void) {
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            dictEnableResize(); // 置1
        else
            dictDisableResize(); // 置0
    }
    
static int _dictExpandIfNeeded(dict *d)
{
    // ...
    // 就是if中的条件
    if (d->ht[0].used >= d->ht[0].size &&
       (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        // 扩容
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

符合条件后,就是扩容,扩容时的大小是原来的2倍(下面会说,必须是$2^n$)。

扩容的函数是dictExpand(dict *d, unsigned long size),代码注释说明了逻辑:

int dictExpand(dict *d, unsigned long size)
{
  	// ...
    // 1. 创建新散列表,且大小必须是2^n(大于size的)
    dictht n; 
    unsigned long realsize = _dictNextPower(size);
	// 2. 初始化新表
    // ...
   	// 3. 将新表(浅)复制给ht[1],并设置rehashidx进度设为0 (进度默认是-1)
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

4.2. 收缩

有扩容也有收缩,收缩入口在dictResize(dict *d)方法,它将散列表大小缩小到不小于ht->used的2的幂次倍。

而收缩成立的条件在server.chtNeedsResize(dict *dict)函数中,它的条件是:

  • 散列表的大小大于4
  • 负载因子小于0.1
// DICT_HT_INITIAL_SIZE = 4
// HASHTABLE_MIN_FILL = 10
int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    // size要大于4,且负载因子小于0.1
    return (size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < HASHTABLE_MIN_FILL));
}

4.3. 渐进式rehash

扩容后,需要重新调整链表。

Redis中,rehash是渐进式的,且分为2种:

  • 根据链表一条一条rehash,直到完成
  • 限制一次渐进rehash时间,一步一步完成

第二种方式最终也会调用第一种方式的相关函数。

a) 根据step渐进rehash

这种方法中,一个step代表处理完原表的一条链表,最关键函数是dictRehash(dict *d, int n),第二个参数就是step:

注意,一个step处理的链表必须非空

int dictRehash(dict *d, int n) {
    // 这里n代表处理n条非空链表
    int empty_visits = n*10; // 限制只能读n * 10条空链表
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        // ...
        while(d->ht[0].table[d->rehashidx] == NULL) {
            // 访问到空链表,继续
            d->rehashidx++;
            // 空链表读到次数过多,就退出
            if (--empty_visits == 0) return 1;
        }
        // 对于非空链表,将旧链表拆除2个新链表赋到新表上
        // ... code omitted
        // 清除旧链表
        d->ht[0].table[d->rehashidx] = NULL;
        // rehashidx进度增加
        d->rehashidx++;
    }
    // rehash完,扫尾并返回0,否则返回1
    if (d->ht[0].used == 0) {
        // ... reset & free
        return 0;
    }
    return 1;
}

上面函数会:

  • 最多处理旧表中的n条非空链表,并在新表中产生最多2*n条非空链表,完成部分的rehash(和Java的HashMap十分类似)
  • 但最多遍历10*n条非空链表

Redis利用这个函数,实现了单步的rehash,函数是_dictRehashStep(dict *d),实际上调用的是dictRehash(d, 1)

b) 限制时间的rehash

这部分关键函数是dictRehashMilliseconds(dict *d, int ms)

int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;
	// 同样使用了dictRehash(dict* d, int n)
    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

它每次都会尝试rehash 100条非空链表,并检查时间,若超时旧退出。

c) Rehash时的读写

对于读,在dictFind(dict *d, const void *key)函数中,可见:

  • 若在rehash,则先单步rehash一次
  • 查找时,2张表都会找
dictEntry *dictFind(dict *d, const void *key)
{
    // 1. 若在rehash,再单步rehash一次
    if (dictIsRehashing(d)) _dictRehashStep(d);
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
       // 2. 会从2个表中查找
       // ...
    }
    // ...
}

对于写,以最后的dictAddRaw(dict *d, void *key, dictEntry **existing)为例,可见:

  • 若在rehash,则先单步rehash一次
  • 查找是否有重复节点时,2张表都会找
  • 插入时,若rehash,插入ht[1],否则插入ht[0]
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    // 1.若在rehash,则先单步rehash一次
    if (dictIsRehashing(d)) _dictRehashStep(d);
    // 2. 查找是否有重复节点时,2张表都会找,代码略
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
	// 3. 插入时,若rehash,插入ht[1],否则插入ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
   	// ...
}

d) 为何要2种渐进式rehash?

对于第一种方案,散列项添加/查询后,如果需要/正在rehash,都会一步一步来,这样就可以把rehash的时间平摊到每个请求中,避免长时间停顿

然而这种方法会导致ht[0]ht[1]都有数据,因此需要加速其rehash。Redis会在空闲时抽出1ms来推进rehash流程,不影响性能且加速rehash,这是第二种方案存在的原因。

// server.c 空闲抽1ms进行rehash
// called by databasesCron()
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1;
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1;
    }
    return 0;
}