Advanced Go Concurrency Patterns
Sameer Ajmani
Sameer Ajmani
This talk was presented at Google I/O in May 2013.
2In the language and runtime, not a library.
This changes how you structure your programs.
4Goroutines are independently executing functions in the same address space.
go f() go g(1, 2)
Channels are typed values that allow goroutines to synchronize and exchange information.
c := make(chan int) go func() { c <- 3 }() n := <-c
For more on the basics, watch Go Concurrency Patterns (Pike, 2012).
5// +build ignore,OMIT
package main
import (
"fmt"
"time"
)
type Ball struct{ hits int } func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table) table <- new(Ball) // game on; toss the ball time.Sleep(1 * time.Second) <-table // game over; grab the ball } func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
// +build ignore,OMIT
package main
import (
"fmt"
"time"
)
type Ball struct{ hits int } func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table) // table <- new(Ball) // game on; toss the ball time.Sleep(1 * time.Second) <-table // game over; grab the ball } func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
// +build ignore,OMIT
package main
import (
"fmt"
"time"
)
type Ball struct{ hits int } func main() { table := make(chan *Ball) go player("ping", table) go player("pong", table) table <- new(Ball) // game on; toss the ball time.Sleep(1 * time.Second) <-table // game over; grab the ball panic("show me the stacks") } func player(name string, table chan *Ball) { for { ball := <-table ball.hits++ fmt.Println(name, ball.hits) time.Sleep(100 * time.Millisecond) table <- ball } }
Long-lived programs need to clean up.
Let's look at how to write programs that handle communication, periodic events, and cancellation.
The core is Go's select
statement: like a switch
, but the decision is made based on the ability to communicate.
select { case xc <- x: // sent x on xc case y := <-yc: // received y from yc }
My favorite feed reader disappeared. I need a new one.
Why not write one?
Where do we start?
10Searching pkg.go.dev for "rss" turns up several hits, including one that provides:
// Fetch fetches Items for uri and returns the time when the next // fetch should be attempted. On failure, Fetch returns an error. func Fetch(uri string) (items []Item, next time.Time, err error) type Item struct{ Title, Channel, GUID string // a subset of RSS fields }
But I want a stream:
<-chan Item
And I want multiple subscriptions.
11type Fetcher interface { Fetch() (items []Item, next time.Time, err error) } func Fetch(domain string) Fetcher {...} // fetches Items from domain
type Subscription interface { Updates() <-chan Item // stream of Items Close() error // shuts down the stream } func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream func Merge(subs ...Subscription) Subscription {...} // merges several streams
//go:build ignore && OMIT
// +build ignore,OMIT
// fakemain runs the Subscribe example with a fake RSS fetcher.
package main
import (
"fmt"
"math/rand"
"time"
)
// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted. On failure, Fetch returns a non-nil error.
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel. Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
Updates() <-chan Item
Close() error
}
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
closing: make(chan chan error), // for Close
}
go s.loop()
return s
}
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // sends items to the user
closing chan chan error // for Close
}
// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
return s.updates
}
// STOPUPDATES OMIT
// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
// STOPCLOSESIG OMIT
errc := make(chan error)
s.closing <- errc // HLchan
return <-errc // HLchan
}
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
// STARTCLOSEONLY OMIT
var err error // set when Fetch fails
for {
select {
case errc := <-s.closing: // HLchan
errc <- err // HLchan
close(s.updates) // tells receiver we're done
return
}
}
// STOPCLOSEONLY OMIT
}
// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
// STARTFETCHONLY OMIT
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT
}
// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
// STARTSENDONLY OMIT
var pending []Item // appended by fetch; consumed by send
for {
var first Item
var updates chan Item // HLupdates
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case // HLupdates
}
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT
}
// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
// STARTFETCHVARS OMIT
var pending []Item
var next time.Time
var err error
// STOPFETCHVARS OMIT
for {
// STARTNOCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
// STOPNOCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}
}
// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
const maxPending = 10
// STARTSEEN OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
// STOPSEEN OMIT
for {
// STARTCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time // HLcap
if len(pending) < maxPending { // HLcap
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
} // HLcap
// STOPCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
// STARTDEDUPE OMIT
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] { // HLdupe
pending = append(pending, item) // HLdupe
seen[item.GUID] = true // HLdupe
} // HLdupe
}
// STOPDEDUPE OMIT
case updates <- first:
pending = pending[1:]
}
}
}
// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called. It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
const maxPending = 10
type fetchResult struct {
fetched []Item
next time.Time
err error
}
// STARTFETCHDONE OMIT
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
// STOPFETCHDONE OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool)
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
// STARTFETCHIF OMIT
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending { // HLfetch
startFetch = time.After(fetchDelay) // enable fetch case
}
// STOPFETCHIF OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTFETCHASYNC OMIT
select {
case <-startFetch: // HLfetch
fetchDone = make(chan fetchResult, 1) // HLfetch
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone: // HLfetch
fetchDone = nil // HLfetch
// Use result.fetched, result.next, result.err
// STOPFETCHASYNC OMIT
fetched := result.fetched
next, err = result.next, result.err
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if id := item.GUID; !seen[id] { // HLdupe
pending = append(pending, item)
seen[id] = true // HLdupe
}
}
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case updates <- first:
pending = pending[1:]
}
}
}
// naiveMerge is a version of Merge that doesn't quite work right. In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
subs []Subscription
updates chan Item
}
// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
m := &naiveMerge{
subs: subs,
updates: make(chan Item),
}
// STARTNAIVEMERGELOOP OMIT
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it // HL
}
}(sub)
}
// STOPNAIVEMERGELOOP OMIT
return m
}
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates) // HL
return
}
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item {
return m.updates
}
type merge struct {
subs []Subscription
updates chan Item
quit chan struct{}
errs chan error
}
// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
// STOPMERGESIG OMIT
m := &merge{
subs: subs,
updates: make(chan Item),
quit: make(chan struct{}),
errs: make(chan error),
}
// STARTMERGE OMIT
for _, sub := range subs {
go func(s Subscription) {
for {
var it Item
select {
case it = <-s.Updates():
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
select {
case m.updates <- it:
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
}
}(sub)
}
// STOPMERGE OMIT
return m
}
func (m *merge) Updates() <-chan Item {
return m.updates
}
// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
close(m.quit) // HL
for _ = range m.subs {
if e := <-m.errs; e != nil { // HL
err = e
}
}
close(m.updates) // HL
return
}
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
out := make(chan Item)
go func() {
seen := make(map[string]bool)
for it := range in {
if !seen[it.GUID] {
// BUG: this send blocks if the
// receiver closes the Subscription
// and stops receiving.
out <- it // HL
seen[it.GUID] = true
}
}
close(out)
}()
return out
}
type deduper struct {
s Subscription
updates chan Item
closing chan chan error
}
// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
d := &deduper{
s: s,
updates: make(chan Item),
closing: make(chan chan error),
}
go d.loop()
return d
}
func (d *deduper) loop() {
in := d.s.Updates() // enable receive
var pending Item
var out chan Item // disable send
seen := make(map[string]bool)
for {
select {
case it := <-in:
if !seen[it.GUID] {
pending = it
in = nil // disable receive
out = d.updates // enable send
seen[it.GUID] = true
}
case out <- pending:
in = d.s.Updates() // enable receive
out = nil // disable send
case errc := <-d.closing:
err := d.s.Close()
errc <- err
close(d.updates)
return
}
}
}
func (d *deduper) Close() error {
errc := make(chan error)
d.closing <- errc
return <-errc
}
func (d *deduper) Updates() <-chan Item {
return d.updates
}
// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
return fakeFetch(domain)
}
func fakeFetch(domain string) Fetcher {
return &fakeFetcher{channel: domain}
}
type fakeFetcher struct {
channel string
items []Item
}
// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
if FakeDuplicates {
items = f.items
} else {
items = []Item{item}
}
return
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// STARTMAIN OMIT
func main() { // Subscribe to some feeds, and create a merged update stream. merged := Merge( Subscribe(Fetch("blog.golang.org")), Subscribe(Fetch("googleblog.blogspot.com")), Subscribe(Fetch("googledevelopers.blogspot.com"))) // Close the subscriptions after some time. time.AfterFunc(3*time.Second, func() { fmt.Println("closed:", merged.Close()) }) // Print the stream. for it := range merged.Updates() { fmt.Println(it.Channel, it.Title) } panic("show me the stacks") }
// STOPMAIN OMIT
Subscribe
creates a new Subscription
that repeatedly fetches items until Close
is called.
func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, updates: make(chan Item), // for Updates } go s.loop() return s } // sub implements the Subscription interface. type sub struct { fetcher Fetcher // fetches items updates chan Item // delivers items to the user } // loop fetches items using s.fetcher and sends them // on s.updates. loop exits when s.Close is called. func (s *sub) loop() {...}
To implement the Subscription
interface, define Updates
and Close
.
func (s *sub) Updates() <-chan Item { return s.updates }
func (s *sub) Close() error { // TODO: make loop exit // TODO: find out about any error return err }
Fetch
Updates
channelClose
is called, reporting any error// naivemain runs the Subscribe example with the naive Subscribe
// implementation and a fake RSS fetcher.
//go:build ignore && OMIT
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted. On failure, Fetch returns a non-nil error.
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel. Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
Updates() <-chan Item
Close() error
}
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
closing: make(chan chan error), // for Close
}
go s.loop()
return s
}
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // sends items to the user
closing chan chan error // for Close
}
// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
return s.updates
}
// STOPUPDATES OMIT
// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
// STOPCLOSESIG OMIT
errc := make(chan error)
s.closing <- errc // HLchan
return <-errc // HLchan
}
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
// STARTCLOSEONLY OMIT
var err error // set when Fetch fails
for {
select {
case errc := <-s.closing: // HLchan
errc <- err // HLchan
close(s.updates) // tells receiver we're done
return
}
}
// STOPCLOSEONLY OMIT
}
// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
// STARTFETCHONLY OMIT
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT
}
// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
// STARTSENDONLY OMIT
var pending []Item // appended by fetch; consumed by send
for {
var first Item
var updates chan Item // HLupdates
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case // HLupdates
}
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT
}
// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
// STARTFETCHVARS OMIT
var pending []Item
var next time.Time
var err error
// STOPFETCHVARS OMIT
for {
// STARTNOCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
// STOPNOCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}
}
// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
const maxPending = 10
// STARTSEEN OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
// STOPSEEN OMIT
for {
// STARTCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time // HLcap
if len(pending) < maxPending { // HLcap
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
} // HLcap
// STOPCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
// STARTDEDUPE OMIT
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] { // HLdupe
pending = append(pending, item) // HLdupe
seen[item.GUID] = true // HLdupe
} // HLdupe
}
// STOPDEDUPE OMIT
case updates <- first:
pending = pending[1:]
}
}
}
// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called. It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
const maxPending = 10
type fetchResult struct {
fetched []Item
next time.Time
err error
}
// STARTFETCHDONE OMIT
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
// STOPFETCHDONE OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool)
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
// STARTFETCHIF OMIT
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending { // HLfetch
startFetch = time.After(fetchDelay) // enable fetch case
}
// STOPFETCHIF OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTFETCHASYNC OMIT
select {
case <-startFetch: // HLfetch
fetchDone = make(chan fetchResult, 1) // HLfetch
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone: // HLfetch
fetchDone = nil // HLfetch
// Use result.fetched, result.next, result.err
// STOPFETCHASYNC OMIT
fetched := result.fetched
next, err = result.next, result.err
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if id := item.GUID; !seen[id] { // HLdupe
pending = append(pending, item)
seen[id] = true // HLdupe
}
}
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case updates <- first:
pending = pending[1:]
}
}
}
// naiveMerge is a version of Merge that doesn't quite work right. In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
subs []Subscription
updates chan Item
}
// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
m := &naiveMerge{
subs: subs,
updates: make(chan Item),
}
// STARTNAIVEMERGELOOP OMIT
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it // HL
}
}(sub)
}
// STOPNAIVEMERGELOOP OMIT
return m
}
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates) // HL
return
}
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item {
return m.updates
}
type merge struct {
subs []Subscription
updates chan Item
quit chan struct{}
errs chan error
}
// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
// STOPMERGESIG OMIT
m := &merge{
subs: subs,
updates: make(chan Item),
quit: make(chan struct{}),
errs: make(chan error),
}
// STARTMERGE OMIT
for _, sub := range subs {
go func(s Subscription) {
for {
var it Item
select {
case it = <-s.Updates():
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
select {
case m.updates <- it:
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
}
}(sub)
}
// STOPMERGE OMIT
return m
}
func (m *merge) Updates() <-chan Item {
return m.updates
}
// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
close(m.quit) // HL
for _ = range m.subs {
if e := <-m.errs; e != nil { // HL
err = e
}
}
close(m.updates) // HL
return
}
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
out := make(chan Item)
go func() {
seen := make(map[string]bool)
for it := range in {
if !seen[it.GUID] {
// BUG: this send blocks if the
// receiver closes the Subscription
// and stops receiving.
out <- it // HL
seen[it.GUID] = true
}
}
close(out)
}()
return out
}
type deduper struct {
s Subscription
updates chan Item
closing chan chan error
}
// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
d := &deduper{
s: s,
updates: make(chan Item),
closing: make(chan chan error),
}
go d.loop()
return d
}
func (d *deduper) loop() {
in := d.s.Updates() // enable receive
var pending Item
var out chan Item // disable send
seen := make(map[string]bool)
for {
select {
case it := <-in:
if !seen[it.GUID] {
pending = it
in = nil // disable receive
out = d.updates // enable send
seen[it.GUID] = true
}
case out <- pending:
in = d.s.Updates() // enable receive
out = nil // disable send
case errc := <-d.closing:
err := d.s.Close()
errc <- err
close(d.updates)
return
}
}
}
func (d *deduper) Close() error {
errc := make(chan error)
d.closing <- errc
return <-errc
}
func (d *deduper) Updates() <-chan Item {
return d.updates
}
// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
return fakeFetch(domain)
}
func fakeFetch(domain string) Fetcher {
return &fakeFetcher{channel: domain}
}
type fakeFetcher struct {
channel string
items []Item
}
// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
if FakeDuplicates {
items = f.items
} else {
items = []Item{item}
}
return
}
func NaiveSubscribe(fetcher Fetcher) Subscription {
s := &naiveSub{
fetcher: fetcher,
updates: make(chan Item),
}
go s.loop()
return s
}
type naiveSub struct {
fetcher Fetcher
updates chan Item
closed bool
err error
}
func (s *naiveSub) Updates() <-chan Item {
return s.updates
}
func (s *naiveSub) loop() {
for { if s.closed { close(s.updates) return } items, next, err := s.fetcher.Fetch() if err != nil { s.err = err time.Sleep(10 * time.Second) continue } for _, item := range items { s.updates <- item } if now := time.Now(); next.After(now) { time.Sleep(next.Sub(now)) } }
}
func (s *naiveSub) Close() error {
s.closed = true // HLsync
return s.err // HLsync
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Subscribe to some feeds, and create a merged update stream.
merged := Merge(
NaiveSubscribe(Fetch("blog.golang.org")),
NaiveSubscribe(Fetch("googleblog.blogspot.com")),
NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
// The loops are still running. Let the race detector notice.
time.Sleep(1 * time.Second)
panic("show me the stacks")
}
func (s *naiveSub) Close() error { s.closed = true return s.err }
for { if s.closed { close(s.updates) return } items, next, err := s.fetcher.Fetch() if err != nil { s.err = err time.Sleep(10 * time.Second) continue } for _, item := range items { s.updates <- item } if now := time.Now(); next.After(now) { time.Sleep(next.Sub(now)) } }
func (s *naiveSub) Close() error { s.closed = true return s.err }
go run -race naivemain.go
// naivemain runs the Subscribe example with the naive Subscribe
// implementation and a fake RSS fetcher.
//go:build ignore && OMIT
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted. On failure, Fetch returns a non-nil error.
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel. Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
Updates() <-chan Item
Close() error
}
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
closing: make(chan chan error), // for Close
}
go s.loop()
return s
}
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // sends items to the user
closing chan chan error // for Close
}
// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
return s.updates
}
// STOPUPDATES OMIT
// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
// STOPCLOSESIG OMIT
errc := make(chan error)
s.closing <- errc // HLchan
return <-errc // HLchan
}
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
// STARTCLOSEONLY OMIT
var err error // set when Fetch fails
for {
select {
case errc := <-s.closing: // HLchan
errc <- err // HLchan
close(s.updates) // tells receiver we're done
return
}
}
// STOPCLOSEONLY OMIT
}
// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
// STARTFETCHONLY OMIT
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT
}
// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
// STARTSENDONLY OMIT
var pending []Item // appended by fetch; consumed by send
for {
var first Item
var updates chan Item // HLupdates
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case // HLupdates
}
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT
}
// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
// STARTFETCHVARS OMIT
var pending []Item
var next time.Time
var err error
// STOPFETCHVARS OMIT
for {
// STARTNOCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
// STOPNOCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}
}
// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
const maxPending = 10
// STARTSEEN OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
// STOPSEEN OMIT
for {
// STARTCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time // HLcap
if len(pending) < maxPending { // HLcap
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
} // HLcap
// STOPCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
// STARTDEDUPE OMIT
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] { // HLdupe
pending = append(pending, item) // HLdupe
seen[item.GUID] = true // HLdupe
} // HLdupe
}
// STOPDEDUPE OMIT
case updates <- first:
pending = pending[1:]
}
}
}
// loop periodically fetches Items, sends them on s.updates, and exits
// when Close is called. It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
const maxPending = 10
type fetchResult struct {
fetched []Item
next time.Time
err error
}
// STARTFETCHDONE OMIT
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
// STOPFETCHDONE OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool)
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
// STARTFETCHIF OMIT
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending { // HLfetch
startFetch = time.After(fetchDelay) // enable fetch case
}
// STOPFETCHIF OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTFETCHASYNC OMIT
select {
case <-startFetch: // HLfetch
fetchDone = make(chan fetchResult, 1) // HLfetch
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone: // HLfetch
fetchDone = nil // HLfetch
// Use result.fetched, result.next, result.err
// STOPFETCHASYNC OMIT
fetched := result.fetched
next, err = result.next, result.err
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if id := item.GUID; !seen[id] { // HLdupe
pending = append(pending, item)
seen[id] = true // HLdupe
}
}
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case updates <- first:
pending = pending[1:]
}
}
}
// naiveMerge is a version of Merge that doesn't quite work right. In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
subs []Subscription
updates chan Item
}
// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
m := &naiveMerge{
subs: subs,
updates: make(chan Item),
}
// STARTNAIVEMERGELOOP OMIT
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it // HL
}
}(sub)
}
// STOPNAIVEMERGELOOP OMIT
return m
}
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates) // HL
return
}
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item {
return m.updates
}
type merge struct {
subs []Subscription
updates chan Item
quit chan struct{}
errs chan error
}
// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
// STOPMERGESIG OMIT
m := &merge{
subs: subs,
updates: make(chan Item),
quit: make(chan struct{}),
errs: make(chan error),
}
// STARTMERGE OMIT
for _, sub := range subs {
go func(s Subscription) {
for {
var it Item
select {
case it = <-s.Updates():
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
select {
case m.updates <- it:
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
}
}(sub)
}
// STOPMERGE OMIT
return m
}
func (m *merge) Updates() <-chan Item {
return m.updates
}
// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
close(m.quit) // HL
for _ = range m.subs {
if e := <-m.errs; e != nil { // HL
err = e
}
}
close(m.updates) // HL
return
}
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
out := make(chan Item)
go func() {
seen := make(map[string]bool)
for it := range in {
if !seen[it.GUID] {
// BUG: this send blocks if the
// receiver closes the Subscription
// and stops receiving.
out <- it // HL
seen[it.GUID] = true
}
}
close(out)
}()
return out
}
type deduper struct {
s Subscription
updates chan Item
closing chan chan error
}
// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
d := &deduper{
s: s,
updates: make(chan Item),
closing: make(chan chan error),
}
go d.loop()
return d
}
func (d *deduper) loop() {
in := d.s.Updates() // enable receive
var pending Item
var out chan Item // disable send
seen := make(map[string]bool)
for {
select {
case it := <-in:
if !seen[it.GUID] {
pending = it
in = nil // disable receive
out = d.updates // enable send
seen[it.GUID] = true
}
case out <- pending:
in = d.s.Updates() // enable receive
out = nil // disable send
case errc := <-d.closing:
err := d.s.Close()
errc <- err
close(d.updates)
return
}
}
}
func (d *deduper) Close() error {
errc := make(chan error)
d.closing <- errc
return <-errc
}
func (d *deduper) Updates() <-chan Item {
return d.updates
}
// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
return fakeFetch(domain)
}
func fakeFetch(domain string) Fetcher {
return &fakeFetcher{channel: domain}
}
type fakeFetcher struct {
channel string
items []Item
}
// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
if FakeDuplicates {
items = f.items
} else {
items = []Item{item}
}
return
}
func NaiveSubscribe(fetcher Fetcher) Subscription {
s := &naiveSub{
fetcher: fetcher,
updates: make(chan Item),
}
go s.loop()
return s
}
type naiveSub struct {
fetcher Fetcher
updates chan Item
closed bool
err error
}
func (s *naiveSub) Updates() <-chan Item {
return s.updates
}
func (s *naiveSub) loop() {
for { if s.closed { close(s.updates) return } items, next, err := s.fetcher.Fetch() if err != nil { s.err = err
time.Sleep(10 * time.Second) // HLsleep
continue
}
for _, item := range items {
s.updates <- item // HLsend
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now)) // HLsleep
}
}
// STOPNAIVE OMIT
}
func (s *naiveSub) Close() error {
s.closed = true // HLsync
return s.err // HLsync
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Subscribe to some feeds, and create a merged update stream.
merged := Merge(
NaiveSubscribe(Fetch("blog.golang.org")),
NaiveSubscribe(Fetch("googleblog.blogspot.com")),
NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
// The loops are still running. Let the race detector notice.
time.Sleep(1 * time.Second)
panic("show me the stacks")
}
func (s *naiveSub) Close() error { s.closed = true return s.err }
for { if s.closed { close(s.updates) return } items, next, err := s.fetcher.Fetch() if err != nil { s.err = err time.Sleep(10 * time.Second) continue } for _, item := range items { s.updates <- item } if now := time.Now(); next.After(now) { time.Sleep(next.Sub(now)) } }
for { if s.closed { close(s.updates) return } items, next, err := s.fetcher.Fetch() if err != nil { s.err = err time.Sleep(10 * time.Second) continue } for _, item := range items { s.updates <- item } if now := time.Now(); next.After(now) { time.Sleep(next.Sub(now)) } }
Change the body of loop
to a select
with three cases:
Close
was calledFetch
s.updates
loop
runs in its own goroutine.
select
lets loop
avoid blocking indefinitely in any one state.
func (s *sub) loop() { ... declare mutable state ... for { ... set up channels for cases ... select { case <-c1: ... read/write state ... case c2 <- x: ... read/write state ... case y := <-c3: ... read/write state ... } } }
The cases interact via local state in loop
.
Close
communicates with loop
via s.closing
.
type sub struct { closing chan chan error }
The service (loop
) listens for requests on its channel (s.closing
).
The client (Close
) sends a request on s.closing
: exit and reply with the error
In this case, the only thing in the request is the reply channel.
25
Close
asks loop to exit and waits for a response.
func (s *sub) Close() error { errc := make(chan error) s.closing <- errc return <-errc }
loop
handles Close
by replying with the Fetch
error and exiting.
var err error // set when Fetch fails for { select { case errc := <-s.closing: errc <- err close(s.updates) // tells receiver we're done return } }
Schedule the next Fetch
after some delay.
var pending []Item // appended by fetch; consumed by send var next time.Time // initially January 1, year 0 var err error for { var fetchDelay time.Duration // initially 0 (no delay) if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } startFetch := time.After(fetchDelay) select { case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) } }
Send the fetched items, one at a time.
var pending []Item // appended by fetch; consumed by send for { select { case s.updates <- pending[0]: pending = pending[1:] } }
Whoops. This crashes.
Sends and receives on nil channels block.
Select never selects a blocking case.
// +build ignore,OMIT
package main
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() { a, b := make(chan string), make(chan string) go func() { a <- "a" }() go func() { b <- "b" }() if rand.Intn(2) == 0 { a = nil fmt.Println("nil a") } else { b = nil fmt.Println("nil b") } select { case s := <-a: fmt.Println("got", s) case s := <-b: fmt.Println("got", s) } }
Enable send only when pending is non-empty.
var pending []Item // appended by fetch; consumed by send for { var first Item var updates chan Item if len(pending) > 0 { first = pending[0] updates = s.updates // enable send case } select { case updates <- first: pending = pending[1:] } }
Put the three cases together:
select { case errc := <-s.closing: errc <- err close(s.updates) return case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) case updates <- first: pending = pending[1:] }
The cases interact via err
, next
, and pending
.
No locks, no condition variables, no callbacks.
31s.closed
and s.err
time.Sleep
may keep loop runningloop
may block forever sending on s.updates
select { case errc := <-s.closing: errc <- err close(s.updates) return case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...) case updates <- first: pending = pending[1:] }
var pending []Item var next time.Time var err error
case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } pending = append(pending, fetched...)
var pending []Item var next time.Time var err error var seen = make(map[string]bool) // set of item.GUIDs
case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { pending = append(pending, item) seen[item.GUID] = true } }
case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { pending = append(pending, item) seen[item.GUID] = true } }
const maxPending = 10
var fetchDelay time.Duration if now := time.Now(); next.After(now) { fetchDelay = next.Sub(now) } var startFetch <-chan time.Time if len(pending) < maxPending { startFetch = time.After(fetchDelay) // enable fetch case }
Could instead drop older items from the head of pending
.
case <-startFetch: var fetched []Item fetched, next, err = s.fetcher.Fetch() if err != nil { next = time.Now().Add(10 * time.Second) break } for _, item := range fetched { if !seen[item.GUID] { pending = append(pending, item) seen[item.GUID] = true } }
Add a new select
case for fetchDone
.
type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // if non-nil, Fetch is running
var startFetch <-chan time.Time if fetchDone == nil && len(pending) < maxPending { startFetch = time.After(fetchDelay) // enable fetch case }
select { case <-startFetch: fetchDone = make(chan fetchResult, 1) go func() { fetched, next, err := s.fetcher.Fetch() fetchDone <- fetchResult{fetched, next, err} }() case result := <-fetchDone: fetchDone = nil // Use result.fetched, result.next, result.err
Responsive. Cleans up. Easy to read and change.
Three techniques:
for-select
loopchan chan error
)nil
channels in select
cases
More details online, including Merge
.
Concurrent programming can be tricky.
Go makes it easier:
Go Concurrency Patterns (2012)
go.dev/talks/2012/concurrency.slide
Concurrency is not parallelism
go.dev/s/concurrency-is-not-parallelism
Share memory by communicating
Go Tour (learn Go in your browser)
42