Source file src/net/textproto/pipeline.go
1 // Copyright 2010 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package textproto 6 7 import ( 8 "sync" 9 ) 10 11 // A Pipeline manages a pipelined in-order request/response sequence. 12 // 13 // To use a Pipeline p to manage multiple clients on a connection, 14 // each client should run: 15 // 16 // id := p.Next() // take a number 17 // 18 // p.StartRequest(id) // wait for turn to send request 19 // «send request» 20 // p.EndRequest(id) // notify Pipeline that request is sent 21 // 22 // p.StartResponse(id) // wait for turn to read response 23 // «read response» 24 // p.EndResponse(id) // notify Pipeline that response is read 25 // 26 // A pipelined server can use the same calls to ensure that 27 // responses computed in parallel are written in the correct order. 28 type Pipeline struct { 29 mu sync.Mutex 30 id uint 31 request sequencer 32 response sequencer 33 } 34 35 // Next returns the next id for a request/response pair. 36 func (p *Pipeline) Next() uint { 37 p.mu.Lock() 38 id := p.id 39 p.id++ 40 p.mu.Unlock() 41 return id 42 } 43 44 // StartRequest blocks until it is time to send (or, if this is a server, receive) 45 // the request with the given id. 46 func (p *Pipeline) StartRequest(id uint) { 47 p.request.Start(id) 48 } 49 50 // EndRequest notifies p that the request with the given id has been sent 51 // (or, if this is a server, received). 52 func (p *Pipeline) EndRequest(id uint) { 53 p.request.End(id) 54 } 55 56 // StartResponse blocks until it is time to receive (or, if this is a server, send) 57 // the request with the given id. 58 func (p *Pipeline) StartResponse(id uint) { 59 p.response.Start(id) 60 } 61 62 // EndResponse notifies p that the response with the given id has been received 63 // (or, if this is a server, sent). 64 func (p *Pipeline) EndResponse(id uint) { 65 p.response.End(id) 66 } 67 68 // A sequencer schedules a sequence of numbered events that must 69 // happen in order, one after the other. The event numbering must start 70 // at 0 and increment without skipping. The event number wraps around 71 // safely as long as there are not 2^32 simultaneous events pending. 72 type sequencer struct { 73 mu sync.Mutex 74 id uint 75 wait map[uint]chan struct{} 76 } 77 78 // Start waits until it is time for the event numbered id to begin. 79 // That is, except for the first event, it waits until End(id-1) has 80 // been called. 81 func (s *sequencer) Start(id uint) { 82 s.mu.Lock() 83 if s.id == id { 84 s.mu.Unlock() 85 return 86 } 87 c := make(chan struct{}) 88 if s.wait == nil { 89 s.wait = make(map[uint]chan struct{}) 90 } 91 s.wait[id] = c 92 s.mu.Unlock() 93 <-c 94 } 95 96 // End notifies the sequencer that the event numbered id has completed, 97 // allowing it to schedule the event numbered id+1. It is a run-time error 98 // to call End with an id that is not the number of the active event. 99 func (s *sequencer) End(id uint) { 100 s.mu.Lock() 101 if s.id != id { 102 s.mu.Unlock() 103 panic("out of sync") 104 } 105 id++ 106 s.id = id 107 if s.wait == nil { 108 s.wait = make(map[uint]chan struct{}) 109 } 110 c, ok := s.wait[id] 111 if ok { 112 delete(s.wait, id) 113 } 114 s.mu.Unlock() 115 if ok { 116 close(c) 117 } 118 } 119