Redis源码阅读(二十) 服务器

1. 服务器定义

Redis服务段结构是其最核心的功能结构,负责维护与客户端键的网络连接,维护数据库状态,执行客户端的命令请求,统计一些运行数据等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
struct redisServer {
/* General */
// 程序运行主进程pid
pid_t pid; /* Main process pid. */
// 配置文件绝对路径
char *configfile; /* Absolute config file path, or NULL */
// 可执行文件绝对路径
char *executable; /* Absolute executable file path. */
// 执行executable时的命令行参数
char **exec_argv; /* Executable argv vector (copy). */
// serverCron()调用频率
int hz; /* serverCron() calls frequency in hertz */

// RedisDB对象,长度16,每个都存储了Redis数据库对象
redisDb *db;

// 命令表
dict *commands; /* Command table */
// rename之前的命令表
dict *orig_commands; /* Command table before command renaming. */

// 事件循环
aeEventLoop *el;

// 服务器LRU时钟
unsigned int lruclock; /* Clock for LRU eviction */
// 立即关闭服务器
int shutdown_asap; /* SHUTDOWN needed ASAP */
// 在执行serverCron()期间进行rehash
int activerehashing; /* Incremental rehash in serverCron() */
// 主动进行碎片整理
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
// 是否设置了密码
char *requirepass; /* Pass for AUTH command, or NULL */
// PID文件路径
char *pidfile; /* PID file path */
// 架构类型,32位或64位
int arch_bits; /* 32 or 64 depending on sizeof(long) */
// serverCron()运行次数计数
int cronloops; /* Number of times the cron function run */
// 服务器运行ID,每次重启都会分配新的id
char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */
// 服务器是否运行在Sentinel模式
int sentinel_mode; /* True if this instance is a Sentinel. */
// 服务器初始化后使用内存的大小
size_t initial_memory_usage; /* Bytes used after initialization. */
// 非标准输出也会记录Redis的logo
int always_show_logo; /* Show logo even for non-stdout logging. */

/* Modules */
// 模块暴露的API
dict *moduleapi; /* Exported APIs dictionary for modules. */
// 服务器启动时需要加载的模块
list *loadmodule_queue; /* List of modules to load at startup. */
// 这个pipe用来唤醒事件循环中的需要module命令处理的阻塞客户端
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */

/* Networking */
// 监听端口号
int port; /* TCP listening port */
// listen()函数的backlog参数
int tcp_backlog; /* TCP listen() backlog */
// 绑定的地址
char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */
// 绑定地址的数量
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
// UNIX套接字
// socket路径
char *unixsocket; /* UNIX socket path */
// socket模式
mode_t unixsocketperm; /* UNIX socket permission */
// TCPsocket的文件描述符
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
// TCP文件描述符数量
int ipfd_count; /* Used slots in ipfd[] */
// UNIX socket文件描述符数量
int sofd; /* Unix socket file descriptor */
// 集群文件描述符
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
// 集群文件描述符数量
int cfd_count; /* Used slots in cfd[] */
// 服务器客户端链表,存储所有的客户端
list *clients; /* List of active clients */
// 即将关闭的客户端链表
list *clients_to_close; /* Clients to close asynchronously */
// 准备写操作或安装写处理程序的客户端链表
list *clients_pending_write; /* There is to write or install handler. */
// 从节点链表,监控客户端链表
list *slaves, *monitors; /* List of slaves and MONITORs */
// 当前客户端,仅用于崩溃报告
client *current_client; /* Current client, only used on crash report */
// 客户端是否处于暂停
int clients_paused; /* True if clients are currently paused */
// 取消暂停状态时间
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
// 网络错误缓冲区
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
// 迁移缓存套接字的字典,键:host:ip,值:TCP套接字结构
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
// 下一个客户端ID
uint64_t next_client_id; /* Next client unique ID. Incremental. */
// 保护模式,不接受外部链接, 仅本地连接
int protected_mode; /* Don't accept external connections. */

/* RDB / AOF loading information */
// 服务器处于加载状态,从RDB/AOF文件恢复中
int loading; /* We are loading data from disk if true */
// 要加载的总字节数
off_t loading_total_bytes;
// 已经加载的字节数
off_t loading_loaded_bytes;
// 加载开始时间
time_t loading_start_time;
// 加载过程中,读写的最大块字节数
off_t loading_process_events_interval_bytes;

/* Fast pointers to often looked up command */
// Redis命令
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand, *sremCommand, *execCommand, *expireCommand,
*pexpireCommand;

/* Fields used only for stats */
// 服务器开始时间
time_t stat_starttime; /* Server start time */
// 执行命令的数量
long long stat_numcommands; /* Number of processed commands */
// 接收连接的数量
long long stat_numconnections; /* Number of connections received */
// 过期键的数量
long long stat_expiredkeys; /* Number of expired keys */
// 键可能过期的百分比
double stat_expired_stale_perc; /* Percentage of keys probably expired */
// 删除过期键时,如果过期键太多但我们不能一直进行删除,所以设定了时间
// 超过该时间,则退出循环等待下一次删除,该属性记录超过该时间的次数
long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/
// LFU算法中,驱逐key的次数
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
// 成功命中key的次数
long long stat_keyspace_hits; /* Number of successful lookups of keys */
// 没有命中key的次数
long long stat_keyspace_misses; /* Number of failed lookups of keys */
// 整理内存碎片命中次数
long long stat_active_defrag_hits; /* number of allocations moved */
// 整理内存碎片没有命中次数
long long stat_active_defrag_misses; /* number of allocations scanned but not moved */
// 整理内存碎片key被重新分配的次数
long long stat_active_defrag_key_hits; /* number of keys with moved allocations */
// 整理内存碎片扫描到key但没有移动的次数
long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */
// 使用的内存峰值
size_t stat_peak_memory; /* Max used memory record */
// 执行fork()的时间
long long stat_fork_time; /* Time needed to perform latest fork() */
// 执行fork()的速率
double stat_fork_rate; /* Fork rate in GB/sec. */
// 因为最大客户端数,拒绝客户端连接的次数
long long stat_rejected_conn; /* Clients rejected because of maxclients */
// 执行从节点全量同步的次数
long long stat_sync_full; /* Number of full resyncs with slaves. */
// 成功接受PSYNC的次数
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
// 错误接受PSYNC的次数
long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */

// 慢查询命令链表
list *slowlog; /* SLOWLOG list of commands */
// 当前慢查询的日志ID
long long slowlog_entry_id; /* SLOWLOG current entry ID */
// 慢查询的评定时间
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
// 慢查询的最大数量
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */

// serverCron() 中采用RSS, 常驻内存大小
size_t resident_set_size; /* RSS sampled in serverCron(). */
// 读取网络字节数
long long stat_net_input_bytes; /* Bytes read from network. */
// 输出网络字节数
long long stat_net_output_bytes; /* Bytes written to network. */
// RDB保存时写时复制的字节数
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
// AOF执行时写时复制的字节数
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */

/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
// 用来追踪瞬时数据,例如每秒操作数,网络流量等
struct {
// 上次采样的时间戳
long long last_sample_time; /* Timestamp of last sample in ms */
// 上次采用的数量
long long last_sample_count;/* Count in last sample */
// 采用数据
long long samples[STATS_METRIC_SAMPLES];
// 下标
int idx;
} inst_metric[STATS_METRIC_COUNT];

/* Configuration */
// 日志级别
int verbosity; /* Loglevel in redis.conf */
// 客户端最大空闲时间,超过会超时
int maxidletime; /* Client timeout in seconds */
// 开启SO_KEEPALIVE选项
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
// 开启过期
int active_expire_enabled; /* Can be disabled for testing purposes. */
// 开启内存整理
int active_defrag_enabled;
// 内存碎片整理的标准,小于这个值会忽略
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
// 内存碎片的最小比例,开启内存整理
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
// 碎片的最大比例
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
// 碎片整理的CPU占用最小比例
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
// 碎片整理的CPU占用最大比例
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
// 客户端查询缓冲区的最大长度
size_t client_max_querybuf_len; /* Limit for client query buffer length */

// 服务器数据库的数量
int dbnum; /* Total number of configured DBs */

// 1表示有监督,0表示没有
int supervised; /* 1 if supervised, 0 otherwise. */
// 监督模式
int supervised_mode; /* See SUPERVISED_* */
// 是否运行在守护模式
int daemonize; /* True if running as a daemon */
// 不同类型的客户端输出缓冲区限制
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];

/* AOF persistence */
// 服务器AOF状态
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
// 服务器AOF的fsync策略
int aof_fsync; /* Kind of fsync() policy */
// 服务器AOF文件名
char *aof_filename; /* Name of the AOF file */
// 如果有aof执行,则不进行fsync
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
// AOF增长比率,默认100
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
// AOF最小字节数
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
// AOF初始字节数
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
// AOF当前字节数
off_t aof_current_size; /* AOF current size. */
// AOF重写提上日程,BGSAVE完成立即执行
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
// AOF子进程pid
pid_t aof_child_pid; /* PID if rewriting process */
// AOF缓冲区链表
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
// AOF缓冲区
sds aof_buf; /* AOF buffer, written before entering the event loop */
// AOF文件描述符
int aof_fd; /* File descriptor of currently selected AOF file */
// AOF选中的db
int aof_selected_db; /* Currently selected DB in AOF */
// 延迟执行flush操作的开始时间
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
// 最后一次fsync的直接
time_t aof_last_fsync; /* UNIX time of last fsync() */
// AOF执行最后时间
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
// AOF执行开始时间
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
// AOF执行的状态
int aof_lastbgrewrite_status; /* C_OK or C_ERR */
// 延迟fsync的次数
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
// 重写时是否开启增量式同步,每次写入AOF_AUTOSYNC_BYTES个字节,就执行一次同步
int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
// 上一次AOF操作状态
int aof_last_write_status; /* C_OK or C_ERR */
// 删一次AOF操作错误
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */

/* AOF pipes used to communicate between parent and child during rewrite. */
// 进程通信文件描述描述符,管道
int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent;
int aof_pipe_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /* If true stop sending accumulated diffs
to child process. */
// 保存子进程AOF时累积数据的sds
sds aof_child_diff; /* AOF diff accumulator child side. */

/* RDB persistence */
// 记录数据库被修改的次数
long long dirty; /* Changes to DB from the last save */
// BGSAVE执行前,需要备份dirty
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
// 执行BGSAVE的子进程pid
pid_t rdb_child_pid; /* PID of RDB saving child */
// RDB执行的参数
struct saveparam *saveparams; /* Save points array for RDB */
// save参数的长度
int saveparamslen; /* Number of saving points */
// rdb文件名称
char *rdb_filename; /* Name of RDB file */
// 是否采用LZF压缩算法压缩RDB文件
int rdb_compression; /* Use compression in RDB? */
// RDB是否使用校验和
int rdb_checksum; /* Use RDB checksum? */
// 上一次save成功的实际
time_t lastsave; /* Unix time of last successful save */
// 上一次尝试bgsave的实际
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
// 上一次save执行的实际
time_t rdb_save_time_last; /* Time used by last RDB save run. */
// RDB保存开始时间
time_t rdb_save_time_start; /* Current RDB save start time. */
// BGSAVE计划
int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */
// rdb执行的类型,写入磁盘还是从节点socket
int rdb_child_type; /* Type of save by active child. */
// 上一次BGSAVE执行的状态
int lastbgsave_status; /* C_OK or C_ERR */
// BGSAVE出错则停止写
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
// rdb管道写端
int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */
// rdb管道读端, 使用无盘同步
int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */

// 子进程信息
struct {
// AOF 或 RDB
int process_type; /* AOF or RDB child? */
// 写时复制的大小
size_t cow_size; /* Copy on write size. */
unsigned long long magic; /* Magic value to make sure data is valid. */
} child_info_data;

/* Propagation of commands in AOF / replication */
// 传递给AOF/replication的一些命令
redisOpArray also_propagate; /* Additional command to propagate. */

/* Logging */
// 日志文件路径
char *logfile; /* Path of log file */
// 是否开启系统日志
int syslog_enabled; /* Is syslog enabled? */
// 系统日志标识
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */

// ...

/* Replication (slave) */
// master结点的验证密码
char *masterauth; /* AUTH with this password with master */
// master结点的地址
char *masterhost; /* Hostname of master */
// master结点的端口号
int masterport; /* Port of master */

