为什么要线程池
之前实现的那个简陋的服务器只能接受一个连接,这显然是不行的,所以我们可以用多线程来解决这个问题
但是如果来一个连接就开一个线程那么连接很多的时候就会有开很多的线程,还是会影响性能,所以需要线程池,即固定最大可以开的线程,然后将线程管理起来,来一个连接就把它加入工作队列,然后呢让空闲的线程去竞争这个任务。
线程池实现
类定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| #ifndef __THREAD_POOL__ #define __THREAD_POOL__ #include "http_server.h" namespace thp { template <class T> class Thread_Pool { private: int threadnum; std::vector<std::thread>_thread; std::queue<T*>task_queue; std::mutex _mtx; std::condition_variable cv; std::atomic_bool is_runing; void work(); public: void stop(); void init(); void append(T*task); explicit Thread_Pool(int num):threadnum(num) { init(); } ~Thread_Pool() { stop(); } Thread_Pool(const Thread_Pool&)=delete; Thread_Pool& operator =(const Thread_Pool& t)=delete; }; } #endif
|
其中is_runing是原子变量,互斥锁和条件变量是来保证工作队列的访问的安全,同时该类是模板类,但是工作队列中的元素要保证是一个类,并且实现了一个run函数
初始化
1 2 3 4 5 6 7 8 9 10 11 12
| template<class T> void thp::Thread_Pool<T>::init() { if(is_runing==false) { is_runing=true; for(int i=0;i<threadnum;i++) { _thread.emplace_back(std::thread(&thp::Thread_Pool<T>::work,this)); } } }
|
创建threadnum个线程,线程的入口是本类的work函数
添加任务
1 2 3 4 5 6 7 8 9 10 11 12
| template<class T> void thp::Thread_Pool<T>::append(T*task) { if(is_runing) { { std::unique_lock<std::mutex>lk(_mtx); task_queue.push(task); } cv.notify_one(); } }
|
队列不为空就唤醒一个线程
线程的入口函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| template<class T> void thp::Thread_Pool<T>::work() { while(is_runing) { T* task=nullptr; { std::unique_lock<std::mutex> lk(_mtx); if(!task_queue.empty()) { task=task_queue.front(); task_queue.pop(); }else cv.wait(lk,[this]()->bool{return !is_runing||!task_queue.empty();}); } if(task!=nullptr) { task->run(); delete task; } } return ; }
|
关闭线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| template<class T> void thp::Thread_Pool<T>::stop() { is_runing=false; cv.notify_all(); for(auto &t:_thread) { if(t.joinable()) t.join(); } while(!task_queue.empty()) { delete task_queue.front(); task_queue.pop(); } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #include "../lib/thread_pool.cpp" class thp_test { private: std::string s; public: thp_test():s("hello thread "){}; void run() { std::cout<<s<<pthread_self()<<std::endl; } }; int main() { thp::Thread_Pool<thp_test>tp(10); sleep(1); for(int i=0;i<10;i++) { tp.append(new thp_test()); sleep(1); } tp.stop(); return 0; }
|