如何深入理解go语言中select语句的源码实现?

2026-05-27 16:202阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计5545个文字,预计阅读时间需要23分钟。

如何深入理解go语言中select语句的源码实现?

深入理解Go中的select关键字

1.标签一:1

2.标签二:2

3.标签三:3

4.查看源码实现:

1. 不存在case 2. select中仅存在一个case 3. select中存在两个case,其中一个是default,发送值4 4. 多个4

  • 深入了解下 go 中的 select
    • 前言
      • 1、栗子一
      • 2、栗子二
      • 3、栗子三
    • 看下源码实现
      • 1、不存在 case
      • 2、select 中仅存在一个 case
      • 3、select 中存在两个 case,其中一个是 default
        • 发送值
        • 接收值
      • 4、多个 case 的场景
        • 具体的实现逻辑
        • 1、打乱 case 的顺序
        • 2、找出已经 ready 的 case
        • 3、case 都没 ready,且没有 default
        • 4、唤醒后返回 channel 对应的 case
    • 总结
    • 参考
深入了解下 go 中的 select 前言

这里借助于几个经常遇到的 select 的使用 demo 来作为开始,先来看看,下面几个 demo 的输出情况

1、栗子一

func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { chan1 <- 1 }() go func() { chan2 <- 1 }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") default: fmt.Println("default") } }

select 中的 case 执行是随机的,所以当 case 监听的 channel 有数据传入,就执行相应的流程并退出 select,如果对应的 case 没有收到 channel 的数据,就执行 default 语句,然后退出 select。

上面的协程启动时间是无法预估的,所以上面的两个 case 和 default ,都有机会执行。

可能的输出

可能输出1、

chan1 ready.

可能输出2、

chan2 ready.

可能输出3、

default 2、栗子二

func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { close(chan1) }() go func() { close(chan2) }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") default: fmt.Println("default") } }

已经关闭的 channel ,使用 select 是可以从中读出对应的零值,同时两面关闭 channel 的协程的执行实际也是不可控的,原则上,上面两个 case 和 default 都有可能被执行。

可能的输出

可能输出1、

chan1 ready.

可能输出2、

chan2 ready.

可能输出3、

default 3、栗子三

func main() { select {} }

上面这个,应为没有机会退出,所以会发生死锁

看下源码实现

select 中的多个 case 是随机触发执行的,一次只有一个 case 得到执行。如果我们按照顺序依次判断,那么后面的条件永远都会得不到执行,而随机的引入就是为了避免饥饿问题的发生。

1、如果没有 default 分支

如果没有 default 分支,select 将会一直处于阻塞状态,直到其中的一个 case 就绪;

2、如果有 default 分支

如果有 default 分支,随机将 case 分支遍历一遍,如果有 case 分支可执行,处理对应的 case 分支;

如果遍历完 case 分支,没有可执行的分支,执行 default 分支。

源码版本 go version go1.16.13 darwin/amd64

源码包 src/runtime/select.go 定义了表示case语句的数据结构:

