源码阅读-Redis数据结构: 跳表

Posted by keys961 on September 6, 2019

跳表是一个有序列表,每个节点会维护多个指针指向其它节点,以加速访问。

它查找效率为:平均$O(logN)$,最坏$O(N)$。由于实现比较简单,它常用来替换平衡树(如红黑树)

Redis里,跳表被用于有序集合。

关于跳表的定义和API实现在文件server.ht_zset.c中。

1. 跳表定义

首先是跳表的节点,定义为zskiplistNode,有下面几个重要信息:

  • level数组:代表层,层数是由随机数指定(1~32层)
    • 每一层指向含有对应层的下一个节点(forward指针)
    • 此外还有跨度(span),代表该层中,自己和对应下一节点的距离
  • backward后驱节点,即第一层的前一个节点
  • score:排序依据,节点按该字段从小到大排序
  • ele:节点元素保存的值,它是SDS字符串(当score一样时,会按ele字典序排序)
typedef struct zskiplistNode {
    sds ele; // 元素的值
    double score; // 分值,从小到大排列
    struct zskiplistNode *backward; // 后驱节点
    struct zskiplistLevel {
        struct zskiplistNode *forward; // 对应层的后驱节点
        unsigned long span; // 跨度,记录自己和后驱节点的距离
    } level[]; // 跳表层数组
} zskiplistNode;

然后是跳跃表的定义zskiplist,主要包含:

  • 跳表的头和尾
  • 跳表长度
  • 层数最大的节点的层数
typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

图示可参考下面:

skip_list

2. 跳表创建

主要函数是zslCreate()(在t_zset.c中实现),这里注意,它创建了一个dummy header节点:

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
	// 1. 开辟空间
    zsl = zmalloc(sizeof(*zsl));
    // 2. 初始化参数
    zsl->level = 1;
    zsl->length = 0;
    // 3. 创建一个dummy head node
    // 它的层数是64,score是0, ele为空
    // ZSKIPLIST_MAXLEVEL = 64
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    // 4. 把dummy header每一层以及其它字段清空
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    zsl->header->backward = NULL;
    zsl->tail = NULL;
    return zsl;
}

3. 跳表元素插入

主要实现是zslInsert(zskiplist *zsl, double score, sds ele)函数,逻辑如下面代码和注释所述:

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // update数组存储新节点每层的前驱节点,x就是新节点
    // (实际上只会用max(zsl->level, level)个元素)
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    // rank数组存储新节点每次的前驱节点的位置(相对于header)
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    // 1. x从header开始遍历
    x = zsl->header;
    // 2. 先从最上层
    for (i = zsl->level-1; i >= 0; i--) {
        // 最上层初始为0,之后根据上一层的位置初始化
        // 因为每往下走一层,初始位置(前后的)是上一层遍历最后位置
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        // 3. 该层上
        // 遍历到的当前节点的后继节点score比自己小,则前进
        // 否则就停止遍历,当前遍历的位置就是该层上新节点的前驱节点
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            // 记录(该层前驱节点)的位置
            rank[i] += x->level[i].span;
            // 向后遍历
            x = x->level[i].forward;
        }
        // 这里x就是newNode->level[i]->backward(虽然没有backword字段)
        update[i] = x;
    }
   	
    // 4. 随机算出一个新结点的层数
    level = zslRandomLevel();
    // 5. 当新节点的层数比其它节点的层数都大时
    // 则需要初始化[zsl->level, level)的rank和update
    // 并更新zsl->level
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            // rank是0,因为前驱是header
            rank[i] = 0;
            update[i] = zsl->header;
            // 这里就是header->level[i].span
            // 值后面会被更新,先初始化为为原表长度(指向队尾)
            update[i]->level[i].span = zsl->length;
        }
        // 更新zsl->level
        zsl->level = level;
    }
    // 6. 创建新节点
    x = zslCreateNode(level,score,ele);
    // 7. 连接每一层的后驱节点
    for (i = 0; i < level; i++) {
        // 连接每一层新节点的后驱节点
        x->level[i].forward = update[i]->level[i].forward;
        // 修改每一层新节点前驱节点的后驱节点
        update[i]->level[i].forward = x;
		// 修改每一层新节点的span
        // 说明: 插入新节点后update[i]->level[i].span+1是新节点该层的前驱节点和该层的后驱节点的距离
        // 然后需要减去新节点和前驱节点的距离,即rank[0]-rank[i]+1(后面会说)
        // 这样最后就是下面的计算公式了
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        // 修改每一层新节点前驱节点的span
        // 说明: 最底层的前驱节点位置和自己的位置做差,再加上1,易知,新节点和最底层前驱节点的距离就是1
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 8,若level < zsl->level时
    // 上面必有几层指针不用动
    // 所以需要该层前驱的span + 1,因为(以第一层看)中间加了节点,距离变远了
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
	// 9. 连接新节点的前驱节点,若前驱是header,需要设为NULL
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    // 10. 连接新节点后驱节点的前驱节点,必要时更新zsl->tail字段
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    // 11. 改变长度
    zsl->length++;
    return x;
}

