Redis源码阅读(四) 跳跃表

跳跃表是一种有序链表,通过维持多个指针指向不同结点用来进行快速访问,它支持平均O(logN),最坏O(N)的时间复杂度,在Redis中它是实现有序集合ZSET的底层实现之一。

1. 定义

Redis中,跳跃表实现如下图

skiplist

这个跳跃表的实现和 William Pugh的《Skip Lists: A Probabilistic Alternative to Balanced Trees》 中描述的基本相同,在下面3个方面做了修改:

​ a) 允许重复的分数

​ b) 比较的时候要考虑score和ele本身

​ c) 每个结点有一个回退指针,方便从尾部向头部遍历

1.1 跳跃表结点

  • ele 结点key,使用sds字符串存储,该值必须唯一。在字典中也有该ele结点(用于ZSETs使用)

  • score 存储结点分数值,可重复(此时通过ele排序),一般用来作为排序的参考,但有特殊情况会使用ele排序

  • backward 回退指针,当前结点的前驱结点,在从尾部遍历时非常方便

  • level

    跳跃表实现的关键,每个元素包含多个level,每个level都可以指向其后方不同的结点,在搜索结点时通过自顶向下的方式搜索,可以大大节约搜索时间。

    每次插入新结点时,通过幂次定律计算其要插入的level,以保证越高level的结点数量越少。

    • forward 当前level指向的后继结点
    • span 当前level后继结点与该结点所跨域结点的数量,计算排序时相当有用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zskiplistNode {
// 结点key,使用sds存储
sds ele;
// 结点分数值,一般使用该字段排序
double score;
// 回退指针,当前结点的前驱结点
struct zskiplistNode *backward;
// 层级level,每个结点都有多层,用于保存跳跃信息
struct zskiplistLevel {
// 该结点当前层级的前进指针,指向同级的下一个结点
struct zskiplistNode *forward;
// 跨度,记录同级下个结点与当前结点的距离
unsigned int span;
} level[];
} zskiplistNode;

1.2 跳跃表

1
2
3
4
5
6
7
8
typedef struct zskiplist {
// 跳跃表的头结点和尾结点
struct zskiplistNode *header, *tail;
// 跳跃表中结点的数量
unsigned long length;
// 跳跃表中结点最大的层数
int level;
} zskiplist;

2. 幂次定律

Redis中,使用返回随机level,该随机算法使用幂次定律。

1
2
3
4
5
6
7
8
9
10
11
12
/**                                                                                                     
* 放回一个跳跃表随机level,用作新结点插入
* 随机算法使用幂次定律,越大的值生成几率越小
*/
int zslRandomLevel(void) {
int level = 1;
// 每次产生的随机数小于 0.25 * 0XFFFF则level加1
// 所以越大的level概率越小
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

跳跃表的作用是为了替换平衡二叉树,跳跃表结构清晰简单、易于实现,利用随机化的性质让越高level的结点越少,其效率是极高的,平衡二叉树的维持平衡则需要进行大量的操作。

3. 源码剖析

跳跃表利用较小的空间代价,仅一些指针,提高了查找的效率。

  • 查找结点的rank

    从最高层开始遍历结点的level,通过先右后下的方式查找对应的结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**                                                                                                 
* 查找score和ele在跳跃表中的排位
*/
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

// 从高到低遍历跳跃表
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 结点的forward结点小于查询结点,则继续前进
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {

// rank值即为span相加,经过几个结点span就为多少
// 所以经过结点数相加就可得到该结点的排名
rank += x->level[i].span;
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
  • 创建一个新的跳跃表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
zskiplist *zslCreate(void) {                                                                        
int j;
zskiplist *zsl;

// 分配内存空间
zsl = zmalloc(sizeof(*zsl));

zsl->level = 1;
zsl->length = 0;

// 跳跃表头结点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);

// 头结点每层前进指针和跨度赋值
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}

// 回退指针
zsl->header->backward = NULL;

// 跳跃表尾结点
zsl->tail = NULL;
return zsl;
}
  • 跳跃表中插入结点

    找到要插入结点的位置,并且记录所用需要修改level指针的结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
// 头结点
x = zsl->header;

// 从最高层开始查找
for (i = zsl->level-1; i >= 0; i--) {

/* store rank that is crossed to reach the insert position */
// rank[i]用来记录第i层达到插入位置所跨域结点数,
// rank[i]初始化为上一层所跨域结点总数
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

// 通过每层前进指针遍历跳跃表
// 比较score和element找到需要更新的结点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
// 记录第i层该插入结点位置所跨域的结点数
rank[i] += x->level[i].span;

// 前进一个结点
x = x->level[i].forward;
}

// 记录需要更新的每层结点
update[i] = x;
}

/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
// 我们假定要插入的元素并不存在,需要调用者自己检查元素是否已经存在

// 随机层数
level = zslRandomLevel();

// 如果新level比所有结点层数都大
if (level > zsl->level) {
// 初始化未使用层,并记录到update数组中
for (i = zsl->level; i < level; i++) {
// 未使用level的rank为0
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
// 跳跃表最大level
zsl->level = level;
}

// 创建新结点
x = zslCreateNode(level,score,ele);

// 将update中记录的指针指向新结点
for (i = 0; i < level; i++) {

// 设置新结点的forward指针
x->level[i].forward = update[i]->level[i].forward;

// update记录中的结点forward指向新结点
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
// 计算新结点每层跨域结点数
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

// 插入新结点后,计算新的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
// 没有访问的level也需要增加跨度
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

// 新结点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
  • 删除结点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**                                                                                                 
* 删除跳跃表一个结点,供内部使用
* 更新跳跃表删除结点与其他结点间的关系, 修改forward,span,backward等等
*/
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 更新每层与删除结点相关的forward和span
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
// 当前层有该结点,计算新的跨度
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
// 当前层没有该结点,跨度减1
update[i]->level[i].span -= 1;
}
}

// 确保删除结点后,回退指针不会出错
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}

// 当删除结点在最高层,且只有一个元素时,level--
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}