Redis源码阅读(二十一) 复制

Redis为了解决单点故障的问题,会将数据复制到多个从节点服务器中,通过复制,实现Redis的高可用。

1. 介绍

复制使得slave服务器能够精确复制master服务器的数据,每当slave和master连接断开时,slave会自动重连到master,并且无论这期间master做了什么操作,slave都将尝试让自身成为master的精确副本。

  • 当一个master和slave连接正常时,master会将所有的修改命令发送给slave。
  • 当master和slave之间因为某些原因断开连接后,slave会重新连接到master并尝试进行部分重同步。
  • 当无法进行部分同步时,slave会请求全量同步。

Redis默认使用异步复制,即低延迟高性能,是绝大多Redis的自然复制模式。

配置Redis复制功能非常简单,只需加入下面内容到slave的配置文件即可,或在客户端中执行该命令。

1
2
# 命令 主机ip地址 主机端口号
slaveof 192.168.1.2 6379

注:通常完全重同步需要在磁盘上创建RDB文件,然后slave获取此RDB文件进行复制。但如果磁盘性能很低会对master造成很大压力,所以可以通过socket直接发送给slave而不存储到磁盘中。

2. 工作原理

master使用复制ID和偏移量来标识当前复制的进度,复制ID用于标记数据集,偏移量用于标识此次复制中,master将多少字节已经发送给slave。而master自己则用于一块复制缓冲区域,用于缓存master产生的复制流。

当slave连接到master时,使用PSYNC命令发送旧的复制ID和已经处理的偏移量。如果复制ID与master匹配,且偏移量在复制积压缓冲区以内,则进行部分重同步;但否则的话,需要进行完全重同步。

复制ID标记数据集的历史记录,每次实例作为master重新启动或slave提升为master都会为其生成新的复制ID。

Redis实例拥有两个复制ID是因为到故障发生时,slave提升为master,slave需要记住过期的复制ID,该复制ID是之前的master,这样当其他slave与新master连接时,它们可以使用旧的复制ID尝试部分重同步。

2.1 部分重同步

主节点在复制过程中,会将命令传递给从节点的同时保存在一个叫做复制积压缓冲区backlog的区域内,当从节点重新连接到主节点,会发送自己的复制ID和偏移量,如果偏移量在主节点的缓冲区范围内,则可以进行部分重同步,只是少量的数据,而不需要全部的RDB文件。

复制积压缓冲区backlog是一个1M大小的循环队列,用于存储主节点传递的修改数据库的命令,也是部分重同步的重要部分,从节点通过偏移量和复制积压缓冲区就可以计算出与主节点的差异,如果在缓冲区中能找到则只需进行部分重同步,而无需完整的拷贝所有RDB文件

2.2 完全重同步

由于网络或其他原因,主从节点断线,当从节点重新连接到主节点时,主节点的复制积压缓冲区已经超过了从节点的偏移,此时无法进行部分重同步。

2.3 心跳检测

为了保持master与slave之间的连接,它们之间会不时的进行通信以保持对方的在线状态:

  • master节点每隔10s(默认)向slave节点发送PING命令,判断从节点连接状态
  • slave节点每秒发送REPLCONF ACK <offset>命令,给主节点报告自己当前复制偏移量

2.4 复制积压缓冲区backlog

复制积压缓冲区backlog是一个1M大小的循环队列,用于存储主节点传递的修改数据库的命令,也是部分重同步的重要部分,从节点通过偏移量和复制积压缓冲区就可以计算出与主节点的差异,如果在缓冲区中能找到则只需进行部分重同步,而无需完整的拷贝所有RDB文件

3. 实现

  • 步骤1:设置主服务器的地址和端口

    执行SLAVEOF <master_ip> <master_port>

  • 步骤2:建立socket连接

  • 步骤3:发送PING命令

  • 步骤4:身份验证

  • 步骤5:发送端口信息

  • 步骤6:同步

  • 步骤7:命令传递

4. 源码剖析

  • 复制周期性任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/* Replication cron function, called 1 time per second. */
// 复制周期函数,每秒调用一次
void replicationCron(void) {
static long long replication_cron_loops = 0;

/* Non blocking connection timeout? */
// 非阻塞连接已经超时,记录日志,并取消握手状态
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}

/* Bulk transfer I/O timeout? */
// 接收RDB文件超时,记录日志,并取消握手状态
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake();
}

/* Timed out master when we are an already connected slave? */
// 主节点每10s向从节点发送PING,从节点每1秒向主节点发送PING
// 主从节点已连接,但很久未接收到PING,则超时
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
// 释放主节点
freeClient(server.master);
}

/* Check if we should connect to a MASTER */
// 尝试连接master状态
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
// 连接master
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}

/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
// 发送ACK给主节点
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();

