Linux系统中的线程池是如何实现的?

2026-05-05 20:191阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2993个文字,预计阅读时间需要12分钟。

Linux系统中的线程池是如何实现的?

本文简要介绍了线程池的概念和特点,对线程池的结构和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序展示了线程池的运行过程。

概述:线程池是一个进程中的线程集合,用于高效地管理线程资源。它通过复用一定数量的线程来执行任务,避免了频繁创建和销毁线程的开销。

设计:线程池的结构主要包括以下几个部分:

1.线程池管理器:负责创建线程、分配任务、回收线程等。

2.工作线程:执行具体任务的线程。

3.任务队列:存储待执行的任务。

4.线程工厂:用于创建工作线程。

操作接口:

1.execute(Runnable task):提交一个任务到线程池执行。

2.shutdown():关闭线程池,不再接受新任务,等待已提交的任务执行完毕。

3.isShutdown():判断线程池是否已关闭。

实现:

javapublic class ThreadPool { private final int corePoolSize; private final int maximumPoolSize; private final long keepAliveTime; private final BlockingQueue workQueue; private final ExecutorService executorService;

public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this.corePoolSize=corePoolSize; this.maximumPoolSize=maximumPoolSize; this.keepAliveTime=unit.toMillis(keepAliveTime); this.workQueue=workQueue; this.executorService=Executors.newFixedThreadPool(corePoolSize); }

public void execute(Runnable task) { executorService.execute(task); }

public void shutdown() { executorService.shutdown(); }

public boolean isShutdown() { return executorService.isShutdown(); }}

示例程序:javapublic class Main { public static void main(String[] args) { ThreadPool threadPool=new ThreadPool(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());

for (int i=0; i { System.out.println(线程 + Thread.currentThread().getName() + 正在执行任务 + finalI); }); }

threadPool.shutdown(); while (!threadPool.isShutdown()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(所有任务执行完毕!); }}

运行结果:线程 pool-1-thread-1 正在执行任务 0线程 pool-1-thread-2 正在执行任务 1线程 pool-1-thread-1 正在执行任务 2线程 pool-1-thread-2 正在执行任务 3线程 pool-1-thread-1 正在执行任务 4线程 pool-1-thread-2 正在执行任务 5线程 pool-1-thread-1 正在执行任务 6线程 pool-1-thread-2 正在执行任务 7线程 pool-1-thread-1 正在执行任务 8线程 pool-1-thread-2 正在执行任务 9所有任务执行完毕!

本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。 简述

一个进程中的线程就好比是一家公司里的员工,员工的数目应该根据公司的业务多少来定,太少了忙不过来,但是太多了也浪费资源。最理想的情况是让进程有一些初始数目的线程(线程池),当没有任务时这些线程自动进入睡眠,有了任务它们会立即执行任务,不断循环。进程还应根据自身任务的繁重与否来增删线程的数目,当所有的任务都完成之后,所有的线程还能妥当地收官。

如下图所示是一个处于初始状态的线程池。

图1 线程池示意图

有以下几点需要注意:

(1)任务队列中刚开始没有任何任务,是一个具有头结点的空链队列

(2)使用互斥锁来保护这个队列

(3)使用条件变量来代表任务队列中的任务个数的变化,将来如果主线程向队列中投放任务,那么可以通过条件变量来唤醒那么睡着了的线程

(4)通过一个公共开关----shutdown,来控制线程退出,进而销毁整个线程池

接口设计

线程池相关结构体如下表所示。

Linux系统中的线程池是如何实现的?

原型 struct task 功能描述 任务节点,包含需要执行的函数及其参数,通过链表连成一个任务队列 成员列表 void * (* task)(void *arg); void *arg; struct task *next; 备注 任务实例最终形成一条单向链表 原型 thread_pool 功能描述 线程池实例,包含一个线程池的所有信息 成员列表 pthread_mutex_t lock; //互斥锁,保护任务队列 pthread_cond_t cond; //条件变量,同步所有线程 bool shutdown; //线程池销毁标记 struct task *task_list; //任务链队列指针 pthread_t *tids; //线程ID存放位置 unsigned int waiting tasks; //任务链队列中等待的任务个数 unsigned int active_threads; //当前活跃线程个数 备注 活跃线程个数可以修改,但至少有1条活跃线程

