epoll源码学习

源码位置:fs/eventpoll.c

函数使用

函数使用如下。源码分析中也是如此,只保留了关键代码语句。

1
2
3
4
5
6
7
8
9
10
11
12
epoll_create(size);

while (...)
{
/* 等待就绪连接 */
epoll_wait( ... );

/* 如有新连接,构造epoll_event结构体后 */
epoll_ctl( ... EPOLL_CTL_ADD ... );
/* 如有断开连接 */
epoll_ctl( ... EPOLL_CTL_DEL ... );
}

数据结构关系

epoll涉及数据结构关系如下图所示:

epoll文件结构

源码分析

(1) 函数int epoll_crate(int size)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
asmlinkage long sys_epoll_create(int size)
{

int error, fd = -1;
struct eventpoll *ep;
struct inode *inode;
struct file *file;
...
// 为ep分配内存并进行初始化
if (size <= 0 || (error = ep_alloc(&ep)) != 0)
goto error_return;
/*
* 调用anon_inode_getfd新建一个file instance,也就是epoll可以看成一个文件(匿名文件)
* epoll所管理的所有的fd都是放在一个大的结构eventpoll(红黑树)中,
* 将主结构体struce eventpoll *ep放入file->private项中进行保存(sys_epoll_ctl会取用)
*/

error = anon_inode_getfd(&fd, &inode, &file, "[eventpoll]",
&eventpoll_fops, ep);
return fd;

该函数创建一个epoll句柄,size用来告诉内核这个监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
代码结构比较清晰,调用ep_alloc分配一个eventpoll结构,调用anon_inode_getfd创建一个文件节点和文件描述符,并返回文件描述符,这个文件描述符供epoll自己使用。


(2)函数int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create的返回值,第二个参数表示动作,用三个宏来表示:

1
2
3
EPOLL_CTL_ADD;  // 注册新的fd到epfd中
EPOLL_CTL_MOD; // 修改已经注册的fd的监听事件
EPOLL_CTL_DEL; // 从epfd中删除一个fd

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event的结构如下:

1
2
3
4
struct epoll_event{
__uint32_t events; /* epoll events */
epoll_data_t data; /* user data variable */
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
asmlinkage long sys_epoll_ctl(int epfd, int op, int fd,
struct epoll_event __user *event)

{

int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
...
/* 判断参数的合法性,将__user *event复制给epds */
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
file = fget(epfd); // epoll fd 对应的文件对象
tfile = fget(fd); // fd 对应的文件对象
ep = file->private_data;
mutex_lock(&ep->mtx);
/* 防止重复添加(在ep的红黑树中查找是否已存在这个fd) */
epi = ep_find(ep, tfile, fd);
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
// 在ep的红黑树中插入这个fd对应的epitem结构体
error = ep_insert(ep, &epds, tfile, fd);
}
...
}
mutex_unlock(&ep->mtx);
...
}

去除错误检测,剩下的代码也比较清晰,首先取出epoll_create分配的eventpoll结构ep,然后使用ep_find在ep中查找当前操作的文件描述符,接下来有个判断,分不同操作进行,如果是EPOLL_CTL_ADD,则调ep_insert插入文件描述符,如果是EPOLL_CTL_DEL则调用ep_remove删除文件描述符,修改则用ep_modify。

下面我们进入ep_insert()函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{

int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;

// 分配一个epitem结构体来保存每个加入的fd
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
goto error_return;
// 初始化该结构体
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

/* Initialize the poll table using the queue callback */
epq.epi = epi;
// 安装poll回调函数
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数
* ep_ptable_queue_proc (poll_wait中调用)
* 如果fd是套接字, f_op为socket_file_ops,poll函数是sock_poll()。
* 如果是TCP套接字的话,进而会调用tcp_poll()函数。此处调用poll函数查看
* 当前文件描述符的状态,存储在revents中。
* 在poll的处理函数tcp_poll()中, 会调用sock_poll_wait()。在sock_poll_wait中
* 会调用epq.pt.qproc指向的函数,也就是ep_ptable_queue_proc()
*/

revents = tfile->f_op->poll(tfile, &epq.pt);
...
ep_rbtree_insert(ep, epi); // 将该epi插入到ep的红黑树中
// revents & event->events: 刚才fop->poll的返回值中标识的事件有用户event关心的事件发生
// !ep_is_linked(&epi->rdllink): epi的ready队列中有数据.ep_is_linked用于判断队列是否为空。
/*
* 如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的epitem加入到就绪
* 队列中。如果有进程正在等待该文件的状态就绪,则唤醒一个等待的进程。
*/

if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* 如果有进程正在等待文件的状态就绪,也就是调用epoll_wait睡眠的进程正在
* 等待,则唤醒一个等待进程
* waitqueue_active(q) 等待队列q中有等待的进程返回1,否则返回0。
*/

if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
if (waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
/*
* 如果有进程等待eventpoll文件本身的事件就绪,则增加临时变量pwake的值,
* pwake的值不为0时,在释放lock后,会唤醒等待进程
*/

if (waitqueue_active(&ep->poll_wait))
pwake++;
}
...

在插入函数中

1
2
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);

