5

Linux epoll & Node.js Event Loop & I / O复用

 3 years ago
source link: https://zhuanlan.zhihu.com/p/35958023
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Linux epoll & Node.js Event Loop & I / O复用

selectpollepoll都是I / O复用的机制,在《UNIX网络编程》里重点讲了selectpoll的机制,但selectpoll并不是现代高性能服务器的最佳选择。包括现在的Node.js中的事件循环机制(event loop)也是基于epoll实现的。

select和poll的缺点

按照《UNIX网络编程》中所述,pollselect类似,没有解决以下的问题:

  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  • select支持的文件描述符数量太小了,默认是1024

epoll对于上述缺点的改进

epoll既然是对selectpoll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epollselectpoll的调用接口上的不同,selectpoll都只提供了一个函数——select或者poll函数。而epoll提供了三个函数,epoll_create,epoll_ctlepoll_waitepoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。
  对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次。
  对于第二个缺点,epoll的解决方案不像selectpoll一样每次都把current轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
  对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,一般来说这个数目和系统内存关系很大。

epoll接口

epoll操作过程需要三个接口,分别如下:

#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll_create方法

#include <sys/epoll.h>
int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

#include <sys/epoll.h>
#define FDSIZE 1024
// ...
int main(int argc,char *argv[])
{
  int epollfd = epoll_create(FDSIZE); // 这里并不是指最大文件描述符数量为1024,而是给内核初始化数据结构的一个建议。
  return 0;
}

epoll_ctl方法

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl方法是epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

  • epfd:是epoll_create()的返回值。
  • op:表示对对应的fd文件描述符的操作,一般情况下表示想要监听事件、删除事件和修改事件处理函数,用三个宏来表示:
    • EPOLL_CTL_ADD,表示对于对应的fd文件描述符添加一组事件监听;
    • EPOLL_CTL_DEL,表示对于对应的fd文件描述符删除该组事件监听;
    • EPOLL_CTL_MOD,表示对于对应的fd文件描述符修改该组事件监听为新的events
  • fd:表示需要监听的fd(文件描述符)
  • event:是告诉内核需要监听的事件集合,传入一个指针,指向事件集合的第一项,struct epoll_event的结构如下:
struct epoll_event {
  __uint32_t events;  // 表示一类epoll事件
  epoll_data_t data;  // 用户传递的数据
}

因为events表示一类epoll事件,它可以是以下几个宏的集合:

  • EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  • EPOLLOUT:表示对应的文件描述符可以写;
  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的;
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里;

例如,如果想让epoll对于对应的文件描述符fd添加一组事件,监听对应的文件描述符可读的情况:

static void add_event_epoll_in(int epollfd, int fd)
{
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd;
  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

epoll_wait方法

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

epoll_wait方法等待事件的产生,类似于select()调用,返回需要处理的事件数目。

  • epfd:是epoll_create()的返回值;
  • events:用来从内核得到事件的集合,我们一般把需要处理的事件对应的文件描述符fd放到events结构体下data参数内,这样我们可以在事件处理函数中取到对应的文件描述符fd,执行对应操作(例如对于TCP套接字,我们调用readwrite等);
  • maxevents:告之内核这个events的数量,这个maxevents的值不能大于创建epoll_create()时的size,否则会造成溢出的风险;
  • timeout:超时时间,以毫秒为单位,如果设置为-1,表示一直等待,设置为0表示不等待;

举例:应用程序一般阻塞与epoll_wait调用,一旦events中任意一事件触发,epoll_wait执行,等待指定的timeout超时时间,如果I / O完成则立即返回需要处理的事件数目。

static void do_epoll(int listenfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //创建一个描述符
    epollfd = epoll_create(FDSIZE);
    //添加监听描述符事件
    add_event_epoll_in(epollfd, listenfd);
    for ( ; ; )
    {
        // 获取已经准备好的描述符事件数目
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd, events, ret, listenfd, buf);
    }
    close(epollfd);
}

static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        // 在这里取到需要处理的文件描述符
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}

使用epoll重构服务器回射程序

// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

//函数声明
//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);
//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);
//读处理
static void do_read(int epollfd,int fd,char *buf);
//写处理
static void do_write(int epollfd,int fd,char *buf);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);

int main(int argc,char *argv[])
{
    int  listenfd;
    listenfd = socket_bind(IPADDRESS,PORT);
    listen(listenfd,LISTENQ);
    do_epoll(listenfd);
    return 0;
}

static int socket_bind(const char* ip,int port)
{
    int  listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET,SOCK_STREAM,0);
    if (listenfd == -1)
    {
        perror("socket error:");
        exit(1);
    }
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET,ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error: ");
        exit(1);
    }
    return listenfd;
}

static void do_epoll(int listenfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //创建一个描述符
    epollfd = epoll_create(FDSIZE);
    //添加监听描述符事件
    add_event(epollfd,listenfd,EPOLLIN);
    for ( ; ; )
    {
        //获取已经准备好的描述符事件
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,listenfd,buf);
    }
    close(epollfd);
}

