Source file
src/runtime/chan.go
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "internal/abi"
22 "internal/runtime/atomic"
23 "internal/runtime/math"
24 "internal/runtime/sys"
25 "unsafe"
26 )
27
28 const (
29 maxAlign = 8
30 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
31 debugChan = false
32 )
33
34 type hchan struct {
35 qcount uint
36 dataqsiz uint
37 buf unsafe.Pointer
38 elemsize uint16
39 synctest bool
40 closed uint32
41 timer *timer
42 elemtype *_type
43 sendx uint
44 recvx uint
45 recvq waitq
46 sendq waitq
47
48
49
50
51
52
53
54 lock mutex
55 }
56
57 type waitq struct {
58 first *sudog
59 last *sudog
60 }
61
62
63 func reflect_makechan(t *chantype, size int) *hchan {
64 return makechan(t, size)
65 }
66
67 func makechan64(t *chantype, size int64) *hchan {
68 if int64(int(size)) != size {
69 panic(plainError("makechan: size out of range"))
70 }
71
72 return makechan(t, int(size))
73 }
74
75 func makechan(t *chantype, size int) *hchan {
76 elem := t.Elem
77
78
79 if elem.Size_ >= 1<<16 {
80 throw("makechan: invalid channel element type")
81 }
82 if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
83 throw("makechan: bad alignment")
84 }
85
86 mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
87 if overflow || mem > maxAlloc-hchanSize || size < 0 {
88 panic(plainError("makechan: size out of range"))
89 }
90
91
92
93
94
95 var c *hchan
96 switch {
97 case mem == 0:
98
99 c = (*hchan)(mallocgc(hchanSize, nil, true))
100
101 c.buf = c.raceaddr()
102 case !elem.Pointers():
103
104
105 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
106 c.buf = add(unsafe.Pointer(c), hchanSize)
107 default:
108
109 c = new(hchan)
110 c.buf = mallocgc(mem, elem, true)
111 }
112
113 c.elemsize = uint16(elem.Size_)
114 c.elemtype = elem
115 c.dataqsiz = uint(size)
116 if getg().syncGroup != nil {
117 c.synctest = true
118 }
119 lockInit(&c.lock, lockRankHchan)
120
121 if debugChan {
122 print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
123 }
124 return c
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138 func chanbuf(c *hchan, i uint) unsafe.Pointer {
139 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
140 }
141
142
143
144
145
146 func full(c *hchan) bool {
147
148
149 if c.dataqsiz == 0 {
150
151 return c.recvq.first == nil
152 }
153
154 return c.qcount == c.dataqsiz
155 }
156
157
158
159
160 func chansend1(c *hchan, elem unsafe.Pointer) {
161 chansend(c, elem, true, sys.GetCallerPC())
162 }
163
164
176 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
177 if c == nil {
178 if !block {
179 return false
180 }
181 gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
182 throw("unreachable")
183 }
184
185 if debugChan {
186 print("chansend: chan=", c, "\n")
187 }
188
189 if raceenabled {
190 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
191 }
192
193 if c.synctest && getg().syncGroup == nil {
194 panic(plainError("send on synctest channel from outside bubble"))
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 if !block && c.closed == 0 && full(c) {
214 return false
215 }
216
217 var t0 int64
218 if blockprofilerate > 0 {
219 t0 = cputicks()
220 }
221
222 lock(&c.lock)
223
224 if c.closed != 0 {
225 unlock(&c.lock)
226 panic(plainError("send on closed channel"))
227 }
228
229 if sg := c.recvq.dequeue(); sg != nil {
230
231
232 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
233 return true
234 }
235
236 if c.qcount < c.dataqsiz {
237
238 qp := chanbuf(c, c.sendx)
239 if raceenabled {
240 racenotify(c, c.sendx, nil)
241 }
242 typedmemmove(c.elemtype, qp, ep)
243 c.sendx++
244 if c.sendx == c.dataqsiz {
245 c.sendx = 0
246 }
247 c.qcount++
248 unlock(&c.lock)
249 return true
250 }
251
252 if !block {
253 unlock(&c.lock)
254 return false
255 }
256
257
258 gp := getg()
259 mysg := acquireSudog()
260 mysg.releasetime = 0
261 if t0 != 0 {
262 mysg.releasetime = -1
263 }
264
265
266 mysg.elem = ep
267 mysg.waitlink = nil
268 mysg.g = gp
269 mysg.isSelect = false
270 mysg.c = c
271 gp.waiting = mysg
272 gp.param = nil
273 c.sendq.enqueue(mysg)
274
275
276
277
278 gp.parkingOnChan.Store(true)
279 reason := waitReasonChanSend
280 if c.synctest {
281 reason = waitReasonSynctestChanSend
282 }
283 gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
284
285
286
287
288 KeepAlive(ep)
289
290
291 if mysg != gp.waiting {
292 throw("G waiting list is corrupted")
293 }
294 gp.waiting = nil
295 gp.activeStackChans = false
296 closed := !mysg.success
297 gp.param = nil
298 if mysg.releasetime > 0 {
299 blockevent(mysg.releasetime-t0, 2)
300 }
301 mysg.c = nil
302 releaseSudog(mysg)
303 if closed {
304 if c.closed == 0 {
305 throw("chansend: spurious wakeup")
306 }
307 panic(plainError("send on closed channel"))
308 }
309 return true
310 }
311
312
313
314
315
316
317
318 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
319 if c.synctest && sg.g.syncGroup != getg().syncGroup {
320 unlockf()
321 panic(plainError("send on synctest channel from outside bubble"))
322 }
323 if raceenabled {
324 if c.dataqsiz == 0 {
325 racesync(c, sg)
326 } else {
327
328
329
330 racenotify(c, c.recvx, nil)
331 racenotify(c, c.recvx, sg)
332 c.recvx++
333 if c.recvx == c.dataqsiz {
334 c.recvx = 0
335 }
336 c.sendx = c.recvx
337 }
338 }
339 if sg.elem != nil {
340 sendDirect(c.elemtype, sg, ep)
341 sg.elem = nil
342 }
343 gp := sg.g
344 unlockf()
345 gp.param = unsafe.Pointer(sg)
346 sg.success = true
347 if sg.releasetime != 0 {
348 sg.releasetime = cputicks()
349 }
350 goready(gp, skip+1)
351 }
352
353
354
355
356
357
358 func timerchandrain(c *hchan) bool {
359
360
361
362
363
364 if atomic.Loaduint(&c.qcount) == 0 {
365 return false
366 }
367 lock(&c.lock)
368 any := false
369 for c.qcount > 0 {
370 any = true
371 typedmemclr(c.elemtype, chanbuf(c, c.recvx))
372 c.recvx++
373 if c.recvx == c.dataqsiz {
374 c.recvx = 0
375 }
376 c.qcount--
377 }
378 unlock(&c.lock)
379 return any
380 }
381
382
383
384
385
386
387
388
389
390
391
392 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
393
394
395
396
397
398 dst := sg.elem
399 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
400
401
402 memmove(dst, src, t.Size_)
403 }
404
405 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
406
407
408
409 src := sg.elem
410 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
411 memmove(dst, src, t.Size_)
412 }
413
414 func closechan(c *hchan) {
415 if c == nil {
416 panic(plainError("close of nil channel"))
417 }
418
419 lock(&c.lock)
420 if c.closed != 0 {
421 unlock(&c.lock)
422 panic(plainError("close of closed channel"))
423 }
424
425 if raceenabled {
426 callerpc := sys.GetCallerPC()
427 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
428 racerelease(c.raceaddr())
429 }
430
431 c.closed = 1
432
433 var glist gList
434
435
436 for {
437 sg := c.recvq.dequeue()
438 if sg == nil {
439 break
440 }
441 if sg.elem != nil {
442 typedmemclr(c.elemtype, sg.elem)
443 sg.elem = nil
444 }
445 if sg.releasetime != 0 {
446 sg.releasetime = cputicks()
447 }
448 gp := sg.g
449 gp.param = unsafe.Pointer(sg)
450 sg.success = false
451 if raceenabled {
452 raceacquireg(gp, c.raceaddr())
453 }
454 glist.push(gp)
455 }
456
457
458 for {
459 sg := c.sendq.dequeue()
460 if sg == nil {
461 break
462 }
463 sg.elem = nil
464 if sg.releasetime != 0 {
465 sg.releasetime = cputicks()
466 }
467 gp := sg.g
468 gp.param = unsafe.Pointer(sg)
469 sg.success = false
470 if raceenabled {
471 raceacquireg(gp, c.raceaddr())
472 }
473 glist.push(gp)
474 }
475 unlock(&c.lock)
476
477
478 for !glist.empty() {
479 gp := glist.pop()
480 gp.schedlink = 0
481 goready(gp, 3)
482 }
483 }
484
485
486
487
488
489 func empty(c *hchan) bool {
490
491 if c.dataqsiz == 0 {
492 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
493 }
494
495
496 if c.timer != nil {
497 c.timer.maybeRunChan()
498 }
499 return atomic.Loaduint(&c.qcount) == 0
500 }
501
502
503
504
505 func chanrecv1(c *hchan, elem unsafe.Pointer) {
506 chanrecv(c, elem, true)
507 }
508
509
510 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
511 _, received = chanrecv(c, elem, true)
512 return
513 }
514
515
516
517
518
519
520
521 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
522
523
524
525 if debugChan {
526 print("chanrecv: chan=", c, "\n")
527 }
528
529 if c == nil {
530 if !block {
531 return
532 }
533 gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
534 throw("unreachable")
535 }
536
537 if c.synctest && getg().syncGroup == nil {
538 panic(plainError("receive on synctest channel from outside bubble"))
539 }
540
541 if c.timer != nil {
542 c.timer.maybeRunChan()
543 }
544
545
546 if !block && empty(c) {
547
548
549
550
551
552
553
554
555
556 if atomic.Load(&c.closed) == 0 {
557
558
559
560
561 return
562 }
563
564
565
566 if empty(c) {
567
568 if raceenabled {
569 raceacquire(c.raceaddr())
570 }
571 if ep != nil {
572 typedmemclr(c.elemtype, ep)
573 }
574 return true, false
575 }
576 }
577
578 var t0 int64
579 if blockprofilerate > 0 {
580 t0 = cputicks()
581 }
582
583 lock(&c.lock)
584
585 if c.closed != 0 {
586 if c.qcount == 0 {
587 if raceenabled {
588 raceacquire(c.raceaddr())
589 }
590 unlock(&c.lock)
591 if ep != nil {
592 typedmemclr(c.elemtype, ep)
593 }
594 return true, false
595 }
596
597 } else {
598
599 if sg := c.sendq.dequeue(); sg != nil {
600
601
602
603
604 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
605 return true, true
606 }
607 }
608
609 if c.qcount > 0 {
610
611 qp := chanbuf(c, c.recvx)
612 if raceenabled {
613 racenotify(c, c.recvx, nil)
614 }
615 if ep != nil {
616 typedmemmove(c.elemtype, ep, qp)
617 }
618 typedmemclr(c.elemtype, qp)
619 c.recvx++
620 if c.recvx == c.dataqsiz {
621 c.recvx = 0
622 }
623 c.qcount--
624 unlock(&c.lock)
625 return true, true
626 }
627
628 if !block {
629 unlock(&c.lock)
630 return false, false
631 }
632
633
634 gp := getg()
635 mysg := acquireSudog()
636 mysg.releasetime = 0
637 if t0 != 0 {
638 mysg.releasetime = -1
639 }
640
641
642 mysg.elem = ep
643 mysg.waitlink = nil
644 gp.waiting = mysg
645
646 mysg.g = gp
647 mysg.isSelect = false
648 mysg.c = c
649 gp.param = nil
650 c.recvq.enqueue(mysg)
651 if c.timer != nil {
652 blockTimerChan(c)
653 }
654
655
656
657
658
659 gp.parkingOnChan.Store(true)
660 reason := waitReasonChanReceive
661 if c.synctest {
662 reason = waitReasonSynctestChanReceive
663 }
664 gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)
665
666
667 if mysg != gp.waiting {
668 throw("G waiting list is corrupted")
669 }
670 if c.timer != nil {
671 unblockTimerChan(c)
672 }
673 gp.waiting = nil
674 gp.activeStackChans = false
675 if mysg.releasetime > 0 {
676 blockevent(mysg.releasetime-t0, 2)
677 }
678 success := mysg.success
679 gp.param = nil
680 mysg.c = nil
681 releaseSudog(mysg)
682 return true, success
683 }
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
700 if c.synctest && sg.g.syncGroup != getg().syncGroup {
701 unlockf()
702 panic(plainError("receive on synctest channel from outside bubble"))
703 }
704 if c.dataqsiz == 0 {
705 if raceenabled {
706 racesync(c, sg)
707 }
708 if ep != nil {
709
710 recvDirect(c.elemtype, sg, ep)
711 }
712 } else {
713
714
715
716
717 qp := chanbuf(c, c.recvx)
718 if raceenabled {
719 racenotify(c, c.recvx, nil)
720 racenotify(c, c.recvx, sg)
721 }
722
723 if ep != nil {
724 typedmemmove(c.elemtype, ep, qp)
725 }
726
727 typedmemmove(c.elemtype, qp, sg.elem)
728 c.recvx++
729 if c.recvx == c.dataqsiz {
730 c.recvx = 0
731 }
732 c.sendx = c.recvx
733 }
734 sg.elem = nil
735 gp := sg.g
736 unlockf()
737 gp.param = unsafe.Pointer(sg)
738 sg.success = true
739 if sg.releasetime != 0 {
740 sg.releasetime = cputicks()
741 }
742 goready(gp, skip+1)
743 }
744
745 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
746
747
748
749
750
751 gp.activeStackChans = true
752
753
754
755 gp.parkingOnChan.Store(false)
756
757
758
759
760
761 unlock((*mutex)(chanLock))
762 return true
763 }
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
782 return chansend(c, elem, false, sys.GetCallerPC())
783 }
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
802 return chanrecv(c, elem, false)
803 }
804
805
806 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
807 return chansend(c, elem, !nb, sys.GetCallerPC())
808 }
809
810
811 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
812 return chanrecv(c, elem, !nb)
813 }
814
815 func chanlen(c *hchan) int {
816 if c == nil {
817 return 0
818 }
819 async := debug.asynctimerchan.Load() != 0
820 if c.timer != nil && async {
821 c.timer.maybeRunChan()
822 }
823 if c.timer != nil && !async {
824
825
826
827 return 0
828 }
829 return int(c.qcount)
830 }
831
832 func chancap(c *hchan) int {
833 if c == nil {
834 return 0
835 }
836 if c.timer != nil {
837 async := debug.asynctimerchan.Load() != 0
838 if async {
839 return int(c.dataqsiz)
840 }
841
842
843
844 return 0
845 }
846 return int(c.dataqsiz)
847 }
848
849
850 func reflect_chanlen(c *hchan) int {
851 return chanlen(c)
852 }
853
854
855 func reflectlite_chanlen(c *hchan) int {
856 return chanlen(c)
857 }
858
859
860 func reflect_chancap(c *hchan) int {
861 return chancap(c)
862 }
863
864
865 func reflect_chanclose(c *hchan) {
866 closechan(c)
867 }
868
869 func (q *waitq) enqueue(sgp *sudog) {
870 sgp.next = nil
871 x := q.last
872 if x == nil {
873 sgp.prev = nil
874 q.first = sgp
875 q.last = sgp
876 return
877 }
878 sgp.prev = x
879 x.next = sgp
880 q.last = sgp
881 }
882
883 func (q *waitq) dequeue() *sudog {
884 for {
885 sgp := q.first
886 if sgp == nil {
887 return nil
888 }
889 y := sgp.next
890 if y == nil {
891 q.first = nil
892 q.last = nil
893 } else {
894 y.prev = nil
895 q.first = y
896 sgp.next = nil
897 }
898
899
900
901
902
903
904
905
906
907 if sgp.isSelect {
908 if !sgp.g.selectDone.CompareAndSwap(0, 1) {
909
910 continue
911 }
912 }
913
914 return sgp
915 }
916 }
917
918 func (c *hchan) raceaddr() unsafe.Pointer {
919
920
921
922
923
924 return unsafe.Pointer(&c.buf)
925 }
926
927 func racesync(c *hchan, sg *sudog) {
928 racerelease(chanbuf(c, 0))
929 raceacquireg(sg.g, chanbuf(c, 0))
930 racereleaseg(sg.g, chanbuf(c, 0))
931 raceacquire(chanbuf(c, 0))
932 }
933
934
935
936
937 func racenotify(c *hchan, idx uint, sg *sudog) {
938
939
940
941
942
943
944
945 qp := chanbuf(c, idx)
946
947
948
949
950
951
952 if c.elemsize == 0 {
953 if sg == nil {
954 raceacquire(qp)
955 racerelease(qp)
956 } else {
957 raceacquireg(sg.g, qp)
958 racereleaseg(sg.g, qp)
959 }
960 } else {
961 if sg == nil {
962 racereleaseacquire(qp)
963 } else {
964 racereleaseacquireg(sg.g, qp)
965 }
966 }
967 }
968
View as plain text