Redis源码阅读(十四) 有序集合对象

Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员,不同的是每个元素都会关联一个double类型的分数,redis通过分数来为集合中的成员进行从小到大的排序,有序集合的成员是唯一的,但分数(score)却可以重复。集合中最大的成员数为 2^32 - 1 。

1. 编码

有序集合对象的编码可以是ZIPLIST或SKIPLIST。

ZIPLIST编码的有序集合对象使用压缩列表作为底层实现,每个元素使用两个挨在一起的结点保存,第一个保存成员member,第二个保存分数score。

SKIPLIST编码的有序集合对象使用ZSET结构作为底层实现,包括了一个跳跃表,一个字典。zsl跳跃表按照分数值从小到大保存了集合的成员,跳跃表结点object属性保存成员,score属性保存成员的分数值。dict字典保存了成员到分数的映射,每个键值对都是一个集合成员,字典的键保存成员,字典的值保存成员的分数。

1
2
3
4
5
6
typedef struct zset {
// 字典
dict *dict;
// 跳跃表
zskiplist *zsl;
} zset;

使用字典和跳跃表保存有序集合成员是为了更好的优化操作。在获取成员和分数时,使用字典可以有效减少查询时间,在进行RANK等操作时,由于字典的无序,遍历查询效果很低,所以使用跳跃表的方式可以减少查询时间。

2. 编码转换

当有序集合对象满足下面条件时使用ZIPLIST编码:

  1. 有序集合对象存储的所有元素成员长度小于64字节。
  2. 有序集合对象存储的元素个数不超过128.

配置文件中zset-max-ziplist-value和zset-max-ziplist-entries可以修改其上限

3. 命令

命令 作用
ZADD 向有序集合添加一个或多个成员,或者更新已存在成员的分数
ZCARD 获取有序集合的成员数
ZCOUNT 计算在有序集合中指定区间分数的成员数
ZINCRBY 有序集合中对指定成员的分数加上增量 increment
ZINTERSTORE 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
ZLEXCOUNT 在有序集合中计算指定字典区间内成员数量
ZRANGE 通过索引区间返回有序集合成指定区间内的成员
ZRANGEBYLEX 通过字典区间返回有序集合的成员
ZRANGEBYSCORE 通过分数返回有序集合指定区间内的成员
ZRANK 返回有序集合中指定成员的索引
ZREM 移除有序集合中的一个或多个成员
ZREMRANGEBYLEX 移除有序集合中给定的字典区间的所有成员
ZREMRANGEBYRANK 移除有序集合中给定的排名区间的所有成员
ZREMRANGEBYSCORE 移除有序集合中给定的分数区间的所有成员
ZREVRANGE 返回有序集中指定区间内的成员,通过索引,分数从高到底
ZREVRANGEBYSCORE 返回有序集中指定分数区间内的成员,分数从高到低排序
ZREVRANK 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
ZSCORE 返回有序集中,成员的分数值
ZUNIONSTORE 计算给定的一个或多个有序集的并集,并存储在新的 key 中
ZSCAN 迭代有序集合中的元素(包括元素成员和元素分值)

4. 源码剖析

  • 编码转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
sds ele;
double score;

// 不需要转换
if (zobj->encoding == encoding) return;

// ZIPLIST 转 SKIPLIST
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;

if (encoding != OBJ_ENCODING_SKIPLIST)
serverPanic("Unknown target encoding");

// 分配跳跃表空间
zs = zmalloc(sizeof(*zs));
// 创建字典
zs->dict = dictCreate(&zsetDictType,NULL);
// 创建跳跃表
zs->zsl = zslCreate();

// 首地址
eptr = ziplistIndex(zl,0);
serverAssertWithInfo(NULL,zobj,eptr != NULL);
// 分数地址
sptr = ziplistNext(zl,eptr);
serverAssertWithInfo(NULL,zobj,sptr != NULL);

// 遍历
while (eptr != NULL) {
// 获取分数
score = zzlGetScore(sptr);
serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
// 创建分数字符串
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen((char*)vstr,vlen);

// 插入分数结点
node = zslInsert(zs->zsl,score,ele);
// 添加到字典
serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
zzlNext(zl,&eptr,&sptr);
}

// 释放之前的跳跃表
zfree(zobj->ptr);
// 指向新的压缩列表
zobj->ptr = zs;
// 编码修改
zobj->encoding = OBJ_ENCODING_SKIPLIST;
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// SKIPLIST 转 ZIPLIST
unsigned char *zl = ziplistNew();

if (encoding != OBJ_ENCODING_ZIPLIST)
serverPanic("Unknown target encoding");

