Redis源码阅读(二十三) 集群

Redis集群是Redis的分布式实现,具有高性能和线性可扩展性,最多可拥有1000个节点;具有可接受的写入安全程度,系统尽力保留与大多主节点连接的客户端的所有写入;同时保持可用性,集群能够在大多数主节点可用区中存活。集群通过分片的方式进行数据共享,并提供复制和故障转移功能。

1. 集群节点

Redis集群通常包含多个节点,但最开始时,每个节点都是独立的集群,要将它们连接在一起需要执行CLUSTER MEET <ip> <port>命令,让指定的节点与之握手,一旦握手成功,就会将该节点加入当前节点所在集群中。

1.1 集群数据结构

clusterNode结构保存节点当前状态,每个节点都会使用该结构为自己创建一个状态,并且也会为集群中其他节点创建相应的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
typedef struct clusterNode {
// 节点创建时间
mstime_t ctime; /* Node object creation time. */
// 节点名称
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
// 状态标识
int flags; /* CLUSTER_NODE_... */
// 最新的配置纪元
uint64_t configEpoch; /* Last configEpoch observed for this node */
// 节点的槽位
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
// 当前节点复制槽的数量
int numslots; /* Number of slots handled by this node */
// 从节点数量
int numslaves; /* Number of slave nodes, if this is a master */
// 从节点指针数组
struct clusterNode **slaves; /* pointers to slave nodes */
// 指向主节点,可能为NULL
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
// 最新发送ping的时间
mstime_t ping_sent; /* Unix time we sent latest ping */
// 最新接收pong的时间
mstime_t pong_received; /* Unix time we received the pong */
// 设置为FAIL的时间
mstime_t fail_time; /* Unix time when FAIL flag was set */
// 最新投票的时间
mstime_t voted_time; /* Last time we voted for a slave of this master */
// 复制偏移量的时间
mstime_t repl_offset_time; /* Unix time we received offset for this node */
// 孤立的主节点迁移的时间
mstime_t orphaned_time; /* Starting time of orphaned master condition */
// 该节点已知的复制偏移量
long long repl_offset; /* Last known repl offset for this node. */
// 该节点已知的最新IP地址
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
// 最新的客户端端口
int port; /* Latest known clients port of this node */
// 最新的集群端口
int cport; /* Latest known cluster port of this node. */
// 与该节点关联的TCP/IP连接
clusterLink *link; /* TCP/IP link with this node */
// 保存下线报告的链表
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;

clusterLink结构是存储连接该节点所需的有关信息状态

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct clusterLink {
// 连接创建的时间
mstime_t ctime; /* Link creation time */
// TCP socket连接的文件描述符
int fd; /* TCP socket file descriptor */
// 输出缓冲区
sds sndbuf; /* Packet send buffer */
// 输入缓冲区
sds rcvbuf; /* Packet reception buffer */
// 关联该连接的节点
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;

redisClient和clusterLink都有自己的socket、文件描述符和输入输出缓冲区,不同的是redisClient用于保存客户端的连接,而clusterLink用于保存节点间的连接

每个节点都有一个clusterState结构,该结构记录了当前纪元集群的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
typedef struct clusterState {
// 节点自身
clusterNode *myself; /* This node */
// 节点当前纪元
uint64_t currentEpoch;
// 集群状态
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
// 集群中至少负责一个槽的主节点数
int size; /* Num of master nodes with at least one slot */
// 集群节点字典,键是节点名称,值是节点结构
dict *nodes; /* Hash table of name -> clusterNode structures */
// 防止重复添加节点的黑名单
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
uint64_t slots_keys_count[CLUSTER_SLOTS];
rax *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
// 之前或下次选举时间
mstime_t failover_auth_time; /* Time of previous or next election. */
// 节点获得投票数
int failover_auth_count; /* Number of votes received so far. */
// 节点已经发送投票请求
int failover_auth_sent; /* True if we already asked for votes. */
// 节点当前排名
int failover_auth_rank; /* This slave rank for current auth request. */
// 选举的当前纪元
uint64_t failover_auth_epoch; /* Epoch of the current election. */
// 从节点不能执行故障转移的原因
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
// 手动故障转移的时间限制,为0表示没有正在执行的手动故障转移
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
// 手动故障转移的状态
// 执行手动故障转移的从节点
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
// 从节点记录手动故障转移的主节点偏移量
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 非零表示可以开始手动故障转移
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are used by masters to take state on elections. */
// 集群最后一次投票的纪元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
} clusterState;

1.2 CLUSTER MEET命令

收到命令的节点A将于指定地址端口的节点B进行握手:

  1. 节点A为节点B创建clusterNode结构,保存B的状态信息,并添加到自己的clusterState.nodes
  2. 节点A向节点B发送MEET消息
  3. 节点B接收到A的MEET消息,会为A创建clusterNode结构,同样保存到自己的clusterState.nodes
  4. 节点B返回PONG消息给节点A
  5. 节点A接收到PONG消息,表示与B节点MEET成功
  6. 节点A发送PING给节点B
  7. 节点B收到节点A的PING消息,表示节点A已经收到了自己的PONG

2. 散列槽指派

Redis集群通过分片的方式保存键值对,集群中整个数据库被分为16384个slot,通过HASH_SLOT = CRC16(key) mod 16384的方式确定某个键存储在哪个散列槽中。

1
2
3
4
5
6
7
8
typedef struct clusterNode {
// ...
// 节点的槽位
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
// 当前节点复制槽的数量
int numslots; /* Number of slots handled by this node */
// ...
} clusterNode;

slots是一个二进制位数组,长度为16384/8=2048个字节,包含16384个二进制位,刚好对应每个散列槽。如果节点的slots数组的第i位为1,表示该节点负责这个散列槽数据的处理。numslots表示当前节点处理散列槽的数量。

每个节点除了负责字节要处理的散列槽之外,还需要将自己slots数组告知集群的其他节点,当节点接收到来自其他节点的信息后,会更新自己保存的状态数据。

1
2
3
4
5
typedef struct clusterState {
// ...
clusterNode *slots[CLUSTER_SLOTS];
// ...
} clusterState;

clusterState结构中slots记录了集群所有散列槽的指派信息,slots数组包含16384个项,每个都是指向集群节点的指针。如果slots[i]为NULL,表示该散列槽尚未指派给任何节点,否则指向处理给散列槽的节点。

3. 重定向和重新分片

3.1 MOVED错误

在对数据的16384个散列槽都进行了指派后,集群就会进入上线状态,这时Redis客户端可以自由的向集群中每个节点发送查询,当命令与数据库键有关时,节点会计算该键属于哪个散列槽,并检查该散列槽是否指派给自己,如果该散列槽并未指派给自己,那么节点会返回一个MOVED错误,告知客户端重定向到正确的节点。

MOVED错误的格式为MOVED <slot> <ip>:<port>,当客户端接收到节点返回的MOVED错误信息,会根据其提供的IP和端口号访问该节点,重新执行之前的命令。

3.2 重新分片

Redis集群的重新分片可以修改散列槽的指派,并且对应槽的键也会修改到相应节点。该操作可以在线进行,重新分片过程中,集群不需要下线,而且还可以继续处理请求。

Redis集群重新分片由redis-trib负责执行,它对集群某单个散列槽重新分片步骤如下:

  1. redis-trib向目标节点发送CLUSTER SETSLOT <slot> IMPORTING <source_id>命令,让目标节点准备好从源节点导入指定散列槽的键值对。
  2. redis-trib向源节点发送CLUSTER SETSLOT <slot> MIGRATING <target_id>命令,让源节点准备好将指定散列槽的键值对迁移至目标节点
  3. redis-trib向源节点发送CLUSTER GETKEYSINLSOT <slot> <count>命令,获得最多count个属于slot的键值对的键
  4. 对于3获得的每个key,redis-trib向源节点发送MIGRATE <target_ip> <target_port> <key_name> 0 <timeout>,将键原子的从源节点迁移到目标节点
  5. 重复执行步骤3、4,直到指定散列槽所有键值对迁移完成
  6. redis-trib向集群中任一节点发送CLUSTER SETSLOT <slot> NODE <target_id>,将散列槽slot指派给目标节点,这一指派信息会发送给整个集群

3.3 ASK错误

节点接收到有关key的命令,若为在自己数据库内查到该key,则检查自己的clusterState.migrating_slots_to[i],如果该key正在进行迁移,那么节点回复客户端一个ASK错误,指引客户端在正确位置寻找key。

3.4 ASKING

客户端请求集群中节点关于键key的命令,若该节点并未指派key所在的散列槽,但节点的clusterState.importing_slots_from[i]表示正在导入散列槽i,并且发送命令客户端带有REDIS_ASKING标识,那么节点将执行关于该散列槽的操作一次。

当客户端接收到ASK错误并转向正在导入散列槽的节点时,先向节点发送ASKING命令,然后重新发送要执行的命令,因为不发生ASKING命令而直接发送执行命令,会被节点拒绝,并返回MOVED错误。

3.5 MOVED错误和ASK错误的区别

  • MOVED错误表示散列槽的负责权已经从一个节点转移到另一个节点,当客户端遇到MOVED错误,只需将目标节点换成MOVED重定向的节点
  • ASK错误是两个节点散列槽迁移过程中的临时措施,客户端收到ASK错误后,会在接下来的命令请求换成ASK错误中指定的节点,但之后并不会改变,客户端仍然请求源节点