Redis源码阅读(十八) 事件模型

Redis服务器是一个事件驱动程序,有两类事件需要处理:

  • 文件事件:网络客户端通过socket与服务器进行通信。
  • 时间事件:服务器定时任务的执行。

1. 事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
typedef struct aeEventLoop {
// 目前注册的最大文件描述符
int maxfd; /* highest file descriptor currently registered */

// 目前追踪的最大文件描述符
int setsize; /* max number of file descriptors tracked */

// 用于生成时间事件id
long long timeEventNextId;

// 最后一次执行时间事件的时间,用于检测系统时钟偏差
time_t lastTime; /* Used to detect system clock skew */

// 已注册的文件事件
aeFileEvent *events; /* Registered events */

// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */

// 时间事件
aeTimeEvent *timeEventHead;

// 事件循环开关
int stop;

// 多路复用库的私有数据
void *apidata; /* This is used for polling API specific data */

// 事件处理之前要执行的函数
aeBeforeSleepProc *beforesleep;
// 事件处理之后要执行的函数
aeBeforeSleepProc *aftersleep;
} aeEventLoop;

Redis通过事件循环处理,实现了与客户端的交互和定时任务。

2. 文件事件

Redis的文件事件处理基于Reactor模式。

2.1 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct aeFileEvent {
// 文件事件类型
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */

// 读事件处理函数
aeFileProc *rfileProc;

// 写事件处理函数
aeFileProc *wfileProc;

// 多路复用库的私有数据
void *clientData;
} aeFileEvent;

服务器通过多路IO复用库来监听客户端的多个套接字,并根据不同的事件类型关联不同的处理函数,当事件就绪时,调用相应的处理函数。

2.2 I/O多路复用库

Redis中I/O多路复用由evport,kqueue,epoll和select实现,它们的性能从高到低依次排列。

  • evport:Solaris 10 的新增加的特性,高效的事件处理库。
  • kqueue: FreeBSD系列的库。
  • epoll:Linux 2.6增加的特性。
  • select:通用的事件处理库。

3. 时间事件

3.1 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
typedef struct aeTimeEvent {
// 时间事件识别id
long long id; /* time event identifier. */

// 时间事件到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */

// 时间事件处理函数
aeTimeProc *timeProc;

// 事件释放函数
aeEventFinalizerProc *finalizerProc;

// 多路复用库的私有数据
void *clientData;

// 上个时间事件
struct aeTimeEvent *prev;
// 下个时间事件
struct aeTimeEvent *next;
} aeTimeEvent;

Redis中,时间事件全部存储在一个无序链表timeEventHead中,每当时间事件执行时,服务器就遍历整个链表,查找所有已到达事件,并进行处理。

3.2 serverCron函数

Redis服务器需要定期对自身进行数据处理和状态检查,从而确保服务稳定运行。要执行的任务有:

  • 定期检查使用情况,进行rehash操作
  • 清理过期键
  • 关闭和清理无效客户端
  • AOF和RDB持久化
  • 集群模式,主从数据同步

4. 源码剖析

  • 初始化事件循环处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

// 创建事件循环状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

// 初始化文件事件数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);

// 初始化已就绪文件事件数组
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

// 文件事件数组为空或者就绪事件数组为空返回错误
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;

// 初始化事件循环状态
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;

// 创建一个epoll实例
if (aeApiCreate(eventLoop) == -1) goto err;

/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化监听事件 mask = AE_NONE, 表示为设置事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
// 释放内存
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
  • 事件处理程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* 处理所有已到达的时间事件和已就绪文件事件。
* 如果不传入特殊falgs,那么函数睡眠直到文件事件就绪或时间事件到达。
*
* If flags is 0, the function does nothing and returns.
* flags = 0, 函数直接返回
*
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* flags == AE_ALL_EVENTS, 所有类型事件都会被处理
*
* if flags has AE_FILE_EVENTS set, file events are processed.
* flags == AE_FILE_EVENTS, 所有文件事件会被处理
*
* if flags has AE_TIME_EVENTS set, time events are processed.
* flags == AE_TIME_EVENTS, 所有时间事件会被处理
*
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* flags == AE_DONT_WAIT, 处理完事件后直接返回,不阻塞等待
*
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* flags == AE_CALL_AFTER_SLEEP, 处理会调用回调函数
*
* The function returns the number of events processed. */
/**
* 事件处理程序
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// 注意:我们需要调用select函数, 虽然没有文件事件, 但是我们需要处理时间事件,
// 用来在下一次事件前进行sleep

// 如果没有要处理的文件事件,或者设置了时间事件但未设置阻塞标识
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

// 时间事件,并且不阻塞
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 获取最近到达的时间事件
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) { // 获取到时间事件
long now_sec, now_ms;

// 获取当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;

/* How many milliseconds we need to wait for the next
* time event to fire? */
// 计算下一个时间时间到达要等待的时间
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;

if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}

// 未获取到时间事件
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */

// tvp时间设置为0,就不会阻塞
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 阻塞
tvp = NULL; /* wait forever */
}
}

/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 调用多路复用API,等待事件到达
// 如果tvp为NULL,则阻塞,否则等待tvp设置的阻塞时间
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
// sleep等待事件后回调
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 遍历就绪文件事件
for (j = 0; j < numevents; j++) {
// 获取当前就绪文件事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* 通常我们首先执行可读事件, 然后执行可写事件。
* 这种方式很好, 因为有时候我们会在执行查询后立即回复
*
* 但是,如果掩码中设置了AE_BARRIER,则是让我们做相反的事:
* 绝对不要在可读之后进行可写事件。
* 这种情况下,我们反转调用。
* 例如,当我们想在beforeSlee()中做一些事情,就像同步文件到磁盘一样
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

processed++;
}
}
/* Check time events */
// 时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}
  • 执行时间事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);

/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次处理时间事件的时间
eventLoop->lastTime = now;

// 时间事件链表头结点
te = eventLoop->timeEventHead;
// 最大的时间事件id
maxId = eventLoop->timeEventNextId-1;
// 遍历时间事件
while(te) {
long now_sec, now_ms;
long long id;

/* Remove events scheduled for deletion. */
// 如果事件时间已经删除, 释放该事件,并移动指针
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}

/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
// 如果时间事件id大于最大时间事件id,该事件不执行
if (te->id > maxId) {
te = te->next;
continue;
}

// 获取当前时间
aeGetTime(&now_sec, &now_ms);
// 如果当前时间大于或等于事件的执行时间,说明需要执行该事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;

id = te->id;
// 执行事件
retval = te->timeProc(eventLoop, id, te->clientData);
// 时间事件处理次数+1
processed++;

// 该事件是否需要进行执行
if (retval != AE_NOMORE) {
// 需要继续执行,则retval毫秒后进执行
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 标记该事件需要删除
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}