这两个函数将ep_ptable_queue_proc注册到epq.pt中的qproc。

1
2
3
4
typedef struct poll_table_struct {
poll_queue_proc qproc;
unsigned long key;
}poll_table;

执行f_op->poll(tfile, &epq.pt)时,XXX_poll(tfile, &epq.pt)函数会执行poll_wait(),poll_wait()会调用epq.pt.qproc函数,即ep_ptable_queue_proc。
ep_ptable_queue_proc函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{

struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

在ep_ptable_queue_proc函数中,引入了另外一个非常重要的数据结构eppoll_entry。eppoll_entry主要完成epitem和epitem事件发生时的callback(ep_poll_callback)函数之间的关联。首先将eppoll_entry的whead指向fd的设备等待队列(同select中的wait_address),然后初始化eppoll_entry的base变量指向epitem,最后通过add_wait_queue将epoll_entry挂载到fd的设备等待队列上。完成这个动作后,epoll_entry已经被挂载到fd的设备等待队列。

由于ep_ptable_queue_proc函数设置了等待队列的ep_poll_callback回调函数。所以在设备硬件数据到来时,硬件中断处理函数中会唤醒该等待队列上等待的进程时,会调用唤醒函数ep_poll_callback。

1
2
3
4
5
6
7
8
9
10
11
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{

int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
...
//***关键***,将该fd加入到epoll监听的就绪链表中
list_add_tail(&epi->rdllink, &ep->rdllist);
...
}

所以ep_poll_callback函数主要的功能是将被监视文件的等待事件就绪时,将文件对应的epitem实例添加到就绪队列中.


(3) 函数 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
等待事件的产生,类似于select()函数调用。参数events用来从内核得到事件的集合,maxevents告诉内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间。该函数返回需要处理的事件数目,如果返回0表示超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout)
{

int error;
struct file *file;
struct eventpoll *ep;
...
file = fget(epfd);
ep = file->private_data;

/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
...
}

该函数很简单,主要通过调用ep_poll获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)