下面是线程池的接口说明

(1)线程池初始化:init_pool()

原型 bool init_pool(thread_pool *pool, unsigned int threads_number); 功能描述 创建一个新的线程池,包含threads_number个活跃线程 参数 pool:线程池指针 threads_number:初始活跃线程个数(大于或等于1) 返回值 成功返回true,失败返回false 头文件 thread_pool.h 备注 线程池最少线程个数为1

(2)投放任务:add_task()

原型 bool add_task(thread_pool pool, void * (do_task)(void *arg), void *arg); 功能描述 往线程池投放任务 参数 pool:线程池指针 do_task:投放至线程池的执行例程 arg:执行例程do_task的参数,若该执行例程不需要参数可设置为NULL 返回值 成功返回true,失败返回false 头文件 thread_pool.h 备注 任务队列中最大任务个数为MAX_WAITING_TASKS

(3) 增加活跃线程:add_thread()

原型 int add_thread(thread_pool *pool, unsigned int additional_threads); 功能描述 增加线程池中活跃线程的个数 参数 pool:需要增加线程的线程池指针 additional_threads:新增线程个数 返回值 >0 : 实际新增线程个数 -1 :失败 头文件 thread_pool.h

(4)删除活跃线程:remove_thread()

原型 int remove_thread(thread_pool *pool, unsigned int removing threads); 功能描述 删除线程池中活跃线程的个数 参数 pool:需要删除线程的线程池指针 removing_threads:要删除的线程个数;该参数设置为0时直接返回当前线程池线程总数,对线程池不造成任何其他影响 返回值 >0 : 当前线程池剩余线程个数 -1: 失败 头文件 thread_pool.h 备注 线程池至少会存在1条活跃线程 如果被删除的线程正在执行任务,就等待其完成任务之后删除

(5)销毁线程池:destroy_pool()

原型 bool destroy_pool(thread_pool *pool); 功能描述 阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存 参数 pool:将要销毁的线程池 返回值 成功返回true,失败返回false 头文件 thread_pool.h 源代码实现

头文件thread_pool.h

////////////////////////////////////////////////////////////////// // // Description: 本文件包含了线程池基本结构体定义,以及各个操作函 // 数的声明 // ////////////////////////////////////////////////////////////////// #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <errno.h> #include <pthread.h> #define MAX_WAITING_TASKS 1000 #define MAX_ACTIVE_THREADS 20 struct task { void *(*task)(void *arg); void *arg; struct task *next; }; typedef struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct task *task_list; pthread_t *tids; unsigned waiting_tasks; unsigned active_threads; bool shutdown; }thread_pool; bool init_pool(thread_pool *pool, unsigned int threads_number); bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg); int add_thread(thread_pool *pool, unsigned int additional_threads_number); int remove_thread(thread_pool *pool, unsigned int removing_threads_number); bool destroy_pool(thread_pool *pool); void *routine(void *arg); #endif

接口实现:thread_pool.c

