C 11线程池实现中,如何优化基于C++11的线程池性能?

2026-05-06 06:211阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2082个文字,预计阅读时间需要9分钟。

C 11线程池实现中,如何优化基于C++11的线程池性能?

1. 线程基础

1.1 线程池是什么?

线程池是一种线程管理方式,用于管理一组线程,以执行任务。

1.2 为什么使用线程池?线程的创建和销毁需要消耗系统资源,使用线程池可以复用线程,提高系统效率。当线程数量过多,系统开销大,会影响系统的性能和稳定性。

1.线程池 1.1 线程池是什么?

一种线程管理方式。

1.2 为什么用线程池?

线程的创建和销毁都需要消耗系统开销,当线程数量过多,系统开销过大,就会影响缓存局部性和整体性能。而线程池能够在充分利用内核资源的前提下,避免系统资源被过度调用。

1.3 如何设计线程池?

简单来说,在线程池中提前创建好多个线程,使用时从线程池中取出,使用完放回线程池。线程池中的线程调度由线程池中的管理者线程调度。

C 11线程池实现中,如何优化基于C++11的线程池性能?

2.基于C++11的实现

Talk is cheap. Show me the code.

直接看程序,原理、函数在后面再介绍。

2.1 程序

程序主要分为四个文件,分别为:

  • Task.h //任务类
  • ThreadPool.h //线程池类
  • ThreadPool.cpp //线程池类实现
  • main.cpp //测试程序
2.1.2 任务类Task.h

#pragma once using callback = void(*)(void*);//函数指针,定义别名 class Task{ public: callback func;//回调任务函数 void* arg; //函数参数 public: Task() { //无参构造函数 this->func = nullptr; this->arg = nullptr; } Task(callback func, void* arg) {//含参构造函数 this->func = func; this->arg = arg; } ~Task() = default; //析构函数 Task(const Task &t) = default; //拷贝构造函数 Task& operator=(const Task &t); //拷贝赋值操作符 Task(Task &&t) = default; //移动构造函数,注意不能有const Task& operator=(const Task &&t);//移动赋值操作符 }; 2.1.2 线程池类ThreadPool.h

#pragma once #include "Task.h" #include <thread> #include <queue> #include <vector> #include <atomic> #include <mutex> #include <condition_variable> using namespace std; class ThreadPool { public: ThreadPool(int minSize, int maxSize);//构造函数 void AddTask(Task task); //添加新任务 int GetBusyNum(); //获取当前工作中的线程数 int GetAliveNum(); //获取当前活着的线程数 int GetTaskQueueSize(); //获取当前任务队列长度 ~ThreadPool(); ThreadPool(const ThreadPool &t) = default; //拷贝构造函数 ThreadPool& operator=(const ThreadPool &t); //拷贝赋值操作符 ThreadPool(ThreadPool &&t) = default; //移动构造函数 ThreadPool& operator=(const ThreadPool &&t);//移动赋值操作符 private: queue<Task> taskQueue; //任务队列 thread managerID;//管理者线程ID vector<thread> threadIDs;//工作中的线程组ID int minNum;//最小线程数量(如果线程池中线程的数目过少,处理器的一些核可能就无法充分利用,浪费) int maxNum;//最大线程数量(如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。) atomic_int busyNum;//工作中的线程数量(atomic_int保证其赋值,取值操作的原子性) atomic_int liveNum;//活着的线程数量 atomic_int exitNum;//将要被销毁的线程数量 mutex mutexPool;//线程池的锁 condition_variable cond;//条件变量 bool shutDown;//是不是要销毁线程池, 销毁为true, 不销毁为false static void worker(void* arg);//工作的线程任务函数 static void manager(void* arg);//管理者线程任务函数 static const int NUMBER = 2;//管理者线程每次增加/销毁的线程数 }; 2.1.3 线程池类实现ThreadPool.cpp