{

int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
...
/* 没有事件,所以需要睡眠。当有事件到来时,睡眠会被ep_poll_callback函数唤醒 */
if (list_empty(&ep->rdllist)) {
/* 将current进程放在wait这个等待队列中 */
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
/* 将当前进程加入到eventpoll的等待队列中,等待文件
* 状态就绪或直到超时,或被信号中断
*/

__add_wait_queue(&ep->wq, &wait);
for (;;) {
/*
* 执行ep_poll_callback()唤醒时应当需要将当前进程唤醒,
* 所以当前进程状态应该为"可唤醒":TASK_INTERRUPTIBLE
*/

set_current_state(TASK_INTERRUPTIBLE);
/* 如果就绪队列不为空,也就是说已经有文件的状态就绪或者超时,则退出循环 */
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
...
if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;

ep_send_events函数向用户空间发送就绪事件。
ep_send_events()函数将用户传入的内存简单封装到ep_send_events_data结构中,然后调用ep_scan_ready_list()将就绪队列中的事件传入用户空间的内存。用户空间访问这个结果,进行处理。


总结

同过上面的代码浏览,大致清楚了epoll的逻辑:
(1) 通过epoll_create构建了一个文件结构,后续的所有操作都是在这个文件基础上。因此也就没有select中来回在用户空间和内核空间之间拷贝。
(2) epoll_ctl在插入事件时,也为该事件添加了回调函数,当该事件发生时,会被插入就绪队列中。因此也就避免了select的全部遍历事件。
(3) epoll_wait只是返回就绪队列中的事件。

django访问静态文件遇到的问题

在用django做管理后台时,遇到了下面的问题:

1
2
3
4
[16/Sep/2015 15:58:38] "GET /video/static/admin/css/base.css HTTP/1.1" 404 105
[16/Sep/2015 15:58:38] "GET /video/static/admin/css/changelists.css HTTP/1.1" 404 112
[16/Sep/2015 15:58:38] "GET /video/static/admin/js/core.js HTTP/1.1" 404 103
[16/Sep/2015 15:58:38] "GET /video/static/common/change_list.css?v=20150331 HTTP/1.1" 404 109

原因:这些文件是django库自带的一些文件,并没有在我们服务的目录下面,因此找不到。
下面会对python访问静态文件做一些说明。
需要配置一些文件
(1) django_urls.py

1
2
3
4
5
urlpatterns = patterns('',
url(r'^video/admin/', include(admin.site.urls)),
url(r'^video/(?P<path>.*)$', 'pyutil.django.views.static.serve',
{'document_root': '.'}),

document_root的值就是静态文件所在的目录,这里设置为当前目录,既工程所在的目录,在我的程序中就是video。
例如,访问:host/video/static/html/index.html,就会返回video/static/html文件夹下的index.html。
(2) settings.py

1
2
3
4
5
6
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
STATIC_URL = '/video/static/'
STATIC_ROOT = 'static/'
STATICFILES_DIRS = (
os.path.join(BASE_DIR, "webroot/static"),
)

其中,STATIC_URL是我们访问静态文件的路由。这里需要说明下STATIC_ROOTSTATICFILES_DIRS的关系。
STATICFILES_DIRS是存放app对应的静态文件目录。们的服务是其中的一个app,这个目录也就是存放我的服务的静态文件。
配置中,还有其他的app,如下。每个app的静态文件都存放在相应的STATICFILES_DIRS目录中。

1
2
3
4
5
6
7
8
9
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'video.djangosite.service',
)

当 settiing.py中DEBUG = True 时,Django会自动到各个app中找到放在里面的静态文件
如果执行 python manage.py collectstatic 。django就会收集各个app的STATICFILES_DIRS目录下的文件放到STATIC_ROOT中。

注:STATIC_ROOT不能STATICFILES_DIRS相同,或者是其的子目录。否则就会报错:”ImproperlyConfigured: The STATICFILES_DIRS setting should not contain the STATIC_ROOT setting”

针对最初的问题,我们可以在DEBUG模式下运行程序,或者收集静态文件到当前工程的目录下。

select源码学习

函数原型

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* maxfdp1: 指定待检测的描述符的个数
* readset: 读描述符集合
* writeset: 写描述符集合
* execeptset: 异常描述符集合
* timeout: 超时时间(一直、指定时间、不等待)
*
* 返回值:
* 0: 超时
* -1: 错误
* >0: 可进行读、写、异常操作描述符的大小。
*/

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

上面的几个参数都是输入/输出参数,既传递输入值,存储返回结果。


源码分析

源码位置:fs/select.c

入口函数:sys_select

我们在程序中调用select函数时,系统调用该函数。该函数只是处理了下超时时间参数,然后调用core_sys_select函数,最后计算剩余时间值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct timeval __user *tvp)
{

s64 timeout = -1;
struct timeval tv;
int ret;

if (tvp) {
/* 将超时时间拷贝到内核空间 */
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;

/* 非法时间 */
if (tv.tv_sec < 0 || tv.tv_usec < 0)
return -EINVAL;

/* Cast to u64 to make GCC stop complaining */
if ((u64)tv.tv_sec >= (u64)MAX_INT64_SECONDS)
timeout = -1; /* infinite */
else {
/* 计算出超时时间(转换为始终周期数) */
timeout = DIV_ROUND_UP(tv.tv_usec, USEC_PER_SEC/HZ);
timeout += tv.tv_sec * HZ;
}
}

ret = core_sys_select(n, inp, outp, exp, &timeout);
if (tvp) {
struct timeval rtv;

if (current->personality & STICKY_TIMEOUTS)
goto sticky;
/* rtv 是剩余值*/
rtv.tv_usec = jiffies_to_usecs(do_div((*(u64*)&timeout), HZ));
rtv.tv_sec = timeout;
if (timeval_compare(&rtv, &tv) >= 0)
rtv =smlinkage long sys_select() tv;
if (copy_to_user(tvp, &rtv, sizeof(rtv))) {
sticky:
/*
* If an application puts its timeval in read-only
* memory, we don't want the Linux-specific update to
* the timeval to cause a fault after the select has
* completed successfully. However, because we're not
* updating the timeval, we can't restart the system
* call.
*/

if (ret == -ERESTARTNOHAND)
ret = -EINTR;
}
}

return ret;
}

函数core_sys_select

该函数主要做了以下事情:

  1. fix传入的n值,使其不会超过文件描述符的最大值。这就是select能同时处理的连接受限的原因。
  2. 为描述符分配空间,将描述符从用户空间拷贝到内核空间。
  3. 调用do_select执行真正的IO复用。
  4. 将3的结果从用户内核空间拷贝的用户空间,返回给程序。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    static int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, s64 *timeout)
    {

    fd_set_bits fds; /* 参考后面的数据结构 */
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /* Allocate small arguments on the stack to save memory and be faster */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    ret = -EINVAL;
    if (n < 0)
    goto out_nofds;

    /* max_fds can increase, so grab it once to avoid race */
    rcu_read_lock();
    /* 获取当前进程的文件描述符表 */
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    /* 修正用户传入的第一个参数:fd_set中文件描述符的最大值 */
    if (n > max_fds)
    n = max_fds;

    /*
    * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
    * since we used fdset we need to allocate memory in units of
    * long-words.
    */

    size = FDS_BYTES(n); //n个bit位需要的字节数
    bits = stack_fds;
    /* 读、写、异常,输入&输出,共需6倍大小的描述符空间 */
    if (size > sizeof(stack_fds) / 6) {
    /* Not enough space in on-stack array; must use kmalloc */
    ret = -ENOMEM;
    bits = kmalloc(6 * size, GFP_KERNEL);
    if (!bits)
    goto out_nofds;
    }
    fds.in = bits;
    fds.out = bits + size;
    fds.ex = bits + 2*size;
    fds.res_in = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex = bits + 5*size;

    /* get_fd_set仅仅调用copy_from_user从用户空间拷贝了fd_set */
    if ((ret = get_fd_set(n, inp, fds.in)) ||
    (ret = get_fd_set(n, outp, fds.out)) ||
    (ret = get_fd_set(n, exp, fds.ex)))
    goto out;
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    ret = do_select(n, &fds, timeout);

    if (ret < 0)
    goto out;
    if (!ret) {
    ret = -ERESTARTNOHAND;
    if (signal_pending(current))
    goto out;
    ret = 0;
    }

    /*
    * 把结果集拷贝到用户空间
    */

    if (set_fd_set(n, inp, fds.res_in) ||
    set_fd_set(n, outp, fds.res_out) ||
    set_fd_set(n, exp, fds.res_ex))
    ret = -EFAULT;

    out:
    if (bits != stack_fds)
    kfree(bits);
    out_nofds:
    return ret;
    }
函数do_select

该函数会遍历所有的描述符,返回有事件发生的描述符。其中涉及到poll_wait相关的进程休眠、唤醒相关的代码没有去看,但这不影响我们对select的理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{

struct poll_wqueues table;
poll_table *wait;
int retval, i;

rcu_read_lock();
/*
* 根据已经打开fd的位图检查用户打开的fd,要求对应的fd必须打开,
* 并且返回最大的fd
*/

retval = max_select_fd(n, fds);
rcu_read_unlock();

if (retval < 0)
return retval;
n = retval;

/*将当前进程放入自已的等待队列table, 并将该等待队列加入到该测试表wait*/
poll_initwait(&table);
wait = &table.pt;
if (!*timeout)
wait = NULL;
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
long __timeout;

set_current_state(TASK_INTERRUPTIBLE);

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;

in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
/* 由于数据描述符表是用的long,所以每次i要加上long所占的bit位数 */
if (all_bits == 0) {
i += __NFDBITS;
continue;
/* 有事件发生,遍历每一个bit */
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed);
if (file) {
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
fput_light(file, fput_needed);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
cond_resched();
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
wait = NULL;
if (retval || !*timeout || signal_pending(current))
break;
if(table.error) {
retval = table.error;
break;
}
if (*timeout < 0) {
/* Wait indefinitely */
__timeout = MAX_SCHEDULE_TIMEOUT;
} else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT - 1)) {
/* Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in a loop */
__timeout = MAX_SCHEDULE_TIMEOUT - 1;
*timeout -= __timeout;
} else {
__timeout = *timeout;
*timeout = 0;
}
__timeout = schedule_timeout(__timeout);
if (*timeout >= 0)
*timeout += __timeout;
}
__set_current_state(TASK_RUNNING);

