Go并发编程难道不是一件让人头疼的复杂任务吗?

2026-04-11 08:231阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2450个文字,预计阅读时间需要10分钟。

Go并发编程难道不是一件让人头疼的复杂任务吗?

前言:Go语言的重大卖点可以说是并发编程。

作为一门非常年轻的语言(诞生于2006年),在Google的培育下,为了充分利用多核机器资源,Go语言特别强调并发编程。从底层原生支持并发,实现了高效的并发优势。

前言

Go 语言的一大卖点可以说是并发编程。作为一门非常年轻的语言(诞生于2006年),在Google的培育下,为了充分利用多核机器资源的并发优势,从底层原生支持并发。

实现并发很“简单”

Go 语言通过协程(Goroutine)实现并发。启动一个协程只需要在函数调用前加上关键字go即可,协程将会以异步方式执行,不会阻塞。

举个例子,我们实现一个商城商品的接口,这个接口会返回商品价格和下单量,这两部分数据由两个不同的RPC服务提供。最简单的做法是同步去分别调用:

func GetItemPrice(itemId int64) int64 { // 模拟Item RPC 调用 time.Sleep(1 * time.Second) return 100 } func GetOrderCount(itemId int64) int64 { // 模拟Order RPC 调用 time.Sleep(1 * time.Second) return 200 } type ItemInfo struct { ItemPrice int64 OrderCount int64 } func GetItem(itemId int64) ItemInfo { // 内部实现是 item rpc 调用 itemPrice := GetItemPrice(itemId) // 内部实现是 order rpc 调用 orderCount := GetOrderCount(itemId) return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

上面这种实现的缺点显而易见:第二个RPC请求需要等到第一个Rpc返回后再开始调用,这个接口的耗时与调用的RPC接口量成正比。当然这是一个错位的示范,我们可以使用协程来解决这个问题:

func GetItemByGoroutine(itemId int64) ItemInfo { var itemPrice, orderCount int64 // 协程方式执行 go func() { itemPrice = GetItemPrice(itemId) }() go func() { orderCount = GetOrderCount(itemId) }() // 等待协程执行完 time.Sleep(1050 * time.Millisecond) return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

我们来比较一下两个的区别:

startAt := time.Now().UnixMilli() itemInfo := GetItem(1) endAt := time.Now().UnixMilli() fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutine(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt) /* output: get itemInfo: {100 200}, cost: 2000ms get itemInfo by groutine: {100 200}, cost: 1050ms */

可以看到,通过使用协程,我们可以并发地执行两个请求,缩短整体的执行时间。但是现在的实现并不优雅,因为协程是异步执行,不阻塞主流程的,我加了一行 time.Sleep(1050 * time.Millisecond)来强制主流程等待1.05s,这时我认为两个协程应该都执行完了,然后去做后面的数据组装。这里 sleep 的时间有点异想天开了,因为我们设想每个rpc接口的耗时都是1s,而实际情况肯定不会这么幸运:如果突发的网络延迟到时接口响应时间变成了1.5s,那岂不是GG。如果每个协程能够告诉我们他已经执行完成了,我在他们都完成时再去打包数据不就好了?

让它告诉我:它完了

这也就引出了协程间是如何通信的?,在 Go 语言里,协程通过 channel 实现通信,channel 是 Go 语言特有的一种数据类型,声明方式很简单

a := make(chan int) b := make(chan bool) c := make(chan map[int]string)

向 channel 中写入数据的类型必须和 channel 声明时的类型一样,写入方式:

a <- 1 b <- true c <- map[int]string{1: "1"}

读取方式为:

aValue := <- a bValue := <- b cVaule := <- c

如果当前 channel 内没有数据,会一直阻塞在这里。通过这个特性,我们就可以实现到上面讲的:让协程告诉我们它是否执行结束:

func GetItemByGoroutineWithChan(itemId int64) ItemInfo { var itemPrice, orderCount int64 itemCallDone := make(chan bool) orderCallDone := make(chan bool) go func() { itemPrice = GetItemPrice(itemId) itemCallDone <- true }() go func() { orderCount = GetOrderCount(itemId) orderCallDone <- true }() <-itemCallDone <-orderCallDone return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

来看一下效果,果然使用了 channel 效果更好

get itemInfo: {100 200}, cost: 2000ms get itemInfo by groutine: {100 200}, cost: 1051ms get itemInfo by groutine with chan: {100 200}, cost: 1001ms

假设因为网络原因,订单服务的接口延迟变成了3s,商品服务的接口延迟变成了2s

func GetItemPrice(itemId int64) int64 { // 模拟Item RPC 调用延迟 time.Sleep(2 * time.Second) return 100 } func GetOrderCount(itemId int64) int64 { // 模拟Order RPC 调用延迟 time.Sleep(3 * time.Second) return 200 }

再看一下效果,同步调用方式的耗时变成了5秒,只使用协程已无法取到正确数据,通过 channel 方式可以取到数据,并且耗时3秒。

get itemInfo: {100 200}, cost: 5000ms get itemInfo by groutine: {0 0}, cost: 1050ms get itemInfo by groutine with chan: {100 200}, cost: 3001ms 到时间了,准备交卷

实际上,我们的接口响应是有一定时间要求的,比如:2s,现象一下:当我们看到网页一直在转圈时,我们会怎么做?刷新一下,重新发起请求,第一次的请求在3s时返回数据就没有意义了。所以,我们需要对接口调用有一定的超时控制,如果超过约定时间,服务返回兜底数据或直接报错(这依赖于业务上决策)。

因此,我们要在这个订单接口实现超时控制,如果到了规定时间,我们就直接返回,不再等待还没完成的请求了:

func GetItemWithTimeout(itemId int64, timeOut int64) ItemInfo { var itemPrice, orderCount int64 itemCallDone := make(chan bool) orderCallDone := make(chan bool) go func() { itemPrice = GetItemPrice(itemId) itemCallDone <- true }() go func() { orderCount = GetOrderCount(itemId) orderCallDone <- true }() // 这是一个闹钟,到点后 channel time.C 会有数据 ticker := time.NewTicker(time.Duration(timeOut) * time.Second) defer ticker.Stop() // 计数有没有完成 doneCnt := 0 // 标记是否超时 isTimeOut := false for { select { case <-itemCallDone: doneCnt += 1 break case <-orderCallDone: doneCnt += 1 break case <-ticker.C: isTimeOut = true break } if doneCnt == 2 { break } if isTimeOut { fmt.Printf("timeout\n") break } } return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

我们原来运行对比一下:

Go并发编程难道不是一件让人头疼的复杂任务吗?

func main() { startAt := time.Now().UnixMilli() itemInfo := GetItem(1) endAt := time.Now().UnixMilli() fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutine(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutineWithChan(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine with chan: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemWithTimeout(1, 2) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine with timeout: %v, cost: %dms\n", itemInfo, endAt-startAt) } ```output get itemInfo: {100 200}, cost: 5001ms get itemInfo by groutine: {0 0}, cost: 1051ms get itemInfo by groutine with chan: {100 200}, cost: 3001ms timeout get itemInfo by groutine with timeout: {100 0}, cost: 2001ms

订单服务的接口延迟是3s,商品服务的接口延迟是2s。函数传入的超时时间是2s,所以只取到了商品服务的接口数据后就返回了。

这就是一个简单的并发编程模式,相似的场景都可以套用。

等等,还有坑

通过协程的并发编程好是好,但它还是有坑的,使用是要多加注意

  1. 避免死锁
    使用 channel 通信时,特别是使用无缓冲区类型的 channel 时,一定要主要注意:
  • 千万不要自己写完自己读

func main() { ch := make(chan int, 0) ch <- 666 x := <- ch fmt.Println(x) }

  • 确认协程可以执行的起来,比如下面的协程就起不起来

func main() { ch := make(chan int,0) ch <- 666 go func() { <- ch }() }

  • channel 读写时,逻辑上不要发生死锁

func main() { // 男孩要女孩先说再见,女孩要男孩先说 chBoy := make(chan int,0) chGirl := make(chan int,0) go func() { select { case <- chGirl: chBoy<-888 } }() select { case <- chBoy: chGirl <- 888 } }

  1. 避免协程泄露
    主程序退出时,协程有可能还没退出,如果一直不退出,那就有可能是协程泄露了。常见的协程泄露常见有:
  • 协程中因为 channel 阻塞,无法退出

func goroutineLeak() { p := make(chan int) go func() { time.Sleep(2 * time.Second) // 注意这里,当到达1秒后程序后自动返回,没有人去读channel的数据,就会一直阻塞到这里 p <- 1 fmt.Println("finished...") }() select { case a := <-p: fmt.Printf("got a %v", a) case <-time.After(1 * time.Second): return } } func main() { for i := 0; i <= 5; i++ { goroutineLeak() } fmt.Println(runtime.NumGoroutine()) } /* output: 7 */

所以说,上面实现的方法是有可能出现协程泄露的,那应该如果处理呢,一种方式是使用有缓存channel,另一种是使用WaitGroup来代替。

func GetItemWithTimeoutV2(itemId int64, timeOut int64) ItemInfo { var itemPrice, orderCount int64 var wg sync.WaitGroup wg.Add(2) go func() { itemPrice = GetItemPrice(itemId) wg.Done() fmt.Println("111") }() go func() { orderCount = GetOrderCount(itemId) wg.Done() fmt.Println("222") }() ticker := time.NewTicker(time.Duration(timeOut) * time.Second) defer ticker.Stop() done := make(chan bool) go func() { defer close(done) wg.Wait() }() select { case <-done: case <-ticker.C: fmt.Println("timeout") } return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

运行效果:

func main() { startAt := time.Now().UnixMilli() itemInfo := GetItemWithTimeoutV2(1, 2) endAt := time.Now().UnixMilli() time.Sleep(time.Second * 2) fmt.Printf("get itemInfo by groutine with tiemeoutv2: %v, cost: %dms, goroutines:%d\n", itemInfo, endAt-startAt, runtime.NumGoroutine()) } /* output timeout 111 222 get itemInfo by groutine with tiemeoutv2: {100 0}, cost: 2001ms, goroutines:1 /

标签:一大卖点

本文共计2450个文字,预计阅读时间需要10分钟。

Go并发编程难道不是一件让人头疼的复杂任务吗?

前言:Go语言的重大卖点可以说是并发编程。

作为一门非常年轻的语言(诞生于2006年),在Google的培育下,为了充分利用多核机器资源,Go语言特别强调并发编程。从底层原生支持并发,实现了高效的并发优势。

前言

Go 语言的一大卖点可以说是并发编程。作为一门非常年轻的语言(诞生于2006年),在Google的培育下,为了充分利用多核机器资源的并发优势,从底层原生支持并发。

实现并发很“简单”

Go 语言通过协程(Goroutine)实现并发。启动一个协程只需要在函数调用前加上关键字go即可,协程将会以异步方式执行,不会阻塞。

举个例子,我们实现一个商城商品的接口,这个接口会返回商品价格和下单量,这两部分数据由两个不同的RPC服务提供。最简单的做法是同步去分别调用:

func GetItemPrice(itemId int64) int64 { // 模拟Item RPC 调用 time.Sleep(1 * time.Second) return 100 } func GetOrderCount(itemId int64) int64 { // 模拟Order RPC 调用 time.Sleep(1 * time.Second) return 200 } type ItemInfo struct { ItemPrice int64 OrderCount int64 } func GetItem(itemId int64) ItemInfo { // 内部实现是 item rpc 调用 itemPrice := GetItemPrice(itemId) // 内部实现是 order rpc 调用 orderCount := GetOrderCount(itemId) return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

上面这种实现的缺点显而易见:第二个RPC请求需要等到第一个Rpc返回后再开始调用,这个接口的耗时与调用的RPC接口量成正比。当然这是一个错位的示范,我们可以使用协程来解决这个问题:

func GetItemByGoroutine(itemId int64) ItemInfo { var itemPrice, orderCount int64 // 协程方式执行 go func() { itemPrice = GetItemPrice(itemId) }() go func() { orderCount = GetOrderCount(itemId) }() // 等待协程执行完 time.Sleep(1050 * time.Millisecond) return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

我们来比较一下两个的区别:

startAt := time.Now().UnixMilli() itemInfo := GetItem(1) endAt := time.Now().UnixMilli() fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutine(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt) /* output: get itemInfo: {100 200}, cost: 2000ms get itemInfo by groutine: {100 200}, cost: 1050ms */

可以看到,通过使用协程,我们可以并发地执行两个请求,缩短整体的执行时间。但是现在的实现并不优雅,因为协程是异步执行,不阻塞主流程的,我加了一行 time.Sleep(1050 * time.Millisecond)来强制主流程等待1.05s,这时我认为两个协程应该都执行完了,然后去做后面的数据组装。这里 sleep 的时间有点异想天开了,因为我们设想每个rpc接口的耗时都是1s,而实际情况肯定不会这么幸运:如果突发的网络延迟到时接口响应时间变成了1.5s,那岂不是GG。如果每个协程能够告诉我们他已经执行完成了,我在他们都完成时再去打包数据不就好了?

让它告诉我:它完了

这也就引出了协程间是如何通信的?,在 Go 语言里,协程通过 channel 实现通信,channel 是 Go 语言特有的一种数据类型,声明方式很简单

a := make(chan int) b := make(chan bool) c := make(chan map[int]string)

向 channel 中写入数据的类型必须和 channel 声明时的类型一样,写入方式:

a <- 1 b <- true c <- map[int]string{1: "1"}

读取方式为:

aValue := <- a bValue := <- b cVaule := <- c

如果当前 channel 内没有数据,会一直阻塞在这里。通过这个特性,我们就可以实现到上面讲的:让协程告诉我们它是否执行结束:

func GetItemByGoroutineWithChan(itemId int64) ItemInfo { var itemPrice, orderCount int64 itemCallDone := make(chan bool) orderCallDone := make(chan bool) go func() { itemPrice = GetItemPrice(itemId) itemCallDone <- true }() go func() { orderCount = GetOrderCount(itemId) orderCallDone <- true }() <-itemCallDone <-orderCallDone return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

来看一下效果,果然使用了 channel 效果更好

get itemInfo: {100 200}, cost: 2000ms get itemInfo by groutine: {100 200}, cost: 1051ms get itemInfo by groutine with chan: {100 200}, cost: 1001ms

假设因为网络原因,订单服务的接口延迟变成了3s,商品服务的接口延迟变成了2s

func GetItemPrice(itemId int64) int64 { // 模拟Item RPC 调用延迟 time.Sleep(2 * time.Second) return 100 } func GetOrderCount(itemId int64) int64 { // 模拟Order RPC 调用延迟 time.Sleep(3 * time.Second) return 200 }

再看一下效果,同步调用方式的耗时变成了5秒,只使用协程已无法取到正确数据,通过 channel 方式可以取到数据,并且耗时3秒。

get itemInfo: {100 200}, cost: 5000ms get itemInfo by groutine: {0 0}, cost: 1050ms get itemInfo by groutine with chan: {100 200}, cost: 3001ms 到时间了,准备交卷

实际上,我们的接口响应是有一定时间要求的,比如:2s,现象一下:当我们看到网页一直在转圈时,我们会怎么做?刷新一下,重新发起请求,第一次的请求在3s时返回数据就没有意义了。所以,我们需要对接口调用有一定的超时控制,如果超过约定时间,服务返回兜底数据或直接报错(这依赖于业务上决策)。

因此,我们要在这个订单接口实现超时控制,如果到了规定时间,我们就直接返回,不再等待还没完成的请求了:

func GetItemWithTimeout(itemId int64, timeOut int64) ItemInfo { var itemPrice, orderCount int64 itemCallDone := make(chan bool) orderCallDone := make(chan bool) go func() { itemPrice = GetItemPrice(itemId) itemCallDone <- true }() go func() { orderCount = GetOrderCount(itemId) orderCallDone <- true }() // 这是一个闹钟,到点后 channel time.C 会有数据 ticker := time.NewTicker(time.Duration(timeOut) * time.Second) defer ticker.Stop() // 计数有没有完成 doneCnt := 0 // 标记是否超时 isTimeOut := false for { select { case <-itemCallDone: doneCnt += 1 break case <-orderCallDone: doneCnt += 1 break case <-ticker.C: isTimeOut = true break } if doneCnt == 2 { break } if isTimeOut { fmt.Printf("timeout\n") break } } return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

我们原来运行对比一下:

Go并发编程难道不是一件让人头疼的复杂任务吗?

func main() { startAt := time.Now().UnixMilli() itemInfo := GetItem(1) endAt := time.Now().UnixMilli() fmt.Printf("get itemInfo: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutine(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemByGoroutineWithChan(1) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine with chan: %v, cost: %dms\n", itemInfo, endAt-startAt) startAt = time.Now().UnixMilli() itemInfo = GetItemWithTimeout(1, 2) endAt = time.Now().UnixMilli() fmt.Printf("get itemInfo by groutine with timeout: %v, cost: %dms\n", itemInfo, endAt-startAt) } ```output get itemInfo: {100 200}, cost: 5001ms get itemInfo by groutine: {0 0}, cost: 1051ms get itemInfo by groutine with chan: {100 200}, cost: 3001ms timeout get itemInfo by groutine with timeout: {100 0}, cost: 2001ms

订单服务的接口延迟是3s,商品服务的接口延迟是2s。函数传入的超时时间是2s,所以只取到了商品服务的接口数据后就返回了。

这就是一个简单的并发编程模式,相似的场景都可以套用。

等等,还有坑

通过协程的并发编程好是好,但它还是有坑的,使用是要多加注意

  1. 避免死锁
    使用 channel 通信时,特别是使用无缓冲区类型的 channel 时,一定要主要注意:
  • 千万不要自己写完自己读

func main() { ch := make(chan int, 0) ch <- 666 x := <- ch fmt.Println(x) }

  • 确认协程可以执行的起来,比如下面的协程就起不起来

func main() { ch := make(chan int,0) ch <- 666 go func() { <- ch }() }

  • channel 读写时,逻辑上不要发生死锁

func main() { // 男孩要女孩先说再见,女孩要男孩先说 chBoy := make(chan int,0) chGirl := make(chan int,0) go func() { select { case <- chGirl: chBoy<-888 } }() select { case <- chBoy: chGirl <- 888 } }

  1. 避免协程泄露
    主程序退出时,协程有可能还没退出,如果一直不退出,那就有可能是协程泄露了。常见的协程泄露常见有:
  • 协程中因为 channel 阻塞,无法退出

func goroutineLeak() { p := make(chan int) go func() { time.Sleep(2 * time.Second) // 注意这里,当到达1秒后程序后自动返回,没有人去读channel的数据,就会一直阻塞到这里 p <- 1 fmt.Println("finished...") }() select { case a := <-p: fmt.Printf("got a %v", a) case <-time.After(1 * time.Second): return } } func main() { for i := 0; i <= 5; i++ { goroutineLeak() } fmt.Println(runtime.NumGoroutine()) } /* output: 7 */

所以说,上面实现的方法是有可能出现协程泄露的,那应该如果处理呢,一种方式是使用有缓存channel,另一种是使用WaitGroup来代替。

func GetItemWithTimeoutV2(itemId int64, timeOut int64) ItemInfo { var itemPrice, orderCount int64 var wg sync.WaitGroup wg.Add(2) go func() { itemPrice = GetItemPrice(itemId) wg.Done() fmt.Println("111") }() go func() { orderCount = GetOrderCount(itemId) wg.Done() fmt.Println("222") }() ticker := time.NewTicker(time.Duration(timeOut) * time.Second) defer ticker.Stop() done := make(chan bool) go func() { defer close(done) wg.Wait() }() select { case <-done: case <-ticker.C: fmt.Println("timeout") } return ItemInfo{ ItemPrice: itemPrice, OrderCount: orderCount, } }

运行效果:

func main() { startAt := time.Now().UnixMilli() itemInfo := GetItemWithTimeoutV2(1, 2) endAt := time.Now().UnixMilli() time.Sleep(time.Second * 2) fmt.Printf("get itemInfo by groutine with tiemeoutv2: %v, cost: %dms, goroutines:%d\n", itemInfo, endAt-startAt, runtime.NumGoroutine()) } /* output timeout 111 222 get itemInfo by groutine with tiemeoutv2: {100 0}, cost: 2001ms, goroutines:1 /

标签:一大卖点