#include "ThreadPool.h" #include <unistd.h> //pthread_self #include <iostream> using namespace std; ThreadPool::ThreadPool(int minSize, int maxSize) { do{ minNum = minSize; maxNum = maxSize; busyNum = 0; liveNum = minSize; exitNum = 0; shutDown = false; //初始化管理者线程和工作线程组 managerID = thread(manager, this); threadIDs.resize(maxSize); for(int i = 0; i < minSize; ++i) { threadIDs[i] = thread(worker, this); } return; } while(0);//do{...}while(0)结构提高代码健壮性 } ThreadPool::~ThreadPool() { shutDown = true; if(managerID.joinable()) {//阻塞在管理者线程,直到其执行完,再向下进行 managerID.join(); } cond.notify_all();//唤醒所有等待的线程 for(int i = 0; i < maxNum; ++i) {//依次执行工作者的线程 if(threadIDs[i].joinable()) { threadIDs[i].join(); } } } //添加新任务 void ThreadPool::AddTask(Task task) { unique_lock<mutex> poolLock(mutexPool); if(shutDown) { return; } taskQueue.emplace(task); cond.notify_all(); } int ThreadPool::GetBusyNum() { return busyNum; } int ThreadPool::GetAliveNum() { return liveNum; } int ThreadPool::GetTaskQueueSize() { unique_lock<mutex> poolLock(mutexPool); int queueSize = taskQueue.size(); poolLock.unlock(); return queueSize; } //工作者线程 void ThreadPool::worker(void* arg) { ThreadPool* pool = static_cast<ThreadPool*>(arg); while(true) { unique_lock<mutex> poolLock(pool->mutexPool); //若当前任务队列为空且线程池处于开启状态 while(pool->taskQueue.empty() && !pool->shutDown) { pool->cond.wait(poolLock);//阻塞工作线程 //若存在待销毁线程 if(pool->exitNum > 0) { --pool->exitNum; if(pool->liveNum > pool->minNum) {//若活着的线程数大于最小线程数,则可以进行销毁 --pool->liveNum; cout << "threadID: " << pthread_self() << " has exited." << endl; return; } } } //判断线程池是否关闭了 if(pool->shutDown) { cout << "threadID: " << pthread_self() << " has exited." << endl; return; } //从任务队列中取出一个任务 Task task = pool->taskQueue.front(); pool->taskQueue.pop(); ++pool->busyNum; //解锁 poolLock.unlock(); //执行任务 cout << "threadID: " << pthread_self() << " start to work." << endl; task.func(task.arg); task.arg = nullptr; //执行完后,工作线程数-1 cout << "threadID: " << pthread_self() << " stop working." << endl; --pool->busyNum; } } //管理者线程 void ThreadPool::manager(void* arg) { ThreadPool* pool = static_cast<ThreadPool*>(arg); while(!pool->shutDown) { //每隔3秒检测一次 sleep(3); //添加新线程 //若任务个数大于活着的线程数,且活着的线程数小于最大线程数 if(pool->GetTaskQueueSize() > pool->liveNum && pool->liveNum < pool->maxNum) { unique_lock<mutex> poolLock(pool->mutexPool); poolLock.lock(); int count = 0; for(int i = 0; i < pool->maxNum && count < ThreadPool::NUMBER && pool->liveNum < pool->maxNum; ++i) { if(pool->threadIDs[i].get_id() == thread::id()) { cout << "Create a new thread." << endl; pool->threadIDs[i] = thread(worker, pool); ++count; ++pool->liveNum; } } poolLock.unlock(); } //销毁线程 //若忙的线程*2小于存活的线程数,且存活的线程数大于最小的线程数 if(pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) { pool->exitNum = ThreadPool::NUMBER; for(int i = 0; i < ThreadPool::NUMBER; ++i) {//让工作的线程自杀 pool->cond.notify_all(); } } } } 2.2 测试方法:

将上述文件放在Linux下的一个文件夹(我这里是\Share\study_threadPool\myself)

  • 进入该文件夹:cd /share/study_threadPool/myself/
  • 编译:g++ main.cpp ThreadPool.cpp -o ThreadPool.o -pthread
  • 运行:./ThreadPool.o
2.2 C++11相关函数
  1. thread类
  • ThreadPool.cpp第17行:managerID = thread(manager, this);表示创建一个新线程,manager是该线程执行的函数,this是该线程执行函数的参数。
  • ThreadPool.cpp第29行:managerID.joinable() 判断该线程是否可以join
  • ThreadPool.cpp第30行:managerID.join() 阻塞在该线程,直到其执行完
  • ThreadPool.cpp第123行:pool->threadIDs[i].get_id()表示获取该线程的ID
  1. mutex
  • ThreadPool.cpp第42行:unique_lock<mutex> poolLock(mutexPool); 自动加锁与解锁
  • ThreadPool.cpp第61行:poolLock.unlock();解锁
  • ThreadPool.cpp第120行:poolLock.lock();加锁
  1. condition_variable
  • ThreadPool.cpp第32行:nd.notify_all();唤醒所有等待的线程
  1. atomic
  • ThreadPool.h第34行:atomic_int busyNum;本质还是int,只是每次对其操作时,都能保证是原子操作
  1. using
  • Task.h第2行:sing callback = void(*)(void*);函数的别名
3.调试过程中出现的问题及解决方法 3.1 warning:#pragma once in main file


解决方案:g++编译时不要编译头文件

3.2 移动构造函数出错


解决方案:移动构造函数的参数不能加const

4.参考
  • 苏丙榅大佬的线程池工作原理和实现 - C/C++,

  • 基于C++11的线程池

