Source file
src/runtime/select.go
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "internal/abi"
11 "internal/runtime/sys"
12 "unsafe"
13 )
14
15 const debugSelect = false
16
17
18
19
20 type scase struct {
21 c *hchan
22 elem unsafe.Pointer
23 }
24
25 var (
26 chansendpc = abi.FuncPCABIInternal(chansend)
27 chanrecvpc = abi.FuncPCABIInternal(chanrecv)
28 )
29
30 func selectsetpc(pc *uintptr) {
31 *pc = sys.GetCallerPC()
32 }
33
34 func sellock(scases []scase, lockorder []uint16) {
35 var c *hchan
36 for _, o := range lockorder {
37 c0 := scases[o].c
38 if c0 != c {
39 c = c0
40 lock(&c.lock)
41 }
42 }
43 }
44
45 func selunlock(scases []scase, lockorder []uint16) {
46
47
48
49
50
51
52
53
54 for i := len(lockorder) - 1; i >= 0; i-- {
55 c := scases[lockorder[i]].c
56 if i > 0 && c == scases[lockorder[i-1]].c {
57 continue
58 }
59 unlock(&c.lock)
60 }
61 }
62
63 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
64
65
66
67
68
69 gp.activeStackChans = true
70
71
72
73 gp.parkingOnChan.Store(false)
74
75
76
77
78
79
80
81
82
83
84 var lastc *hchan
85 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
86 if sg.c != lastc && lastc != nil {
87
88
89
90
91
92
93 unlock(&lastc.lock)
94 }
95 lastc = sg.c
96 }
97 if lastc != nil {
98 unlock(&lastc.lock)
99 }
100 return true
101 }
102
103 func block() {
104 gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1)
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
123 gp := getg()
124 if debugSelect {
125 print("select: cas0=", cas0, "\n")
126 }
127
128
129
130 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
131 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
132
133 ncases := nsends + nrecvs
134 scases := cas1[:ncases:ncases]
135 pollorder := order1[:ncases:ncases]
136 lockorder := order1[ncases:][:ncases:ncases]
137
138
139
140
141
142 var pcs []uintptr
143 if raceenabled && pc0 != nil {
144 pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
145 pcs = pc1[:ncases:ncases]
146 }
147 casePC := func(casi int) uintptr {
148 if pcs == nil {
149 return 0
150 }
151 return pcs[casi]
152 }
153
154 var t0 int64
155 if blockprofilerate > 0 {
156 t0 = cputicks()
157 }
158
159
160
161
162
163
164
165
166
167
168 norder := 0
169 allSynctest := true
170 for i := range scases {
171 cas := &scases[i]
172
173
174 if cas.c == nil {
175 cas.elem = nil
176 continue
177 }
178
179 if cas.c.synctest {
180 if getg().syncGroup == nil {
181 panic(plainError("select on synctest channel from outside bubble"))
182 }
183 } else {
184 allSynctest = false
185 }
186
187 if cas.c.timer != nil {
188 cas.c.timer.maybeRunChan()
189 }
190
191 j := cheaprandn(uint32(norder + 1))
192 pollorder[norder] = pollorder[j]
193 pollorder[j] = uint16(i)
194 norder++
195 }
196 pollorder = pollorder[:norder]
197 lockorder = lockorder[:norder]
198
199 waitReason := waitReasonSelect
200 if gp.syncGroup != nil && allSynctest {
201
202
203 waitReason = waitReasonSynctestSelect
204 }
205
206
207
208 for i := range lockorder {
209 j := i
210
211 c := scases[pollorder[i]].c
212 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
213 k := (j - 1) / 2
214 lockorder[j] = lockorder[k]
215 j = k
216 }
217 lockorder[j] = pollorder[i]
218 }
219 for i := len(lockorder) - 1; i >= 0; i-- {
220 o := lockorder[i]
221 c := scases[o].c
222 lockorder[i] = lockorder[0]
223 j := 0
224 for {
225 k := j*2 + 1
226 if k >= i {
227 break
228 }
229 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
230 k++
231 }
232 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
233 lockorder[j] = lockorder[k]
234 j = k
235 continue
236 }
237 break
238 }
239 lockorder[j] = o
240 }
241
242 if debugSelect {
243 for i := 0; i+1 < len(lockorder); i++ {
244 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
245 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
246 throw("select: broken sort")
247 }
248 }
249 }
250
251
252 sellock(scases, lockorder)
253
254 var (
255 sg *sudog
256 c *hchan
257 k *scase
258 sglist *sudog
259 sgnext *sudog
260 qp unsafe.Pointer
261 nextp **sudog
262 )
263
264
265 var casi int
266 var cas *scase
267 var caseSuccess bool
268 var caseReleaseTime int64 = -1
269 var recvOK bool
270 for _, casei := range pollorder {
271 casi = int(casei)
272 cas = &scases[casi]
273 c = cas.c
274
275 if casi >= nsends {
276 sg = c.sendq.dequeue()
277 if sg != nil {
278 goto recv
279 }
280 if c.qcount > 0 {
281 goto bufrecv
282 }
283 if c.closed != 0 {
284 goto rclose
285 }
286 } else {
287 if raceenabled {
288 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
289 }
290 if c.closed != 0 {
291 goto sclose
292 }
293 sg = c.recvq.dequeue()
294 if sg != nil {
295 goto send
296 }
297 if c.qcount < c.dataqsiz {
298 goto bufsend
299 }
300 }
301 }
302
303 if !block {
304 selunlock(scases, lockorder)
305 casi = -1
306 goto retc
307 }
308
309
310 if gp.waiting != nil {
311 throw("gp.waiting != nil")
312 }
313 nextp = &gp.waiting
314 for _, casei := range lockorder {
315 casi = int(casei)
316 cas = &scases[casi]
317 c = cas.c
318 sg := acquireSudog()
319 sg.g = gp
320 sg.isSelect = true
321
322
323 sg.elem = cas.elem
324 sg.releasetime = 0
325 if t0 != 0 {
326 sg.releasetime = -1
327 }
328 sg.c = c
329
330 *nextp = sg
331 nextp = &sg.waitlink
332
333 if casi < nsends {
334 c.sendq.enqueue(sg)
335 } else {
336 c.recvq.enqueue(sg)
337 }
338
339 if c.timer != nil {
340 blockTimerChan(c)
341 }
342 }
343
344
345 gp.param = nil
346
347
348
349
350 gp.parkingOnChan.Store(true)
351 gopark(selparkcommit, nil, waitReason, traceBlockSelect, 1)
352 gp.activeStackChans = false
353
354 sellock(scases, lockorder)
355
356 gp.selectDone.Store(0)
357 sg = (*sudog)(gp.param)
358 gp.param = nil
359
360
361
362
363
364 casi = -1
365 cas = nil
366 caseSuccess = false
367 sglist = gp.waiting
368
369 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
370 sg1.isSelect = false
371 sg1.elem = nil
372 sg1.c = nil
373 }
374 gp.waiting = nil
375
376 for _, casei := range lockorder {
377 k = &scases[casei]
378 if k.c.timer != nil {
379 unblockTimerChan(k.c)
380 }
381 if sg == sglist {
382
383 casi = int(casei)
384 cas = k
385 caseSuccess = sglist.success
386 if sglist.releasetime > 0 {
387 caseReleaseTime = sglist.releasetime
388 }
389 } else {
390 c = k.c
391 if int(casei) < nsends {
392 c.sendq.dequeueSudoG(sglist)
393 } else {
394 c.recvq.dequeueSudoG(sglist)
395 }
396 }
397 sgnext = sglist.waitlink
398 sglist.waitlink = nil
399 releaseSudog(sglist)
400 sglist = sgnext
401 }
402
403 if cas == nil {
404 throw("selectgo: bad wakeup")
405 }
406
407 c = cas.c
408
409 if debugSelect {
410 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
411 }
412
413 if casi < nsends {
414 if !caseSuccess {
415 goto sclose
416 }
417 } else {
418 recvOK = caseSuccess
419 }
420
421 if raceenabled {
422 if casi < nsends {
423 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
424 } else if cas.elem != nil {
425 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
426 }
427 }
428 if msanenabled {
429 if casi < nsends {
430 msanread(cas.elem, c.elemtype.Size_)
431 } else if cas.elem != nil {
432 msanwrite(cas.elem, c.elemtype.Size_)
433 }
434 }
435 if asanenabled {
436 if casi < nsends {
437 asanread(cas.elem, c.elemtype.Size_)
438 } else if cas.elem != nil {
439 asanwrite(cas.elem, c.elemtype.Size_)
440 }
441 }
442
443 selunlock(scases, lockorder)
444 goto retc
445
446 bufrecv:
447
448 if raceenabled {
449 if cas.elem != nil {
450 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
451 }
452 racenotify(c, c.recvx, nil)
453 }
454 if msanenabled && cas.elem != nil {
455 msanwrite(cas.elem, c.elemtype.Size_)
456 }
457 if asanenabled && cas.elem != nil {
458 asanwrite(cas.elem, c.elemtype.Size_)
459 }
460 recvOK = true
461 qp = chanbuf(c, c.recvx)
462 if cas.elem != nil {
463 typedmemmove(c.elemtype, cas.elem, qp)
464 }
465 typedmemclr(c.elemtype, qp)
466 c.recvx++
467 if c.recvx == c.dataqsiz {
468 c.recvx = 0
469 }
470 c.qcount--
471 selunlock(scases, lockorder)
472 goto retc
473
474 bufsend:
475
476 if raceenabled {
477 racenotify(c, c.sendx, nil)
478 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
479 }
480 if msanenabled {
481 msanread(cas.elem, c.elemtype.Size_)
482 }
483 if asanenabled {
484 asanread(cas.elem, c.elemtype.Size_)
485 }
486 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
487 c.sendx++
488 if c.sendx == c.dataqsiz {
489 c.sendx = 0
490 }
491 c.qcount++
492 selunlock(scases, lockorder)
493 goto retc
494
495 recv:
496
497 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
498 if debugSelect {
499 print("syncrecv: cas0=", cas0, " c=", c, "\n")
500 }
501 recvOK = true
502 goto retc
503
504 rclose:
505
506 selunlock(scases, lockorder)
507 recvOK = false
508 if cas.elem != nil {
509 typedmemclr(c.elemtype, cas.elem)
510 }
511 if raceenabled {
512 raceacquire(c.raceaddr())
513 }
514 goto retc
515
516 send:
517
518 if raceenabled {
519 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
520 }
521 if msanenabled {
522 msanread(cas.elem, c.elemtype.Size_)
523 }
524 if asanenabled {
525 asanread(cas.elem, c.elemtype.Size_)
526 }
527 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
528 if debugSelect {
529 print("syncsend: cas0=", cas0, " c=", c, "\n")
530 }
531 goto retc
532
533 retc:
534 if caseReleaseTime > 0 {
535 blockevent(caseReleaseTime-t0, 1)
536 }
537 return casi, recvOK
538
539 sclose:
540
541 selunlock(scases, lockorder)
542 panic(plainError("send on closed channel"))
543 }
544
545 func (c *hchan) sortkey() uintptr {
546 return uintptr(unsafe.Pointer(c))
547 }
548
549
550
551 type runtimeSelect struct {
552 dir selectDir
553 typ unsafe.Pointer
554 ch *hchan
555 val unsafe.Pointer
556 }
557
558
559 type selectDir int
560
561 const (
562 _ selectDir = iota
563 selectSend
564 selectRecv
565 selectDefault
566 )
567
568
569 func reflect_rselect(cases []runtimeSelect) (int, bool) {
570 if len(cases) == 0 {
571 block()
572 }
573 sel := make([]scase, len(cases))
574 orig := make([]int, len(cases))
575 nsends, nrecvs := 0, 0
576 dflt := -1
577 for i, rc := range cases {
578 var j int
579 switch rc.dir {
580 case selectDefault:
581 dflt = i
582 continue
583 case selectSend:
584 j = nsends
585 nsends++
586 case selectRecv:
587 nrecvs++
588 j = len(cases) - nrecvs
589 }
590
591 sel[j] = scase{c: rc.ch, elem: rc.val}
592 orig[j] = i
593 }
594
595
596 if nsends+nrecvs == 0 {
597 return dflt, false
598 }
599
600
601 if nsends+nrecvs < len(cases) {
602 copy(sel[nsends:], sel[len(cases)-nrecvs:])
603 copy(orig[nsends:], orig[len(cases)-nrecvs:])
604 }
605
606 order := make([]uint16, 2*(nsends+nrecvs))
607 var pc0 *uintptr
608 if raceenabled {
609 pcs := make([]uintptr, nsends+nrecvs)
610 for i := range pcs {
611 selectsetpc(&pcs[i])
612 }
613 pc0 = &pcs[0]
614 }
615
616 chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
617
618
619 if chosen < 0 {
620 chosen = dflt
621 } else {
622 chosen = orig[chosen]
623 }
624 return chosen, recvOK
625 }
626
627 func (q *waitq) dequeueSudoG(sgp *sudog) {
628 x := sgp.prev
629 y := sgp.next
630 if x != nil {
631 if y != nil {
632
633 x.next = y
634 y.prev = x
635 sgp.next = nil
636 sgp.prev = nil
637 return
638 }
639
640 x.next = nil
641 q.last = x
642 sgp.prev = nil
643 return
644 }
645 if y != nil {
646
647 y.prev = nil
648 q.first = y
649 sgp.next = nil
650 return
651 }
652
653
654
655 if q.first == sgp {
656 q.first = nil
657 q.last = nil
658 }
659 }
660
View as plain text