/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
// 如果服务器有从节点,定期发送PING
listIter li;
listNode *ln;
robj *ping_argv[1];

/* First, send PING according to ping_slave_period. */
// 首先, 根据配置的server.repl_ping_slave_period判断是否发送ACK,默认10秒主节点PING从节点
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}

/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
// 其次,发送一个换行符'\n'给所有准备同步:等待master创建RDB文件的从节点
// 发送该换行符是为了确保slave知道他们的master在线,这样对偏移量也没有影响
// 从节点会忽略该换行符,但会刷新最后一次交互时间,这种情况下,我们忽略ping周期并每秒刷新
// 一次连接, 因为某些超时设置为几秒


// 遍历所有从节点
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

// 从节点的复制状态处于等待主节点创建RDB文件状态, 则发送一个'\n'用于ping
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));

if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) {
/* Don't worry about socket errors, it's just a ping. */
}
}
}

/* Disconnect timedout slaves. */
// 断开超时的从节点
if (listLength(server.slaves)) {
listIter li;
listNode *ln;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

// 跳过未完成RDB文件传输的节点
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}

/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
// 如果该主节点已经没有从节点,并且复制积压缓冲区存在,为了回收内存,
// 我们可以在一段时间(可配置)之后释放它。
// 注意:从节点无法执行该操作:没有附加从节点的从节点仍将数据累积下来,以便
// 故障转移后将其转换为master时回复PSYNC
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
// 计算没有从节点开始有多久了
time_t idle = server.unixtime - server.repl_no_slaves_since;

// 超过设定的限制,则释放积压缓冲区backlong
if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
*
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connecto to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected slaves.",
(int) server.repl_backlog_time_limit);
}
}

/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
// 如果AOF关闭,且没有从节点,我们就释放复制脚本缓存,因为没有传递EVALSHA的必要
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}

/* Start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
// 如果从节点处于WAIT_BGSAVE_START状态,则为了复制操作开始执行一个BGSAVE
// 无盘复制情况下,等待指定秒数(可配置),以确保其他从节点有时间在我们开始流式传输之前到达
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;

// 遍历所有从节点
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
// 如果从节点复制状态为等待BGSAVE开始
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
// 计算距离上一次交互时间差
idle = server.unixtime - slave->lastinteraction;
// 记录最大空余时间
if (idle > max_idle) max_idle = idle;
// 记录等待BGSAVE的节点数量
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}

// 判断是否需要执行BGSAVE,BGSAVE会根据mincapa将RDB文件写到磁盘或socket
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
}

/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
// 更新延迟lag小于 min-slaves-max-lag的从节点数量
refreshGoodSlavesCount();
// 更新cron函数执行次数
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
  • SYNC和PSYNC命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/**
* SYNC和PSYNC命令实现
*/
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
// 从节点忽略同步命令
if (c->flags & CLIENT_SLAVE) return;

/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
// 从节点未与主节点连接,则发送错误回复
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}

/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
// 如果指定的客户端回复缓冲区中海油数据,则不能执行同步
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}

serverLog(LL_NOTICE,"Slave %s asks for synchronization",
replicationGetSlaveName(c));

/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
// 尝试执行部分同步命令,回复"+FULLRESYNC <replid> <offset>", 失败则执行全量同步,
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 主节点尝试执行部分重同步
if (masterTryPartialResynchronization(c) == C_OK) {
// 可执行PSYNC命令,计数+1
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */

// 不能执行PSYNC,则全量同步
} else {
char *master_replid = c->argv[1]->ptr;

/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
// 记录psync失败次数
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive replicationSendAck ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}

/* Full resynchronization. */
// 全量同步计数
server.stat_sync_full++;

/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
// 客户端状态为从节点服务器等待BGSAVE开始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
// 执行SYNC命令后,是否关闭TCP_NODELAY
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);

/* Create the replication backlog if needed. */
// 根据需要创建一个backlog缓冲区
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}

/* CASE 1: BGSAVE is in progress, with disk target. */
// 情况1:BGSAVE正在执行,同步到磁盘
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;

// 遍历从节点链表
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
// 从节点等待BGSAVE结束
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}

/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
// 对与该节点,检测它是否具有触发当前BGSAVE操作的能力
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
// 将从节点输出缓冲区内容拷贝到客户端输出缓冲区
copyClientOutputBuffer(c,slave);
// 设置全量同步状态,和部分重同步偏移量
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}

/* CASE 2: BGSAVE is in progress, with socket target. */
// 情况2: BGSAVE正在执行,同步到socket
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

/* CASE 3: There is no BGSAVE is progress. */
// 情况3:BGSAVE没有执行
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
// 无盘同步的子进程在replicationCron()中创建
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
// 没有进行BGSAVE并且没有AOF,则开始为复制执行BGSAVE
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}