static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
static void handle_accpet(int epollfd,int listenfd)
{
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t  cliaddrlen;
    clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
    if (clifd == -1)
        perror("accpet error:");
    else
    {
        printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
        //添加一个客户描述符和事件
        add_event(epollfd,clifd,EPOLLIN);
    }
}

static void do_read(int epollfd,int fd,char *buf)
{
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)
    {
        perror("read error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"client close.\n");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else
    {
        printf("read message is : %s",buf);
        //修改描述符对应的事件,由读改为写
        modify_event(epollfd,fd,EPOLLOUT);
    }
}

static void do_write(int epollfd,int fd,char *buf)
{
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLOUT);
    }
    else
        modify_event(epollfd,fd,EPOLLIN);
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
// client.c
#include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>

#define MAXSIZE     1024
#define IPADDRESS   "127.0.0.1"
#define SERV_PORT   8787
#define FDSIZE        1024
#define EPOLLEVENTS 20

static void handle_connection(int sockfd);
static void
handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_read(int epollfd,int fd,int sockfd,char *buf);
static void do_write(int epollfd,int fd,int sockfd,char *buf);
static void add_event(int epollfd,int fd,int state);
static void delete_event(int epollfd,int fd,int state);
static void modify_event(int epollfd,int fd,int state);

int main(int argc,char *argv[])
{
    int                 sockfd;
    struct sockaddr_in  servaddr;
    sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(SERV_PORT);
    inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr);
    connect(sockfd,(struct sockaddr*)&servaddr,sizeof(servaddr));
    //处理连接
    handle_connection(sockfd);
    close(sockfd);
    return 0;
}


static void handle_connection(int sockfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    char buf[MAXSIZE];
    int ret;
    epollfd = epoll_create(FDSIZE);
    add_event(epollfd,STDIN_FILENO,EPOLLIN);
    for ( ; ; )
    {
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,sockfd,buf);
    }
    close(epollfd);
}

static void
handle_events(int epollfd,struct epoll_event *events,int num,int sockfd,char *buf)
{
    int fd;
    int i;
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,sockfd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,sockfd,buf);
    }
}

static void do_read(int epollfd,int fd,int sockfd,char *buf)
{
    int nread;
    nread = read(fd,buf,MAXSIZE);
        if (nread == -1)
    {
        perror("read error:");
        close(fd);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"server close.\n");
        close(fd);
    }
    else
    {
        if (fd == STDIN_FILENO)
            add_event(epollfd,sockfd,EPOLLOUT);
        else
        {
            delete_event(epollfd,sockfd,EPOLLIN);
            add_event(epollfd,STDOUT_FILENO,EPOLLOUT);
        }
    }
}

static void do_write(int epollfd,int fd,int sockfd,char *buf)
{
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
    }
    else
    {
        if (fd == STDOUT_FILENO)
            delete_event(epollfd,fd,EPOLLOUT);
        else
            modify_event(epollfd,fd,EPOLLIN);
    }
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

运行结果:

Node.js的Event Loop

众所周知,Node.js是单线程的,可用作高性能服务器。显然,若仅仅限定为1024个描述符,对于高并发的请求显然是不支持的。Node.js内部的Event Demultiplexer(事件多路分解器)借助epoll来实现事件循环机制。其基本步骤如下:

  1. 应用程序通过向Event Demultiplexer(事件多路分解器)提交请求来生成新的I / O操作。应用程序还指定一个处理程序,当操作完成时将调用该处理程序。向Event Demultiplexer(事件多路分解器)提交新请求是一种非阻塞调用,它立即将控制权返回给该应用程序。
  2. 当一组I / O操作完成时,事件多路分解器将新的事件推入Event Queue(事件队列)
  3. 此时Event Loop遍历Event Queue的项目。
  4. 对于每个事件,调用关联的处理程序。
  5. 处理程序是应用程序代码的一部分,当它执行完成时将控制权返回给Event Loop。但是,在处理程序执行过程中可能会请求新的异步操作,从而导致新的操作被插入Event Demultiplexer(事件多路分解器)
  6. Event Loop中的所有项目被处理完时,循环将再次阻塞Event Demultiplexer(事件多路分解器),当有新事件可用时,Event Demultiplexer(事件多路分解器)将触发另一个周期。

通过epoll解析上述步骤操作:

  • 事件多路分解器即为通过epoll_create()创建的epoll句柄的抽象,在Node.js启动时,事件多路分解器会阻塞于epoll_wait调用。
  • 对于第一步,注册事件处理程序时,调用epoll_ctl()方法,设定op参数为EPOLL_CTL_ADD,向事件多路分解器添加一组事件。
  • 对于第二步,一旦I / O完成,又调用epoll_ctl()方法,设定op参数为EPOLL_CTL_DEL,删除对应事件,此时把控制权返还给应用程序。
  • 事件循环遍历事件队列,只要没有事件,就阻塞于epoll_wait()
  • 不断重复上述步骤,实现Node.js`的事件循环机制。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK