Golang中WaitGroup如何实现并发同步及其工作原理?

2026-05-25 12:331阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1624个文字,预计阅读时间需要7分钟。

Golang中WaitGroup如何实现并发同步及其工作原理?

Golang 中的 `sync.WaitGroup` 是一个用于等待一组协程(goroutine)完成的同步原语。下面是使用 `sync.WaitGroup` 的简化示例:

gopackage main

import (fmtsync)

func main() {var wg sync.WaitGroup

for i :=0; i <10; i++ {wg.Add(1)go func() {defer wg.Done()fmt.Println(Hello WaitGroup!)}()}wg.Wait()}

在这个示例中,`WaitGroup` 是一个数据结构,包含三个主要的操作:

1. `Add(int)`:增加计数器的值。

2.`Done()`:将计数器的值减一,如果计数器变为零,则通知所有等待的 `Wait()` 调用者。

3.`Wait()`:阻塞当前goroutine,直到计数器变为零。

`WaitGroup` 实际上是一个 `sync.WaitGroup` 结构体,其定义如下:

go

type WaitGroup struct {n int // n is the number of goroutines that have called Add. It is negative if Wait has been called more times than Add.mu sync.Mutexsema uint32}

在这里,`n` 是 `WaitGroup` 的核心字段,表示等待的goroutine数量。当 `n` 为零时,表示所有goroutine都已完成,`Wait()` 可以返回。

Golang Sync.WaitGroup 使用及原理 使用

func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() fmt.Println("Hello WaitGroup!") }() } wg.Wait() } 实现

首先看 waitgroup 到底是什么数据结构

type WaitGroup struct { noCopy noCopy state1 [3]uint32 }

nocopy 避免这个结构体被复制的一个技巧,可以告诉go vet工具违反了复制使用的规则
state1 [3]uint32 字段中包含了 waitgroup 的所有状态信息, 根据标准库上自带的注释简单翻译是:state1 由 12 个字节组成,其中将8字节看作64位值,其中高32位存放的是 counter 计数器, 代表目前还未完成的 goroutine个数,低32位存放的是 waiter 计数器, 可以理解成下面这个结构体

