Source file src/runtime/coro.go
1 // Copyright 2023 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 import ( 8 "internal/runtime/sys" 9 "unsafe" 10 ) 11 12 // A coro represents extra concurrency without extra parallelism, 13 // as would be needed for a coroutine implementation. 14 // The coro does not represent a specific coroutine, only the ability 15 // to do coroutine-style control transfers. 16 // It can be thought of as like a special channel that always has 17 // a goroutine blocked on it. If another goroutine calls coroswitch(c), 18 // the caller becomes the goroutine blocked in c, and the goroutine 19 // formerly blocked in c starts running. 20 // These switches continue until a call to coroexit(c), 21 // which ends the use of the coro by releasing the blocked 22 // goroutine in c and exiting the current goroutine. 23 // 24 // Coros are heap allocated and garbage collected, so that user code 25 // can hold a pointer to a coro without causing potential dangling 26 // pointer errors. 27 type coro struct { 28 gp guintptr 29 f func(*coro) 30 31 // State for validating thread-lock interactions. 32 mp *m 33 lockedExt uint32 // mp's external LockOSThread counter at coro creation time. 34 lockedInt uint32 // mp's internal lockOSThread counter at coro creation time. 35 } 36 37 //go:linkname newcoro 38 39 // newcoro creates a new coro containing a 40 // goroutine blocked waiting to run f 41 // and returns that coro. 42 func newcoro(f func(*coro)) *coro { 43 c := new(coro) 44 c.f = f 45 pc := sys.GetCallerPC() 46 gp := getg() 47 systemstack(func() { 48 mp := gp.m 49 start := corostart 50 startfv := *(**funcval)(unsafe.Pointer(&start)) 51 gp = newproc1(startfv, gp, pc, true, waitReasonCoroutine) 52 53 // Scribble down locked thread state if needed and/or donate 54 // thread-lock state to the new goroutine. 55 if mp.lockedExt+mp.lockedInt != 0 { 56 c.mp = mp 57 c.lockedExt = mp.lockedExt 58 c.lockedInt = mp.lockedInt 59 } 60 }) 61 gp.coroarg = c 62 c.gp.set(gp) 63 return c 64 } 65 66 // corostart is the entry func for a new coroutine. 67 // It runs the coroutine user function f passed to corostart 68 // and then calls coroexit to remove the extra concurrency. 69 func corostart() { 70 gp := getg() 71 c := gp.coroarg 72 gp.coroarg = nil 73 74 defer coroexit(c) 75 c.f(c) 76 } 77 78 // coroexit is like coroswitch but closes the coro 79 // and exits the current goroutine 80 func coroexit(c *coro) { 81 gp := getg() 82 gp.coroarg = c 83 gp.coroexit = true 84 mcall(coroswitch_m) 85 } 86 87 //go:linkname coroswitch 88 89 // coroswitch switches to the goroutine blocked on c 90 // and then blocks the current goroutine on c. 91 func coroswitch(c *coro) { 92 gp := getg() 93 gp.coroarg = c 94 mcall(coroswitch_m) 95 } 96 97 // coroswitch_m is the implementation of coroswitch 98 // that runs on the m stack. 99 // 100 // Note: Coroutine switches are expected to happen at 101 // an order of magnitude (or more) higher frequency 102 // than regular goroutine switches, so this path is heavily 103 // optimized to remove unnecessary work. 104 // The fast path here is three CAS: the one at the top on gp.atomicstatus, 105 // the one in the middle to choose the next g, 106 // and the one at the bottom on gnext.atomicstatus. 107 // It is important not to add more atomic operations or other 108 // expensive operations to the fast path. 109 func coroswitch_m(gp *g) { 110 c := gp.coroarg 111 gp.coroarg = nil 112 exit := gp.coroexit 113 gp.coroexit = false 114 mp := gp.m 115 116 // Track and validate thread-lock interactions. 117 // 118 // The rules with thread-lock interactions are simple. When a coro goroutine is switched to, 119 // the same thread must be used, and the locked state must match with the thread-lock state of 120 // the goroutine which called newcoro. Thread-lock state consists of the thread and the number 121 // of internal (cgo callback, etc.) and external (LockOSThread) thread locks. 122 locked := gp.lockedm != 0 123 if c.mp != nil || locked { 124 if mp != c.mp || mp.lockedInt != c.lockedInt || mp.lockedExt != c.lockedExt { 125 print("coro: got thread ", unsafe.Pointer(mp), ", want ", unsafe.Pointer(c.mp), "\n") 126 print("coro: got lock internal ", mp.lockedInt, ", want ", c.lockedInt, "\n") 127 print("coro: got lock external ", mp.lockedExt, ", want ", c.lockedExt, "\n") 128 throw("coro: OS thread locking must match locking at coroutine creation") 129 } 130 } 131 132 // Acquire tracer for writing for the duration of this call. 133 // 134 // There's a lot of state manipulation performed with shortcuts 135 // but we need to make sure the tracer can only observe the 136 // start and end states to maintain a coherent model and avoid 137 // emitting an event for every single transition. 138 trace := traceAcquire() 139 140 canCAS := true 141 sg := gp.syncGroup 142 if sg != nil { 143 // If we're in a synctest group, always use casgstatus (which tracks 144 // group idleness) rather than directly CASing. Mark the group as active 145 // while we're in the process of transferring control. 146 canCAS = false 147 sg.incActive() 148 } 149 150 if locked { 151 // Detach the goroutine from the thread; we'll attach to the goroutine we're 152 // switching to before returning. 153 gp.lockedm.set(nil) 154 } 155 156 if exit { 157 // The M might have a non-zero OS thread lock count when we get here, gdestroy 158 // will avoid destroying the M if the G isn't explicitly locked to it via lockedm, 159 // which we cleared above. It's fine to gdestroy here also, even when locked to 160 // the thread, because we'll be switching back to another goroutine anyway, which 161 // will take back its thread-lock state before returning. 162 gdestroy(gp) 163 gp = nil 164 } else { 165 // If we can CAS ourselves directly from running to waiting, so do, 166 // keeping the control transfer as lightweight as possible. 167 gp.waitreason = waitReasonCoroutine 168 if !canCAS || !gp.atomicstatus.CompareAndSwap(_Grunning, _Gwaiting) { 169 // The CAS failed: use casgstatus, which will take care of 170 // coordinating with the garbage collector about the state change. 171 casgstatus(gp, _Grunning, _Gwaiting) 172 } 173 174 // Clear gp.m. 175 setMNoWB(&gp.m, nil) 176 } 177 178 // The goroutine stored in c is the one to run next. 179 // Swap it with ourselves. 180 var gnext *g 181 for { 182 // Note: this is a racy load, but it will eventually 183 // get the right value, and if it gets the wrong value, 184 // the c.gp.cas will fail, so no harm done other than 185 // a wasted loop iteration. 186 // The cas will also sync c.gp's 187 // memory enough that the next iteration of the racy load 188 // should see the correct value. 189 // We are avoiding the atomic load to keep this path 190 // as lightweight as absolutely possible. 191 // (The atomic load is free on x86 but not free elsewhere.) 192 next := c.gp 193 if next.ptr() == nil { 194 throw("coroswitch on exited coro") 195 } 196 var self guintptr 197 self.set(gp) 198 if c.gp.cas(next, self) { 199 gnext = next.ptr() 200 break 201 } 202 } 203 204 // Check if we're switching to ourselves. This case is able to break our 205 // thread-lock invariants and an unbuffered channel implementation of 206 // coroswitch would deadlock. It's clear that this case should just not 207 // work. 208 if gnext == gp { 209 throw("coroswitch of a goroutine to itself") 210 } 211 212 // Emit the trace event after getting gnext but before changing curg. 213 // GoSwitch expects that the current G is running and that we haven't 214 // switched yet for correct status emission. 215 if trace.ok() { 216 trace.GoSwitch(gnext, exit) 217 } 218 219 // Start running next, without heavy scheduling machinery. 220 // Set mp.curg and gnext.m and then update scheduling state 221 // directly if possible. 222 setGNoWB(&mp.curg, gnext) 223 setMNoWB(&gnext.m, mp) 224 225 // Synchronize with any out-standing goroutine profile. We're about to start 226 // executing, and an invariant of the profiler is that we tryRecordGoroutineProfile 227 // whenever a goroutine is about to start running. 228 // 229 // N.B. We must do this before transitioning to _Grunning but after installing gnext 230 // in curg, so that we have a valid curg for allocation (tryRecordGoroutineProfile 231 // may allocate). 232 if goroutineProfile.active { 233 tryRecordGoroutineProfile(gnext, nil, osyield) 234 } 235 236 if !canCAS || !gnext.atomicstatus.CompareAndSwap(_Gwaiting, _Grunning) { 237 // The CAS failed: use casgstatus, which will take care of 238 // coordinating with the garbage collector about the state change. 239 casgstatus(gnext, _Gwaiting, _Grunnable) 240 casgstatus(gnext, _Grunnable, _Grunning) 241 } 242 243 // Donate locked state. 244 if locked { 245 mp.lockedg.set(gnext) 246 gnext.lockedm.set(mp) 247 } 248 249 // Release the trace locker. We've completed all the necessary transitions.. 250 if trace.ok() { 251 traceRelease(trace) 252 } 253 254 if sg != nil { 255 sg.decActive() 256 } 257 258 // Switch to gnext. Does not return. 259 gogo(&gnext.sched) 260 } 261