本文共计2082个文字,预计阅读时间需要9分钟。

C 11线程池实现中,如何优化基于C++11的线程池性能?

1. 线程基础

1.1 线程池是什么?

线程池是一种线程管理方式,用于管理一组线程,以执行任务。

1.2 为什么使用线程池?线程的创建和销毁需要消耗系统资源,使用线程池可以复用线程,提高系统效率。当线程数量过多,系统开销大,会影响系统的性能和稳定性。

1.线程池 1.1 线程池是什么?

一种线程管理方式。

1.2 为什么用线程池?

线程的创建和销毁都需要消耗系统开销,当线程数量过多,系统开销过大,就会影响缓存局部性和整体性能。而线程池能够在充分利用内核资源的前提下,避免系统资源被过度调用。

1.3 如何设计线程池?

简单来说,在线程池中提前创建好多个线程,使用时从线程池中取出,使用完放回线程池。线程池中的线程调度由线程池中的管理者线程调度。

C 11线程池实现中,如何优化基于C++11的线程池性能?

2.基于C++11的实现

Talk is cheap. Show me the code.

直接看程序,原理、函数在后面再介绍。

2.1 程序

程序主要分为四个文件,分别为:

  • Task.h //任务类
  • ThreadPool.h //线程池类
  • ThreadPool.cpp //线程池类实现
  • main.cpp //测试程序
2.1.2 任务类Task.h

#pragma once using callback = void(*)(void*);//函数指针,定义别名 class Task{ public: callback func;//回调任务函数 void* arg; //函数参数 public: Task() { //无参构造函数 this->func = nullptr; this->arg = nullptr; } Task(callback func, void* arg) {//含参构造函数 this->func = func; this->arg = arg; } ~Task() = default; //析构函数 Task(const Task &t) = default; //拷贝构造函数 Task& operator=(const Task &t); //拷贝赋值操作符 Task(Task &&t) = default; //移动构造函数,注意不能有const Task& operator=(const Task &&t);//移动赋值操作符 }; 2.1.2 线程池类ThreadPool.h

#pragma once #include "Task.h" #include <thread> #include <queue> #include <vector> #include <atomic> #include <mutex> #include <condition_variable> using namespace std; class ThreadPool { public: ThreadPool(int minSize, int maxSize);//构造函数 void AddTask(Task task); //添加新任务 int GetBusyNum(); //获取当前工作中的线程数 int GetAliveNum(); //获取当前活着的线程数 int GetTaskQueueSize(); //获取当前任务队列长度 ~ThreadPool(); ThreadPool(const ThreadPool &t) = default; //拷贝构造函数 ThreadPool& operator=(const ThreadPool &t); //拷贝赋值操作符 ThreadPool(ThreadPool &&t) = default; //移动构造函数 ThreadPool& operator=(const ThreadPool &&t);//移动赋值操作符 private: queue<Task> taskQueue; //任务队列 thread managerID;//管理者线程ID vector<thread> threadIDs;//工作中的线程组ID int minNum;//最小线程数量(如果线程池中线程的数目过少,处理器的一些核可能就无法充分利用,浪费) int maxNum;//最大线程数量(如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。) atomic_int busyNum;//工作中的线程数量(atomic_int保证其赋值,取值操作的原子性) atomic_int liveNum;//活着的线程数量 atomic_int exitNum;//将要被销毁的线程数量 mutex mutexPool;//线程池的锁 condition_variable cond;//条件变量 bool shutDown;//是不是要销毁线程池, 销毁为true, 不销毁为false static void worker(void* arg);//工作的线程任务函数 static void manager(void* arg);//管理者线程任务函数 static const int NUMBER = 2;//管理者线程每次增加/销毁的线程数 }; 2.1.3 线程池类实现ThreadPool.cpp

