文章目录
前言select()和poll() IO多路复用模型select的缺点poll的问题
epoll IO多路复用模型实现机制epoll编程实例
前言
在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等IO多路复用的方法来实现并发服务程序。在大数据、高并发、集群等一些名词唱得火热之年代,select和poll的用武之地越来越有限,风头已经被epoll占尽。
本文便来介绍epoll的实现机制,并附带讲解一下select和poll。通过对比其不同的实现机制,真正理解为何epoll能实现高并发。
select()和poll() IO多路复用模型
select的缺点
单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define __FD_SETSIZE 1024) 内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销; select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件; select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
poll的问题
相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。
拿select模型为例,假设我们的服务器需要支持100万的并发连接,则在__FD_SETSIZE 为1024的情况下,则我们至少需要开辟1k个进程才能实现100万的并发连接。除了进程间上下文切换的时间消耗外,从内核/用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。因此,基于select模型的服务器程序,要达到10万级别的并发访问,是一个很难完成的任务。 因此,该epoll上场了。
epoll IO多路复用模型实现机制
由于epoll的实现机制与select/poll机制完全不同,上面所说的 select的缺点在epoll上不复存在。
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?
在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。
epoll的设计和实现与select完全不同。 epoll通过在Linux内核中申请一个简易的文件系统(文件系统一般用什么数据结构实现?B+树)。 把原先的select/poll调用分成了3个部分:
(1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
(2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
(3)调用epoll_wait收集发生的事件的连接
如此一来,要实现上面说是的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
下面来看看Linux内核具体的epoll机制实现思路。
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。eventpoll结构体如下所示:
struct eventpoll
{
....
struct rb_root rbr
;
struct list_head rdlist
;
....
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:
struct epitem
{
struct rb_node rbn
;
struct list_head rdllink
;
struct epoll_filefd ffd
;
struct eventpoll
*ep
;
struct epoll_event event
;
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。 epoll数据结构示意图如下:
从上面的讲解可知:通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。
OK,讲解完了Epoll的机理,我们便能很容易掌握epoll的用法了。
一句话描述就是:三步曲
第一步:epoll_create()系统调用。此调用返回一个句柄,之后所有的使用都依靠这个句柄来标识。
第二步:epoll_ctl()系统调用。通过此调用向epoll对象中添加、删除、修改感兴趣的事件,返回0标识成功,返回-1表示失败。
第三部:epoll_wait()系统调用。通过此调用收集收集在epoll监控中已经发生的事件。
epoll编程实例
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std
;
#define MAX_EVENTS 500
struct myevent_s
{
int fd
;
void (*call_back
)(int fd
, int events
, void *arg
);
int events
;
void *arg
;
int status
;
char buff
[128];
int len
, s_offset
;
long last_active
;
};
void EventSet(myevent_s
*ev
, int fd
, void (*call_back
)(int, int, void*), void *arg
)
{
ev
->fd
= fd
;
ev
->call_back
= call_back
;
ev
->events
= 0;
ev
->arg
= arg
;
ev
->status
= 0;
bzero(ev
->buff
, sizeof(ev
->buff
));
ev
->s_offset
= 0;
ev
->len
= 0;
ev
->last_active
= time(NULL);
}
void EventAdd(int epollFd
, int events
, myevent_s
*ev
)
{
struct epoll_event epv
= {0, {0}};
int op
;
epv
.data
.ptr
= ev
;
epv
.events
= ev
->events
= events
;
if(ev
->status
== 1){
op
= EPOLL_CTL_MOD
;
}
else{
op
= EPOLL_CTL_ADD
;
ev
->status
= 1;
}
if(epoll_ctl(epollFd
, op
, ev
->fd
, &epv
) < 0)
printf("Event Add failed[fd=%d], evnets[%d]\n", ev
->fd
, events
);
else
printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev
->fd
, op
, events
);
}
void EventDel(int epollFd
, myevent_s
*ev
)
{
struct epoll_event epv
= {0, {0}};
if(ev
->status
!= 1) return;
epv
.data
.ptr
= ev
;
ev
->status
= 0;
epoll_ctl(epollFd
, EPOLL_CTL_DEL
, ev
->fd
, &epv
);
}
int g_epollFd
;
myevent_s g_Events
[MAX_EVENTS
+1];
void RecvData(int fd
, int events
, void *arg
);
void SendData(int fd
, int events
, void *arg
);
void AcceptConn(int fd
, int events
, void *arg
)
{
struct sockaddr_in sin
;
socklen_t len
= sizeof(struct sockaddr_in
);
int nfd
, i
;
if((nfd
= accept(fd
, (struct sockaddr
*)&sin
, &len
)) == -1)
{
if(errno
!= EAGAIN
&& errno
!= EINTR
)
{
}
printf("%s: accept, %d", __func__, errno
);
return;
}
do
{
for(i
= 0; i
< MAX_EVENTS
; i
++)
{
if(g_Events
[i
].status
== 0)
{
break;
}
}
if(i
== MAX_EVENTS
)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS
);
break;
}
int iret
= 0;
if((iret
= fcntl(nfd
, F_SETFL
, O_NONBLOCK
)) < 0)
{
printf("%s: fcntl nonblocking failed:%d", __func__, iret
);
break;
}
EventSet(&g_Events
[i
], nfd
, RecvData
, &g_Events
[i
]);
EventAdd(g_epollFd
, EPOLLIN
, &g_Events
[i
]);
}while(0);
printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin
.sin_addr
),
ntohs(sin
.sin_port
), g_Events
[i
].last_active
, i
);
}
void RecvData(int fd
, int events
, void *arg
)
{
struct myevent_s
*ev
= (struct myevent_s
*)arg
;
int len
;
len
= recv(fd
, ev
->buff
+ev
->len
, sizeof(ev
->buff
)-1-ev
->len
, 0);
EventDel(g_epollFd
, ev
);
if(len
> 0)
{
ev
->len
+= len
;
ev
->buff
[len
] = '\0';
printf("C[%d]:%s\n", fd
, ev
->buff
);
EventSet(ev
, fd
, SendData
, ev
);
EventAdd(g_epollFd
, EPOLLOUT
, ev
);
}
else if(len
== 0)
{
close(ev
->fd
);
printf("[fd=%d] pos[%d], closed gracefully.\n", fd
, ev
-g_Events
);
}
else
{
close(ev
->fd
);
printf("recv[fd=%d] error[%d]:%s\n", fd
, errno
, strerror(errno
));
}
}
void SendData(int fd
, int events
, void *arg
)
{
struct myevent_s
*ev
= (struct myevent_s
*)arg
;
int len
;
len
= send(fd
, ev
->buff
+ ev
->s_offset
, ev
->len
- ev
->s_offset
, 0);
if(len
> 0)
{
printf("send[fd=%d], [%d<->%d]%s\n", fd
, len
, ev
->len
, ev
->buff
);
ev
->s_offset
+= len
;
if(ev
->s_offset
== ev
->len
)
{
EventDel(g_epollFd
, ev
);
EventSet(ev
, fd
, RecvData
, ev
);
EventAdd(g_epollFd
, EPOLLIN
, ev
);
}
}
else
{
close(ev
->fd
);
EventDel(g_epollFd
, ev
);
printf("send[fd=%d] error[%d]\n", fd
, errno
);
}
}
void InitListenSocket(int epollFd
, short port
)
{
int listenFd
= socket(AF_INET
, SOCK_STREAM
, 0);
fcntl(listenFd
, F_SETFL
, O_NONBLOCK
);
printf("server listen fd=%d\n", listenFd
);
EventSet(&g_Events
[MAX_EVENTS
], listenFd
, AcceptConn
, &g_Events
[MAX_EVENTS
]);
EventAdd(epollFd
, EPOLLIN
, &g_Events
[MAX_EVENTS
]);
sockaddr_in sin
;
bzero(&sin
, sizeof(sin
));
sin
.sin_family
= AF_INET
;
sin
.sin_addr
.s_addr
= INADDR_ANY
;
sin
.sin_port
= htons(port
);
bind(listenFd
, (const sockaddr
*)&sin
, sizeof(sin
));
listen(listenFd
, 5);
}
int main(int argc
, char **argv
)
{
unsigned short port
= 12345;
if(argc
== 2){
port
= atoi(argv
[1]);
}
g_epollFd
= epoll_create(MAX_EVENTS
);
if(g_epollFd
<= 0) printf("create epoll failed.%d\n", g_epollFd
);
InitListenSocket(g_epollFd
, port
);
struct epoll_event events
[MAX_EVENTS
];
printf("server running:port[%d]\n", port
);
int checkPos
= 0;
while(1){
long now
= time(NULL);
for(int i
= 0; i
< 100; i
++, checkPos
++)
{
if(checkPos
== MAX_EVENTS
) checkPos
= 0;
if(g_Events
[checkPos
].status
!= 1) continue;
long duration
= now
- g_Events
[checkPos
].last_active
;
if(duration
>= 60)
{
close(g_Events
[checkPos
].fd
);
printf("[fd=%d] timeout[%d--%d].\n", g_Events
[checkPos
].fd
, g_Events
[checkPos
].last_active
, now
);
EventDel(g_epollFd
, &g_Events
[checkPos
]);
}
}
int fds
= epoll_wait(g_epollFd
, events
, MAX_EVENTS
, 1000);
if(fds
< 0){
printf("epoll_wait error, exit\n");
break;
}
for(int i
= 0; i
< fds
; i
++){
myevent_s
*ev
= (struct myevent_s
*)events
[i
].data
.ptr
;
if((events
[i
].events
&EPOLLIN
)&&(ev
->events
&EPOLLIN
))
{
ev
->call_back(ev
->fd
, events
[i
].events
, ev
->arg
);
}
if((events
[i
].events
&EPOLLOUT
)&&(ev
->events
&EPOLLOUT
))
{
ev
->call_back(ev
->fd
, events
[i
].events
, ev
->arg
);
}
}
}
return 0;
}
原文:https://blog.csdn.net/davidsguo008/article/details/73556811