////////////////////////////////////////////////////////////////// // Description: 本文件包含了线程池操作函数的定义 ////////////////////////////////////////////////////////////////// #include "thread_pool.h" void handler(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); } void *routine(void *arg) { thread_pool *pool = (thread_pool *)arg; struct task *p; while(1) { pthread_cleanup_push(handler, (void *)&pool->lock); pthread_mutex_lock(&pool->lock); while(pool->waiting_tasks == 0 && !pool->shutdown) { pthread_cond_wait(&pool->cond, &pool->lock); } if(pool->waiting_tasks == 0 && pool->shutdown == true) { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); } p = pool->task_list->next; pool->task_list->next = p->next; pool->waiting_tasks--; pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); (p->task)(p->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); free(p); } pthread_exit(NULL); } bool init_pool(thread_pool *pool, unsigned int threads_number) { pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->cond, NULL); pool->shutdown = false; pool->task_list = malloc(sizeof(struct task)); pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); if(pool->task_list == NULL || pool->tids == NULL) { perror("allocate memory error"); return false; } pool->task_list->next = NULL; pool->waiting_tasks = 0; pool->active_threads = threads_number; int i; for(i=0; i<pool->active_threads; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("create threads error"); return false; } } return true; } bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg) { struct task *new_task = malloc(sizeof(struct task)); if(new_task == NULL) { perror("allocate memory error"); return false; } new_task->task = task; new_task->arg = arg; new_task->next = NULL; pthread_mutex_lock(&pool->lock); if(pool->waiting_tasks >= MAX_WAITING_TASKS) { pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n"); free(new_task); return false; } struct task *tmp = pool->task_list; while(tmp->next != NULL) tmp = tmp->next; tmp->next = new_task; pool->waiting_tasks++; pthread_mutex_unlock(&pool->lock); pthread_cond_signal(&pool->cond); return true; } int add_thread(thread_pool *pool, unsigned additional_threads) { if(additional_threads == 0) return 0; unsigned total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("add threads error"); if(actual_increment == 0) return -1; break; } actual_increment++; } pool->active_threads += actual_increment; return actual_increment; } int remove_thread(thread_pool *pool, unsigned int removing_threads) { if(removing_threads == 0) return pool->active_threads; int remain_threads = pool->active_threads - removing_threads; remain_threads = remain_threads>0 ? remain_threads:1; int i; for(i=pool->active_threads-1; i>remain_threads-1; i--) { errno = pthread_cancel(pool->tids[i]); if(errno != 0) break; } if(i == pool->active_threads-1) return -1; else { pool->active_threads = i+1; return i+1; } } bool destroy_pool(thread_pool *pool) { pool->shutdown = true; pthread_cond_broadcast(&pool->cond); int i; for(i=0; i<pool->active_threads; i++) { errno = pthread_join(pool->tids[i], NULL); if(errno != 0) { printf("join tids[%d] error: %s\n", i, strerror(errno)); } else printf("[%u] is joined\n", (unsigned)pool->tids[i]); } free(pool->task_list); free(pool->tids); free(pool); return true; } 测试程序

main.c

////////////////////////////////////////////////////////////////// // Description: 一个使用了线程池的示例 ////////////////////////////////////////////////////////////////// #include "thread_pool.h" void *mytask(void *arg) { int n = (int)arg; printf("[%u][%s] ==> job will be done in %d sec...\n", (unsigned)pthread_self(), __FUNCTION__, n); sleep(n); printf("[%u][%s] ==> job done!\n", (unsigned)pthread_self(), __FUNCTION__); return NULL; } void *count_time(void *arg) { int i = 0; while(1) { sleep(1); printf("sec: %d\n", ++i); } } int main(void) { pthread_t a; pthread_create(&a, NULL, count_time, NULL); // 1, initialize the pool thread_pool *pool = malloc(sizeof(thread_pool)); init_pool(pool, 2); // 2, throw tasks printf("throwing 3 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 3, check active threads number printf("current thread number: %d\n", remove_thread(pool, 0)); sleep(9); // 4, throw tasks printf("throwing another 2 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 5, add threads add_thread(pool, 2); sleep(5); // 6, remove threads printf("remove 3 threads from the pool, " "current thread number: %d\n", remove_thread(pool, 3)); // 7, destroy the pool destroy_pool(pool); return 0; } 运行结果

zzc@zzc-virtual-machine:~/share/example/thread$ ./test throwing 3 tasks... current thread number: 2 [3981727488][mytask] ==> job will be done in 3 sec... [3990120192][mytask] ==> job will be done in 6 sec... sec: 1 sec: 2 [3981727488][mytask] ==> job done! [3981727488][mytask] ==> job will be done in 7 sec... sec: 3 sec: 4 sec: 5 [3990120192][mytask] ==> job done! sec: 6 sec: 7 sec: 8 throwing another 2 tasks... [3990120192][mytask] ==> job will be done in 5 sec... [3892311808][mytask] ==> job will be done in 3 sec... sec: 9 [3981727488][mytask] ==> job done! sec: 10 sec: 11 [3892311808][mytask] ==> job done! sec: 12 sec: 13 remove 3 threads from the pool, current thread number: 1 [3990120192][mytask] ==> job done! [3990120192] is joined 延伸

实际使用的时候,只需将上述代码中的mytask函数改成我们需要实现的功能函数即可。

总结

本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。

本文共计2993个文字,预计阅读时间需要12分钟。

Linux系统中的线程池是如何实现的?

本文简要介绍了线程池的概念和特点,对线程池的结构和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序展示了线程池的运行过程。

概述:线程池是一个进程中的线程集合,用于高效地管理线程资源。它通过复用一定数量的线程来执行任务,避免了频繁创建和销毁线程的开销。

设计:线程池的结构主要包括以下几个部分:

1.线程池管理器:负责创建线程、分配任务、回收线程等。

2.工作线程:执行具体任务的线程。

3.任务队列:存储待执行的任务。

4.线程工厂:用于创建工作线程。

操作接口:

1.execute(Runnable task):提交一个任务到线程池执行。

2.shutdown():关闭线程池,不再接受新任务,等待已提交的任务执行完毕。

3.isShutdown():判断线程池是否已关闭。

实现:

javapublic class ThreadPool { private final int corePoolSize; private final int maximumPoolSize; private final long keepAliveTime; private final BlockingQueue workQueue; private final ExecutorService executorService;

public ThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this.corePoolSize=corePoolSize; this.maximumPoolSize=maximumPoolSize; this.keepAliveTime=unit.toMillis(keepAliveTime); this.workQueue=workQueue; this.executorService=Executors.newFixedThreadPool(corePoolSize); }

public void execute(Runnable task) { executorService.execute(task); }

public void shutdown() { executorService.shutdown(); }

public boolean isShutdown() { return executorService.isShutdown(); }}

示例程序:javapublic class Main { public static void main(String[] args) { ThreadPool threadPool=new ThreadPool(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue());

for (int i=0; i { System.out.println(线程 + Thread.currentThread().getName() + 正在执行任务 + finalI); }); }

threadPool.shutdown(); while (!threadPool.isShutdown()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(所有任务执行完毕!); }}

运行结果:线程 pool-1-thread-1 正在执行任务 0线程 pool-1-thread-2 正在执行任务 1线程 pool-1-thread-1 正在执行任务 2线程 pool-1-thread-2 正在执行任务 3线程 pool-1-thread-1 正在执行任务 4线程 pool-1-thread-2 正在执行任务 5线程 pool-1-thread-1 正在执行任务 6线程 pool-1-thread-2 正在执行任务 7线程 pool-1-thread-1 正在执行任务 8线程 pool-1-thread-2 正在执行任务 9所有任务执行完毕!

本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。 简述

一个进程中的线程就好比是一家公司里的员工,员工的数目应该根据公司的业务多少来定,太少了忙不过来,但是太多了也浪费资源。最理想的情况是让进程有一些初始数目的线程(线程池),当没有任务时这些线程自动进入睡眠,有了任务它们会立即执行任务,不断循环。进程还应根据自身任务的繁重与否来增删线程的数目,当所有的任务都完成之后,所有的线程还能妥当地收官。

如下图所示是一个处于初始状态的线程池。

图1 线程池示意图

有以下几点需要注意:

(1)任务队列中刚开始没有任何任务,是一个具有头结点的空链队列

(2)使用互斥锁来保护这个队列

(3)使用条件变量来代表任务队列中的任务个数的变化,将来如果主线程向队列中投放任务,那么可以通过条件变量来唤醒那么睡着了的线程

(4)通过一个公共开关----shutdown,来控制线程退出,进而销毁整个线程池

接口设计

线程池相关结构体如下表所示。

Linux系统中的线程池是如何实现的?

原型 struct task 功能描述 任务节点,包含需要执行的函数及其参数,通过链表连成一个任务队列 成员列表 void * (* task)(void *arg); void *arg; struct task *next; 备注 任务实例最终形成一条单向链表 原型 thread_pool 功能描述 线程池实例,包含一个线程池的所有信息 成员列表 pthread_mutex_t lock; //互斥锁,保护任务队列 pthread_cond_t cond; //条件变量,同步所有线程 bool shutdown; //线程池销毁标记 struct task *task_list; //任务链队列指针 pthread_t *tids; //线程ID存放位置 unsigned int waiting tasks; //任务链队列中等待的任务个数 unsigned int active_threads; //当前活跃线程个数 备注 活跃线程个数可以修改,但至少有1条活跃线程

下面是线程池的接口说明