/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */
// 获取当前压缩列表地址
zs = zobj->ptr;
// 释放字典
dictRelease(zs->dict);
// 跳跃表头结点
node = zs->zsl->header->level[0].forward;
// 释放头结点
zfree(zs->zsl->header);
zfree(zs->zsl);

// 遍历跳跃表
while (node) {
// 插入新结点到压缩列表
zl = zzlInsertAt(zl,NULL,node->ele,node->score);
// 下个结点
next = node->level[0].forward;
// 释放跳跃表结点
zslFreeNode(node);
node = next;
}

// 释放跳跃表
zfree(zs);
// 有序接对象指向压缩列表
zobj->ptr = zl;
// 编码
zobj->encoding = OBJ_ENCODING_ZIPLIST;
} else {
serverPanic("Unknown sorted set encoding");
}
}
  • 添加成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;

/* NaN as input is an error regardless of all the other parameters. */
// 判断分数释放合法
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}

/* Update the sorted set according to its encoding. */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;

// 压缩列表中查找结点
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
// NX 表不存在则添加
if (nx) {
// 存在则直接返回
*flags |= ZADD_NOP;
return 1;
}

/* Prepare the score for the increment if needed. */
// 递增
if (incr) {
// 计算新分数
score += curscore;
// 判断分数合法
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
// 记录新分数
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changed. */
// 分数要更新
if (score != curscore) {
// 删除原有成员和分数
zobj->ptr = zzlDelete(zobj->ptr,eptr);
// 添加新的成员分数
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
// 标识更新
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
// 压缩列表中直接插入
zobj->ptr = zzlInsert(zobj->ptr,ele,score);

// 根据有序集合长度,判断是否需要转换编码
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// 获取跳跃表
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;

// 查找跳跃表中的ele
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
// 不存在才操作,存在则直接返回
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
// 获取当前分数值
curscore = *(double*)dictGetVal(de);

/* Prepare the score for the increment if needed. */
// 递增
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changes. */
// 判断分数需要修改
if (score != curscore) {
zskiplistNode *node;
// 删除原有结点
serverAssert(zslDelete(zs->zsl,curscore,ele,&node));
// 插入新的分数结点
znode = zslInsert(zs->zsl,score,node->ele);
/* We reused the node->ele SDS string, free the node now
* since zslInsert created a new one. */
// 释放之前的结点
node->ele = NULL;
zslFreeNode(node);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
// 更新结点的分数
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
// 复制成员字符串
ele = sdsdup(ele);
// 添加新结点
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
  • 删除成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int zsetDel(robj *zobj, sds ele) {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;

// 寻找结点,并删除
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// 跳跃表
zset *zs = zobj->ptr;
dictEntry *de;
double score;

// 软删除结点
de = dictUnlink(zs->dict,ele);
if (de != NULL) {
/* Get the score in order to delete from the skiplist later. */
// 获取分数
score = *(double*)dictGetVal(de);

/* Delete from the hash table and later from the skiplist.
* Note that the order is important: deleting from the skiplist
* actually releases the SDS string representing the element,
* which is shared between the skiplist and the hash table, so
* we need to delete from the skiplist as the final step. */
dictFreeUnlinkedEntry(zs->dict,de);

/* Delete from skiplist. */
int retval = zslDelete(zs->zsl,score,ele,NULL);
serverAssert(retval);

if (htNeedsResize(zs->dict)) dictResize(zs->dict);
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* No such element found. */
}
  • ZADD命令底层实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */

/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
// 解析命令参数选项
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}

/* Turn options into simple to check vars. */
// 设置flag对应的选项
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;

/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
// 计算元素个数
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
// 回复客户端错误
addReply(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */

/* Check for incompatible options. */
// 不合法的选项参数,回复客户端错误信息
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}

if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}

/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
// 存储所有成员分数
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}

/* Lookup the key and create the sorted set if does not exist. */
// db中查询有序集合对象
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
// xx标志(存在则继续),直接回复客户端
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */

// 根据配置参数判断有序集合底层编码
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
// SKIPLIST
zobj = createZsetObject();
} else {
// ZIPLIST
zobj = createZsetZiplistObject();
}
// db中添加有序集合对象
dbAdd(c->db,key,zobj);
} else {
// 找到类型错误,则回复客户端
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}


// 遍历所有成员
for (j = 0; j < elements; j++) {
double newscore;
// 获取分数
score = scores[j];
int retflags = flags;

// 获取成员字符串
ele = c->argv[scoreidx+1+j*2]->ptr;
// 添加成员和分数到有序集合
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
// 根据不同标识,计算不同操作的数量
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);

// 根据不同命令直接回复客户端
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}

// 清除分数数组
cleanup:
zfree(scores);
// 有添加或更新的则发送数据库有键发生修改
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}