Redis源码阅读(三) 字典

字典,又称符号表、关联数组、映射(map)是一种保存K/V队的抽象数据结构。字典中每个key都是唯一的,而且能在常数时间O(1)获取key对应的值。

1. 定义

  • 哈希表结点
    • key 保存键值对中的键
    • v 保存键值对中的值,类型可以为void *, uint64_t等,使用union节省内存
    • next 保存下一个哈希表结点,解决冲突时产生一个链表
1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {                                                                             
void *key; // 键
union { // 值,可以是void *,uint64_t,int64_t,double,使用union节省内存
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; // 下一个结点的指针,链地址法解决冲突
} dictEntry;
  • 字典类型

    一组操作特定类型键值对的函数,Redis会为用途不同的字典设置不同类型,类似于多态的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct dictType {                                                                              
// 计算哈希值
uint64_t (*hashFunction)(const void *key);
// 复制key
void *(*keyDup)(void *privdata, const void *key);
// 复制val
void *(*valDup)(void *privdata, const void *obj);
// 比较key
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁key
void (*keyDestructor)(void *privdata, void *key);
// 销毁val
void (*valDestructor)(void *privdata, void *obj);
} dictType;
  • 哈希表
    • table 哈希结点数组,类似于桶的概念,每个索引都是一个链表(存在NULL表示没有计算出该索引)
    • size 哈希表能够存储结点的大小容量,包括已使用和未使用
    • sizemask 哈希表大小掩码,永远为size-1,用于计算哈希值的索引位置(存放在table中的下标)
    • used 哈希表已经存储的结点个数
1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct dictht {                                                                             
// 哈希表数组
dictEntry **table;

// 哈希表大小
unsigned long size;

// 哈希表大小掩码,用于计算索引值,sizemask = size - 1
unsigned long sizemask;

// 哈希表已经使用的大小
unsigned long used;
} dictht;
  • Redis字典
    • type 定义了不同字典类型,拥有一组该特定类型的操作函数
    • privdata 保存了需要传递给特定类型函数回调的可选参数
    • ht[2] 每个字典拥有两张哈希表,用于渐进式rehash替换,一般使用ht[0],rehash过程中才使用ht[1]
    • rehashidx 记录目前rehash的进度,-1表示没有进行rehash
    • iterators 目前在运行安全迭代器数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct dict {                                                                               
// 字典类型,定义了特定的操作函数
dictType *type;

// 私有数据,保存了需要传给特定类型函数的可选参数
void *privdata;

// 两张哈希表, 用于渐进式rehash替换,一般使用ht[0],rehash情况下才会使用ht[1]
dictht ht[2];

// 渐进式rehash索引,-1表示没有进行rehash
long rehashidx; /* rehashing not in progress if rehashidx == -1 */

// 目前正在运行的安全迭代器数量
unsigned long iterators; /* number of iterators currently running */
} dict;
  • 字典迭代器

    不安全迭代器在操作前后会生成指纹,比对两次指纹是否相同说明迭代器是否进行非法操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*                                                                                                  
* 字典迭代器
* if safe == 1,那么迭代过程中科院使用dictAdd,dictFind和其他函数,对字典进行修改。
* if safe != 1,那么制定调用dictNext函数,不能对字典进行修改。
*/
typedef struct dictIterator {
// 迭代器的字典
dict *d;

// 迭代器当前哈希结点的索引值
long index;

// table为哈希表索引0或1,safe表示这个迭代器是否安全
int table, safe;

// entry: 当前迭代器指向的结点,
// nextEntry: 当前迭代器结点的下一个结点
// 由于安全迭代器执行时,entry所指向的结点可能会被修改,
// 所以需要一个额外的指针保存下一个结点位置,防止指针丢失
dictEntry *entry, *nextEntry;

/* unsafe iterator fingerprint for misuse detection. */
// 不安全的迭代器会生成指纹,当两次指纹不相同时说明迭代器执行了非法的操作
long long fingerprint;
} dictIterator;

2. 哈希算法

2.1 哈希函数

Jean-Philippe Aumasson和Daniel J. Bernstein能够证明,即使使用随机化种子实施Murmur哈希也容易受到所谓的HashDoS攻击。 通过使用差分密码分析,他们能够生成会导致哈希碰撞的输入。 攻击的作者建议使用他们自己的SipHash代替。

具体有兴趣可以查资料深入研究。

  • MurmurHash

    算法优点在于即使输入键有规律,也能给出一个很好的随机分布,计算速度快

  • SipHash

    为了解决 hash flodding 问题而产生的算法,让输出随机化。

2.2 解决冲突

通过构造性能良好的哈希函数可以减少冲突的产生,但不可能完全避免,常用的冲突解决有以下几种

  • 开放定址法

    该方法也称之为再散列法,当key的哈希值p产生冲突时,继续对p进行哈希产生p1,若哈希值p1又产生冲突,则进行哈希,这样直到没有冲突产生为止。

  • 再哈希法

    存在多个哈希函数,若H1产生冲突,则使用H2进行哈希,这样直到没有冲突

  • 链地址法

    将哈希值相同的元素通过链表的形式关联起来。

对于开放定址法和再哈希法都需要较大的存储空间,而且会存在相当多的空桶问题,导致内存严重浪费,Redis采用链地址法的方式解决冲突,可以方便的修改结点的指针,成本开销很低。

3. 渐进式rehash

当哈希表中的key越来越多后,可能会导致某些桶内元素过多(负载过高),这样操作的速度就会明显下降;当哈希表中的key越来越少,会导致空桶过多(负载过低),浪费内存。为了保证负载因子在合理范围内,当哈希表中的键过多或过少时,Redis会对哈希表进行扩展或收缩操作。

当 (哈希表使用数量/哈希表大小)dict_force_resize_ratio > 5 时,会触发redis强制进行rehash

Redis工作在单线程模式下,一般用做缓存,若在key的数量相当大的情况下触发rehash,那执行时间可想而知,为了解决这个问题,Redis采用渐进式rehash的方式处理,开启rehash后,每次执行某些操作,就会触发小步rehash,这样就平衡了等待时间,以较低的成本完成了Redis的扩容或收缩。

以下是rehash的步骤:

  1. 为哈希表ht[1]分配内存空间.
    • 执行扩展操作,大小为大于等于ht[0].used*2的第一个2^n。
    • 执行收缩操作,大小为大于等于ht[0].used(不能小于4)的第一个2^n。
  1. 将ht[0]中所有的键值对rehash到ht[1]上。
    • 每次执行rehash会处理n个桶的数据,有些桶没有任何数据会跳过,但若跳过数量太多,也会导致单步时间过长,所以每次空桶访问的个数不能大于n*10。
    • 遍历一个桶中的所有数据,将ht[0]中的数据更新到ht[1]中,重新计算哈希值,计算桶的索引值,放到ht[1]对应的桶中,使用头插入的方法插入到链表
  1. 当ht[0]中所有键值对都迁移到ht[1]之后,释放ht[0]的内存空间,将ht[1]设置为ht[0],ht[1]初始化为空哈希表,为下一次rehash做准备。

渐进式rehash期间,Redis字典会同时使用ht[0]和ht[1],所以对字典的查找、删除、更新等操作会先在ht[0]中进行,再在ht[1]中进行。

4. 字典扫描

字典扫描用于SCAN,HSCAN,SSCAN命令,若Redis的哈希表不做修改,那么遍历元素就比较简单,多次调用即可,但由于Redis可能正在进行rehash,导致遍历可能存在重复或纰漏。

Redis使用了一种算法,由Pieter Noordhuis设计,我们可以叫他反向二进制进位迭代法,该方法主要思想是利用模相关按照反向二进制进位的顺序迭代哈希表中的每个桶,算法保证了不会漏掉元素的同时尽可能少的重复。

例如:

1
2
3
4
5
6
7
8
9
哈希表长度分别为8和16的迭代顺序如下:
8: 0 --> 4 --> 2 --> 6 --> 1 --> 5 --> 3 --> 7
16: 0 --> 8 --> 4 --> 12 --> 2 --> 10 --> 6 --> 14 --> 1
--> 9 --> 5 --> 13 --> 3 --> 11 --> 7 --> 15

二进制bit表示如下:
8: 000 --> 100 --> 010 --> 110 --> 001 --> 101 --> 011 --> 111 --> 000
16: 0000 --> 1000 --> 0100 --> 1100 --> 0010 --> 1010 --> 0110 --> 1110 --> 0001
--> 1001 --> 0101 --> 1101 --> 0011 --> 1011 --> 0111 --> 1111 --> 0000

假设某结点哈希值为11(0b1011):

​ 哈希表长度为 8,sizemask = 7(0b111),结点计算出索引值为11 & 7 = 3(0b011)

​ 哈希表长度为16,sizemask = 15(0b1111),结点计算出索引值为 11 & 15 = 11(0b1011)

由于哈希值不变,所以当哈希表长度变化时,索引值的后几比特位也不会变化,利用该性质可以得出以下结论

  • 扩展情况:

    哈希表长度8,当迭代完1号桶(001)时,发生扩容(长度16),新cursor为1(0001),此时还未迭代的桶分别是5(101)、3(011)、7(111),这些桶内结点产生的新索引为5(0101)、13(1101)、3(0011)、11(1011)、7(0111)、15(1111),从上表可知新的索引都在cursor(0001)之后,保证所有元素都会被迭代到,但存在5号桶的重复迭代。

  • 收缩情况:

    哈希表长度16,当迭代完cursor(0001)之后,发生收缩(长度8),新cursor为(001),此时还未迭代的桶分别是9(1001)、5(0101)、13(1101)、3(0011)、11(1011)、7(0111)、15(1111),这些桶内结点产生的新索引为1(001)、5(101)、3(011)、7(111),从上表可知新的索引都在cursor(001)之后,这样保证所有元素都会被迭代到,但存在1号桶部分元素存放迭代。

5. 源码剖析

  • N步rehash
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 执行N步渐进式rehash, 返回1表示rehash未完成,0表示已完成
*
* rehash每一步都操作的是一个哈希索引(桶), 每个桶中可能会有多个结点
* 被rehash的桶中的所有结点会移动到新的哈希表.
*
* 由于哈希表中存在多个空桶(即没有元素),为了防止该函数执行时间过长,
* 每次rehash最多能够遇到n*10的空桶
*/
int dictRehash(dict *d, int n) {
// 最大空桶访问数
int empty_visits = n*10; /* Max number of empty buckets to visit. */
// 不能在非rehash时调用
if (!dictIsRehashing(d)) return 0;

// n步迁移
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 确保rehashidx不能超过哈希表的大小
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// 访问空桶数量不能过多
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 当前需要rehash的桶
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 移动该桶中所有结点到新的哈希表中(ht[1])
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
// 计算结点新的hash值
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 将新结点插入到链表头部
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;

// 哈希表计数
d->ht[0].used--;
d->ht[1].used++;

// 下一个处理结点
de = nextde;
}
// 迁移完成的哈希表索引指针为空
d->ht[0].table[d->rehashidx] = NULL;
// 更新rehash索引
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
// 释放ht[0]哈希表空间,只释放指针
zfree(d->ht[0].table);
// 替换ht[0] 为 ht[1]
d->ht[0] = d->ht[1];
// 重置ht[1]属性
_dictReset(&d->ht[1]);
// rehash索引初始化
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}
  • 字典扫描
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
unsigned long dictScan(dict *d,
unsigned long v,
dictScanFunction *fn,
dictScanBucketFunction* bucketfn,
void *privdata)
{
dictht *t0, *t1;
const dictEntry *de, *next;
unsigned long m0, m1;

// 空字典不用scan
if (dictSize(d) == 0) return 0;

// 没有进行rehash
if (!dictIsRehashing(d)) {

// 获取哈希表和sizemask
t0 = &(d->ht[0]);
m0 = t0->sizemask;

/* Emit entries at cursor */
// 桶回调函数
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);

// 哈希表某个桶
de = t0->table[v & m0];

// 遍历该桶的链表, 每个元素进行回调
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}

