Redis源码阅读(十九) 客户端

Redis是经典C/S架构,一个服务器可以与多个客户端进行通信。Redis服务器工作在单进程单线程模式,但它通过I/O多路复用技术处理客户端请求实现了高并发。

1. 客户端定义

Redis客户端作为与服务器通信的媒介,保存了其必要的一些属性,要操作Redis服务器存储的内容,必须通过客户端与服务端进行通信才能修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Redis客户端结构
* 保存着客户端的状态
*/
typedef struct client {
// 客户端识别id
uint64_t id; /* Client incremental unique ID. */
// 客户端socket文件描述符
int fd; /* Client socket. */

// 指向当前客户端选中的Redis数据库
redisDb *db; /* Pointer to currently SELECTed DB. */
// 客户端名称
robj *name; /* As set by CLIENT SETNAME. */
// 查询输入缓冲区
sds querybuf; /* Buffer we use to accumulate client queries. */
// 如果是主节点,该缓冲区表示我们要复制给从节点的内容
sds pending_querybuf; /* If this is a master, this buffer represents the
yet not applied replication stream that we
are receiving from the master. */
// 输入缓冲区峰值
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
// 客户端当前命令参数数量
int argc; /* Num of arguments of current command. */
// 客户端当前命令参数
robj **argv; /* Arguments of current command. */
// 保存客户端执行命令的记录
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
// 请求协议类型:PROTO_REQ_*, 内联或多条命令
int reqtype; /* Request protocol type: PROTO_REQ_* */
// 参数列表中未读取参数的数量
int multibulklen; /* Number of multi bulk arguments left to read. */
// 命令内容的长度
long bulklen; /* Length of bulk argument in multi bulk request. */
// 回复缓存列表,用于发送大于固定回复缓冲区的回复
list *reply; /* List of reply objects to send to the client. */
// 回复缓存列表对象的总字节数
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
// 已发送的字节数
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
// 客户端创建时间
time_t ctime; /* Client creation time. */
// 客户端与服务器最后一次交互时间
time_t lastinteraction; /* Time of the last interaction, used for timeout */
// 客户端输出缓冲区超过软性限制的时间,记录输出缓冲区第一次到达软性限制的时间
time_t obuf_soft_limit_reached_time;
// 客户端状态标识
int flags; /* Client flags: CLIENT_* macros. */
// 认证标识,0表示已认证,1表示未认证
int authenticated; /* When requirepass is non-NULL. */

// =========> 主从复制
// 从节点复制状态
int replstate; /* Replication state if this is a slave. */
// ACK上设置从节点写处理
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
// 主节点传过来的RDB文件描述符
int repldbfd; /* Replication DB file descriptor. */
// 主节点RDB文件偏移量
off_t repldboff; /* Replication DB file offset. */
// 主节点RDB文件大小
off_t repldbsize; /* Replication DB file size. */
// 主节点RDB文件头
sds replpreamble; /* Replication DB preamble. */
// 主节点中读取的复制偏移量
long long read_reploff; /* Read replication offset if this is a master. */
// 主节点中应用的复制偏移量
long long reploff; /* Applied replication offset if this is a master. */
// 通过ack接收到的复制偏移量
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
// 接收到复制偏移量使用的时间
long long repl_ack_time;/* Replication ack time, if this is a slave. */
// FULLRESYNC回复从节点偏移量
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
// master运行复制id
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
// 从节点端口号
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
// 从节点IP地址
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
// 从节点功能,按位OR
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */

// 事务状态
multiState mstate; /* MULTI/EXEC state */
// 客户端阻塞类型
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
// 客户端当前阻塞状态
blockingState bpop; /* blocking state */
// 上次写入全局复制偏移量
long long woff; /* Last write global replication offset. */
// 客户端监控命令列表, MULTI/EXEC
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
// 客户端订阅的渠道
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// 客户端订阅的模式
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
// 被缓存的ID
sds peerid; /* Cached peer ID. */

/* Response buffer */
// 回复固定缓冲区的偏移量
int bufpos;
// 回复固定缓冲区
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

1.1 套接字文件描述符 - fd

根据客户端类型的不同,fd的值可以是-1或真实文件描述符。

  • 伪客户端:fd值为-1,这种客户端用于本地执行命令,如AOF文件加载或Lua脚本执行。
  • 普通客户端:fd值为真实描述符值,这种客户端通过网络套接字与服务端通信,用于执行一般的命令。

1.2 客户端名称 - name

默认情况下,连接到服务端的客户端没有名称,我们通过CLIENT SETNAME命令可以设置当前客户端的名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 通过CLIENT list命令获取当前服务器连接的客户端
127.0.0.1:6379> CLIENT list
id=3 addr=127.0.0.1:62087 fd=8 name= age=3 ... cmd=client

# 通过CLIENT SETNAME 命令设置当前客户端名称
127.0.0.1:6379> CLIENT SETNAME xxx
OK
127.0.0.1:6379> CLIENT list
id=3 addr=127.0.0.1:62087 fd=8 name=xxx age=33 ... cmd=client

# 通过CLIENT GETNAME 获取当前客户端名称
127.0.0.1:6379> CLIENT GETNAME
"xxx"
127.0.0.1:6379>

1.3 标志 - flags

客户端标志属性flags记录了客户端的角色,每一位都记录客户端的一种属性或状态。

第N位 标志 描述
0 CLIENT_SLAVE 客户端是从节点
1 CLIENT_MASTER 客户端是主节点
2 CLIENT_MONITOR 客户端是监控从节点
3 CLIENT_MULTI 客户端正在执行事务
4 CLIENT_BLOCKED 客户端正在被BRPOP等命令阻塞
5 CLIENT_DIRTY_CAS WATCH命令监控的键已经改变,EXEC将会失败
6 CLIENT_CLOSE_AFTER_REPLY 回复完成后关闭客户端
7 CLIENT_UNBLOCKED 客户端非阻塞,存储在server.unblocked_clients链表中
8 CLIENT_LUA 指向Lua脚本的客户端
9 CLIENT_ASKING 客户端发出了ASKING命令
10 CLIENT_CLOSE_ASAP 客户端需要尽快关闭
11 CLIENT_UNIX_SOCKET 通过unix domain连接的客户端
12 CLIENT_DIRTY_EXEC 事务命令入队时出错,EXEC将会失败
13 CLIENT_MASTER_FORCE_REPLY 主从服务器命令通信时,从节点要发送REPLICATION ACK给主节点,但主节点必须打开该标识强制回复
14 CLIENT_FORCE_AOF 默认情况下,AOF不会记录PUB/SUB等命令,该标志表示AOF也会记录PUBSUB命令和SCRIPT LOAD
15 CLIENT_FORCE_REPL 强制主节点将当前执行的命令复制给所有从节点。执行PUBSUB命令会打开客户端CLIENT_FORCE_AOF标志,执行SCRIPT LOAD命令会打开CLIENT_FORCE_AOF,CLIENT_FORCE_REPL两个标志
16 CLIENT_PRE_PSYNC 客户端代表的是低于Redis2.8版本的从节点,主节点不能使用PSYNC命令与该从节点同步
17 CLIENT_READONLY 集群客户端处于只读状态
18 CLIENT_PUBSUB 客户端处于PUB/SUB模式
19 CLIENT_PREVENT_AOF_PROP 客户端执行的命令不存储到AOF
20 CLIENT_PREVENT_REPL_PROP 客户端执行的命令不复制给从节点
19+20 CLIENT_PREVENT_PROP 客户端执行的敏力不存储到AOF且不复制给从节点
21 CLIENT_PENDING_WRITE 客户端有要发送的内容,但并未设置写处理程序
22 CLIENT_REPLY_OFF 不要发送回复给客户端
23 CLIENT_REPLY_SKIP_NEXT 跳过下一跳命令的回复
24 CLIENT_REPLY_SKIP 不要发送这条命令的回复
25 CLIENT_LUA_DEBUG debug模式执行Lua脚本
26 CLIENT_LUA_DEBUG_SYNC debug模式执行Lua脚本,但并不fork()子进程
27 CLIENT_MODULE 无连接的模块客户端

1.4 输入缓冲区 - querybuf

客户端的输入缓冲区用于保存客户端发送的命令请求,输入缓冲区的大小根据输入内容动态地扩大或缩小,最大不能超过client_max_querybuf_len(默认1G),可以通过配置文件进行配置。

1
2
3
4
# 输入命令
SET msg hello
# 输入缓冲区querybuf中则保存为
*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$5\r\nhello\r\n

pending_querybuf缓冲区是存储主节点将接收到的命令复制给从节点的数据

1.5 输出缓冲区 - list & buf

服务端给客户端的回复会保存在客户端的输出缓冲区中,每个客户端有两个输出缓冲区,一个是固定的缓冲区,一个是可变的缓冲区:

  • 固定缓冲区-buf字节数组:保存长度较小的回复信息
  • 可变缓冲区-list链表:保存长度较大的回复

固定缓冲区buf的默认大小为16KB,当该空间用完或回复内容太多无法放进buf中,则会使用list可变缓冲区。

2. 源码剖析

  • TCP连接处理程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// TCP链接处理程序,创建一个客户端的连接状态
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 创建客户端
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}

