Source file
src/net/pipe.go
1
2
3
4
5 package net
6
7 import (
8 "io"
9 "os"
10 "sync"
11 "time"
12 )
13
14
15 type pipeDeadline struct {
16 mu sync.Mutex
17 timer *time.Timer
18 cancel chan struct{}
19 }
20
21 func makePipeDeadline() pipeDeadline {
22 return pipeDeadline{cancel: make(chan struct{})}
23 }
24
25
26
27
28
29
30
31 func (d *pipeDeadline) set(t time.Time) {
32 d.mu.Lock()
33 defer d.mu.Unlock()
34
35 if d.timer != nil && !d.timer.Stop() {
36 <-d.cancel
37 }
38 d.timer = nil
39
40
41 closed := isClosedChan(d.cancel)
42 if t.IsZero() {
43 if closed {
44 d.cancel = make(chan struct{})
45 }
46 return
47 }
48
49
50 if dur := time.Until(t); dur > 0 {
51 if closed {
52 d.cancel = make(chan struct{})
53 }
54 d.timer = time.AfterFunc(dur, func() {
55 close(d.cancel)
56 })
57 return
58 }
59
60
61 if !closed {
62 close(d.cancel)
63 }
64 }
65
66
67 func (d *pipeDeadline) wait() chan struct{} {
68 d.mu.Lock()
69 defer d.mu.Unlock()
70 return d.cancel
71 }
72
73 func isClosedChan(c <-chan struct{}) bool {
74 select {
75 case <-c:
76 return true
77 default:
78 return false
79 }
80 }
81
82 type pipeAddr struct{}
83
84 func (pipeAddr) Network() string { return "pipe" }
85 func (pipeAddr) String() string { return "pipe" }
86
87 type pipe struct {
88 wrMu sync.Mutex
89
90
91
92 rdRx <-chan []byte
93 rdTx chan<- int
94
95
96
97 wrTx chan<- []byte
98 wrRx <-chan int
99
100 once sync.Once
101 localDone chan struct{}
102 remoteDone <-chan struct{}
103
104 readDeadline pipeDeadline
105 writeDeadline pipeDeadline
106 }
107
108
109
110
111
112
113 func Pipe() (Conn, Conn) {
114 cb1 := make(chan []byte)
115 cb2 := make(chan []byte)
116 cn1 := make(chan int)
117 cn2 := make(chan int)
118 done1 := make(chan struct{})
119 done2 := make(chan struct{})
120
121 p1 := &pipe{
122 rdRx: cb1, rdTx: cn1,
123 wrTx: cb2, wrRx: cn2,
124 localDone: done1, remoteDone: done2,
125 readDeadline: makePipeDeadline(),
126 writeDeadline: makePipeDeadline(),
127 }
128 p2 := &pipe{
129 rdRx: cb2, rdTx: cn2,
130 wrTx: cb1, wrRx: cn1,
131 localDone: done2, remoteDone: done1,
132 readDeadline: makePipeDeadline(),
133 writeDeadline: makePipeDeadline(),
134 }
135 return p1, p2
136 }
137
138 func (*pipe) LocalAddr() Addr { return pipeAddr{} }
139 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
140
141 func (p *pipe) Read(b []byte) (int, error) {
142 n, err := p.read(b)
143 if err != nil && err != io.EOF && err != io.ErrClosedPipe {
144 err = &OpError{Op: "read", Net: "pipe", Err: err}
145 }
146 return n, err
147 }
148
149 func (p *pipe) read(b []byte) (n int, err error) {
150 switch {
151 case isClosedChan(p.localDone):
152 return 0, io.ErrClosedPipe
153 case isClosedChan(p.remoteDone):
154 return 0, io.EOF
155 case isClosedChan(p.readDeadline.wait()):
156 return 0, os.ErrDeadlineExceeded
157 }
158
159 select {
160 case bw := <-p.rdRx:
161 nr := copy(b, bw)
162 p.rdTx <- nr
163 return nr, nil
164 case <-p.localDone:
165 return 0, io.ErrClosedPipe
166 case <-p.remoteDone:
167 return 0, io.EOF
168 case <-p.readDeadline.wait():
169 return 0, os.ErrDeadlineExceeded
170 }
171 }
172
173 func (p *pipe) Write(b []byte) (int, error) {
174 n, err := p.write(b)
175 if err != nil && err != io.ErrClosedPipe {
176 err = &OpError{Op: "write", Net: "pipe", Err: err}
177 }
178 return n, err
179 }
180
181 func (p *pipe) write(b []byte) (n int, err error) {
182 switch {
183 case isClosedChan(p.localDone):
184 return 0, io.ErrClosedPipe
185 case isClosedChan(p.remoteDone):
186 return 0, io.ErrClosedPipe
187 case isClosedChan(p.writeDeadline.wait()):
188 return 0, os.ErrDeadlineExceeded
189 }
190
191 p.wrMu.Lock()
192 defer p.wrMu.Unlock()
193 for once := true; once || len(b) > 0; once = false {
194 select {
195 case p.wrTx <- b:
196 nw := <-p.wrRx
197 b = b[nw:]
198 n += nw
199 case <-p.localDone:
200 return n, io.ErrClosedPipe
201 case <-p.remoteDone:
202 return n, io.ErrClosedPipe
203 case <-p.writeDeadline.wait():
204 return n, os.ErrDeadlineExceeded
205 }
206 }
207 return n, nil
208 }
209
210 func (p *pipe) SetDeadline(t time.Time) error {
211 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
212 return io.ErrClosedPipe
213 }
214 p.readDeadline.set(t)
215 p.writeDeadline.set(t)
216 return nil
217 }
218
219 func (p *pipe) SetReadDeadline(t time.Time) error {
220 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
221 return io.ErrClosedPipe
222 }
223 p.readDeadline.set(t)
224 return nil
225 }
226
227 func (p *pipe) SetWriteDeadline(t time.Time) error {
228 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
229 return io.ErrClosedPipe
230 }
231 p.writeDeadline.set(t)
232 return nil
233 }
234
235 func (p *pipe) Close() error {
236 p.once.Do(func() { close(p.localDone) })
237 return nil
238 }
239
View as plain text