(1)线程池初始化:init_pool()

原型 bool init_pool(thread_pool *pool, unsigned int threads_number); 功能描述 创建一个新的线程池,包含threads_number个活跃线程 参数 pool:线程池指针 threads_number:初始活跃线程个数(大于或等于1) 返回值 成功返回true,失败返回false 头文件 thread_pool.h 备注 线程池最少线程个数为1

(2)投放任务:add_task()

原型 bool add_task(thread_pool pool, void * (do_task)(void *arg), void *arg); 功能描述 往线程池投放任务 参数 pool:线程池指针 do_task:投放至线程池的执行例程 arg:执行例程do_task的参数,若该执行例程不需要参数可设置为NULL 返回值 成功返回true,失败返回false 头文件 thread_pool.h 备注 任务队列中最大任务个数为MAX_WAITING_TASKS

(3) 增加活跃线程:add_thread()

原型 int add_thread(thread_pool *pool, unsigned int additional_threads); 功能描述 增加线程池中活跃线程的个数 参数 pool:需要增加线程的线程池指针 additional_threads:新增线程个数 返回值 >0 : 实际新增线程个数 -1 :失败 头文件 thread_pool.h

(4)删除活跃线程:remove_thread()

原型 int remove_thread(thread_pool *pool, unsigned int removing threads); 功能描述 删除线程池中活跃线程的个数 参数 pool:需要删除线程的线程池指针 removing_threads:要删除的线程个数;该参数设置为0时直接返回当前线程池线程总数,对线程池不造成任何其他影响 返回值 >0 : 当前线程池剩余线程个数 -1: 失败 头文件 thread_pool.h 备注 线程池至少会存在1条活跃线程 如果被删除的线程正在执行任务,就等待其完成任务之后删除

(5)销毁线程池:destroy_pool()

原型 bool destroy_pool(thread_pool *pool); 功能描述 阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存 参数 pool:将要销毁的线程池 返回值 成功返回true,失败返回false 头文件 thread_pool.h 源代码实现

头文件thread_pool.h

////////////////////////////////////////////////////////////////// // // Description: 本文件包含了线程池基本结构体定义,以及各个操作函 // 数的声明 // ////////////////////////////////////////////////////////////////// #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <errno.h> #include <pthread.h> #define MAX_WAITING_TASKS 1000 #define MAX_ACTIVE_THREADS 20 struct task { void *(*task)(void *arg); void *arg; struct task *next; }; typedef struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct task *task_list; pthread_t *tids; unsigned waiting_tasks; unsigned active_threads; bool shutdown; }thread_pool; bool init_pool(thread_pool *pool, unsigned int threads_number); bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg); int add_thread(thread_pool *pool, unsigned int additional_threads_number); int remove_thread(thread_pool *pool, unsigned int removing_threads_number); bool destroy_pool(thread_pool *pool); void *routine(void *arg); #endif

接口实现:thread_pool.c

