Redis源码阅读(十三) 集合对象

Redis的集合是Sting类型的无序集合,集合成员唯一,集合中最大的成员数为 2^32 - 1

1. 编码

集合对象的编码可以是INTSET或HT。

INTSET编码的集合对象使用整数集合作为底层实现,集合对象的所有元素都存储在整数集合中。

HT编码的集合对象使用字典作为底层实现,每个字符串对象(字典的键)包含了一个集合元素,而字典的值为NULL。

2. 编码转换

当集合对象满足下面条件时使用INTSET编码:

  1. 集合对象存储的所有元素都为整数值。
  2. 集合对象存储的元素个数不超过512.

配置文件中set-max-intset-entries可以修改元素个数的上线

当对INTSET编码的集合对象进行一些操作致使无法使用INTSET编码是,则将集合对象转换成HT编码,INTSET中保存的键值对会移动到新的字典中。

3. 命令

命令 作用
SADD 向集合添加一个或多个成员
SCARD 获取集合的成员数
SDIFF 返回给定所有集合的差集
SDIFFSTORE 返回给定所有集合的差集并存储在 dst 中
SINTER 返回给定所有集合的交集
SINTERSTORE 返回给定所有集合的交集并存储在 dst 中
SISMEMBER 判断 member 元素是否是集合 key 的成员
SMEMBERS 返回集合中的所有成员
SMOVE 将 member 元素从 src 集合移动到 dst 集合
SPOP 移除并返回集合中的一个随机元素
SRANDMEMBER 返回集合中一个或多个随机数
SREM 移除集合中一个或多个成员
SUNION 返回所有给定集合的并集
SUNIONSTORE 所有给定集合的并集存储在 dst 集合中
SSCAN 迭代集合中的元素

4. 源码剖析

  • 编码转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
setobj->encoding == OBJ_ENCODING_INTSET);

if (enc == OBJ_ENCODING_HT) {
int64_t intele;
// 创建字典对象
dict *d = dictCreate(&setDictType,NULL);
sds element;

/* Presize the dict to avoid rehashing */
// 根据集合元素个数扩容
dictExpand(d,intsetLen(setobj->ptr));

/* To add the elements we extract integers and create redis objects */
// 迭代整数集合
si = setTypeInitIterator(setobj);
while (setTypeNext(si,&element,&intele) != -1) {
// 转换值为long long
element = sdsfromlonglong(intele);
// 存储在字典中
serverAssert(dictAdd(d,element,NULL) == DICT_OK);
}
// 释放迭代器
setTypeReleaseIterator(si);

// 设置编码
setobj->encoding = OBJ_ENCODING_HT;
// 释放整数集合内存空间
zfree(setobj->ptr);
// 集合对象新的底层结构为字典
setobj->ptr = d;
} else {
serverPanic("Unsupported set conversion");
}
}
  • 添加成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* SADD key member1 [member2]
* 向集合添加一个或多个成员
*/
void saddCommand(client *c) {
robj *set;
int j, added = 0;

// 查找指定key的集合对象
set = lookupKeyWrite(c->db,c->argv[1]);

if (set == NULL) {
// 集合对象不存在,则创建一个
set = setTypeCreate(c->argv[2]->ptr);
// 添加集合对象到db
dbAdd(c->db,c->argv[1],set);
} else {
// 集合存在但类型不对,回复客户端
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}

// 变量参数列表,添加成员到集合对象中
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}

// 如果有添加的则通知数据库有key发生改变,并发送"sadd"事件通知
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}

/**
* 添加成员底层处理
*/
int setTypeAdd(robj *subject, sds value) {
long long llval;

// 根据不同编码进行处理
if (subject->encoding == OBJ_ENCODING_HT) { // HT编码

// 获取集合对象哈希表
dict *ht = subject->ptr;
// 添加字典entry
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
// 设置字典的值
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) { // INTSET编码

// 如果要添加的值可以用long long表示
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
// 添加到整数集合intset
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* Failed to get integer from object, convert to regular set. */
// 否则转换为HT编码
setTypeConvert(subject,OBJ_ENCODING_HT);

/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
// 进行value添加
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
  • 删除成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* SREM key member1 [member2]
* 移除集合中一个或多个成员
*/
void sremCommand(client *c) {
robj *set;
int j, deleted = 0, keyremoved = 0;

// 查找指定key的集合对象,并判断类型
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;

// 遍历要移除的参数列表
for (j = 2; j < c->argc; j++) {
// 从集合对象中移除成员
if (setTypeRemove(set,c->argv[j]->ptr)) {
deleted++;
// 若集合对象为空,则从db中删除该集合对象
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
keyremoved = 1;
break;
}
}
}
// 删除成员,需要通知数据库key发生变化,并发送"srem"事件
if (deleted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);

// 如果删除了集合对象,需要发送"del"事件
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}