// ...

/* Synchronous replication. */
// 等待WAIT命令的客户端链表
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
// 读取从节点ack
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */

/* Limits */
// 同时最多连接的客户端数量
unsigned int maxclients; /* Max number of simultaneous clients */
// 服务器使用内存的最大值
unsigned long long maxmemory; /* Max number of memory bytes to use */
// 键占用内存回收策略
int maxmemory_policy; /* Policy for key eviction */
// 随机采样的个数
int maxmemory_samples; /* Pricision of random sampling */
// LFU 对数计数因子
int lfu_log_factor; /* LFU logarithmic counter factor. */
// LFU 反衰减因子
int lfu_decay_time; /* LFU counter decay factor. */
// 批量协议最大长度
long long proto_max_bulk_len; /* Protocol bulk length maximum size. */

/* Blocked clients */
// 阻塞的客户端数量
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
// 阻塞的客户端链表
list *unblocked_clients; /* list of clients to unblock before next loop */
// BLPOP命令产生的阻塞键列表
list *ready_keys; /* List of readyList structures for BLPOP & co */

/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
// 降序排序
int sort_desc;
// 根据字母排序
int sort_alpha;
// 根据模式排序
int sort_bypattern;
// 根据分数排序
int sort_store;

/* Zip structure config, see redis.conf for more information */
// ziplist最大结点个数
size_t hash_max_ziplist_entries;
// ziplist结点最大内存
size_t hash_max_ziplist_value;
// set整数集合最大结点数
size_t set_max_intset_entries;
// zset压缩列表最大结点数
size_t zset_max_ziplist_entries;
// zset压缩列表结点最大内存
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;

/* List parameters */
// list压缩列表最大长度
int list_max_ziplist_size;
// list压缩列表压缩程度
int list_compress_depth;

/* time cache */
// 循环采用世界
time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */

/* Pubsub */
// 客户端订阅channel的字典
dict *pubsub_channels; /* Map channels to list of subscribed clients */
// pubsub定义模式
list *pubsub_patterns; /* A list of pubsub_patterns */
// 通过Pub/Sub传递事件
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */

/* Cluster */
// ...

/* Scripting */
// ...

// ...
};

2. Redis命令

2.1 命令表 commands

命令表存储了Redis可执行的命令字典,并对每个命令设定了一些执行参数。orig_commands是重命名之后的命令表,通过重命名可以提升Redis的安全性,比如讲KEYS, FLUSHDB这类的命令重命名为别的,这样一般用户在就不知道其真实的命令。

  • 命令结构表示如下
字段名 描述
name 命名的名称,用户执行命令通过查找命令表匹配该字段来寻找相应的命令
function 执行命令实现的方法
arity 命令参数的个数,-N表示大于等于N。命令本身也是一个参数
sflags 字符串形式标识符,用于设置命令的属性
flags sflags标识符的二进制标识,由sflags计算
get_keys_proc 一个可选函数,用于获取命令参数,只有在first_key_index,last_key_index,key_step无法指定哪些是参数时才使用此选项
first_key_index 第一个参数是key
last_key_index 最后一个参数是key
key_step key之间的步长,如MSET步长为2, MSET key value key2 value2 …
microseconds 服务器执行该命令耗费的时间
calls 服务器总共执行了多少次该命令
  • 命令的属性sflag标识
标识 意义
w 写命令
r 读命令
m 该命令会占用大量内存,执行之前需要检查内存情况
a 管理命令,如SAVE,SHUTDOWN等
p 发布订阅模式的命令
f 强制复制命令,无视服务器脏计数
s Lua脚本中不允许的命令
R 随机命令,相同情况下,结果可能不同
S Lua脚本中使用标识则需要对结果进行排序
l 服务器载入情况下可以使用的命令
t 从节点服务器数据过期时允许执行的命令
M 在MONITOR模式下不会自动传播
k 执行一个显示的ASKING,使得在集群模式下,被标志为imporing的槽可以接受该命令
F 快速模式,O(1)或O(log(N))的复杂度

2.2 命令请求执行过程