poll_freewait(&table);

return retval;
}

总结

select主要做了3件事情:

  1. 将需要监测的描述符从用户空间拷贝到内核空间;
  2. 遍历描述符,返回有事件发生的描述符;
  3. 将发生的描述符从内核空间拷贝到用户空间。

由于需要将全部描述符在内核空间和用户空间的来回拷贝、以及遍历,造成了效率的不高;而且描述符的个数也受系统最大文件描述符的限制。


一些数据数据结构
fd_set_bits

1
2
3
4
5
6
7
8
9
10
11
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

/*
* How many longwords for "nr" bits?
*/

#define FDS_BITPERLONG (8*sizeof(long))
#define FDS_LONGS(nr) ((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)
#define FDS_BYTES(nr) (FDS_LONG(nr)*sizeof(long))

fd_set_bits,标识出可读、可写、异常描述符表的输入和输出。
下面三个宏分别表示:long类型的bit数、nr个bit位需要几个long类型存放、nr个bt位存放需要的字节数。

1
2
3
4
typedef stuct poll_table_stuct{
poll_queue_proc qproc;
unsigned long key;
} poll_table;

poll_table:对每个文件进行poll操作时,判读是否能够非阻塞的进行key值(poll事件组成)标识的I/O操作;如果不能,则调用回调函数qproc将进程添加到文件的poll等待队列中。

