如何用Go语言构建一个简易的协程池实现?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1041个文字,预计阅读时间需要5分钟。
直接输出结果,不超过100字:
协程池的核心目标不是“复用协程”,而是“限制并发数 + 复用执行器”。它本质是个带缓冲的生产者-消费者模型:任务入队,固定数量的工作协程持续取任务执行。
- 典型适用场景:
http.Handler中处理短时 IO 任务(如发 HTTP 请求、查 Redis)、批量数据转换、日志异步刷盘 - 不适用场景:长阻塞操作(如
time.Sleep(1h))、需强顺序保证的任务 - 关键参数只有两个:
capacity(工作协程数)和queueSize(待处理任务缓冲区长度),后者建议设为capacity * 2 ~ capacity * 10
用 chan 实现最简可用的协程池
不用第三方库,50 行内可写出线程安全、可关闭、带错误反馈的基础版本。核心是三个 channel:tasks(接收任务)、results(返回结果)、quit(通知退出)。
type Task func() error <p>type Pool struct { tasks chan Task results chan error quit chan struct{} }</p><p>func NewPool(capacity, queueSize int) *Pool { return &Pool{ tasks: make(chan Task, queueSize), results: make(chan error, queueSize), quit: make(chan struct{}), } }</p><p>func (p *Pool) Start() { for i := 0; i < capacity; i++ { go p.worker() } }</p><p>func (p *Pool) worker() { for { select { case task := <-p.tasks: p.results <- task() case <-p.quit: return } } }</p><p>func (p *Pool) Submit(task Task) { select { case p.tasks <- task: default: // 队列满,可丢弃、阻塞或返回错误,按需选 } }</p><p>func (p *Pool) Result() error { select { case err := <-p.results: return err default: return nil // 非阻塞读取 } }</p><p>func (p *Pool) Close() { close(p.quit) close(p.tasks) close(p.results) }
注意:Submit 使用 select{default:} 实现非阻塞提交,避免调用方卡死;若需背压,应改用带超时的 select{case p.tasks 。
sync.WaitGroup 和 context.Context 怎么补足短板
上面的版本无法等待所有任务完成,也无法响应取消信号。实际项目中必须加上这两块:
- 用
sync.WaitGroup跟踪待处理任务数:每次Submit前wg.Add(1),worker执行完后wg.Done();Close改为wg.Wait()+ 关 channel - 用
context.Context替换quit chan struct{}:worker 中用select{case ,方便上游统一控制生命周期(比如 HTTP handler 的 <code>r.Context()) - 错误处理要区分:任务自身错误走
resultschannel,池级错误(如 context cancel)应在Submit或Result中显式返回,不要静默吞掉
什么时候该换成熟轮子:比如 panjf2000/ants
自己写的池够教学和简单场景用,但线上服务要面对:panic 恢复、任务超时控制、空闲协程自动销毁、指标暴露(当前运行数/排队数)、更细粒度的拒绝策略。这时直接用 ants 更省心:
p, _ := ants.NewPool(100) defer p.Release() <p>err := p.Submit(func() { // 你的任务 })
它默认 recover panic,支持 NewPoolWithFunc 复用参数对象,还提供 Running()、Free() 等诊断方法。但注意:它的 Submit 是阻塞的,队列满会等,如果需要非阻塞语义,得自己包一层加 select{default:} 判断。
真正容易被忽略的是池的**作用域生命周期**——别在 HTTP handler 里每次新建池,而应作为全局变量或依赖注入;也别让一个池混用差异巨大的任务类型(比如既有毫秒级 Redis 查询,又有秒级文件解析),否则小任务会被大任务饿死。
本文共计1041个文字,预计阅读时间需要5分钟。
直接输出结果,不超过100字:
协程池的核心目标不是“复用协程”,而是“限制并发数 + 复用执行器”。它本质是个带缓冲的生产者-消费者模型:任务入队,固定数量的工作协程持续取任务执行。
- 典型适用场景:
http.Handler中处理短时 IO 任务(如发 HTTP 请求、查 Redis)、批量数据转换、日志异步刷盘 - 不适用场景:长阻塞操作(如
time.Sleep(1h))、需强顺序保证的任务 - 关键参数只有两个:
capacity(工作协程数)和queueSize(待处理任务缓冲区长度),后者建议设为capacity * 2 ~ capacity * 10
用 chan 实现最简可用的协程池
不用第三方库,50 行内可写出线程安全、可关闭、带错误反馈的基础版本。核心是三个 channel:tasks(接收任务)、results(返回结果)、quit(通知退出)。
type Task func() error <p>type Pool struct { tasks chan Task results chan error quit chan struct{} }</p><p>func NewPool(capacity, queueSize int) *Pool { return &Pool{ tasks: make(chan Task, queueSize), results: make(chan error, queueSize), quit: make(chan struct{}), } }</p><p>func (p *Pool) Start() { for i := 0; i < capacity; i++ { go p.worker() } }</p><p>func (p *Pool) worker() { for { select { case task := <-p.tasks: p.results <- task() case <-p.quit: return } } }</p><p>func (p *Pool) Submit(task Task) { select { case p.tasks <- task: default: // 队列满,可丢弃、阻塞或返回错误,按需选 } }</p><p>func (p *Pool) Result() error { select { case err := <-p.results: return err default: return nil // 非阻塞读取 } }</p><p>func (p *Pool) Close() { close(p.quit) close(p.tasks) close(p.results) }
注意:Submit 使用 select{default:} 实现非阻塞提交,避免调用方卡死;若需背压,应改用带超时的 select{case p.tasks 。
sync.WaitGroup 和 context.Context 怎么补足短板
上面的版本无法等待所有任务完成,也无法响应取消信号。实际项目中必须加上这两块:
- 用
sync.WaitGroup跟踪待处理任务数:每次Submit前wg.Add(1),worker执行完后wg.Done();Close改为wg.Wait()+ 关 channel - 用
context.Context替换quit chan struct{}:worker 中用select{case ,方便上游统一控制生命周期(比如 HTTP handler 的 <code>r.Context()) - 错误处理要区分:任务自身错误走
resultschannel,池级错误(如 context cancel)应在Submit或Result中显式返回,不要静默吞掉
什么时候该换成熟轮子:比如 panjf2000/ants
自己写的池够教学和简单场景用,但线上服务要面对:panic 恢复、任务超时控制、空闲协程自动销毁、指标暴露(当前运行数/排队数)、更细粒度的拒绝策略。这时直接用 ants 更省心:
p, _ := ants.NewPool(100) defer p.Release() <p>err := p.Submit(func() { // 你的任务 })
它默认 recover panic,支持 NewPoolWithFunc 复用参数对象,还提供 Running()、Free() 等诊断方法。但注意:它的 Submit 是阻塞的,队列满会等,如果需要非阻塞语义,得自己包一层加 select{default:} 判断。
真正容易被忽略的是池的**作用域生命周期**——别在 HTTP handler 里每次新建池,而应作为全局变量或依赖注入;也别让一个池混用差异巨大的任务类型(比如既有毫秒级 Redis 查询,又有秒级文件解析),否则小任务会被大任务饿死。

