Redis源码阅读(十一) 列表对象

Redis列表对象是简单的字符串列表,按照插入顺序排序,可以添加一个元素到头部或尾部,一个列表键最多可包含2^32-1个元素。

1. 编码

Redis列表键的底层编码是QUICKLIST,而quicklist本质上就是以ziplist为结点的双端链表。

2. 命令

命令 作用
LINDEX 根据索引index获取列表key中的元素
LINSERT 在列表key指定的元素pivot的前/后插入元素
LLEN 获取列表key的长度
LPOP 从列表key中移除第一个元素
LPUSH 在列表key中的头部插入一个或多个元素
LPUSHX 插入一个元素value到已经存在的列表key
LRANGE 获取列表key指定范围start,end的元素
LREM 移除列表元素
LSET 通过索引index设置列表中的元素
LTRIM 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
RPOP 移除并获取列表最后一个元素
RPOPLPUSH 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
RPUSH 在列表中添加一个或多个值
RPUSHX 为已存在的列表添加值

3. 阻塞命令

Redis定义了阻塞状态,而每个客户端都有这样的一个状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
typedef struct blockingState {                                                                          
/* Generic fields. */
// 阻塞的时间
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */

/* BLOCKED_LIST */
// 造成阻塞的键
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
// 保存PUSH进来的元素,用于BRPOPLPUSH命令
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */

/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */

/* BLOCKED_MODULE */
void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only
handled in module.c. */
} blockingState;

typedef struct client {
// ...
// 保存着当前客户端的阻塞状态
blockingState bpop; /* blocking state */
// ...
}

typedef struct redisDb {
// 处于阻塞状态的键
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
// 收到PUSH操作时保存的键,避免重复操作
dict *ready_keys; /* Blocked keys that received a PUSH */
}
  • blocking_keys

    保存处于阻塞状态key的字典,每个key都是一个阻塞的键,而值是一个双端链表,表示被这个key所阻塞的客户端

  • bpop.keys

    bpop是每个客户端保存的阻塞状态结构,其中key表示的是造成这个客户端阻塞的键。

命令 作用
BLPOP 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BRPOP 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BRPOPLPUSH 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

4. 源码剖析

  • 阻塞客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;

// 超时时间,和目标对象
c->bpop.timeout = timeout;
c->bpop.target = target;

// 若target不为空,则增加其对象引用计数
if (target != NULL) incrRefCount(target);

// 遍历给定的keys
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dict ignore it. */

// 将要阻塞的key放入bpop.keys字典中
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;

// 增加当前key的引用计数
incrRefCount(keys[j]);

/* And in the other "side", to map keys -> clients */
// db->blocking_keys, 是一个字典,键是阻塞key,值是一个双端链表,存储着被这个key阻塞的客户端
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;

/* For every key we take a list of clients blocked for it */
// 创建新的链表
l = listCreate();
// 添加字典并存储阻塞key和这个链表
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
// 链表中添加该client
listAddNodeTail(l,c);
}

// 阻塞客户端
blockClient(c,BLOCKED_LIST);
}
  • 解除阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* c->bpop.keys存储了客户端阻塞的键
* c->db->blocking_keys存储了当前DB对象阻塞的键
* 要解除一个客户端的阻塞需要先遍历客户端阻塞的键,
* 然后在DB中找到该键,删除其中存储的阻塞客户端
*/
void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;
list *l;

serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);

// 字典迭代器,指向造成客户端阻塞的键的字段
di = dictGetIterator(c->bpop.keys);
/* The client may wait for multiple keys, so unblock it for every key. */
// 遍历阻塞键字典,因为同一个客户端可能被多个key阻塞
while((de = dictNext(di)) != NULL) {
// 获取这个阻塞key
robj *key = dictGetKey(de);

/* Remove this client from the list of clients waiting for this key. */

// 根据阻塞的key,在 block_key ==> client中查询阻塞客户端链表
l = dictFetchValue(c->db->blocking_keys,key);
serverAssertWithInfo(c,key,l != NULL);

// 查找并删除阻塞客户端链表中的客户端
listDelNode(l,listSearchKey(l,c));

/* If the list is empty we need to remove it to avoid wasting memory */
// 链表为空后释放这个映射
if (listLength(l) == 0)
dictDelete(c->db->blocking_keys,key);
}
// 释放迭代器
dictReleaseIterator(di);

/* Cleanup the client structure */
// 清除该客户端的阻塞key字典
dictEmpty(c->bpop.keys,NULL);
if (c->bpop.target) {
decrRefCount(c->bpop.target);
c->bpop.target = NULL;
}
}
  • PUSH命令底层实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;

// 获取key的对象
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

// 必须是LIST类型
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}

for (j = 2; j < c->argc; j++) {
// 若未找到该key,则创建新的quicklist, 并添加key到db
if (!lobj) {
lobj = createQuicklistObject();
// 设置ziplist最大长度和压缩程度,又配置文件指定
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
// 添加新的key对象和优化过编码的value
dbAdd(c->db,c->argv[1],lobj);
}
// push一个value
listTypePush(lobj,c->argv[j],where);
pushed++;
}
// 回复客户端
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

// 通知数据库有键修改
signalModifiedKey(c->db,c->argv[1]);

// 发送"lpush|rpush"事件通知
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
  • POP命令底层实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void popGenericCommand(client *c, int where) {
// 获取key列表对象
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
// 不存在或类型不匹配直接返回
if (o == NULL || checkType(c,o,OBJ_LIST)) return;

// pop出一个值
robj *value = listTypePop(o,where);
if (value == NULL) {
// 为空直接回复客户端
addReply(c,shared.nullbulk);
} else {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";

// 不为空回复客户端pop出的value
addReplyBulk(c,value);
decrRefCount(value);

// 发送"lpop|rpop"事件通知
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);

if (listTypeLength(o) == 0) {
// 判断key列表对象是否要删除
// 删除则发生"del"事件通知
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
// 从db对象中删除该列表对象
dbDelete(c->db,c->argv[1]);
}
// 通知数据库有key修改
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}