跳表是一个有序列表,每个节点会维护多个指针指向其它节点,以加速访问。
它查找效率为:平均$O(logN)$,最坏$O(N)$。由于实现比较简单,它常用来替换平衡树(如红黑树)
Redis里,跳表被用于有序集合。
关于跳表的定义和API实现在文件server.h
和t_zset.c
中。
1. 跳表定义
首先是跳表的节点,定义为zskiplistNode
,有下面几个重要信息:
level
数组:代表层,层数是由随机数指定(1~32层)- 每一层指向含有对应层的下一个节点(
forward
指针) - 此外还有跨度(
span
),代表该层中,自己和对应下一节点的距离
- 每一层指向含有对应层的下一个节点(
backward
后驱节点,即第一层的前一个节点score
:排序依据,节点按该字段从小到大排序ele
:节点元素保存的值,它是SDS字符串(当score
一样时,会按ele
字典序排序)
typedef struct zskiplistNode {
sds ele; // 元素的值
double score; // 分值,从小到大排列
struct zskiplistNode *backward; // 后驱节点
struct zskiplistLevel {
struct zskiplistNode *forward; // 对应层的后驱节点
unsigned long span; // 跨度,记录自己和后驱节点的距离
} level[]; // 跳表层数组
} zskiplistNode;
然后是跳跃表的定义zskiplist
,主要包含:
- 跳表的头和尾
- 跳表长度
- 层数最大的节点的层数
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
图示可参考下面:
2. 跳表创建
主要函数是zslCreate()
(在t_zset.c
中实现),这里注意,它创建了一个dummy header节点:
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 1. 开辟空间
zsl = zmalloc(sizeof(*zsl));
// 2. 初始化参数
zsl->level = 1;
zsl->length = 0;
// 3. 创建一个dummy head node
// 它的层数是64,score是0, ele为空
// ZSKIPLIST_MAXLEVEL = 64
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 4. 把dummy header每一层以及其它字段清空
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
3. 跳表元素插入
主要实现是zslInsert(zskiplist *zsl, double score, sds ele)
函数,逻辑如下面代码和注释所述:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update数组存储新节点每层的前驱节点,x就是新节点
// (实际上只会用max(zsl->level, level)个元素)
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// rank数组存储新节点每次的前驱节点的位置(相对于header)
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
// 1. x从header开始遍历
x = zsl->header;
// 2. 先从最上层
for (i = zsl->level-1; i >= 0; i--) {
// 最上层初始为0,之后根据上一层的位置初始化
// 因为每往下走一层,初始位置(前后的)是上一层遍历最后位置
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 3. 该层上
// 遍历到的当前节点的后继节点score比自己小,则前进
// 否则就停止遍历,当前遍历的位置就是该层上新节点的前驱节点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
// 记录(该层前驱节点)的位置
rank[i] += x->level[i].span;
// 向后遍历
x = x->level[i].forward;
}
// 这里x就是newNode->level[i]->backward(虽然没有backword字段)
update[i] = x;
}
// 4. 随机算出一个新结点的层数
level = zslRandomLevel();
// 5. 当新节点的层数比其它节点的层数都大时
// 则需要初始化[zsl->level, level)的rank和update
// 并更新zsl->level
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
// rank是0,因为前驱是header
rank[i] = 0;
update[i] = zsl->header;
// 这里就是header->level[i].span
// 值后面会被更新,先初始化为为原表长度(指向队尾)
update[i]->level[i].span = zsl->length;
}
// 更新zsl->level
zsl->level = level;
}
// 6. 创建新节点
x = zslCreateNode(level,score,ele);
// 7. 连接每一层的后驱节点
for (i = 0; i < level; i++) {
// 连接每一层新节点的后驱节点
x->level[i].forward = update[i]->level[i].forward;
// 修改每一层新节点前驱节点的后驱节点
update[i]->level[i].forward = x;
// 修改每一层新节点的span
// 说明: 插入新节点后update[i]->level[i].span+1是新节点该层的前驱节点和该层的后驱节点的距离
// 然后需要减去新节点和前驱节点的距离,即rank[0]-rank[i]+1(后面会说)
// 这样最后就是下面的计算公式了
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// 修改每一层新节点前驱节点的span
// 说明: 最底层的前驱节点位置和自己的位置做差,再加上1,易知,新节点和最底层前驱节点的距离就是1
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 8,若level < zsl->level时
// 上面必有几层指针不用动
// 所以需要该层前驱的span + 1,因为(以第一层看)中间加了节点,距离变远了
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 9. 连接新节点的前驱节点,若前驱是header,需要设为NULL
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 10. 连接新节点后驱节点的前驱节点,必要时更新zsl->tail字段
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
// 11. 改变长度
zsl->length++;
return x;
}
总体来说还是略微复杂,总体来说:
- 从上层到下层遍历,需要记住新节点每层的前驱节点(即前驱的
score
恰好要小于新节点的score
),需要遍历到最底层 - 创建新节点,层数随机
- 更新每层的后驱节点指针(新节点和它每层的前驱节点)和底层的前驱指针(新节点和底层的后驱节点)
- 更新其它元数据
层数随机由zslRandomLevel()
函数决定,层数每+1层的概率是25%,最大64层:
// ZSKIPLIST_P = 0.25
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
4. 跳表元素查询
查询的流程就和插入大同小异了,特别是遍历的方式:从最上层往下遍历,一层一层逼近目标节点,需要遍历到最后一层。
以zslFirstInRange(zskiplist *zsl, zrangespec *range)
函数为例,下面是代码和说明注释:
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
// 1. 检查整张表有没有节点全不在range内
if (!zslIsInRange(zsl,range)) return NULL;
// 2. 从header开始,从顶层遍历到底层
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 3. 每一层时,当后继节点的score比range下限小,则往前遍历
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
// 4. 此时x的后继节点(肯定在最底层)score必定大于/不小于range下限
// 后继节点不可能为null,因为第1步检查过了
// 这样x就再往前1步,就是我们要的结果
x = x->level[0].forward;
serverAssert(x != NULL);
// 5. 最后检查x的score是否超过range上限
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}
5. 跳表元素删除
删除依旧和插入类似,在zslDelete(zskiplist *zsl, double score, zskiplistNode **node)
中,实现很像,尤其是遍历定位的操作。
具体看下面代码和注释解释:
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
// 1. 依旧和插入一样,找到每层的(可能)待删除节点的前驱节点
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
// 2. 只有它可能被删除,因为只有它可能满足:
// a) 它的score >= 给定的score且最小
// b) 它的ele字典序 >= 给定的ele字典序且最小
x = x->level[0].forward;
// 3. 判断非空, score和ele相等,若相等则删除
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
// a) 这里将节点移出列表
zslDeleteNode(zsl, x, update);
if (!node)
// b) 释放内存
zslFreeNode(x);
else
// 这里不会进入, Redis唯一用到的地方node == NULL
*node = x;
return 1;
}
return 0; /* not found */
}
而移出节点的操作在zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update)
函数中实现:
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
// 这里update记录了待删除节点每层的前驱节点
int i;
// 1. 更新每层前驱节点的span和后驱节点
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
// 这里i在待删除节点的层数范围内
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
// 这里i在待删除节点的层数范围外
update[i]->level[i].span -= 1;
}
}
// 2. 更新待删除节点后驱节点的前驱节点,必要时更新tail字段
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
// 3. 更新zsl->level和zsl->length值
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
总体来说,删除节点的思路主要是:
- 从上层到下层遍历,需要记住新节点每层的前驱节点(即前驱的
score
恰好要小于新节点的score
),需要遍历到最底层,定位到唯一可能的待删除的节点 - 检查移除待删除节点,将其移除表外
- 更新每层的后驱节点指针(待删除节点每层的前驱节点)
- 更新底层的前驱指针(待删除节点底层的后驱节点)
- 更新元数据
- 释放内存