客户端发送命令到服务端执行返回有一下几个步骤:

  1. 客户端与服务端建立连接,按照一定格式封装命令

    1
    2
    # SET msg hello 命令转换后如下
    *3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$5\r\nhello\r\n
  2. 服务器接收到来自网络客户端的请求数据,按照协议格式解析命令

    1. 读取套接字中的命令请求,存储到对应的客户端输入缓冲区中
    2. 读取缓冲区中数据,解析命令,提取命令请求和参数
  3. 获得解析命令后查找服务器的命令表,找到对应的命令及其执行方法

    1. 查找服务器命令表匹配redisCommand结构中的name字段
    2. 查询到命令后获取其执行方法,属性等,将命令执行方法保存在客户端cmd字段中
  4. 服务器根据执行命令,并返回处理后数据

  5. 客户端接收到服务器处理回复

命令执行时会根据其sflag标志和服务器设置有不同的处理操作,具体情况可看源码

3. Redis周期性任务

服务器为了维护自身资源,默认每100毫秒执行一次周期性任务serverCron(),主要负责删除过期键、服务器状态监控、更新统计信息、渐进式rehash、触发BGSAVE/AOF重写并处理子进程中断、从节点复制重连等等。

3.1 更新服务器时间缓存

Redis服务器很多功能需要获取系统当前时间,该系统调用需要消耗一定的CPU时间,对某些时间精度要求不高的功能我们可以在serverCron()中获取系统时间后缓存下来,可以有效的减少系统调用次数,服务器状态中unixtime和mstime属性用于存储时间缓存。

对于日志打印、更新服务器LRU时钟等任务可以使用服务器缓冲时间;对于过期键的检测、慢查询日志等需要高精度时间任务来说,需要重新系统调用获取时间来计算。

3.2 处理SIGTERM信号

启动服务器时,Redis会设置SIGTERM信号处理函数,当服务器接收到SIGTERM信号时,会开启shutdown_asap标识。serverCron()函数中,每次都会检测该标识,如果设置该标识,服务器会安全关闭,并记录日志。

3.3 客户端资源管理clientsCron()

serverCron()函数每次执行都会调用clientsCron函数,因为函数每秒调用server.hz(默认10),为了确保每个客户端至少执行一秒,所以迭代次数至少为numclients/server.hz,该函数执行以下检查:

  • 如果客户端与服务器连接超时,那么释放该客户端
  • 如果客户端输入缓冲区超过一定限制,则重新分配缓存区的内存空间,确保没有浪费

3.4 管理数据库资源databaseCron()

serverCron()函数每次执行会调用databaseCron()函数,该函数会对数据库进行检查,删除过期键,resize,rehash等操作。

  • 主节点则开启过期键自动删除功能,从节点直接删除过期键
  • 内存碎片整理
  • 如果服务器为进行AOF或RDB,则进行Rehash和Resize

3.5 持久化记录

如果服务器没有RDB或AOF持久正在进行,那么开启后台的AOF重写任务;如果有RDB或AOF,则等待子进程信号,如果接收到信号说明持久化已完成,否则表示未完成。

即使AOF重写错误,也需要刷新AOF缓冲区

3.6 更新信息并记录日志

  • 更新服务器每秒执行命令次数
  • 更新LRU时钟
  • 更新内存使用峰值
  • 记录非空数据库日志
  • 非哨兵模式服务器,记录客户端连接日志

3.7 其他一些任务

  • 异步关闭需要关闭的客户端

  • 如果需要解除客户端暂停状态

  • 周期性复制任务

  • 集群模式下,集群周期性任务

  • 哨兵模式下任务

  • 清理sockets连接

4. 源码剖析

  • 周期性任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/**
* 服务器周期任务, 每秒调用server.hz次
* 需要执行一些异步任务:
* - 主动删除过期键集合(也可以在读操作时懒删除)
* - 软件监控,🐶看门狗🐶
* - 更新统计信息
* - 渐进式rehash
* - 触发BGSAVE/AOF重写, 处理子进程中断
* - 不同类型客户端的超时时间
* - 复制重连
*
* 这里调用会每秒执行server.hz次,所以为了简便,我们使用一个宏run_with_period
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);

/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
// 定期发送一个 SIGALRM 信号
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

/* Update the time cache. */
// 更新服务器时间缓存
updateCachedTime();