/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
// 如果超过服务端规定的最大客户端数,那么写入错误,关闭客户端
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";

/* That's a best effort error message, don't check write errors */
// 写入错误信息
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
// 记录服务器拒绝的客户端数量
server.stat_rejected_conn++;
// 释放客户端
freeClient(c);
return;
}

/* If the server is running in protected mode (the default) and there
* is no password set, nor a specific interface is bound, we don't accept
* requests from non loopback interfaces. Instead we try to explain the
* user what to do to fix it if needed. */
// 如果服务器以保护模式(默认)运行,并且没有设置密码,没有绑定特殊的接口,那么我们
// 就不接收非回环接口的请求(也就是只接收本地请求)
if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
char *err =
"-DENIED Redis is running in protected mode because protected "
"mode is enabled, no bind address was specified, no "
"authentication password is requested to clients. In this mode "
"connections are only accepted from the loopback interface. "
"If you want to connect from external computers to Redis you "
"may adopt one of the following solutions: "
"1) Just disable protected mode sending the command "
"'CONFIG SET protected-mode no' from the loopback interface "
"by connecting to Redis from the same host the server is "
"running, however MAKE SURE Redis is not publicly accessible "
"from internet if you do so. Use CONFIG REWRITE to make this "
"change permanent. "
"2) Alternatively you can just disable the protected mode by "
"editing the Redis configuration file, and setting the protected "
"mode option to 'no', and then restarting the server. "
"3) If you started the server manually just for testing, restart "
"it with the '--protected-mode no' option. "
"4) Setup a bind address or an authentication password. "
"NOTE: You only need to do one of the above things in order for "
"the server to start accepting connections from the outside.\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
}

