1
2
3
4
5 package httputil
6
7 import (
8 "bufio"
9 "errors"
10 "io"
11 "net"
12 "net/http"
13 "net/textproto"
14 "sync"
15 )
16
17 var (
18
19 ErrPersistEOF = &http.ProtocolError{ErrorString: "persistent connection closed"}
20
21
22 ErrClosed = &http.ProtocolError{ErrorString: "connection closed by user"}
23
24
25 ErrPipeline = &http.ProtocolError{ErrorString: "pipeline error"}
26 )
27
28
29
30 var errClosed = errors.New("i/o operation on closed connection")
31
32
33
34
35
36
37 type ServerConn struct {
38 mu sync.Mutex
39 c net.Conn
40 r *bufio.Reader
41 re, we error
42 lastbody io.ReadCloser
43 nread, nwritten int
44 pipereq map[*http.Request]uint
45
46 pipe textproto.Pipeline
47 }
48
49
50
51
52
53
54 func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
55 if r == nil {
56 r = bufio.NewReader(c)
57 }
58 return &ServerConn{c: c, r: r, pipereq: make(map[*http.Request]uint)}
59 }
60
61
62
63
64
65 func (sc *ServerConn) Hijack() (net.Conn, *bufio.Reader) {
66 sc.mu.Lock()
67 defer sc.mu.Unlock()
68 c := sc.c
69 r := sc.r
70 sc.c = nil
71 sc.r = nil
72 return c, r
73 }
74
75
76 func (sc *ServerConn) Close() error {
77 c, _ := sc.Hijack()
78 if c != nil {
79 return c.Close()
80 }
81 return nil
82 }
83
84
85
86
87
88 func (sc *ServerConn) Read() (*http.Request, error) {
89 var req *http.Request
90 var err error
91
92
93 id := sc.pipe.Next()
94 sc.pipe.StartRequest(id)
95 defer func() {
96 sc.pipe.EndRequest(id)
97 if req == nil {
98 sc.pipe.StartResponse(id)
99 sc.pipe.EndResponse(id)
100 } else {
101
102 sc.mu.Lock()
103 sc.pipereq[req] = id
104 sc.mu.Unlock()
105 }
106 }()
107
108 sc.mu.Lock()
109 if sc.we != nil {
110 defer sc.mu.Unlock()
111 return nil, sc.we
112 }
113 if sc.re != nil {
114 defer sc.mu.Unlock()
115 return nil, sc.re
116 }
117 if sc.r == nil {
118 defer sc.mu.Unlock()
119 return nil, errClosed
120 }
121 r := sc.r
122 lastbody := sc.lastbody
123 sc.lastbody = nil
124 sc.mu.Unlock()
125
126
127 if lastbody != nil {
128
129
130
131 err = lastbody.Close()
132 if err != nil {
133 sc.mu.Lock()
134 defer sc.mu.Unlock()
135 sc.re = err
136 return nil, err
137 }
138 }
139
140 req, err = http.ReadRequest(r)
141 sc.mu.Lock()
142 defer sc.mu.Unlock()
143 if err != nil {
144 if err == io.ErrUnexpectedEOF {
145
146
147
148 sc.re = ErrPersistEOF
149 return nil, sc.re
150 } else {
151 sc.re = err
152 return req, err
153 }
154 }
155 sc.lastbody = req.Body
156 sc.nread++
157 if req.Close {
158 sc.re = ErrPersistEOF
159 return req, sc.re
160 }
161 return req, err
162 }
163
164
165
166 func (sc *ServerConn) Pending() int {
167 sc.mu.Lock()
168 defer sc.mu.Unlock()
169 return sc.nread - sc.nwritten
170 }
171
172
173
174
175 func (sc *ServerConn) Write(req *http.Request, resp *http.Response) error {
176
177
178 sc.mu.Lock()
179 id, ok := sc.pipereq[req]
180 delete(sc.pipereq, req)
181 if !ok {
182 sc.mu.Unlock()
183 return ErrPipeline
184 }
185 sc.mu.Unlock()
186
187
188 sc.pipe.StartResponse(id)
189 defer sc.pipe.EndResponse(id)
190
191 sc.mu.Lock()
192 if sc.we != nil {
193 defer sc.mu.Unlock()
194 return sc.we
195 }
196 if sc.c == nil {
197 defer sc.mu.Unlock()
198 return ErrClosed
199 }
200 c := sc.c
201 if sc.nread <= sc.nwritten {
202 defer sc.mu.Unlock()
203 return errors.New("persist server pipe count")
204 }
205 if resp.Close {
206
207
208
209 sc.re = ErrPersistEOF
210 }
211 sc.mu.Unlock()
212
213 err := resp.Write(c)
214 sc.mu.Lock()
215 defer sc.mu.Unlock()
216 if err != nil {
217 sc.we = err
218 return err
219 }
220 sc.nwritten++
221
222 return nil
223 }
224
225
226
227
228
229
230 type ClientConn struct {
231 mu sync.Mutex
232 c net.Conn
233 r *bufio.Reader
234 re, we error
235 lastbody io.ReadCloser
236 nread, nwritten int
237 pipereq map[*http.Request]uint
238
239 pipe textproto.Pipeline
240 writeReq func(*http.Request, io.Writer) error
241 }
242
243
244
245
246
247
248 func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
249 if r == nil {
250 r = bufio.NewReader(c)
251 }
252 return &ClientConn{
253 c: c,
254 r: r,
255 pipereq: make(map[*http.Request]uint),
256 writeReq: (*http.Request).Write,
257 }
258 }
259
260
261
262
263
264
265 func NewProxyClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
266 cc := NewClientConn(c, r)
267 cc.writeReq = (*http.Request).WriteProxy
268 return cc
269 }
270
271
272
273
274
275 func (cc *ClientConn) Hijack() (c net.Conn, r *bufio.Reader) {
276 cc.mu.Lock()
277 defer cc.mu.Unlock()
278 c = cc.c
279 r = cc.r
280 cc.c = nil
281 cc.r = nil
282 return
283 }
284
285
286 func (cc *ClientConn) Close() error {
287 c, _ := cc.Hijack()
288 if c != nil {
289 return c.Close()
290 }
291 return nil
292 }
293
294
295
296
297
298
299 func (cc *ClientConn) Write(req *http.Request) error {
300 var err error
301
302
303 id := cc.pipe.Next()
304 cc.pipe.StartRequest(id)
305 defer func() {
306 cc.pipe.EndRequest(id)
307 if err != nil {
308 cc.pipe.StartResponse(id)
309 cc.pipe.EndResponse(id)
310 } else {
311
312 cc.mu.Lock()
313 cc.pipereq[req] = id
314 cc.mu.Unlock()
315 }
316 }()
317
318 cc.mu.Lock()
319 if cc.re != nil {
320 defer cc.mu.Unlock()
321 return cc.re
322 }
323 if cc.we != nil {
324 defer cc.mu.Unlock()
325 return cc.we
326 }
327 if cc.c == nil {
328 defer cc.mu.Unlock()
329 return errClosed
330 }
331 c := cc.c
332 if req.Close {
333
334
335 cc.we = ErrPersistEOF
336 }
337 cc.mu.Unlock()
338
339 err = cc.writeReq(req, c)
340 cc.mu.Lock()
341 defer cc.mu.Unlock()
342 if err != nil {
343 cc.we = err
344 return err
345 }
346 cc.nwritten++
347
348 return nil
349 }
350
351
352
353 func (cc *ClientConn) Pending() int {
354 cc.mu.Lock()
355 defer cc.mu.Unlock()
356 return cc.nwritten - cc.nread
357 }
358
359
360
361
362
363 func (cc *ClientConn) Read(req *http.Request) (resp *http.Response, err error) {
364
365 cc.mu.Lock()
366 id, ok := cc.pipereq[req]
367 delete(cc.pipereq, req)
368 if !ok {
369 cc.mu.Unlock()
370 return nil, ErrPipeline
371 }
372 cc.mu.Unlock()
373
374
375 cc.pipe.StartResponse(id)
376 defer cc.pipe.EndResponse(id)
377
378 cc.mu.Lock()
379 if cc.re != nil {
380 defer cc.mu.Unlock()
381 return nil, cc.re
382 }
383 if cc.r == nil {
384 defer cc.mu.Unlock()
385 return nil, errClosed
386 }
387 r := cc.r
388 lastbody := cc.lastbody
389 cc.lastbody = nil
390 cc.mu.Unlock()
391
392
393 if lastbody != nil {
394
395
396
397 err = lastbody.Close()
398 if err != nil {
399 cc.mu.Lock()
400 defer cc.mu.Unlock()
401 cc.re = err
402 return nil, err
403 }
404 }
405
406 resp, err = http.ReadResponse(r, req)
407 cc.mu.Lock()
408 defer cc.mu.Unlock()
409 if err != nil {
410 cc.re = err
411 return resp, err
412 }
413 cc.lastbody = resp.Body
414
415 cc.nread++
416
417 if resp.Close {
418 cc.re = ErrPersistEOF
419 return resp, cc.re
420 }
421 return resp, err
422 }
423
424
425 func (cc *ClientConn) Do(req *http.Request) (*http.Response, error) {
426 err := cc.Write(req)
427 if err != nil {
428 return nil, err
429 }
430 return cc.Read(req)
431 }
432
View as plain text