// 更新统计数据
run_with_period(100) {
// 命令执行次数
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
// 网络读取字节数
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
// 网络输出字节数
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}

/* We have just LRU_BITS bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock.
*
* Note that even if the counter wraps it's not a big problem,
* everything will still work but some object will appear younger
* to Redis. However for this to happen a given object should never be
* touched for all the time needed to the counter to wrap, which is
* not likely.
*
* Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */
// 服务器LRU时钟
unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);

/* Record the max memory used since the server was started. */
// 记录使用内存峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();

/* Sample the RSS here since this is a relatively slow call. */
// 记录常驻内存大小
server.resident_set_size = zmalloc_get_rss();

/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 接收到SIGTERM信号,安全关闭服务器
if (server.shutdown_asap) {
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}

/* Show some info about non-empty databases */
// 记录非空数据库日志
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;

size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}

/* Show information about connected clients */
// 非哨兵模式服务器,记录客户端连接信息到日志中
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}

/* We need to do a few operations on clients asynchronously. */
// 客户端周期性任务
clientsCron();

/* Handle background operations on Redis databases. */
// 数据库周期性任务
databasesCron();

/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
// 如果没有RDB或AOF执行,那么开启后台AOF重写操作
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}

/* Check if a background saving or AOF rewrite in progress terminated. */
// 如果正在进行RDB或AOF重写操作,那么等待子进程信号
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;

if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}

// 没有正在进行的RDB或AOF,那么检查是否需要执行
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;

/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}

/* Trigger an AOF rewrite if needed. */
// 根据需要触发AOF
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}


/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
// AOF缓冲区冲洗到磁盘中
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
// 即使AOF重写错误,也刷新AOF缓冲区
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}

/* Close clients that need to be closed asynchronous */
// 异步关闭客户端
freeClientsInAsyncFreeQueue();

/* Clear the paused clients flag if needed. */
// 如果需要解除客户端暂停状态
clientsArePaused(); /* Don't check return value, just use the side effect.*/

/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
// 周期性复制任务
run_with_period(1000) replicationCron();

/* Run the Redis Cluster cron. */
// 集群周期性任务
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}

/* Run the Sentinel timer if we are in sentinel mode. */
// 哨兵timer
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}

/* Cleanup expired MIGRATE cached sockets. */
// 清理socket链接
run_with_period(1000) {
migrateCloseTimedoutSockets();
}

/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
*
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
// 如果设置了相应标志,则启动预定的BGSAVE。
// 这是非常有用的,当我们强制延迟BGSAVE,因为AOF重写正在执行
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}

// 周期循环+1
server.cronloops++;
// 默认100ms
return 1000/server.hz;
}
  • 进入事件循环前调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Redis进入事件循环前被调用,准备文件描述符
*/
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function. */
// 集群模式开启beforeSleep, 可能会改变redis集群的状态,
if (server.cluster_enabled) clusterBeforeSleep();

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
// 主节点开启过期键删除,则以快速模式运行
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. */
// 如果在前一个事件循环迭代期间有客户端被阻塞,则向所有从节点发送ACK请求
if (server.get_ack_from_slaves) {
robj *argv[3];

// 构建参数
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */

// 给从节点服务器发送请求
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);

// 释放构建的参数
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);

// 清空标志
server.get_ack_from_slaves = 0;
}

/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
// 解除所有等待WAIT命令而被阻塞的客户端
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();

/* Check if there are clients unblocked by modules that implement
* blocking commands. */
// 检查是否有模块阻塞命令解除阻塞客户端
moduleHandleBlockedClients();

/* Try to process pending commands for clients that were just unblocked. */
// 处理非阻塞客户端的输入缓冲区
if (listLength(server.unblocked_clients))
processUnblockedClients();

/* Write the AOF buffer on disk */
// 将AOF缓冲区冲洗到磁盘中
flushAppendOnlyFile(0);

/* Handle writes with pending output buffers. */
// 处理待写的输出缓冲区
handleClientsWithPendingWrites();

/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
// 在进入sleep之前,让线程释放GIL来访问数据集。
// Redis主线程此时不会触及任何内容
if (moduleCount()) moduleReleaseGIL();
}