11

学习libevent的select模型

 4 years ago
source link: https://blogread.cn/it/article/4381?f=hot1
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

学习libevent的select模型

浏览:3393次  出处信息

    最近几天都在看libevent的源代码,什么是libevent呢?他是一个由C写的基于事件驱动的网络库,使用方便,效率也还不错,利用这个库能很大程度上提高开发效率,而且libevent跨平台支持的也非常好。像memcache就是一个非常典型的例子,既然memcache这么有威望的产品都选择用他,说明这个东西还是值得大家研究的。

    因为我是windows,所以关于epool,kqueue什么的都没去看,也没看iocp,花了一天时间看完了select模型的源代码,代码不多也很容易懂,总共就400来行吧。这里捡重点说下,首先在套接字管理上他声明了一个和fd_set一模一样的结构体,这里面我们称他为套接字容器(就是用来存放所有客户端请求的套接字),他不用fd_set的原因是因为select型的套接字个数是有限制的,默认是64,也就是说如果你用这样的方法来做应用,那么就只能有63个人同时在线(服务端监听要占用1个),虽然这个数字可以改,但随着套接字的数量增加,导致在处理添加和删除套接字时效率相当底,所以他要自己定义一个结构体,能动态的扩展套接字容器大小,同时他使用红黑树存放了每个套接字位于套接字容器的位置,所以在添加和删除套接字的时候效率得到大大的提升。其次就是主循环,刚刚我们说到他使用了红黑树来存放套接字对应的位置,在这个红黑树里面他还存放了对应的事件,因为每个套接字都有对应的响应事件,这样才能在有可读数据的时候做出响应,他使用红黑树的目的是能以最快的速度使用套接字找到相对应的事件,并且做出响应;在主循环中,他的处理方式是当一轮询问完毕后,才开始响应事件的。

    对于上面使用红黑树来存放套接字的位置和事件对像,我个人感觉在这点上他做得不够好,大家都知道红黑树的查询效率极高,算法的复杂度也是O(lg N),虽然这个数字非常小,但毕竟大于1,也就是说如果有M次读写操作,那么就得花费掉M * lg(N)的时间来处理事件定位,这是个不理想的数字,所以还有改进的空间。在libevent的linux版里面的方法是就是分配一大块连续内存,然后把事件的指针存在里面,以下标的方式来访问,这样计算出来的算法复杂度为O(1),也就是说如果有M次的读写操作,那么只需要花费M的时间来处理事件定位,这个数字应该很理想了。按照这种方法我把select这一块重写了一遍,代码如下:

    socket.h

    #ifndef __EV_SOCKET

     #define __EV_SOCKET

    #include

     #include

     #include “winsock2.h”

    #define EV_READ 1

     #define EV_WRITE 2

    struct win_fd_set {

     u_int fd_count;

     SOCKET fd_array[1];

    struct evsocket

     SOCKET fd;

     sockaddr_in addr;

     short events;

     void (*ev_callback)(SOCKET fd, void *arg);

     void *ev_arg;

    struct evqueue

     struct evsocket *ev;

     struct evqueue *pPrev;

     struct evqueue *pNext;

    struct evbase

     struct evsocket **ev_read_array; //存放socket对应的event(read)

     struct evsocket **ev_write_array;//存放socket对应的event(write)

     size_t *ev_read_pos;//fd在fd_set对像中的位置(read)

     size_t *ev_write_pos;//fd在fd_Set对旬中的位置(write)

    struct evqueue *ev_active_head;//消息队头

     struct evqueue *ev_active_tail;//消息队尾

     int ev_active_count;//总消息数

    win_fd_set *readset_in;

     win_fd_set *readset_out;

     win_fd_set *writeset_in;

     win_fd_set *writeset_out;

    size_t ev_fd_sets;//设置可管理多少个socket

     SOCKET ev_fdz;//存下最大的一个SOCKET值,用于分配连续的evsocket指针

    int running;

    int event_init(void);

     int event_add(struct evsocket *ev, short events,

     void (*callback)(SOCKET fd, void *arg), void *arg);

     int event_del(struct evsocket *ev);

     int event_dispatch(void);

     int event_free(void);

     int event_stop(void);

    #endif

    socket.c

    #include “socket.h”

    static struct evbase *base;

    #define MEMORY_ALGIN(n) (n + sizeof(long) - (n & (sizeof(long) - 1)))

     #define FD_SET_ALLOC_SIZE(n) ((sizeof(struct win_fd_set) + ((n)-1)*sizeof(SOCKET)))

     #define FD_DEFAULT_SIZE 64

    #define XFREE(ptr) do{\\

     if(ptr != NULL)\\

     free(ptr);\\

     }while(0);

    #define X_FD_SET(fd, set, events) do{\\

     size_t pos = ((win_fd_set *)set)->fd_count;\\

     if(((win_fd_set *)set)->fd_count >= base->ev_fd_sets){\\

     base->ev_fd_sets*= 2;\\

     event_fd_reset(base->ev_fd_sets);\\

     if(events == EV_READ){\\

     base->ev_read_pos[fd] = pos;\\

     }else if(events == EV_WRITE){\\

     base->ev_write_pos[fd] = pos;\\

     ((win_fd_set *)set)->fd_array[pos] = fd;\\

     ((win_fd_set *)set)->fd_count++;\\

     }while(0);

    #define X_FD_CLR(fd, set, events) do{\\

     size_t count = ((win_fd_set *)set)->fd_count;\\

     size_t pos = ((events == EV_READ) ? base->ev_read_pos[fd] : base->ev_write_pos[fd]);\\

     SOCKET last_fd = ((win_fd_set *)set)->fd_array[--count];\\

     ((win_fd_set *)set)->fd_array[pos] = last_fd;\\

     if(events == EV_READ){\\

     base->ev_read_pos[last_fd] = pos;\\

     }else if(events == EV_WRITE){\\

     base->ev_write_pos[last_fd] = pos;\\

     ((win_fd_set *)set)->fd_count = count;\\

     }while(0);

    //将要处理的消息放进队列,等select一轮询问完毕后就处理这些消息

     static int event_active_push(struct evsocket *ev)

     struct evqueue *evl = (struct evqueue *)malloc(sizeof(struct evqueue));

     if(evl == NULL){

     return NULL;

    evl->ev = ev;

     evl->pPrev = base->ev_active_tail;

     evl->pNext = NULL;

     if(base->ev_active_tail){

     base->ev_active_tail->pNext = evl;

     if(base->ev_active_head == NULL){

     base->ev_active_head = evl;

     base->ev_active_tail = evl;

     base->ev_active_count++;

    return 1;

    //从消息队列头上取出一条消息,处理之

     static struct evqueue *event_active_pop(void)

     struct evqueue *head = base->ev_active_head;

     if(head){

     base->ev_active_head = head->pNext;

     base->ev_active_count-;

     if(head->pNext){

     head->pNext->pPrev = NULL;

     }else{

     base->ev_active_tail = NULL;

     return head;

    //开始循环处理消息,直到队列空为止

     //其实就是回调函数

     static void event_active_process(void)

     struct evqueue *p = NULL;

     while(p = event_active_pop()){

     p->ev->ev_callback(p->ev->fd, p->ev->ev_arg);

     free(p);

    //重新分配管理套接字容器的大小

     static int event_fd_reset(size_t n)

     size_t size = FD_SET_ALLOC_SIZE(n);

     if(!(base->readset_in = (win_fd_set *)realloc(base->readset_in, size)))

     goto err;

     if(!(base->readset_out = (win_fd_set *)realloc(base->readset_out, size)))

     goto err;

     if(!(base->writeset_in = (win_fd_set *)realloc(base->writeset_in, size)))

     goto err;

     if(!(base->writeset_out = (win_fd_set *)realloc(base->writeset_out, size)))

     goto err;

    return 1;

     XFREE(base->readset_in);

     XFREE(base->readset_out);

     XFREE(base->writeset_in);

     XFREE(base->writeset_out);

     printf(“memory out\\n”);

     return 0;

    //把in里面的套接字复制到out,并且更新out的fd_cout

     static int event_fd_copy(struct win_fd_set *out, struct win_fd_set *in)

     out->fd_count = in->fd_count;

     memcpy(out->fd_array, in->fd_array, in->fd_count * sizeof(SOCKET));

     return 1;

    //重新分配套接字偏移容器和事件容器,我的改进方法就在这,

     //其实这也是学习他linux的select,不知道为啥在win下他不用同样的方法

     //这样做的好处是,直接使用socket以下标的形式就可以访问到对应的事件

     //不过这样的方法比较浪费内存,有好几千个指针变量没用到,但是空间换时间还是值得

     static void event_reset(int fdsz)

     size_t size = MEMORY_ALGIN(fdsz) * sizeof(struct evsocket*);

    if(!(base->ev_read_array = (struct evsocket **)realloc(base->ev_read_array, size)))

     goto err;

     if(!(base->ev_read_pos = (size_t *)realloc(base->ev_read_pos, size)))

     goto err;

     if(!(base->ev_write_array = (struct evsocket **)realloc(base->ev_write_array, size)))

     goto err;

     if(!(base->ev_write_pos = (size_t *)realloc(base->ev_write_pos, size)))

     goto err;

    return ;

     XFREE(base->ev_read_array);

     XFREE(base->ev_write_array);

     printf(“memory out\\n”);

     return ;

    //事件初始化,分配套接字容器大小,默认64,这个在宏FD_DEFAULT_SIZE里面可以改

     int event_init(void)

     size_t size = FD_SET_ALLOC_SIZE(FD_DEFAULT_SIZE);

     if(!(base = (struct evbase *)malloc(sizeof(struct evbase))))

     goto err;

    base->ev_fdz = 0;

     base->ev_fd_sets = FD_DEFAULT_SIZE;

    base->ev_read_array = NULL;

     base->ev_read_pos = NULL;

     base->ev_write_array = NULL;

     base->ev_write_pos = NULL;

    if(!(base->readset_in = (win_fd_set *)malloc(size)))

     goto err;

     if(!(base->readset_out = (win_fd_set *)malloc(size)))

     goto err;

     if(!(base->writeset_in = (win_fd_set *)malloc(size)))

     goto err;

     if(!(base->writeset_out = (win_fd_set *)malloc(size)))

     goto err;

    base->readset_in->fd_count = base->readset_out->fd_count = 0;

     base->writeset_in->fd_count = base->writeset_out->fd_count = 0;

    base->ev_active_head = NULL;

     base->ev_active_tail = NULL;

     base->ev_active_count = 0;

    return 1;

     XFREE(base);

     XFREE(base->readset_in);

     XFREE(base->readset_out);

     XFREE(base->writeset_in);

     XFREE(base->writeset_out);

     printf(“memory out\\n”);

     return 0;

    //添加一个事件

     int event_add(struct evsocket *ev, short events,

     void (*callback)(SOCKET, void *), void *arg)

     ev->events = events;

     ev->ev_callback = callback;

     ev->ev_arg = arg;

    if(ev->fd > base->ev_fdz){

     event_reset(ev->fd);

     base->ev_fdz = ev->fd;

    if(events & EV_READ){

     X_FD_SET(ev->fd, base->readset_in, EV_READ);

     base->ev_read_array[ev->fd] = ev;

    if(events & EV_WRITE){

     X_FD_SET(ev->fd, base->writeset_in, EV_WRITE);

     base->ev_write_array[ev->fd] = ev;

    return 1;

    //删除一个事件

     int event_del(struct evsocket *ev)

     if(ev->events & EV_READ){

     X_FD_CLR(ev->fd, base->readset_in, EV_READ);

     base->ev_read_array[ev->fd] = NULL;

    if(ev->events & EV_WRITE){

     X_FD_CLR(ev->fd, base->writeset_in, EV_WRITE);

     base->ev_write_array[ev->fd] = NULL;

    return 1;

    //这是主循环了,所有的消息都在这里入队列

     int event_dispatch(void)

     size_t i;

     SOCKET fd;

     size_t fd_count;

    base->running = 1;

     while(base->running)

     fd_count = (base->readset_in->fd_count > base->writeset_in->fd_count) ?

     base->readset_in->fd_count : base->writeset_in->fd_count;

    event_fd_copy(base->readset_out, base->readset_in);

     event_fd_copy(base->writeset_out, base->writeset_in);

    if(select(fd_count,

     (struct fd_set *)base->readset_out,

     (struct fd_set *)base->writeset_out,

     NULL, NULL) <= 0)

     continue;

    for(i = 0; i < base->readset_out->fd_count; i++){

     fd = base->readset_out->fd_array[i];

     if(base->ev_read_array[fd]){

     event_active_push(base->ev_read_array[fd]);

    for(i = 0; i < base->writeset_out->fd_count; i++){

     fd = base->writeset_out->fd_array[i];

     if(base->ev_write_array[fd]){

     event_active_push(base->ev_write_array[fd]);

    if(base->ev_active_count > 0){

     event_active_process();

    return 1;

    //释放内存

     int event_free(void)

     struct evqueue *p = NULL;

     while(p = event_active_pop()){

     free(p);

    free(base->ev_read_array);

     free(base->ev_read_pos);

     free(base->ev_write_array);

     free(base->ev_write_pos);

     free(base->readset_in);

     free(base->readset_out);

     free(base->writeset_in);

     free(base->writeset_out);

    return 1;

    //退出循环

     int event_stop(void)

     base->running = 0;

     return 1;

    event.c

    #include

     #include “socket.h”

    #pragma comment(lib, “ws2_32.lib”)

    typedef unsigned int fd_mask;

    #define BUFFER_SIZE 8192

    void OnRead(SOCKET fd, void *arg)

     struct evsocket *ev = (struct evsocket *)arg;

     char buffer[BUFFER_SIZE] = {0};

     int recvSize = recv(fd, buffer, BUFFER_SIZE, 0);

    if(recvSize > 0){

     printf(“recv data: %s\\n”, buffer);

     }else{

     event_del(ev);

     closesocket(fd);

     free(ev);

     printf(“client close\\n”);

    void OnAccept(SOCKET fd, void *arg)

     unsigned long nonblocking = 1;

     struct evsocket *ev = (struct evsocket *)malloc(sizeof(struct evsocket));

     int len = sizeof(ev->addr);

    ev->fd = accept(fd, (sockaddr *)&ev->addr, &len);

     if(ev->fd == SOCKET_ERROR){

     printf(“error\\n”);

     free(ev);

     return ;

     ioctlsocket(ev->fd, FIONBIO, (unsigned long*) &nonblocking);

     printf(“%d)new client accept, %u\\n”, fd, ev->fd);

     event_add(ev, EV_READ, OnRead, ev);

    int main(int argc, char *argv[])

     WSADATA wsd;

     unsigned long nonblocking = 1;

     sockaddr_in adr;

     struct evsocket *ev = (struct evsocket *)malloc(sizeof(struct evsocket));

    /* initialize DLL */

     if(WSAStartup(0×0202, &wsd) != 0){

     printf(“WSAStartup failed\\n”);

     return 1;

     printf(“initialize DLL success\\n”);

    /* initialize Socket */

     if((ev->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET){

     printf(“socket failed\\n”);

     return 1;

     printf(“create socket success\\n”);

    /* set socket options */

     ioctlsocket(ev->fd, FIONBIO, (unsigned long*) &nonblocking);

    /* bind socket */

     adr.sin_family = AF_INET;

     adr.sin_addr.S_un.S_addr = inet_addr(“127.0.0.1″);

     adr.sin_port = htons(7000);

     if(SOCKET_ERROR == bind(ev->fd, (struct sockaddr *)&adr, sizeof(adr)))

     printf(“bind socket failed…\\n”);

     return 1;

     printf(“bind socket success\\n”);

    /* listen socket */

     if(SOCKET_ERROR == listen(ev->fd, 128))

     printf(“listen socket failed…\\n”);

     return 1;

     printf(“listen socket success\\n”);

    event_init();

     event_add(ev, EV_READ, OnAccept, ev);

     printf(“started services success\\n”);

    printf(“waiting client accept…\\n”);

     event_dispatch();

     event_free();

    return 0;

    代码贴完,收工。

觉得文章有用?立即:

和朋友一起 共学习 共进步!

建议继续学习:

QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK