源码阅读-Redis数据结构: 整数集合

Posted by keys961 on September 7, 2019

Redis有多种集合的实现,之前的zskiplist中提及的zset有序集合就是一种实现。

整数集合(intset)也是Redis集合的实现,当集合中只有整数,且数量不多时,采用整数集合进行存储。下面的情形就是一个例子:

> SADD nums 1 2 3 4 5
(integer) 5
> OBJECT ENCODING nums
"intset"

整数集合的定义和实现在intset.hintset.c中。

1. 整数集合定义

定义很简单,即intset结构,字段解释如下所述:

typedef struct intset {
    uint32_t encoding; // 编码方式
    uint32_t length; // 集合内元素数量
    int8_t contents[]; // 元素的数据
} intset;

这里数据内容以int8_t数组表示,实际上,存储的类型由encoding决定

  • encoding == INTSET_ENC_INT16时,访问content时,会被强转为int16_t *,再以数组形式访问
  • encoding == INTSET_ENC_INT32时,访问content时,会被强转为int32_t *,再以数组形式访问
  • encoding == INTSET_ENC_INT64时,访问content时,会被强转为int64_t *,再以数组形式访问
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

并且,每个元素占用的空间一样,即每个元素的类型都是一样的

由于整数集合存储的整数类型可以变动,这就牵扯到整数的“升级”和“降级”,这在下面会说明。

2. 整数集合的创建

创建逻辑在intsetNew(void)函数中,默认encodingINTSET_ENC_INT16

intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16); // encoding默认INTSET_ENC_16
    is->length = 0;
    return is;
}

注意这里有intrev32ifbe(v)的宏,这里是处理大小端的问题,因为一个元素的大小必然超过1个字节,在int8_t *content数组中会占多个数组项。

所以大端机器上的数据需要进行转换才能存储/访问:

#if (BYTE_ORDER == LITTLE_ENDIAN)
#define memrev16ifbe(p) ((void)(0)) // 小端不转换
#define memrev32ifbe(p) ((void)(0))
#define memrev64ifbe(p) ((void)(0))
#define intrev16ifbe(v) (v)
#define intrev32ifbe(v) (v)
#define intrev64ifbe(v) (v)
#else
#define memrev16ifbe(p) memrev16(p) // 大端需要转换(将小端数据转成大端)
#define memrev32ifbe(p) memrev32(p)
#define memrev64ifbe(p) memrev64(p)
#define intrev16ifbe(v) intrev16(v)
#define intrev32ifbe(v) intrev32(v)
#define intrev64ifbe(v) intrev64(v)
#endif

大小端转换的定义和实现在endianconv.hendianconv.c

3. 元素添加与升级

元素添加在intsetAdd(intset *is, int64_t value, uint8_t *success)函数中实现,具体解释看下面代码:

intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    // 1. 判断新元素的类型
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;
	// 2. 判断新元素的类型是否大于集合的encoding
    if (valenc > intrev32ifbe(is->encoding)) {
        // 2.1. 若大于则升级,并执行插入(升级之后说明)
        return intsetUpgradeAndAdd(is,value);
    } else {
       	// 2.2. 否则直接执行插入流程
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

3.1. 不升级的插入

不升级的情况下,插入就是注释2.2.处下面的代码:

// ...
else {
    // 1. 首先找是否元素存在(存在就直接返回)
    // 这里使用二分法查找
    // 当没找到元素时,pos会返回新元素插入的位置
    if (intsetSearch(is,value,&pos)) {
        if (success) *success = 0;
        return is;
    }
    // 2. 调整数组长度,即多开一个元素的空间(调用的是realloc)
    is = intsetResize(is,intrev32ifbe(is->length)+1);
    // 3. 若插入的位置不在尾端
    // 需要将插入位置以及之后的所有内容往后移动一个元素的长度
    if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
// 4. 设置新元素到插入的位置
_intsetSet(is,pos,value);
// 5. 更新长度
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
// ...

总体而言还是很简单的,主要还是:

  • 二分查找元素,并决定插入位置
  • 申请和移动内存
  • 插入元素,更新元数据

不过这样的插入每次都要重新申请内存,并很可能移动内存,引入2次内存拷贝,当元素数量很大的时候性能会很差

假如不考虑拷贝,这里的时间复杂度是$O(log N)$。

3.2. 升级的插入

升级的情况下,插入就是注释2.1.处下面的函数intsetUpgradeAndAdd(intset *is, int64_t value),代码解释如下:

static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    // 1. 判断正负以判断插入头还是尾
    // 只可能插入头/尾,因为升级,新值长度是最长的,只可能比所有元素都小或都大
    int prepend = value < 0 ? 1 : 0;
	// 2. 确定新的encoding,并扩充内存
    is->encoding = intrev32ifbe(newenc);
    is = intsetResize(is,intrev32ifbe(is->length)+1);
	// 3. 从原表尾部将元素一个一个迁移
    // 易知,扩充后的内存必然是原来的2倍以上,所以从尾部开始遍历不会错误修改原数据
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    // 4. 插入新值(头/尾)
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    // 5. 更新长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

总体来说也是很简单,就是:

  • 确定新的encoding
  • 扩充内存
  • 迁移整张表原有数据(因为所有数据都被升级了)
  • 从头/尾插入数据
  • 更新元数据

可以看出,升级带来的时间复杂度是$O( N )$。

3.3. 升级的好处

  • 灵活性提升,可随意添加不同类型的整数到集合,不用担心类型错误
  • 节省内存,升级只会在需要的时候进行

3.4. 整数集合的降级——不支持

Redis的intset不支持降级

一旦升级,编码一直保持升级后的状态

4. 整数集合的查找

由于content存储的整数是有序的,所以采用二分法查找。实现在函数intsetSearch(intset *is, int64_t value, uint32_t *pos)中:

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    // 1. 空集合, 返回0
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        // 2. 检查集合整数的上下界
        // 在带查找整数在范围之外的情况下,将pos置为头或者尾
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }
	// 3. 二分查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }
	// 4. 赋值pos
    if (value == cur) {
        // 找到情况, pos就是对应值的位置
        if (pos) *pos = mid;
        return 1;
    } else {
        // 没找到的情况下,pos赋值尾min
        // 因为content[min]值恰好比value大,需要插入到这个位置
        if (pos) *pos = min;
        return 0;
    }
}

4. 整数集合的删除

删除操作的实现在intsetRemove(intset *is, int64_t value, int *success)函数中,实现方法和之前的操作比较类似:

intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;
	// 1. 查找元素是否存在,注意要检查类型,不能超过集合的encoding
    // 若存在会进入if内,且能拿到对应元素的位置
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);
        if (success) *success = 1;
        // 2. 删除,只需要向前移动内存即可
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
        // 3. 缩容(依旧是realloc)
        is = intsetResize(is,len-1);
        // 4. 更新长度
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

总体也是很简单,逻辑就是:

  • 查找是否存在,若存在则获取元素位置
  • 删除,只需向前移动内存进行覆盖即可
  • 调整content数组大小(问题:为何没有像SDS一样懒惰释放?)
  • 更新元数据