type WaitGroup struct { // 代表目前尚未完成的个数 // WaitGroup.Add(n) 将会导致 counter += n // WaitGroup.Done() 将导致 counter-- counter uint32 // 目前已调用 WaitGroup.Wait 的 goroutine 的个数 waiter uint32 // 对应于 golang 中 runtime 内部的信号量的实现 // runtime_Semacquire 表示增加一个信号量,并挂起当前 goroutine // runtime_Semrelease 表示减少一个信号量,并唤醒 sema 上其中一个正在等待的 goroutine sema uint32 }

整个使用流程为:

  1. 当调用 WaitGroup.Add(n) 时,counter 将会自增: counter += n
  2. 当调用 WaitGroup.Wait() 时,会将 waiter++。同时调用 runtime_Semacquire(semap), 增加信号量,并挂起当前 goroutine。
  3. 当调用 WaitGroup.Done() 时,将会 counter--。如果自减后的 counter 等于 0,说明 WaitGroup 的等待过程已经结束,则需要调用 runtime_Semrelease 释放信号量,唤醒正在 WaitGroup.Wait 的 goroutine。

源码中是如何拆分 state 字段的

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 如果地址是32bit对齐的,数组后两个元素用来做state // 它可以用来做64bit的原子操作,第一个元素32bit用来做信号量 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }

由于我们能使用到的就是 waitgroup.Add(), waitgroup.Done(), waitgroup.Wait() 这三个方法,就按这三个方法分析

Add(), Done()

Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。

func (wg *WaitGroup) Add(delta int) { // 获取拆开后的 state 字段 statep, semap := wg.state() ... ... ... // 在刚刚说的 int64 的高32位上加伤传进来的 delta 的值, 这一步是原子操作 state := atomic.AddUint64(statep, uint64(delta)<<32) // 加好后,获取 counter 也就是 v, 和 waiter 也就是 w 的值 // 此时 int64 变为两个 int32 v := int32(state >> 32) w := uint32(state) // 如果 v 变为负数了,程序异常 if v < 0 { panic("sync: negative WaitGroup counter") } // 在 wait 没结束之前, 不允许调用 Add 方法 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 调用 add() 之后, 还有正在执行的 goroutine 或者 waiter 等于 0, 正常返回 if v > 0 || w == 0 { return } // 下面就是非正常返回, 理解到的就是 v 已经等于 0 了,执行释放操作 // 首先就是将 counter 和 waiter 全部重置为 0 *statep = 0 // 然后循环调用还在等待的 waiter, 释放信号量 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } wait()

Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。

func (wg *WaitGroup) Wait() { // 获取信号量和两个计数值 statep, semap := wg.state() // 不停的循环检查 counter 和 waiter for { // 先原子性的取出 counter 和 waiter state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // counter 已经没有了,函数可以返回 return } // 将 waiter 数 + 1 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 放到信号量队列, 并且阻塞住自己 runtime_Semacquire(semap) // 如果被唤醒,检查 两个计数是否已经为0 了, 如果不为0 ,则触发恐慌 if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 函数返回 return } } } 总结

  1. 保证计数器不能为负值
  2. 保证 Add() 方法全部调用完成之后再调用 Wait()
  3. waitgroup 可以重复使用
  4. atomic 原子操作代替锁, 提高并发性
  5. 合并两个 int32 为一个 int64 提高读取存入数据性能
  6. 对于不希望被复制的结构体, 可以使用 noCopy 字段
reference

www.cyhone.com/articles/golang-waitgroup/
time.geekbang.org/column/intro/100061801?tab=catalog

Golang中WaitGroup如何实现并发同步及其工作原理?

本文共计1624个文字,预计阅读时间需要7分钟。

Golang中WaitGroup如何实现并发同步及其工作原理?

Golang 中的 `sync.WaitGroup` 是一个用于等待一组协程(goroutine)完成的同步原语。下面是使用 `sync.WaitGroup` 的简化示例:

gopackage main

import (fmtsync)

func main() {var wg sync.WaitGroup

for i :=0; i <10; i++ {wg.Add(1)go func() {defer wg.Done()fmt.Println(Hello WaitGroup!)}()}wg.Wait()}

在这个示例中,`WaitGroup` 是一个数据结构,包含三个主要的操作:

1. `Add(int)`:增加计数器的值。

2.`Done()`:将计数器的值减一,如果计数器变为零,则通知所有等待的 `Wait()` 调用者。

3.`Wait()`:阻塞当前goroutine,直到计数器变为零。

`WaitGroup` 实际上是一个 `sync.WaitGroup` 结构体,其定义如下:

go

type WaitGroup struct {n int // n is the number of goroutines that have called Add. It is negative if Wait has been called more times than Add.mu sync.Mutexsema uint32}

在这里,`n` 是 `WaitGroup` 的核心字段,表示等待的goroutine数量。当 `n` 为零时,表示所有goroutine都已完成,`Wait()` 可以返回。

Golang Sync.WaitGroup 使用及原理 使用

func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() fmt.Println("Hello WaitGroup!") }() } wg.Wait() } 实现

首先看 waitgroup 到底是什么数据结构

type WaitGroup struct { noCopy noCopy state1 [3]uint32 }

nocopy 避免这个结构体被复制的一个技巧,可以告诉go vet工具违反了复制使用的规则
state1 [3]uint32 字段中包含了 waitgroup 的所有状态信息, 根据标准库上自带的注释简单翻译是:state1 由 12 个字节组成,其中将8字节看作64位值,其中高32位存放的是 counter 计数器, 代表目前还未完成的 goroutine个数,低32位存放的是 waiter 计数器, 可以理解成下面这个结构体

type WaitGroup struct { // 代表目前尚未完成的个数 // WaitGroup.Add(n) 将会导致 counter += n // WaitGroup.Done() 将导致 counter-- counter uint32 // 目前已调用 WaitGroup.Wait 的 goroutine 的个数 waiter uint32 // 对应于 golang 中 runtime 内部的信号量的实现 // runtime_Semacquire 表示增加一个信号量,并挂起当前 goroutine // runtime_Semrelease 表示减少一个信号量,并唤醒 sema 上其中一个正在等待的 goroutine sema uint32 }

整个使用流程为:

  1. 当调用 WaitGroup.Add(n) 时,counter 将会自增: counter += n
  2. 当调用 WaitGroup.Wait() 时,会将 waiter++。同时调用 runtime_Semacquire(semap), 增加信号量,并挂起当前 goroutine。
  3. 当调用 WaitGroup.Done() 时,将会 counter--。如果自减后的 counter 等于 0,说明 WaitGroup 的等待过程已经结束,则需要调用 runtime_Semrelease 释放信号量,唤醒正在 WaitGroup.Wait 的 goroutine。

源码中是如何拆分 state 字段的

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // 如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量 return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { // 如果地址是32bit对齐的,数组后两个元素用来做state // 它可以用来做64bit的原子操作,第一个元素32bit用来做信号量 return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }

由于我们能使用到的就是 waitgroup.Add(), waitgroup.Done(), waitgroup.Wait() 这三个方法,就按这三个方法分析

Add(), Done()

Add 方法主要操作的是 state 的计数部分。你可以为计数值增加一个 delta 值,内部通过原子操作把这个值加到计数值上。需要注意的是,这个 delta 也可以是个负数,相当于为计数值减去一个值,Done 方法内部其实就是通过 Add(-1) 实现的。

func (wg *WaitGroup) Add(delta int) { // 获取拆开后的 state 字段 statep, semap := wg.state() ... ... ... // 在刚刚说的 int64 的高32位上加伤传进来的 delta 的值, 这一步是原子操作 state := atomic.AddUint64(statep, uint64(delta)<<32) // 加好后,获取 counter 也就是 v, 和 waiter 也就是 w 的值 // 此时 int64 变为两个 int32 v := int32(state >> 32) w := uint32(state) // 如果 v 变为负数了,程序异常 if v < 0 { panic("sync: negative WaitGroup counter") } // 在 wait 没结束之前, 不允许调用 Add 方法 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 调用 add() 之后, 还有正在执行的 goroutine 或者 waiter 等于 0, 正常返回 if v > 0 || w == 0 { return } // 下面就是非正常返回, 理解到的就是 v 已经等于 0 了,执行释放操作 // 首先就是将 counter 和 waiter 全部重置为 0 *statep = 0 // 然后循环调用还在等待的 waiter, 释放信号量 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } } wait()

Wait 方法的实现逻辑是:不断检查 state 的值。如果其中的计数值变为了 0,那么说明所有的任务已完成,调用者不必再等待,直接返回。如果计数值大于 0,说明此时还有任务没完成,那么调用者就变成了等待者,需要加入 waiter 队列,并且阻塞住自己。

func (wg *WaitGroup) Wait() { // 获取信号量和两个计数值 statep, semap := wg.state() // 不停的循环检查 counter 和 waiter for { // 先原子性的取出 counter 和 waiter state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // counter 已经没有了,函数可以返回 return } // 将 waiter 数 + 1 if atomic.CompareAndSwapUint64(statep, state, state+1) { // 放到信号量队列, 并且阻塞住自己 runtime_Semacquire(semap) // 如果被唤醒,检查 两个计数是否已经为0 了, 如果不为0 ,则触发恐慌 if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 函数返回 return } } } 总结

  1. 保证计数器不能为负值
  2. 保证 Add() 方法全部调用完成之后再调用 Wait()
  3. waitgroup 可以重复使用
  4. atomic 原子操作代替锁, 提高并发性
  5. 合并两个 int32 为一个 int64 提高读取存入数据性能
  6. 对于不希望被复制的结构体, 可以使用 noCopy 字段
reference

www.cyhone.com/articles/golang-waitgroup/
time.geekbang.org/column/intro/100061801?tab=catalog

Golang中WaitGroup如何实现并发同步及其工作原理?