epoll源码学习

源码位置:fs/eventpoll.c

函数使用

函数使用如下。源码分析中也是如此,只保留了关键代码语句。

1
2
3
4
5
6
7
8
9
10
11
12
epoll_create(size);

while (...)
{
/* 等待就绪连接 */
epoll_wait( ... );

/* 如有新连接,构造epoll_event结构体后 */
epoll_ctl( ... EPOLL_CTL_ADD ... );
/* 如有断开连接 */
epoll_ctl( ... EPOLL_CTL_DEL ... );
}

数据结构关系

epoll涉及数据结构关系如下图所示:

epoll文件结构

源码分析

(1) 函数int epoll_crate(int size)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
asmlinkage long sys_epoll_create(int size)
{

int error, fd = -1;
struct eventpoll *ep;
struct inode *inode;
struct file *file;
...
// 为ep分配内存并进行初始化
if (size <= 0 || (error = ep_alloc(&ep)) != 0)
goto error_return;
/*
* 调用anon_inode_getfd新建一个file instance,也就是epoll可以看成一个文件(匿名文件)
* epoll所管理的所有的fd都是放在一个大的结构eventpoll(红黑树)中,
* 将主结构体struce eventpoll *ep放入file->private项中进行保存(sys_epoll_ctl会取用)
*/

error = anon_inode_getfd(&fd, &inode, &file, "[eventpoll]",
&eventpoll_fops, ep);
return fd;

该函数创建一个epoll句柄,size用来告诉内核这个监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
代码结构比较清晰,调用ep_alloc分配一个eventpoll结构,调用anon_inode_getfd创建一个文件节点和文件描述符,并返回文件描述符,这个文件描述符供epoll自己使用。


(2)函数int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create的返回值,第二个参数表示动作,用三个宏来表示:

1
2
3
EPOLL_CTL_ADD;  // 注册新的fd到epfd中
EPOLL_CTL_MOD; // 修改已经注册的fd的监听事件
EPOLL_CTL_DEL; // 从epfd中删除一个fd

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event的结构如下:

1
2
3
4
struct epoll_event{
__uint32_t events; /* epoll events */
epoll_data_t data; /* user data variable */
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd,
struct epoll_event __user *event)

{

int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
...
/* 判断参数的合法性,将__user *event复制给epds */
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
file = fget(epfd); // epoll fd 对应的文件对象
tfile = fget(fd); // fd 对应的文件对象
ep = file->private_data;
mutex_lock(&ep->mtx);
/* 防止重复添加(在ep的红黑树中查找是否已存在这个fd) */
epi = ep_find(ep, tfile, fd);
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
// 在ep的红黑树中插入这个fd对应的epitem结构体
error = ep_insert(ep, &epds, tfile, fd);
}
...
}
mutex_unlock(&ep->mtx);
...
}

去除错误检测,剩下的代码也比较清晰,首先取出epoll_create分配的eventpoll结构ep,然后使用ep_find在ep中查找当前操作的文件描述符,接下来有个判断,分不同操作进行,如果是EPOLL_CTL_ADD,则调ep_insert插入文件描述符,如果是EPOLL_CTL_DEL则调用ep_remove删除文件描述符,修改则用ep_modify。

下面我们进入ep_insert()函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{

int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;

// 分配一个epitem结构体来保存每个加入的fd
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
goto error_return;
// 初始化该结构体
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

/* Initialize the poll table using the queue callback */
epq.epi = epi;
// 安装poll回调函数
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数
* ep_ptable_queue_proc (poll_wait中调用)
* 如果fd是套接字, f_op为socket_file_ops,poll函数是sock_poll()。
* 如果是TCP套接字的话,进而会调用tcp_poll()函数。此处调用poll函数查看
* 当前文件描述符的状态,存储在revents中。
* 在poll的处理函数tcp_poll()中, 会调用sock_poll_wait()。在sock_poll_wait中
* 会调用epq.pt.qproc指向的函数,也就是ep_ptable_queue_proc()
*/

revents = tfile->f_op->poll(tfile, &epq.pt);
...
ep_rbtree_insert(ep, epi); // 将该epi插入到ep的红黑树中
// revents & event->events: 刚才fop->poll的返回值中标识的事件有用户event关心的事件发生
// !ep_is_linked(&epi->rdllink): epi的ready队列中有数据.ep_is_linked用于判断队列是否为空。
/*
* 如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的epitem加入到就绪
* 队列中。如果有进程正在等待该文件的状态就绪,则唤醒一个等待的进程。
*/

if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* 如果有进程正在等待文件的状态就绪,也就是调用epoll_wait睡眠的进程正在
* 等待,则唤醒一个等待进程
* waitqueue_active(q) 等待队列q中有等待的进程返回1,否则返回0。
*/

if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
/*
* 如果有进程等待eventpoll文件本身的事件就绪,则增加临时变量pwake的值,
* pwake的值不为0时,在释放lock后,会唤醒等待进程
*/

if (waitqueue_active(&ep->poll_wait))
pwake++;
}
...

在插入函数中

1
2
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);

这两个函数将ep_ptable_queue_proc注册到epq.pt中的qproc。

1
2
3
4
typedef struct poll_table_struct {
poll_queue_proc qproc;
unsigned long key;
}poll_table;

执行f_op->poll(tfile, &epq.pt)时,XXX_poll(tfile, &epq.pt)函数会执行poll_wait(),poll_wait()会调用epq.pt.qproc函数,即ep_ptable_queue_proc。
ep_ptable_queue_proc函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{

struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

在ep_ptable_queue_proc函数中,引入了另外一个非常重要的数据结构eppoll_entry。eppoll_entry主要完成epitem和epitem事件发生时的callback(ep_poll_callback)函数之间的关联。首先将eppoll_entry的whead指向fd的设备等待队列(同select中的wait_address),然后初始化eppoll_entry的base变量指向epitem,最后通过add_wait_queue将epoll_entry挂载到fd的设备等待队列上。完成这个动作后,epoll_entry已经被挂载到fd的设备等待队列。

由于ep_ptable_queue_proc函数设置了等待队列的ep_poll_callback回调函数。所以在设备硬件数据到来时,硬件中断处理函数中会唤醒该等待队列上等待的进程时,会调用唤醒函数ep_poll_callback。

1
2
3
4
5
6
7
8
9
10
11
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{

int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
...
//***关键***,将该fd加入到epoll监听的就绪链表中
list_add_tail(&epi->rdllink, &ep->rdllist);
...
}

所以ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中.


(3) 函数 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
等待事件的产生,类似于select()函数调用。参数events用来从内核得到事件的集合,maxevents告诉内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间。该函数返回需要处理的事件数目,如果返回0表示超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout)
{

int error;
struct file *file;
struct eventpoll *ep;
...
file = fget(epfd);
ep = file->private_data;

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
...
}

该函数很简单,主要通过调用ep_poll获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)

{

int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
...
/* 没有事件,所以需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒 */
if (list_empty(&ep->rdllist)) {
/* 将current进程放在wait这个等待队列中 */
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
/* 将当前进程加入到eventpoll的等待队列中,等待文件
* 状态就绪或直到超时,或被信号中断
*/

__add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* 执行ep_poll_callback()唤醒时应当需要将当前进程唤醒,
* 所以当前进程状态应该为"可唤醒":TASK_INTERRUPTIBLE
*/

set_current_state(TASK_INTERRUPTIBLE);
/* 如果就绪队列不为空,也就是说已经有文件的状态就绪或者超时,则退出循环 */
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
...
if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;

ep_send_events函数向用户空间发送就绪事件。
ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。用户空间访问这个结果,进行处理。


总结

同过上面的代码浏览,大致清楚了epoll的逻辑:
(1) 通过epoll_create构建了一个文件结构,后续的所有操作都是在这个文件基础上。因此也就没有select中来回在用户空间和内核空间之间拷贝。
(2) epoll_ctl在插入事件时,也为该事件添加了回调函数,当该事件发生时,会被插入就绪队列中。因此也就避免了select的全部遍历事件。
(3) epoll_wait只是返回就绪队列中的事件。