// 更新服务器连接客户端数量
server.stat_numconnections++;
// 客户端状态标志
c->flags |= flags;
}
  • 创建客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));

/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */

// fd == -1 表示创建的是一个无网络的伪客户端,执行Lua脚本时使用
// fd != -1 表示创建有网络连接客户端
if (fd != -1) {
// 设置fd为非阻塞模式
anetNonBlock(NULL,fd);
// 禁止使用Nagle算法,客户端数据包应立即发送给服务端,TCP_NODELAY
anetEnableTcpNoDelay(NULL,fd);
// 若开启tcpkeepalive, 设置SO_KEEPALIVE
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 创建一个文件事件,监听可读,并接收命令的输入
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}

// 默认0号数据库
selectDb(c,0);
uint64_t client_id;
// 初始化客户端识别ID
atomicGetIncr(server.next_client_id,client_id,1);
// 初始化客户端属性值
// 客户端识别id
c->id = client_id;
// socket文件描述符
c->fd = fd;
// 名称
c->name = NULL;
// 回复缓冲区偏移量
c->bufpos = 0;
// 输入缓冲区
c->querybuf = sdsempty();
// 主节点复制输入缓冲区
c->pending_querybuf = sdsempty();
// 输入缓冲区峰值
c->querybuf_peak = 0;
// 请求协议类型
c->reqtype = 0;
// 参数个数
c->argc = 0;
// 参数
c->argv = NULL;
// 当前命令和上一次命令
c->cmd = c->lastcmd = NULL;
// 多条命令还未读取数量
c->multibulklen = 0;
// 读取命令长度
c->bulklen = -1;
// 发送字节数
c->sentlen = 0;
// 客户端状态
c->flags = 0;
// 创建时间和上一次交互时间
c->ctime = c->lastinteraction = server.unixtime;
// 认证状态
c->authenticated = 0;
// 复制状态
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
// 复制偏移量
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
// 创建回复链表
c->reply = listCreate();
// 回复链表字节数
c->reply_bytes = 0;
// 回复缓冲区软限制内存大小
c->obuf_soft_limit_reached_time = 0;
// 设置回复链表释放函数
listSetFreeMethod(c->reply,freeClientReplyValue);
// 设置回复链表复制函数
listSetDupMethod(c->reply,dupClientReplyValue);
// 阻塞类型
c->btype = BLOCKED_NONE;
// 阻塞状态超时
c->bpop.timeout = 0;
// 造成阻塞的键字典
c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL);
// 存储解除阻塞的键,保存PUSH的键
c->bpop.target = NULL;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
// 全局复制偏移量
c->woff = 0;
// 监控的键链表
c->watched_keys = listCreate();
// 订阅频道
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
// 订阅模式
c->pubsub_patterns = listCreate();
// 缓存的同辈
c->peerid = NULL;
// 订阅发布模式的释放和比较函数
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// 客户端存储在服务器客户端链表中
if (fd != -1) listAddNodeTail(server.clients,c);
// 初始化客户端事务状态
initClientMultiState(c);
return c;
}
  • 释放客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