#include "ThreadPool.h" #include <unistd.h> //pthread_self #include <iostream> using namespace std; ThreadPool::ThreadPool(int minSize, int maxSize) { do{ minNum = minSize; maxNum = maxSize; busyNum = 0; liveNum = minSize; exitNum = 0; shutDown = false; //初始化管理者线程和工作线程组 managerID = thread(manager, this); threadIDs.resize(maxSize); for(int i = 0; i < minSize; ++i) { threadIDs[i] = thread(worker, this); } return; } while(0);//do{...}while(0)结构提高代码健壮性 } ThreadPool::~ThreadPool() { shutDown = true; if(managerID.joinable()) {//阻塞在管理者线程,直到其执行完,再向下进行 managerID.join(); } cond.notify_all();//唤醒所有等待的线程 for(int i = 0; i < maxNum; ++i) {//依次执行工作者的线程 if(threadIDs[i].joinable()) { threadIDs[i].join(); } } } //添加新任务 void ThreadPool::AddTask(Task task) { unique_lock<mutex> poolLock(mutexPool); if(shutDown) { return; } taskQueue.emplace(task); cond.notify_all(); } int ThreadPool::GetBusyNum() { return busyNum; } int ThreadPool::GetAliveNum() { return liveNum; } int ThreadPool::GetTaskQueueSize() { unique_lock<mutex> poolLock(mutexPool); int queueSize = taskQueue.size(); poolLock.unlock(); return queueSize; } //工作者线程 void ThreadPool::worker(void* arg) { ThreadPool* pool = static_cast<ThreadPool*>(arg); while(true) { unique_lock<mutex> poolLock(pool->mutexPool); //若当前任务队列为空且线程池处于开启状态 while(pool->taskQueue.empty() && !pool->shutDown) { pool->cond.wait(poolLock);//阻塞工作线程 //若存在待销毁线程 if(pool->exitNum > 0) { --pool->exitNum; if(pool->liveNum > pool->minNum) {//若活着的线程数大于最小线程数,则可以进行销毁 --pool->liveNum; cout << "threadID: " << pthread_self() << " has exited." << endl; return; } } } //判断线程池是否关闭了 if(pool->shutDown) { cout << "threadID: " << pthread_self() << " has exited." << endl; return; } //从任务队列中取出一个任务 Task task = pool->taskQueue.front(); pool->taskQueue.pop(); ++pool->busyNum; //解锁 poolLock.unlock(); //执行任务 cout << "threadID: " << pthread_self() << " start to work." << endl; task.func(task.arg); task.arg = nullptr; //执行完后,工作线程数-1 cout << "threadID: " << pthread_self() << " stop working." << endl; --pool->busyNum; } } //管理者线程 void ThreadPool::manager(void* arg) { ThreadPool* pool = static_cast<ThreadPool*>(arg); while(!pool->shutDown) { //每隔3秒检测一次 sleep(3); //添加新线程 //若任务个数大于活着的线程数,且活着的线程数小于最大线程数 if(pool->GetTaskQueueSize() > pool->liveNum && pool->liveNum < pool->maxNum) { unique_lock<mutex> poolLock(pool->mutexPool); poolLock.lock(); int count = 0; for(int i = 0; i < pool->maxNum && count < ThreadPool::NUMBER && pool->liveNum < pool->maxNum; ++i) { if(pool->threadIDs[i].get_id() == thread::id()) { cout << "Create a new thread." << endl; pool->threadIDs[i] = thread(worker, pool); ++count; ++pool->liveNum; } } poolLock.unlock(); } //销毁线程 //若忙的线程*2小于存活的线程数,且存活的线程数大于最小的线程数 if(pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) { pool->exitNum = ThreadPool::NUMBER; for(int i = 0; i < ThreadPool::NUMBER; ++i) {//让工作的线程自杀 pool->cond.notify_all(); } } } } 2.2 测试方法:

将上述文件放在Linux下的一个文件夹(我这里是\Share\study_threadPool\myself)

  • 进入该文件夹:cd /share/study_threadPool/myself/
  • 编译:g++ main.cpp ThreadPool.cpp -o ThreadPool.o -pthread
  • 运行:./ThreadPool.o
2.2 C++11相关函数
  1. thread类
  • ThreadPool.cpp第17行:managerID = thread(manager, this);表示创建一个新线程,manager是该线程执行的函数,this是该线程执行函数的参数。
  • ThreadPool.cpp第29行:managerID.joinable() 判断该线程是否可以join
  • ThreadPool.cpp第30行:managerID.join() 阻塞在该线程,直到其执行完
  • ThreadPool.cpp第123行:pool->threadIDs[i].get_id()表示获取该线程的ID
  1. mutex
  • ThreadPool.cpp第42行:unique_lock<mutex> poolLock(mutexPool); 自动加锁与解锁
  • ThreadPool.cpp第61行:poolLock.unlock();解锁
  • ThreadPool.cpp第120行:poolLock.lock();加锁
  1. condition_variable
  • ThreadPool.cpp第32行:nd.notify_all();唤醒所有等待的线程
  1. atomic
  • ThreadPool.h第34行:atomic_int busyNum;本质还是int,只是每次对其操作时,都能保证是原子操作
  1. using
  • Task.h第2行:sing callback = void(*)(void*);函数的别名
3.调试过程中出现的问题及解决方法 3.1 warning:#pragma once in main file


解决方案:g++编译时不要编译头文件

3.2 移动构造函数出错


解决方案:移动构造函数的参数不能加const

4.参考
  • 苏丙榅大佬的线程池工作原理和实现 - C/C++,

  • 基于C++11的线程池