总体来说还是略微复杂,总体来说:

  • 从上层到下层遍历,需要记住新节点每层的前驱节点(即前驱的score恰好要小于新节点的score),需要遍历到最底层
  • 创建新节点,层数随机
  • 更新每层的后驱节点指针(新节点和它每层的前驱节点)和底层的前驱指针(新节点和底层的后驱节点)
  • 更新其它元数据

层数随机由zslRandomLevel()函数决定,层数每+1层的概率是25%,最大64层:

// ZSKIPLIST_P = 0.25
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

4. 跳表元素查询

查询的流程就和插入大同小异了,特别是遍历的方式:从最上层往下遍历,一层一层逼近目标节点,需要遍历到最后一层

zslFirstInRange(zskiplist *zsl, zrangespec *range)函数为例,下面是代码和说明注释:

zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;
    // 1. 检查整张表有没有节点全不在range内
    if (!zslIsInRange(zsl,range)) return NULL;
	// 2. 从header开始,从顶层遍历到底层
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        // 3. 每一层时,当后继节点的score比range下限小,则往前遍历
        while (x->level[i].forward &&
            !zslValueGteMin(x->level[i].forward->score,range))
                x = x->level[i].forward;
    }
	// 4. 此时x的后继节点(肯定在最底层)score必定大于/不小于range下限
    // 后继节点不可能为null,因为第1步检查过了
    // 这样x就再往前1步,就是我们要的结果
    x = x->level[0].forward;
    serverAssert(x != NULL);
	// 5. 最后检查x的score是否超过range上限
    if (!zslValueLteMax(x->score,range)) return NULL;
    return x;
}

5. 跳表元素删除

删除依旧和插入类似,在zslDelete(zskiplist *zsl, double score, zskiplistNode **node)中,实现很像,尤其是遍历定位的操作

具体看下面代码和注释解释:

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;
	// 1. 依旧和插入一样,找到每层的(可能)待删除节点的前驱节点
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    // 2. 只有它可能被删除,因为只有它可能满足:
    // a) 它的score >= 给定的score且最小
    // b) 它的ele字典序 >= 给定的ele字典序且最小
    x = x->level[0].forward;
    // 3. 判断非空, score和ele相等,若相等则删除
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        // a) 这里将节点移出列表
        zslDeleteNode(zsl, x, update);
        if (!node)
            // b) 释放内存
            zslFreeNode(x);
        else
            // 这里不会进入, Redis唯一用到的地方node == NULL
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

而移出节点的操作在zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update)函数中实现:

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    // 这里update记录了待删除节点每层的前驱节点
    int i;
    // 1. 更新每层前驱节点的span和后驱节点
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            // 这里i在待删除节点的层数范围内
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            // 这里i在待删除节点的层数范围外
            update[i]->level[i].span -= 1;
        }
    }
    // 2. 更新待删除节点后驱节点的前驱节点,必要时更新tail字段
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    // 3. 更新zsl->level和zsl->length值
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

总体来说,删除节点的思路主要是:

  • 从上层到下层遍历,需要记住新节点每层的前驱节点(即前驱的score恰好要小于新节点的score),需要遍历到最底层,定位到唯一可能的待删除的节点
  • 检查移除待删除节点,将其移除表外
    • 更新每层的后驱节点指针(待删除节点每层的前驱节点)
    • 更新底层的前驱指针(待删除节点底层的后驱节点)
  • 更新元数据
  • 释放内存