type scase struct { c *hchan // chan elem unsafe.Pointer // data element }

c为当前 case 语句所操作的 channel 指针,这也说明了一个 case 语句只能操作一个 channel。

编译阶段,select 对应的 opType 是 OSELECT,select 语句在编译期间会被转换成 OSELECT 节点。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/syntax.go#L922 OSELECT // select { List } (List is list of OCASE)

如果是 OSELECT 就会调用 walkselect(),然后 walkselect() 最后调用 walkselectcases()

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/walk.go#L104 // The result of walkstmt MUST be assigned back to n, e.g. // n.Left = walkstmt(n.Left) func walkstmt(n *Node) *Node { if n == nil { return n } setlineno(n) walkstmtlist(n.Ninit.Slice()) switch n.Op { ... case OSELECT: walkselect(n) case OSWITCH: walkswitch(n) case ORANGE: n = walkrange(n) } if n.Op == ONAME { Fatalf("walkstmt ended up with name: %+v", n) } return n } // github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L90 func walkselect(sel *Node) { lno := setlineno(sel) if sel.Nbody.Len() != 0 { Fatalf("double walkselect") } init := sel.Ninit.Slice() sel.Ninit.Set(nil) // 调用walkselectcases init = append(init, walkselectcases(&sel.List)...) sel.List.Set(nil) sel.Nbody.Set(init) walkstmtlist(sel.Nbody.Slice()) lineno = lno }

上面的调用逻辑,select 的逻辑是在 walkselectcases() 函数中完成的,这里来重点看下

walkselectcases() 在处理中会分成下面几种情况来处理

1、select 中不存在 case, 直接堵塞;

2、select 中仅存在一个 case;

3、select 中存在两个 case,其中一个是 default;

4、其他 select 情况如: 包含多个 case 并且有 default 等。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L108 func walkselectcases(cases *Nodes) []*Node { // 获取 case 分支的数量 n := cases.Len() // 优化: 没有 case 的情况 if n == 0 { // 翻译为:block() ... return } // 优化: 只有一个 case 的情况 if n == 1 { // 翻译为:if ch == nil { block() }; n; ... return } // 优化: select 中存在两个 case,其中一个是 default 的情况 if n == 2 { // 翻译为:发送或接收 // if selectnbsend(c, v) { body } else { default body } // 接收 // if selectnbrecv(&v, &received, c) { body } else { default body } return } // 一般情况,调用 selecggo ... } 1、不存在 case

如果不存在 case ,空的 select 语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L108 func walkselectcases(cases *Nodes) []*Node { n := cases.Len() if n == 0 { return []*Node{mkcall("block", nil, nil)} } ... } // 调用 runtime.gopark 让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases。 func block() { gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) }

如果没有 case,导致 Goroutine 进入无法被唤醒的永久休眠状态,会触发 deadlock!

2、select 中仅存在一个 case

如果只有一个 case ,编译器会将 select 改写成 if 条件语句。

// 改写前 select { case v, ok <-ch: // case ch <- v ... } // 改写后 if ch == nil { block() } v, ok := <-ch // case ch <- v ...

如果只有一个 case ,walkselectcases 会将 select 根据收发情况装换成 if 语句,如果 case 中的 Channel 是空指针时,会直接挂起当前 Goroutine 并陷入永久休眠。

3、select 中存在两个 case,其中一个是 default 发送值

在 walkselectcases 中 OSEND,对应的就是向 channel 中发送数据,如果是发送的话,会翻译成下面的语句

如何深入理解go语言中select语句的源码实现?

select { case ch <- i: ... default: ... } if selectnbsend(ch, i) { ... } else { // default body ... } func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }

如果是发送,这里翻译之后最终调用 chansend 向 channel 中发送数据

// 这里提供了一个 block,参数设置成 true,那么表示当前发送操作是阻塞的 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 对于不阻塞的 send,快速检测失败场景 // // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是: // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine // 2. channel 是缓冲型的,但循环数组已经装满了元素 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... }

总结下

1、如果 block 为 true 表示当前向 channel 中的数据发送是阻塞的。这里可以看到 selectnbsend 中传入的是 false,说明 channel 的发送不会阻塞 select。

2、对于不阻塞的发送,会进行下面的检测,如果 channel 未关闭且 channel 没有多余的缓冲空间,就会发送失败,然后跳出当前的 case,走到 default 的逻辑。

如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:

  • 1、channel 是非缓冲型的,且等待接收队列里没有 goroutine;

  • 2、channel 是缓冲型的,但循环数组已经装满了元素;

接收值

在 walkselectcases 函数中可以看到,接收方式会有两个,分别是 OSELRECV 和 OSELRECV2

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/walk.go#L104 func walkselectcases(cases *Nodes) []*Node { ... // optimization: two-case select but one is default: single non-blocking op. if ncas == 2 && dflt != nil { switch n.Op { default: Fatalf("select %v", n.Op) case OSELRECV: // if selectnbrecv(&v, c) { body } else { default body } ... r.Left = mkcall1(chanfn("selectnbrecv", 2, ch.Type), types.Types[TBOOL], &r.Ninit, elem, ch) case OSELRECV2: // if selectnbrecv2(&v, &received, c) { body } else { default body } ... r.Left = mkcall1(chanfn("selectnbrecv2", 2, ch.Type), types.Types[TBOOL], &r.Ninit, elem, receivedp, ch) } r.Left = typecheck(r.Left, ctxExpr) r.Nbody.Set(cas.Nbody.Slice()) r.Rlist.Set(append(dflt.Ninit.Slice(), dflt.Nbody.Slice()...)) return []*Node{r, nod(OBREAK, nil, nil)} } ... }

walkselectcases 对这两种情况的改写

selectnbrecv

select { case v = <-c: ... default: ... } // 改写后 if selectnbrecv(&v, c) { ... } else { // default body ... }

selectnbrecv2

select { case v, ok = <-c: ... foo default: ... bar } // 改写后 if c != nil && selectnbrecv2(&v, &ok, c) { ... foo } else { // default body ... bar }

selectnbrecv 和 selectnbrecv2 有什么区别呢?

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/chan.go#L707 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return }

可以发现只是针对返回的值处理不同,selectnbrecv2 多了一个是否 received 的 bool 值

总结下:

对于接收值的 case 会有两种处理方式,这两种,区别在于是否将 received 的 bool 值传送给调用方

4、多个 case 的场景

多个 case 的场景

1、会将其中所有 case 转化为 scase 结构体;

2、调用运行时函数 selectgo 选取触发的 scase 结构体;

3、通过 for 循环生成一组 if 语句,来判断是否选中 case;

这里来看下 selectgo 的实现

这里看下函数的几个参数

cas0:为 scase 数组的首地址,selectgo() 就是从这些 scase 中找出一个返回;

order0:为一个两倍 cas0 数组长度的 buffer,保存 scase 随机序列 pollorder 和 scase 中 channel 地址序列 lockorder,数组前一半是 pollorder,后一半用来 lockorder;

  • pollorder:每次 selectgo 执行都会把 scase 序列打乱,以达到随机检测 case 的目的;

  • lockorder:所有 case 语句中 channel 序列,以达到去重防止对 channel 加锁时重复加锁的目的;

pc0:对于竞态检测器构建,pc0 指向一个数组类型[ncases]uintptr (也在栈上);对于其他版本,它设置为 nil;

nsends: 发送的 case 的个数;

nrecvs: 接收的 case 的个数;

block: 表示是否存在 default,没有 default 就表示 select 是阻塞的。

看下返回的数据

int: 选中case的编号,这个case编号跟代码一致;

bool: 是否成功从channle中读取了数据,如果选中的case是从channel中读数据,则该返回值表示是否读取成功。

具体的实现逻辑

1、打乱 scase 的顺序,锁定 scase 语句中所有的 channel;

2、按照随机顺序检测 scase 中的 channel 是否ready;

  • 2.1 如果 case 可读,则读取 channel 中数据,解锁所有的 channel,然后返回(case index, true)

  • 2.2 如果 case 可写,则将数据写入 channel,解锁所有的channel,然后返回(case index, false)

  • 2.3 所有 case 都未 ready,并且有 default 语句,则解锁所有的channel,然后返回 (default index, false)

3、所有 case 都未 ready,且没有 default 语句

  • 3.1 将当前协程加入到所有 channel 的等待队列

  • 3.2 当将协程转入阻塞,等待被唤醒

4、唤醒后返回 channel 对应的case index

  • 4.1 如果是读操作,解锁所有的 channel,然后返回(case index, true)

  • 4.2 如果是写操作,解锁所有的 channel,然后返回(case index, false)

这里来分析下 selectgo 的具体实现

1、打乱 case 的顺序

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... // 生成随机顺序 norder := 0 for i := range scases { cas := &scases[i] // 忽略轮询和锁定命令中没有通道的情况 if cas.c == nil { cas.elem = nil // allow GC continue } j := fastrandn(uint32(norder + 1)) pollorder[norder] = pollorder[j] pollorder[j] = uint16(i) norder++ } pollorder = pollorder[:norder] lockorder = lockorder[:norder] // 根据 channel 地址进行排序,决定获取锁的顺序 for i := range lockorder { j := i // Start with the pollorder to permute cases on the same channel. c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = pollorder[i] } ... // 锁定选中的 channel sellock(scases, lockorder) ... }

select 中的多个 case 是随机触发执行的,一次只有一个 case 得到执行。如果我们按照顺序依次判断,那么后面的条件永远都会得不到执行,而随机的引入就是为了避免饥饿问题的发生。

所以可以看到上面会将 scase 序列打乱,以达到随机检测 case 的目的,然后记录到 pollorder 中。

2、找出已经 ready 的 case

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) // pass 1 - 遍历所有 scase,确定已经准备好的 scase var casi int var cas *scase var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool // 因为上面已经将scases随机写入到pollorder中 // 所以这里的遍历相比于原 cas0的顺序,就是随机的 for _, casei := range pollorder { casi = int(casei) cas = &scases[casi] c = cas.c // 接收数据 if casi >= nsends { // 有 goroutine 等待发送数据 sg = c.sendq.dequeue() if sg != nil { goto recv } // 缓冲区有数据 if c.qcount > 0 { goto bufrecv } // 通道关闭 if c.closed != 0 { goto rclose } // 发送数据 } else { if raceenabled { racereadpc(c.raceaddr(), casePC(casi), chansendpc) } // 判断通道的关闭情况 if c.closed != 0 { goto sclose } // 接收等待队列有 goroutine sg = c.recvq.dequeue() if sg != nil { goto send } // 缓冲区有空位置 if c.qcount < c.dataqsiz { goto bufsend } } } // 如果不阻塞,意味着有 default,准备退出select if !block { selunlock(scases, lockorder) casi = -1 goto retc } ... bufrecv: // 可以从 buffer 接收 if raceenabled { if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } racenotify(c, c.recvx, nil) } if msanenabled && cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) } recvOK = true qp = chanbuf(c, c.recvx) if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- selunlock(scases, lockorder) goto retc bufsend: // 可以发送到 buffer if raceenabled { racenotify(c, c.sendx, nil) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ selunlock(scases, lockorder) goto retc recv: // 可以从一个休眠的发送方 (sg)直接接收 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true goto retc rclose: // 在已经关闭的 channel 末尾进行读 selunlock(scases, lockorder) recvOK = false if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } if raceenabled { raceacquire(c.raceaddr()) } goto retc send: // 可以向一个休眠的接收方 (sg) 发送 if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncsend: cas0=", cas0, " c=", c, "\n") } goto retc retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK sclose: // 向已关闭的 channel 进行发送 selunlock(scases, lockorder) panic(plainError("send on closed channel")) }

1、因为上面已经将 scases 随机写入到 pollorder 中,所以这里的遍历相比于原 cas0 的顺序,就是随机的;

2、case 监听的 channel 有两种操作,读取或者写入;

读取数据

  • 1、如果有发送的 goroutine 在等待数据的接收,那么直接从这个 goroutine 中读出数据,结束 select;

  • 2、如果 channel 的缓冲区有数据,在缓冲去读出数据, 结束 select;

  • 3、如果 channel 关闭了,读出零值,结束 select。

所以可看出,已经关闭的 channel ,用 select 是可以读出数据的。

发送数据

  • 1、如果 channel 关闭了,这时候会触发 panic,因为已经关闭的 channel 是不能发送数据的;

  • 2、如果 channel 的接收等待队列有 goroutine,说明有 goroutine ,正在阻塞等待从该 channel 中接收数据,那么数据直接发送给该 goroutine,结束 select;

  • 3、如果 channel 的缓冲区有数据,发送到数据到 channel 的缓冲区中,结束 select。

如果发送的 channel 中没有缓存空间,接收 channel 的缓存空间为空。这时候该 select 将会阻塞。

如果有 block 为 false ,就表示 select 中有 default,然后执行 default 结束 select。

如果 block 为 true 表示没有 default,需要在阻塞 select,细节见下文。

3、case 都没 ready,且没有 default

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... // pass 2 - 所有 channel 入队,等待处理 gp = getg() if gp.waiting != nil { throw("gp.waiting != nil") } nextp = &gp.waiting for _, casei := range lockorder { casi = int(casei) // 获取一个 scase cas = &scases[casi] // 监听的 channel c = cas.c // 构建sudog,设置这一次阻塞发送的相关信息 sg := acquireSudog() sg.g = gp sg.isSelect = true // No stack splits between assigning elem and enqueuing // sg on gp.waiting where copystack can find it. sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c // 按锁定顺序构造等待列表。 *nextp = sg nextp = &sg.waitlink if casi < nsends { c.sendq.enqueue(sg) } else { c.recvq.enqueue(sg) } } // goroutine 陷入睡眠,等待某一个 channel 唤醒 goroutine gp.param = nil // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 将当前的 Goroutine 陷入沉睡等待唤醒 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) gp.activeStackChans = false sellock(scases, lockorder) gp.selectDone = 0 sg = (*sudog)(gp.param) gp.param = nil ... }

如果 case 都没有 ready ,并没有 default

这时候会循环构建 sudog 的队列,并且按锁定顺序构造等待列表,附在 goroutine 中,然后使用 gopark 挂起当前 goroutine 等待调度器的唤醒。

4、唤醒后返回 channel 对应的 case

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... casi = -1 cas = nil caseSuccess = false sglist = gp.waiting // 在从 gp.waiting 取消链接之前清除所有元素。 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { k = &scases[casei] if sg == sglist { // sg has already been dequeued by the G that woke us up. casi = int(casei) cas = k caseSuccess = sglist.success if sglist.releasetime > 0 { caseReleaseTime = sglist.releasetime } } else { c = k.c if int(casei) < nsends { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } ... selunlock(scases, lockorder) goto retc ... retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK ... }

遍历全部 case 时,先获取当前 Goroutine 接收到的参数 sudog 结构,然后依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

因为已经找到了一个可执行的 case,剩下的 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

总结

1、空的 select 会发生死锁;

2、select 中的 case 分支的执行是随机的;

3、如果没有 default 分支

如果没有 default 分支,select 将会一直处于阻塞状态,直到其中的一个 case 就绪;

4、如果有 default 分支

如果有 default 分支,随机将 case 分支遍历一遍,如果有 case 分支可执行,处理对应的 case 分支;

如果遍历完 case 分支,没有可执行的分支,执行 default 分支。

5、select 中向 channel 的发送不会阻塞 select;

6、select 语句中读操作要判断是否成功读取,关闭的 channel 也可以读取。

参考

golang.design/under-the-hood/zh-cn/part1basic/ch03lang/chan/#select-
draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/#52-select
nercoeus.github.io/2020/01/13/go源码阅读之Select/
book.douban.com/subject/35144587/
github.com/boilingfrog/Go-POINT/blob/master/golang/select/select源码阅读.md
boilingfrog.github.io/2022/04/16/go中select源码阅读/

标签:sele

本文共计5545个文字,预计阅读时间需要23分钟。

如何深入理解go语言中select语句的源码实现?

深入理解Go中的select关键字

1.标签一:1

2.标签二:2

3.标签三:3

4.查看源码实现:

1. 不存在case 2. select中仅存在一个case 3. select中存在两个case,其中一个是default,发送值4 4. 多个4

  • 深入了解下 go 中的 select
    • 前言
      • 1、栗子一
      • 2、栗子二
      • 3、栗子三
    • 看下源码实现
      • 1、不存在 case
      • 2、select 中仅存在一个 case
      • 3、select 中存在两个 case,其中一个是 default
        • 发送值
        • 接收值
      • 4、多个 case 的场景
        • 具体的实现逻辑
        • 1、打乱 case 的顺序
        • 2、找出已经 ready 的 case
        • 3、case 都没 ready,且没有 default
        • 4、唤醒后返回 channel 对应的 case
    • 总结
    • 参考
深入了解下 go 中的 select 前言

这里借助于几个经常遇到的 select 的使用 demo 来作为开始,先来看看,下面几个 demo 的输出情况

1、栗子一

func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { chan1 <- 1 }() go func() { chan2 <- 1 }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") default: fmt.Println("default") } }

select 中的 case 执行是随机的,所以当 case 监听的 channel 有数据传入,就执行相应的流程并退出 select,如果对应的 case 没有收到 channel 的数据,就执行 default 语句,然后退出 select。

上面的协程启动时间是无法预估的,所以上面的两个 case 和 default ,都有机会执行。

可能的输出

可能输出1、

chan1 ready.

可能输出2、

chan2 ready.

可能输出3、

default 2、栗子二

func main() { chan1 := make(chan int) chan2 := make(chan int) go func() { close(chan1) }() go func() { close(chan2) }() select { case <-chan1: fmt.Println("chan1 ready.") case <-chan2: fmt.Println("chan2 ready.") default: fmt.Println("default") } }

已经关闭的 channel ,使用 select 是可以从中读出对应的零值,同时两面关闭 channel 的协程的执行实际也是不可控的,原则上,上面两个 case 和 default 都有可能被执行。

可能的输出

可能输出1、

chan1 ready.

可能输出2、

chan2 ready.

可能输出3、

default 3、栗子三

func main() { select {} }

上面这个,应为没有机会退出,所以会发生死锁

看下源码实现

select 中的多个 case 是随机触发执行的,一次只有一个 case 得到执行。如果我们按照顺序依次判断,那么后面的条件永远都会得不到执行,而随机的引入就是为了避免饥饿问题的发生。

1、如果没有 default 分支

如果没有 default 分支,select 将会一直处于阻塞状态,直到其中的一个 case 就绪;

2、如果有 default 分支

如果有 default 分支,随机将 case 分支遍历一遍,如果有 case 分支可执行,处理对应的 case 分支;

如果遍历完 case 分支,没有可执行的分支,执行 default 分支。

源码版本 go version go1.16.13 darwin/amd64

源码包 src/runtime/select.go 定义了表示case语句的数据结构:

type scase struct { c *hchan // chan elem unsafe.Pointer // data element }

c为当前 case 语句所操作的 channel 指针,这也说明了一个 case 语句只能操作一个 channel。

编译阶段,select 对应的 opType 是 OSELECT,select 语句在编译期间会被转换成 OSELECT 节点。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/syntax.go#L922 OSELECT // select { List } (List is list of OCASE)

如果是 OSELECT 就会调用 walkselect(),然后 walkselect() 最后调用 walkselectcases()

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/walk.go#L104 // The result of walkstmt MUST be assigned back to n, e.g. // n.Left = walkstmt(n.Left) func walkstmt(n *Node) *Node { if n == nil { return n } setlineno(n) walkstmtlist(n.Ninit.Slice()) switch n.Op { ... case OSELECT: walkselect(n) case OSWITCH: walkswitch(n) case ORANGE: n = walkrange(n) } if n.Op == ONAME { Fatalf("walkstmt ended up with name: %+v", n) } return n } // github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L90 func walkselect(sel *Node) { lno := setlineno(sel) if sel.Nbody.Len() != 0 { Fatalf("double walkselect") } init := sel.Ninit.Slice() sel.Ninit.Set(nil) // 调用walkselectcases init = append(init, walkselectcases(&sel.List)...) sel.List.Set(nil) sel.Nbody.Set(init) walkstmtlist(sel.Nbody.Slice()) lineno = lno }

上面的调用逻辑,select 的逻辑是在 walkselectcases() 函数中完成的,这里来重点看下

walkselectcases() 在处理中会分成下面几种情况来处理

1、select 中不存在 case, 直接堵塞;

2、select 中仅存在一个 case;

3、select 中存在两个 case,其中一个是 default;

4、其他 select 情况如: 包含多个 case 并且有 default 等。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L108 func walkselectcases(cases *Nodes) []*Node { // 获取 case 分支的数量 n := cases.Len() // 优化: 没有 case 的情况 if n == 0 { // 翻译为:block() ... return } // 优化: 只有一个 case 的情况 if n == 1 { // 翻译为:if ch == nil { block() }; n; ... return } // 优化: select 中存在两个 case,其中一个是 default 的情况 if n == 2 { // 翻译为:发送或接收 // if selectnbsend(c, v) { body } else { default body } // 接收 // if selectnbrecv(&v, &received, c) { body } else { default body } return } // 一般情况,调用 selecggo ... } 1、不存在 case

如果不存在 case ,空的 select 语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/select.go#L108 func walkselectcases(cases *Nodes) []*Node { n := cases.Len() if n == 0 { return []*Node{mkcall("block", nil, nil)} } ... } // 调用 runtime.gopark 让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases。 func block() { gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) }

如果没有 case,导致 Goroutine 进入无法被唤醒的永久休眠状态,会触发 deadlock!

2、select 中仅存在一个 case

如果只有一个 case ,编译器会将 select 改写成 if 条件语句。

// 改写前 select { case v, ok <-ch: // case ch <- v ... } // 改写后 if ch == nil { block() } v, ok := <-ch // case ch <- v ...

如果只有一个 case ,walkselectcases 会将 select 根据收发情况装换成 if 语句,如果 case 中的 Channel 是空指针时,会直接挂起当前 Goroutine 并陷入永久休眠。

3、select 中存在两个 case,其中一个是 default 发送值

在 walkselectcases 中 OSEND,对应的就是向 channel 中发送数据,如果是发送的话,会翻译成下面的语句

如何深入理解go语言中select语句的源码实现?

select { case ch <- i: ... default: ... } if selectnbsend(ch, i) { ... } else { // default body ... } func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }

如果是发送,这里翻译之后最终调用 chansend 向 channel 中发送数据

// 这里提供了一个 block,参数设置成 true,那么表示当前发送操作是阻塞的 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 对于不阻塞的 send,快速检测失败场景 // // 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是: // 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine // 2. channel 是缓冲型的,但循环数组已经装满了元素 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... }

总结下

1、如果 block 为 true 表示当前向 channel 中的数据发送是阻塞的。这里可以看到 selectnbsend 中传入的是 false,说明 channel 的发送不会阻塞 select。

2、对于不阻塞的发送,会进行下面的检测,如果 channel 未关闭且 channel 没有多余的缓冲空间,就会发送失败,然后跳出当前的 case,走到 default 的逻辑。

如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:

  • 1、channel 是非缓冲型的,且等待接收队列里没有 goroutine;

  • 2、channel 是缓冲型的,但循环数组已经装满了元素;

接收值

在 walkselectcases 函数中可以看到,接收方式会有两个,分别是 OSELRECV 和 OSELRECV2

// github.com/golang/go/blob/release-branch.go1.16/src/cmd/compile/internal/gc/walk.go#L104 func walkselectcases(cases *Nodes) []*Node { ... // optimization: two-case select but one is default: single non-blocking op. if ncas == 2 && dflt != nil { switch n.Op { default: Fatalf("select %v", n.Op) case OSELRECV: // if selectnbrecv(&v, c) { body } else { default body } ... r.Left = mkcall1(chanfn("selectnbrecv", 2, ch.Type), types.Types[TBOOL], &r.Ninit, elem, ch) case OSELRECV2: // if selectnbrecv2(&v, &received, c) { body } else { default body } ... r.Left = mkcall1(chanfn("selectnbrecv2", 2, ch.Type), types.Types[TBOOL], &r.Ninit, elem, receivedp, ch) } r.Left = typecheck(r.Left, ctxExpr) r.Nbody.Set(cas.Nbody.Slice()) r.Rlist.Set(append(dflt.Ninit.Slice(), dflt.Nbody.Slice()...)) return []*Node{r, nod(OBREAK, nil, nil)} } ... }

walkselectcases 对这两种情况的改写

selectnbrecv

select { case v = <-c: ... default: ... } // 改写后 if selectnbrecv(&v, c) { ... } else { // default body ... }

selectnbrecv2

select { case v, ok = <-c: ... foo default: ... bar } // 改写后 if c != nil && selectnbrecv2(&v, &ok, c) { ... foo } else { // default body ... bar }

selectnbrecv 和 selectnbrecv2 有什么区别呢?

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/chan.go#L707 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) { // TODO(khr): just return 2 values from this function, now that it is in Go. selected, *received = chanrecv(c, elem, false) return }

可以发现只是针对返回的值处理不同,selectnbrecv2 多了一个是否 received 的 bool 值

总结下:

对于接收值的 case 会有两种处理方式,这两种,区别在于是否将 received 的 bool 值传送给调用方

4、多个 case 的场景

多个 case 的场景

1、会将其中所有 case 转化为 scase 结构体;

2、调用运行时函数 selectgo 选取触发的 scase 结构体;

3、通过 for 循环生成一组 if 语句,来判断是否选中 case;

这里来看下 selectgo 的实现

这里看下函数的几个参数

cas0:为 scase 数组的首地址,selectgo() 就是从这些 scase 中找出一个返回;

order0:为一个两倍 cas0 数组长度的 buffer,保存 scase 随机序列 pollorder 和 scase 中 channel 地址序列 lockorder,数组前一半是 pollorder,后一半用来 lockorder;

  • pollorder:每次 selectgo 执行都会把 scase 序列打乱,以达到随机检测 case 的目的;

  • lockorder:所有 case 语句中 channel 序列,以达到去重防止对 channel 加锁时重复加锁的目的;

pc0:对于竞态检测器构建,pc0 指向一个数组类型[ncases]uintptr (也在栈上);对于其他版本,它设置为 nil;

nsends: 发送的 case 的个数;

nrecvs: 接收的 case 的个数;

block: 表示是否存在 default,没有 default 就表示 select 是阻塞的。

看下返回的数据

int: 选中case的编号,这个case编号跟代码一致;

bool: 是否成功从channle中读取了数据,如果选中的case是从channel中读数据,则该返回值表示是否读取成功。

具体的实现逻辑

1、打乱 scase 的顺序,锁定 scase 语句中所有的 channel;

2、按照随机顺序检测 scase 中的 channel 是否ready;

  • 2.1 如果 case 可读,则读取 channel 中数据,解锁所有的 channel,然后返回(case index, true)

  • 2.2 如果 case 可写,则将数据写入 channel,解锁所有的channel,然后返回(case index, false)

  • 2.3 所有 case 都未 ready,并且有 default 语句,则解锁所有的channel,然后返回 (default index, false)

3、所有 case 都未 ready,且没有 default 语句

  • 3.1 将当前协程加入到所有 channel 的等待队列

  • 3.2 当将协程转入阻塞,等待被唤醒

4、唤醒后返回 channel 对应的case index

  • 4.1 如果是读操作,解锁所有的 channel,然后返回(case index, true)

  • 4.2 如果是写操作,解锁所有的 channel,然后返回(case index, false)

这里来分析下 selectgo 的具体实现

1、打乱 case 的顺序

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... // 生成随机顺序 norder := 0 for i := range scases { cas := &scases[i] // 忽略轮询和锁定命令中没有通道的情况 if cas.c == nil { cas.elem = nil // allow GC continue } j := fastrandn(uint32(norder + 1)) pollorder[norder] = pollorder[j] pollorder[j] = uint16(i) norder++ } pollorder = pollorder[:norder] lockorder = lockorder[:norder] // 根据 channel 地址进行排序,决定获取锁的顺序 for i := range lockorder { j := i // Start with the pollorder to permute cases on the same channel. c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = pollorder[i] } ... // 锁定选中的 channel sellock(scases, lockorder) ... }

select 中的多个 case 是随机触发执行的,一次只有一个 case 得到执行。如果我们按照顺序依次判断,那么后面的条件永远都会得不到执行,而随机的引入就是为了避免饥饿问题的发生。

所以可以看到上面会将 scase 序列打乱,以达到随机检测 case 的目的,然后记录到 pollorder 中。

2、找出已经 ready 的 case

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog ) // pass 1 - 遍历所有 scase,确定已经准备好的 scase var casi int var cas *scase var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool // 因为上面已经将scases随机写入到pollorder中 // 所以这里的遍历相比于原 cas0的顺序,就是随机的 for _, casei := range pollorder { casi = int(casei) cas = &scases[casi] c = cas.c // 接收数据 if casi >= nsends { // 有 goroutine 等待发送数据 sg = c.sendq.dequeue() if sg != nil { goto recv } // 缓冲区有数据 if c.qcount > 0 { goto bufrecv } // 通道关闭 if c.closed != 0 { goto rclose } // 发送数据 } else { if raceenabled { racereadpc(c.raceaddr(), casePC(casi), chansendpc) } // 判断通道的关闭情况 if c.closed != 0 { goto sclose } // 接收等待队列有 goroutine sg = c.recvq.dequeue() if sg != nil { goto send } // 缓冲区有空位置 if c.qcount < c.dataqsiz { goto bufsend } } } // 如果不阻塞,意味着有 default,准备退出select if !block { selunlock(scases, lockorder) casi = -1 goto retc } ... bufrecv: // 可以从 buffer 接收 if raceenabled { if cas.elem != nil { raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc) } racenotify(c, c.recvx, nil) } if msanenabled && cas.elem != nil { msanwrite(cas.elem, c.elemtype.size) } recvOK = true qp = chanbuf(c, c.recvx) if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- selunlock(scases, lockorder) goto retc bufsend: // 可以发送到 buffer if raceenabled { racenotify(c, c.sendx, nil) raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ selunlock(scases, lockorder) goto retc recv: // 可以从一个休眠的发送方 (sg)直接接收 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true goto retc rclose: // 在已经关闭的 channel 末尾进行读 selunlock(scases, lockorder) recvOK = false if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } if raceenabled { raceacquire(c.raceaddr()) } goto retc send: // 可以向一个休眠的接收方 (sg) 发送 if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc) } if msanenabled { msanread(cas.elem, c.elemtype.size) } send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncsend: cas0=", cas0, " c=", c, "\n") } goto retc retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK sclose: // 向已关闭的 channel 进行发送 selunlock(scases, lockorder) panic(plainError("send on closed channel")) }

1、因为上面已经将 scases 随机写入到 pollorder 中,所以这里的遍历相比于原 cas0 的顺序,就是随机的;

2、case 监听的 channel 有两种操作,读取或者写入;

读取数据

  • 1、如果有发送的 goroutine 在等待数据的接收,那么直接从这个 goroutine 中读出数据,结束 select;

  • 2、如果 channel 的缓冲区有数据,在缓冲去读出数据, 结束 select;

  • 3、如果 channel 关闭了,读出零值,结束 select。

所以可看出,已经关闭的 channel ,用 select 是可以读出数据的。

发送数据

  • 1、如果 channel 关闭了,这时候会触发 panic,因为已经关闭的 channel 是不能发送数据的;

  • 2、如果 channel 的接收等待队列有 goroutine,说明有 goroutine ,正在阻塞等待从该 channel 中接收数据,那么数据直接发送给该 goroutine,结束 select;

  • 3、如果 channel 的缓冲区有数据,发送到数据到 channel 的缓冲区中,结束 select。

如果发送的 channel 中没有缓存空间,接收 channel 的缓存空间为空。这时候该 select 将会阻塞。

如果有 block 为 false ,就表示 select 中有 default,然后执行 default 结束 select。

如果 block 为 true 表示没有 default,需要在阻塞 select,细节见下文。

3、case 都没 ready,且没有 default

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... // pass 2 - 所有 channel 入队,等待处理 gp = getg() if gp.waiting != nil { throw("gp.waiting != nil") } nextp = &gp.waiting for _, casei := range lockorder { casi = int(casei) // 获取一个 scase cas = &scases[casi] // 监听的 channel c = cas.c // 构建sudog,设置这一次阻塞发送的相关信息 sg := acquireSudog() sg.g = gp sg.isSelect = true // No stack splits between assigning elem and enqueuing // sg on gp.waiting where copystack can find it. sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c // 按锁定顺序构造等待列表。 *nextp = sg nextp = &sg.waitlink if casi < nsends { c.sendq.enqueue(sg) } else { c.recvq.enqueue(sg) } } // goroutine 陷入睡眠,等待某一个 channel 唤醒 goroutine gp.param = nil // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 将当前的 Goroutine 陷入沉睡等待唤醒 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) gp.activeStackChans = false sellock(scases, lockorder) gp.selectDone = 0 sg = (*sudog)(gp.param) gp.param = nil ... }

如果 case 都没有 ready ,并没有 default

这时候会循环构建 sudog 的队列,并且按锁定顺序构造等待列表,附在 goroutine 中,然后使用 gopark 挂起当前 goroutine 等待调度器的唤醒。

4、唤醒后返回 channel 对应的 case

// github.com/golang/go/blob/release-branch.go1.16/src/runtime/select.go#L121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... casi = -1 cas = nil caseSuccess = false sglist = gp.waiting // 在从 gp.waiting 取消链接之前清除所有元素。 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { k = &scases[casei] if sg == sglist { // sg has already been dequeued by the G that woke us up. casi = int(casei) cas = k caseSuccess = sglist.success if sglist.releasetime > 0 { caseReleaseTime = sglist.releasetime } } else { c = k.c if int(casei) < nsends { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } ... selunlock(scases, lockorder) goto retc ... retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK ... }

遍历全部 case 时,先获取当前 Goroutine 接收到的参数 sudog 结构,然后依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

因为已经找到了一个可执行的 case,剩下的 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

总结

1、空的 select 会发生死锁;

2、select 中的 case 分支的执行是随机的;

3、如果没有 default 分支

如果没有 default 分支,select 将会一直处于阻塞状态,直到其中的一个 case 就绪;

4、如果有 default 分支

如果有 default 分支,随机将 case 分支遍历一遍,如果有 case 分支可执行,处理对应的 case 分支;

如果遍历完 case 分支,没有可执行的分支,执行 default 分支。

5、select 中向 channel 的发送不会阻塞 select;

6、select 语句中读操作要判断是否成功读取,关闭的 channel 也可以读取。

参考

golang.design/under-the-hood/zh-cn/part1basic/ch03lang/chan/#select-
draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/#52-select
nercoeus.github.io/2020/01/13/go源码阅读之Select/
book.douban.com/subject/35144587/
github.com/boilingfrog/Go-POINT/blob/master/golang/select/select源码阅读.md
boilingfrog.github.io/2022/04/16/go中select源码阅读/

标签:sele