////////////////////////////////////////////////////////////////// // Description: 本文件包含了线程池操作函数的定义 ////////////////////////////////////////////////////////////////// #include "thread_pool.h" void handler(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); } void *routine(void *arg) { thread_pool *pool = (thread_pool *)arg; struct task *p; while(1) { pthread_cleanup_push(handler, (void *)&pool->lock); pthread_mutex_lock(&pool->lock); while(pool->waiting_tasks == 0 && !pool->shutdown) { pthread_cond_wait(&pool->cond, &pool->lock); } if(pool->waiting_tasks == 0 && pool->shutdown == true) { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); } p = pool->task_list->next; pool->task_list->next = p->next; pool->waiting_tasks--; pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); (p->task)(p->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); free(p); } pthread_exit(NULL); } bool init_pool(thread_pool *pool, unsigned int threads_number) { pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->cond, NULL); pool->shutdown = false; pool->task_list = malloc(sizeof(struct task)); pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); if(pool->task_list == NULL || pool->tids == NULL) { perror("allocate memory error"); return false; } pool->task_list->next = NULL; pool->waiting_tasks = 0; pool->active_threads = threads_number; int i; for(i=0; i<pool->active_threads; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("create threads error"); return false; } } return true; } bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg) { struct task *new_task = malloc(sizeof(struct task)); if(new_task == NULL) { perror("allocate memory error"); return false; } new_task->task = task; new_task->arg = arg; new_task->next = NULL; pthread_mutex_lock(&pool->lock); if(pool->waiting_tasks >= MAX_WAITING_TASKS) { pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n"); free(new_task); return false; } struct task *tmp = pool->task_list; while(tmp->next != NULL) tmp = tmp->next; tmp->next = new_task; pool->waiting_tasks++; pthread_mutex_unlock(&pool->lock); pthread_cond_signal(&pool->cond); return true; } int add_thread(thread_pool *pool, unsigned additional_threads) { if(additional_threads == 0) return 0; unsigned total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("add threads error"); if(actual_increment == 0) return -1; break; } actual_increment++; } pool->active_threads += actual_increment; return actual_increment; } int remove_thread(thread_pool *pool, unsigned int removing_threads) { if(removing_threads == 0) return pool->active_threads; int remain_threads = pool->active_threads - removing_threads; remain_threads = remain_threads>0 ? remain_threads:1; int i; for(i=pool->active_threads-1; i>remain_threads-1; i--) { errno = pthread_cancel(pool->tids[i]); if(errno != 0) break; } if(i == pool->active_threads-1) return -1; else { pool->active_threads = i+1; return i+1; } } bool destroy_pool(thread_pool *pool) { pool->shutdown = true; pthread_cond_broadcast(&pool->cond); int i; for(i=0; i<pool->active_threads; i++) { errno = pthread_join(pool->tids[i], NULL); if(errno != 0) { printf("join tids[%d] error: %s\n", i, strerror(errno)); } else printf("[%u] is joined\n", (unsigned)pool->tids[i]); } free(pool->task_list); free(pool->tids); free(pool); return true; } 测试程序

main.c

////////////////////////////////////////////////////////////////// // Description: 一个使用了线程池的示例 ////////////////////////////////////////////////////////////////// #include "thread_pool.h" void *mytask(void *arg) { int n = (int)arg; printf("[%u][%s] ==> job will be done in %d sec...\n", (unsigned)pthread_self(), __FUNCTION__, n); sleep(n); printf("[%u][%s] ==> job done!\n", (unsigned)pthread_self(), __FUNCTION__); return NULL; } void *count_time(void *arg) { int i = 0; while(1) { sleep(1); printf("sec: %d\n", ++i); } } int main(void) { pthread_t a; pthread_create(&a, NULL, count_time, NULL); // 1, initialize the pool thread_pool *pool = malloc(sizeof(thread_pool)); init_pool(pool, 2); // 2, throw tasks printf("throwing 3 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 3, check active threads number printf("current thread number: %d\n", remove_thread(pool, 0)); sleep(9); // 4, throw tasks printf("throwing another 2 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 5, add threads add_thread(pool, 2); sleep(5); // 6, remove threads printf("remove 3 threads from the pool, " "current thread number: %d\n", remove_thread(pool, 3)); // 7, destroy the pool destroy_pool(pool); return 0; } 运行结果

zzc@zzc-virtual-machine:~/share/example/thread$ ./test throwing 3 tasks... current thread number: 2 [3981727488][mytask] ==> job will be done in 3 sec... [3990120192][mytask] ==> job will be done in 6 sec... sec: 1 sec: 2 [3981727488][mytask] ==> job done! [3981727488][mytask] ==> job will be done in 7 sec... sec: 3 sec: 4 sec: 5 [3990120192][mytask] ==> job done! sec: 6 sec: 7 sec: 8 throwing another 2 tasks... [3990120192][mytask] ==> job will be done in 5 sec... [3892311808][mytask] ==> job will be done in 3 sec... sec: 9 [3981727488][mytask] ==> job done! sec: 10 sec: 11 [3892311808][mytask] ==> job done! sec: 12 sec: 13 remove 3 threads from the pool, current thread number: 1 [3990120192][mytask] ==> job done! [3990120192] is joined 延伸

实际使用的时候,只需将上述代码中的mytask函数改成我们需要实现的功能函数即可。

总结

本文简单介绍了线程池的概念和特点,对线程池的结构体和相关操作接口进行了设计,并提供了接口的具体实现,最后通过示例程序演示了线程池的运行过程。