/* Set unmasked bits so incrementing the reversed cursor
* operates on the masked bits */
v |= ~m0; // 保留v的低n位数,其余置为1

// 二进制反转相加
/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);

} else { // rehash状态

// 两张哈希表
t0 = &d->ht[0];
t1 = &d->ht[1];

/* Make sure t0 is the smaller and t1 is the bigger table */
// 确保t0长度小于t1
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}

// 长度掩码
m0 = t0->sizemask;
m1 = t1->sizemask;

/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}

/* Iterate over indices in larger table that are the expansion
* of the index pointed to by the cursor in the smaller table */
do {
/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
de = t1->table[v & m1];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}

/* Increment the reverse cursor not covered by the smaller mask.*/
v |= ~m1;
v = rev(v);
v++;
v = rev(v);

/* Continue while bits covered by mask difference is non-zero */
// 直到v的低m1-m0位到m1位之前全部为0
} while (v & (m0 ^ m1));
}

return v;
}
  • 迭代器查找
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 返回迭代器当前结点,并保存下一结点位置
*/
dictEntry *dictNext(dictIterator *iter)
{
while (1) {
// 迭代器索引结点为NULL(第一次迭代)
if (iter->entry == NULL) {

// 获取字典哈希表
dictht *ht = &iter->d->ht[iter->table];

// 第一次迭代并且哈希表为ht[0]
if (iter->index == -1 && iter->table == 0) {

// 迭代器安全,则字典安全迭代器数量+1
if (iter->safe)
iter->d->iterators++;
// 迭代器不安全,则计算指纹
else
iter->fingerprint = dictFingerprint(iter->d);
}

// 迭代器索引+1
iter->index++;

// 迭代器索引>=哈希表大小,表示哈希表迭代完毕
if (iter->index >= (long) ht->size) {
// 字典处于rehash状态,需要对ht[1]进行迭代
if (dictIsRehashing(iter->d) && iter->table == 0) {
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
break;
}
}

// 更新迭代器指向的结点
iter->entry = ht->table[iter->index];
} else {

// 迭代器结点指针指向下一个结点
iter->entry = iter->nextEntry;
}

// 迭代器结点不为空,记录该结点的next
if (iter->entry) {
/* We need to save the 'next' here, the iterator user
* may delete the entry we are returning. */
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}