前面实现了线程池,如果我们使用线程池就是有一个连接到来就把它丢进工作队列,然后让线程竞争,在这个过程中又会产生一个问题,我们浪费了主进程,它只是干了一件把任务丢进线程池工作队列的操作,然后就啥也没干了,所以我们把IO操作单独拿出来给主线程来干,线程池里的线程就只干解析的事情。
IO多路复用
所谓IO多路复用就是在一个线程上可以对多个对象进行IO操作, 这里主要使用了epoll,epoll可以监视多个文件描述符上的多个事件,没有事件发生的时候就会阻塞在epoll_wait上,当有事件发生时就会返回已经准备好的那些事件,就可以对这些文件描述符进行操作。
epoll有两个触发模式分别为水平触发(LT)和边缘触发(ET),LT就是当某个文件描述符上的事件发生后它会一直通知,ET是当某个文件描述符上是事件发生后它只会通知一次,直到下一次该事件再次发生。
例如有一个文件现在有10个字节可读,LT模式下它会通知你,你读取了一个字节后不读了,下一次循环epoll还会返回该文件描述符,而ET模式下一个循环就不会再通知了,直到有新的数据到该文件
模式选取
connfd
对于connfd,如果用LT模式,但是我一次没有读完数据(比如缓冲区满了),那么下一次还会接着读,但是可能我前一次读的数据给了一个线程,后一次读是数据又给了另一个线程,这样数据就不完整了,所以选择ET模式,由于ET模式只通知一次,所以要一次把数据读完,就在读取外面加一个while,知道读完或连接关闭。
同时还是上面的情况,如果一个现在再处理某个文件描述符的时候这个文件描述符上又有新的数据来了,那么这时候会把这个fd给另一个线程,还是会出问题,所以还要给它加上一个ONESHOT也就是只通知一次,先要下一次通知需要重置该epoll中的该fd
listenfd
对于listenfd,我们用LT就可以了,这样不会有连接丢失(边缘触发时,accept只会执行一次接收一个连接,内核不会再去通知有连接就绪),所以使用水平触发的fd就不存在丢失连接的问题)
描述符的阻塞还是非阻塞
connfd
对于用ET模式的描述符必须设置成非阻塞模式,因为ET模式下我们要用一个循环来读写所有文件,那么当读写完了之后,阻塞模式的描述符会阻塞在这里,而非阻塞模式会返回一个EAGAIN代表当前不可读写,所以为了程序能正常运行下去必须使用非阻塞读写
listenfd
对于listenfd,好像阻塞非阻塞都无所谓,但是有这么一直情况,比如现在来了一个连接,epoll通知了现在listenfd可以读了,那么主线程往下运行,在运行到accept这句话之前连接就断开了,那么现在accept就会阻塞在这里,epoll里其他事件都得不到处理,直到下一个连接到来,这显然不好,所以要把listenfd设置成非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1
具体实现
首先要把IO操作从工作线程中分离出来,那么之前的HttpConn类中就只需要一个读缓冲区和写缓冲区,然后在主线程中进行IO,将读取的数据放到类的写缓冲区让类去处理,然后类处理后生成的响应放到写缓冲区,让主线程来读取并写入sockfd
首先是在主线程中调用的两个非阻塞的读写函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| bool http_conn::HttpConn::readall() { int isread; char c; while(true) { isread=rio.rio_readnb(&riobuffer,&c,1); if(isread>0) readbuffer.append(1,c); else if(isread==-1) { if(errno == EAGAIN || errno == EWOULDBLOCK) break; return false; }else if(isread==0) return false; } return true; }
bool http_conn::HttpConn::write_back() { int need_to_send=writebuffer.length(); while(w_send_idx<need_to_send-1) { int issend=rio.rio_writen(connfd,((char *) writebuffer.c_str())+w_send_idx,need_to_send-w_send_idx); if(issend<0) { if(errno==EAGAIN) { modfd(epollfd,connfd,EPOLLOUT); return true; }else return false; } w_send_idx=issend-1; } return false; }
|
这两个函数还是放在HttpConn内,因为要访问类的成员,但是类的内部不会调用它们,由主线程来调用
这两个函数里有一些操控epoll的函数,放到下面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| void http_conn::addfd(int epollfd,int fd,bool et,bool oneshot) { epoll_event event; event.data.fd=fd; event.events=EPOLLIN |EPOLLRDHUP; if(et) event.events|=EPOLLET; if(oneshot) event.events|=EPOLLONESHOT; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event); }
void http_conn::modfd(int epollfd,int fd,int ev) { epoll_event event; event.data.fd=fd; event.events=ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP; epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event); }
void http_conn::removefd(int epollfd,int fd) { epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0); close(fd); }
|
也就是上面的写函数里在暂时写不了的时候重置监听当前描述符的写事件,等待下一次通知
还有一个设置文件描述符为非阻塞的函数
1 2 3 4 5 6
| void http_conn::setnoblock(int fd) { int flag=fcntl(fd,F_GETFL); assert(flag>0); fcntl(fd,F_SETFL,flag|O_NONBLOCK); }
|
然后是主线程,也就是main.cpp,在这里需要创建线程池,然后进行几乎所有的IO操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| #include "http_server.h" #include "http_conn.h" #include "../lib/thread_pool.cpp" #include "my_socket.h" #define MAX_EVENTNUMBER 10000 void addsig(int sig,void(handler)(int),bool restart = true){ struct sigaction sa; memset(&sa,'\0',sizeof(sa)); sa.sa_handler = handler; if(restart){ sa.sa_flags |= SA_RESTART; } sigfillset(&sa.sa_mask); assert(sigaction(sig,&sa,NULL) != -1); } int main() { addsig(SIGPIPE,SIG_IGN); thp::Thread_Pool<http_conn::HttpConn>tp(4); int listenfd=M_SOCKET::MySocket::openListenfd(8888); int epollfd=epoll_create(256); http_conn::HttpConn::epollfd=epollfd; http_conn::setnoblock(listenfd); http_conn::addfd(epollfd,listenfd,false,false); epoll_event events[MAX_EVENTNUMBER]; std::unordered_map<int,http_conn::HttpConn*> connmap; while(true) { int num=epoll_wait(epollfd,events,MAX_EVENTNUMBER,-1); for(int i=0;i<num;++i) { int sockfd=events[i].data.fd; if(sockfd==listenfd) { sockaddr_in clientaddr; unsigned int clientlen=sizeof(clientaddr); int connfd=accept(listenfd,(sockaddr*)&clientaddr,&clientlen); assert(connfd>=0); http_conn::HttpConn *client=new http_conn::HttpConn(); client->init(connfd); connmap.insert(std::make_pair(connfd,client)); }else if(events[i].events&(EPOLLRDHUP | EPOLLHUP |EPOLLERR)) { delete connmap[sockfd]; connmap.erase(sockfd); }else if(events[i].events&EPOLLIN) { bool res=connmap[sockfd]->readall(); if(res) tp.append(connmap[sockfd]); else { delete connmap[sockfd]; connmap.erase(sockfd); } }else if(events[i].events&EPOLLOUT) { if(!connmap[sockfd]->write_back()) { delete connmap[sockfd]; connmap.erase(sockfd); } } } } return 0; }
|
其中忽略的SIGPIPE信号是在程序尝试写一个已经被客户端关闭的信号时会发生的信号,它会结束当前线程,所谓我们要忽略它。
然后是各个线程的入口函数
1 2 3 4 5 6 7 8 9 10 11 12 13
| void http_conn::HttpConn::run() { requeststatuscode=parse_request(); if(requeststatuscode==NO_REQUEST) { modfd(epollfd,connfd,EPOLLIN); return ; }else if(requeststatuscode!=SUCCESS) { parse_error(); } modfd(epollfd,connfd,EPOLLOUT); }
|
当数据读取是正确的,但是不完全,比如只读到了请求行,没有读到请求头那么会返回一个NO_REQUEST,然后将文件描述符重置,等待接下来的读取,如果都处理完了,响应生成了,那么重置文件描述符监听可写事件。
测试
使用webbench进行测试
从上到下为1线程。2线程,4线程,8线程(线程池内的线程数量),在1000并发量的情况下线程多了吞吐量反而有所下降,不过是在同一台虚拟机上测试的,可能会有一些影响
代码
github