1
2
3
4
5
6
struct poll_table_entry {
stuct file *file;
unsigned long key;
wait_queue_t wait;
wait_queue_head_t *wait_address;
}

poll_table_entry: 用于阻塞进程并将进程添加到文件的poll等待队列中,一个文件对应一个poll_table_entry。

1
2
3
4
5
6
7
8
9
stuct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task;
int triggered;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

poll_wqueues: 用于在select/poll时,如果需要阻塞进程,将进程添加到描述表标识的所有文件的poll等待队列中,以便任意一个文件可进行非阻塞I/O操作时唤醒进程。

listen和accept中的sokcet关系

我们学习网络编程时,一个服务端程序通常是:socket()->bind()->listen(),当有客户端程序connect()时,服务端accept()处理。下面是我在学习中遇到的几个问题,以及理解。

问题1:客户端和服务端的socket是怎么通信的?

我们知道,socket有一个四元组:(目的ip, 目的port、源ip,源port)。这个四元组就可以保证客户端和服务端之间可以通信。
比如服务端是个80的web服务,ip是1.1.1.1。服务端启动时会生成一个socket绑定到80端口监听。这时一个ip为 2.2.2.2的客户端要去访问这个服务端时,先要生成一个socket。系统会为这个socket选择一个端口,比如65535.那么客户端通个(1.1.1.1, 80, 2.2.2.2, 65535)通过connect()访问服务端。

问题2:accept中的socket和listen监听的socket的端口相同吗?

客户端通过connect()访问服务端80,通过3次握手后,进入accept队列。服务端调用accept()时,会生成一个新的socket和客户端通信,之前的socket仍然继续监听80端口。那么这个新生成的端口还是80吗?答案是肯定的,否则客户端那边的socket就无法和新生成的socket通信。看accept源码,会发现新生成的socket会拷贝监听socket的信息。因此两者的端口号相同。

问题3:服务端是怎么区分客户端的信息是给listen的socket还是accept的socket的?

如果监听的socket和新生成的socket都是使用80端口,那么客户端发给80端口的信息,怎么区分是给哪个socket?是通过客户端port做路由的。listen使用的socket是没有客户端信息的,它的客户端端口为*;而accept时新生成的socket是有客户端端口号。如下图,第一行是监听的socket,第二行是accet后生成的socket。因此通过四元组中的客户端port就可将客户端信息路由到了正确的socket上。

1
2
3
wangzhilong@in17-164^:/mnt/mfs/$ netstat --tcp -lan | grep 8787
tcp 0 0 0.0.0.0:8787 0.0.0.0:* LISTEN
tcp 0 0 10.4.17.164:8787 10.2.22.31:64285 TIME_WAIT

kafka笔记

最近有使用kafka,顺便学习下kafka,这个博客写的很详细。http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
下面记录几条刚开始没有理解的点,方便以后回忆。

1 topic和particion关系

topic在逻辑上是一个队列,每条消费者必须指定它的topic。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序
topic被发到哪个particion是通过消息中的key进行hash决定的。

2 particion和consumer关系

一个particion最多只能由一个consumer去消费,但一个consumer可以消费多个particion。这也保证了一个particion中消息是顺序消费的,但如果consumer数量大于particion数量,多出来的consumer是浪费的。

3 topic和consumer group(CG)关系

这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
既:一条消息只能一个CG消费一次,但可以被多个CG消费。

4 消费状态怎么记录

Kafka会为每一个consumergroup保留一些metadata信息:当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumergroup只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。