void freeClient(client *c) {
listNode *ln;

/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */

// 如果是主节点,那么释放客户端需要进行备份,以方便重新启用
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED|
CLIENT_UNBLOCKED)))
{
replicationCacheMaster(c);
return;
}
}

/* Log link disconnection with slave */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
serverLog(LL_WARNING,"Connection with slave %s lost.",
replicationGetSlaveName(c));
}

/* Free the query buffer */
// 清空查询缓存
sdsfree(c->querybuf);
sdsfree(c->pending_querybuf);
c->querybuf = NULL;

/* Deallocate structures used to block on blocking ops. */
// 阻塞客户端需要解除阻塞
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
// 释放阻塞键字典
dictRelease(c->bpop.keys);

/* UNWATCH all the keys */
// 清空监视的键
unwatchAllKeys(c);
listRelease(c->watched_keys);

/* Unsubscribe from all the pubsub channels */
// 退订所有频道
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);

/* Free data structures. */
// 释放回复链表
listRelease(c->reply);
// 释放参数列表
freeClientArgv(c);

/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
* places where active clients may be referenced. */
// 删除客户端,关闭socket,移除事件循环处理
unlinkClient(c);

/* Master/slave cleanup Case 1:
* we lost the connection with a slave. */
// 从节点客户端
if (c->flags & CLIENT_SLAVE) {
if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
serverAssert(ln != NULL);
listDelNode(l,ln);
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
}

/* Master/slave cleanup Case 2:
* we lost the connection with the master. */
// 如果是主节点客户端,处理主从的断开
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();

/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
// 如果客户端被设置为异步释放,我们将其从列表中删除
if (c->flags & CLIENT_CLOSE_ASAP) {
ln = listSearchKey(server.clients_to_close,c);
serverAssert(ln != NULL);
listDelNode(server.clients_to_close,ln);
}

/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
// 名字释放
if (c->name) decrRefCount(c->name);
// 释放列表参数
zfree(c->argv);
// 释放事务状态
freeClientMultiState(c);
sdsfree(c->peerid);
zfree(c);
}
  • 输出缓冲区写给客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
sds o;

// 回复缓冲区中有数据
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
// 缓冲区数据写到fd
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;

/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}

// 固定缓冲区发送完成,则发送回复链表中的内容
} else {
// 获取回复链表第一条回复
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o);

// 空对象跳过,并删除
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}

// 写入到fd
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;

/* If we fully sent the object on head go to the next one */
// 发送完成,删除该结点
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objlen;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a slave (otherwise, on high-speed traffic, the replication
* buffer will grow indefinitely) */
// 避免发送字节数大于NET_MAX_WRITES_PER_EVENT
// 但是如果当前服务器的内存数超过maxmemory,则尽快执行写操作
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
// 记录字节数
server.stat_net_output_bytes += totwritten;
// 写入失败
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return C_ERR;
}
}
// 写入成功
if (totwritten > 0) {
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
}

// 如果客户端回复缓冲区已经没有数据,则发送完成
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
return C_OK;
}