Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "maps"
24 "net"
25 "net/http/httptrace"
26 "net/http/internal/ascii"
27 "net/textproto"
28 "net/url"
29 "reflect"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34 _ "unsafe"
35
36 "golang.org/x/net/http/httpguts"
37 "golang.org/x/net/http/httpproxy"
38 )
39
40
41
42
43
44
45 var DefaultTransport RoundTripper = &Transport{
46 Proxy: ProxyFromEnvironment,
47 DialContext: defaultTransportDialContext(&net.Dialer{
48 Timeout: 30 * time.Second,
49 KeepAlive: 30 * time.Second,
50 }),
51 ForceAttemptHTTP2: true,
52 MaxIdleConns: 100,
53 IdleConnTimeout: 90 * time.Second,
54 TLSHandshakeTimeout: 10 * time.Second,
55 ExpectContinueTimeout: 1 * time.Second,
56 }
57
58
59
60 const DefaultMaxIdleConnsPerHost = 2
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 type Transport struct {
97 idleMu sync.Mutex
98 closeIdle bool
99 idleConn map[connectMethodKey][]*persistConn
100 idleConnWait map[connectMethodKey]wantConnQueue
101 idleLRU connLRU
102
103 reqMu sync.Mutex
104 reqCanceler map[*Request]context.CancelCauseFunc
105
106 altMu sync.Mutex
107 altProto atomic.Value
108
109 connsPerHostMu sync.Mutex
110 connsPerHost map[connectMethodKey]int
111 connsPerHostWait map[connectMethodKey]wantConnQueue
112 dialsInProgress wantConnQueue
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 Proxy func(*Request) (*url.URL, error)
129
130
131
132
133 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
134
135
136
137
138
139
140
141
142
143 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 Dial func(network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163
164
165
166
167 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
168
169
170
171
172
173
174
175 DialTLS func(network, addr string) (net.Conn, error)
176
177
178
179
180
181 TLSClientConfig *tls.Config
182
183
184
185 TLSHandshakeTimeout time.Duration
186
187
188
189
190
191
192 DisableKeepAlives bool
193
194
195
196
197
198
199
200
201
202 DisableCompression bool
203
204
205
206 MaxIdleConns int
207
208
209
210
211 MaxIdleConnsPerHost int
212
213
214
215
216
217
218 MaxConnsPerHost int
219
220
221
222
223
224 IdleConnTimeout time.Duration
225
226
227
228
229
230 ResponseHeaderTimeout time.Duration
231
232
233
234
235
236
237
238
239 ExpectContinueTimeout time.Duration
240
241
242
243
244
245
246
247
248
249
250
251 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
252
253
254
255
256 ProxyConnectHeader Header
257
258
259
260
261
262
263
264
265 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
266
267
268
269
270
271
272 MaxResponseHeaderBytes int64
273
274
275
276
277 WriteBufferSize int
278
279
280
281
282 ReadBufferSize int
283
284
285
286 nextProtoOnce sync.Once
287 h2transport h2Transport
288 tlsNextProtoWasNil bool
289
290
291
292
293
294
295 ForceAttemptHTTP2 bool
296
297
298
299
300
301 HTTP2 *HTTP2Config
302
303
304
305
306
307
308
309
310
311 Protocols *Protocols
312 }
313
314 func (t *Transport) writeBufferSize() int {
315 if t.WriteBufferSize > 0 {
316 return t.WriteBufferSize
317 }
318 return 4 << 10
319 }
320
321 func (t *Transport) readBufferSize() int {
322 if t.ReadBufferSize > 0 {
323 return t.ReadBufferSize
324 }
325 return 4 << 10
326 }
327
328
329 func (t *Transport) Clone() *Transport {
330 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
331 t2 := &Transport{
332 Proxy: t.Proxy,
333 OnProxyConnectResponse: t.OnProxyConnectResponse,
334 DialContext: t.DialContext,
335 Dial: t.Dial,
336 DialTLS: t.DialTLS,
337 DialTLSContext: t.DialTLSContext,
338 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
339 DisableKeepAlives: t.DisableKeepAlives,
340 DisableCompression: t.DisableCompression,
341 MaxIdleConns: t.MaxIdleConns,
342 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
343 MaxConnsPerHost: t.MaxConnsPerHost,
344 IdleConnTimeout: t.IdleConnTimeout,
345 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
346 ExpectContinueTimeout: t.ExpectContinueTimeout,
347 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
348 GetProxyConnectHeader: t.GetProxyConnectHeader,
349 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
350 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
351 WriteBufferSize: t.WriteBufferSize,
352 ReadBufferSize: t.ReadBufferSize,
353 }
354 if t.TLSClientConfig != nil {
355 t2.TLSClientConfig = t.TLSClientConfig.Clone()
356 }
357 if t.HTTP2 != nil {
358 t2.HTTP2 = &HTTP2Config{}
359 *t2.HTTP2 = *t.HTTP2
360 }
361 if t.Protocols != nil {
362 t2.Protocols = &Protocols{}
363 *t2.Protocols = *t.Protocols
364 }
365 if !t.tlsNextProtoWasNil {
366 npm := maps.Clone(t.TLSNextProto)
367 if npm == nil {
368 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
369 }
370 t2.TLSNextProto = npm
371 }
372 return t2
373 }
374
375
376
377
378
379
380
381 type h2Transport interface {
382 CloseIdleConnections()
383 }
384
385 func (t *Transport) hasCustomTLSDialer() bool {
386 return t.DialTLS != nil || t.DialTLSContext != nil
387 }
388
389 var http2client = godebug.New("http2client")
390
391
392
393 func (t *Transport) onceSetNextProtoDefaults() {
394 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
395 if http2client.Value() == "0" {
396 http2client.IncNonDefault()
397 return
398 }
399
400
401
402
403
404
405 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
406 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
407 if v := rv.Field(0); v.CanInterface() {
408 if h2i, ok := v.Interface().(h2Transport); ok {
409 t.h2transport = h2i
410 return
411 }
412 }
413 }
414
415 if _, ok := t.TLSNextProto["h2"]; ok {
416
417 return
418 }
419 protocols := t.protocols()
420 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
421 return
422 }
423 if omitBundledHTTP2 {
424 return
425 }
426 t2, err := http2configureTransports(t)
427 if err != nil {
428 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
429 return
430 }
431 t.h2transport = t2
432
433
434
435
436
437
438
439 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
440 const h2max = 1<<32 - 1
441 if limit1 >= h2max {
442 t2.MaxHeaderListSize = h2max
443 } else {
444 t2.MaxHeaderListSize = uint32(limit1)
445 }
446 }
447
448
449
450
451
452
453 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
454 }
455
456 func (t *Transport) protocols() Protocols {
457 if t.Protocols != nil {
458 return *t.Protocols
459 }
460 var p Protocols
461 p.SetHTTP1(true)
462 switch {
463 case t.TLSNextProto != nil:
464
465
466 if t.TLSNextProto["h2"] != nil {
467 p.SetHTTP2(true)
468 }
469 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
470
471
472
473
474
475
476 case http2client.Value() == "0":
477 default:
478 p.SetHTTP2(true)
479 }
480 return p
481 }
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
500 return envProxyFunc()(req.URL)
501 }
502
503
504
505 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
506 return func(*Request) (*url.URL, error) {
507 return fixedURL, nil
508 }
509 }
510
511
512
513
514 type transportRequest struct {
515 *Request
516 extra Header
517 trace *httptrace.ClientTrace
518
519 ctx context.Context
520 cancel context.CancelCauseFunc
521
522 mu sync.Mutex
523 err error
524 }
525
526 func (tr *transportRequest) extraHeaders() Header {
527 if tr.extra == nil {
528 tr.extra = make(Header)
529 }
530 return tr.extra
531 }
532
533 func (tr *transportRequest) setError(err error) {
534 tr.mu.Lock()
535 if tr.err == nil {
536 tr.err = err
537 }
538 tr.mu.Unlock()
539 }
540
541
542
543 func (t *Transport) useRegisteredProtocol(req *Request) bool {
544 if req.URL.Scheme == "https" && req.requiresHTTP1() {
545
546
547
548
549 return false
550 }
551 return true
552 }
553
554
555
556
557 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
558 if !t.useRegisteredProtocol(req) {
559 return nil
560 }
561 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
562 return altProto[req.URL.Scheme]
563 }
564
565 func validateHeaders(hdrs Header) string {
566 for k, vv := range hdrs {
567 if !httpguts.ValidHeaderFieldName(k) {
568 return fmt.Sprintf("field name %q", k)
569 }
570 for _, v := range vv {
571 if !httpguts.ValidHeaderFieldValue(v) {
572
573
574 return fmt.Sprintf("field value for %q", k)
575 }
576 }
577 }
578 return ""
579 }
580
581
582 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
583 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
584 ctx := req.Context()
585 trace := httptrace.ContextClientTrace(ctx)
586
587 if req.URL == nil {
588 req.closeBody()
589 return nil, errors.New("http: nil Request.URL")
590 }
591 if req.Header == nil {
592 req.closeBody()
593 return nil, errors.New("http: nil Request.Header")
594 }
595 scheme := req.URL.Scheme
596 isHTTP := scheme == "http" || scheme == "https"
597 if isHTTP {
598
599 if err := validateHeaders(req.Header); err != "" {
600 req.closeBody()
601 return nil, fmt.Errorf("net/http: invalid header %s", err)
602 }
603
604
605 if err := validateHeaders(req.Trailer); err != "" {
606 req.closeBody()
607 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
608 }
609 }
610
611 origReq := req
612 req = setupRewindBody(req)
613
614 if altRT := t.alternateRoundTripper(req); altRT != nil {
615 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
616 return resp, err
617 }
618 var err error
619 req, err = rewindBody(req)
620 if err != nil {
621 return nil, err
622 }
623 }
624 if !isHTTP {
625 req.closeBody()
626 return nil, badStringError("unsupported protocol scheme", scheme)
627 }
628 if req.Method != "" && !validMethod(req.Method) {
629 req.closeBody()
630 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
631 }
632 if req.URL.Host == "" {
633 req.closeBody()
634 return nil, errors.New("http: no Host in request URL")
635 }
636
637
638
639
640
641
642
643
644
645
646 ctx, cancel := context.WithCancelCause(req.Context())
647
648
649 if origReq.Cancel != nil {
650 go awaitLegacyCancel(ctx, cancel, origReq)
651 }
652
653
654
655
656
657 cancel = t.prepareTransportCancel(origReq, cancel)
658
659 defer func() {
660 if err != nil {
661 cancel(err)
662 }
663 }()
664
665 for {
666 select {
667 case <-ctx.Done():
668 req.closeBody()
669 return nil, context.Cause(ctx)
670 default:
671 }
672
673
674 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
675 cm, err := t.connectMethodForRequest(treq)
676 if err != nil {
677 req.closeBody()
678 return nil, err
679 }
680
681
682
683
684
685 pconn, err := t.getConn(treq, cm)
686 if err != nil {
687 req.closeBody()
688 return nil, err
689 }
690
691 var resp *Response
692 if pconn.alt != nil {
693
694 resp, err = pconn.alt.RoundTrip(req)
695 } else {
696 resp, err = pconn.roundTrip(treq)
697 }
698 if err == nil {
699 if pconn.alt != nil {
700
701
702
703
704
705 cancel(errRequestDone)
706 }
707 resp.Request = origReq
708 return resp, nil
709 }
710
711
712 if http2isNoCachedConnError(err) {
713 if t.removeIdleConn(pconn) {
714 t.decConnsPerHost(pconn.cacheKey)
715 }
716 } else if !pconn.shouldRetryRequest(req, err) {
717
718
719 if e, ok := err.(nothingWrittenError); ok {
720 err = e.error
721 }
722 if e, ok := err.(transportReadFromServerError); ok {
723 err = e.err
724 }
725 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
726
727
728
729 req.closeBody()
730 }
731 return nil, err
732 }
733 testHookRoundTripRetried()
734
735
736 req, err = rewindBody(req)
737 if err != nil {
738 return nil, err
739 }
740 }
741 }
742
743 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
744 select {
745 case <-req.Cancel:
746 cancel(errRequestCanceled)
747 case <-ctx.Done():
748 }
749 }
750
751 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
752
753 type readTrackingBody struct {
754 io.ReadCloser
755 didRead bool
756 didClose bool
757 }
758
759 func (r *readTrackingBody) Read(data []byte) (int, error) {
760 r.didRead = true
761 return r.ReadCloser.Read(data)
762 }
763
764 func (r *readTrackingBody) Close() error {
765 r.didClose = true
766 return r.ReadCloser.Close()
767 }
768
769
770
771
772
773 func setupRewindBody(req *Request) *Request {
774 if req.Body == nil || req.Body == NoBody {
775 return req
776 }
777 newReq := *req
778 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
779 return &newReq
780 }
781
782
783
784
785
786 func rewindBody(req *Request) (rewound *Request, err error) {
787 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
788 return req, nil
789 }
790 if !req.Body.(*readTrackingBody).didClose {
791 req.closeBody()
792 }
793 if req.GetBody == nil {
794 return nil, errCannotRewind
795 }
796 body, err := req.GetBody()
797 if err != nil {
798 return nil, err
799 }
800 newReq := *req
801 newReq.Body = &readTrackingBody{ReadCloser: body}
802 return &newReq, nil
803 }
804
805
806
807
808 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
809 if http2isNoCachedConnError(err) {
810
811
812
813
814
815
816 return true
817 }
818 if err == errMissingHost {
819
820 return false
821 }
822 if !pc.isReused() {
823
824
825
826
827
828
829
830 return false
831 }
832 if _, ok := err.(nothingWrittenError); ok {
833
834
835 return req.outgoingLength() == 0 || req.GetBody != nil
836 }
837 if !req.isReplayable() {
838
839 return false
840 }
841 if _, ok := err.(transportReadFromServerError); ok {
842
843
844 return true
845 }
846 if err == errServerClosedIdle {
847
848
849
850 return true
851 }
852 return false
853 }
854
855
856 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
857
858
859
860
861
862
863
864
865
866
867
868 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
869 t.altMu.Lock()
870 defer t.altMu.Unlock()
871 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
872 if _, exists := oldMap[scheme]; exists {
873 panic("protocol " + scheme + " already registered")
874 }
875 newMap := maps.Clone(oldMap)
876 if newMap == nil {
877 newMap = make(map[string]RoundTripper)
878 }
879 newMap[scheme] = rt
880 t.altProto.Store(newMap)
881 }
882
883
884
885
886
887 func (t *Transport) CloseIdleConnections() {
888 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
889 t.idleMu.Lock()
890 m := t.idleConn
891 t.idleConn = nil
892 t.closeIdle = true
893 t.idleLRU = connLRU{}
894 t.idleMu.Unlock()
895 for _, conns := range m {
896 for _, pconn := range conns {
897 pconn.close(errCloseIdleConns)
898 }
899 }
900 t.connsPerHostMu.Lock()
901 t.dialsInProgress.all(func(w *wantConn) {
902 if w.cancelCtx != nil && !w.waiting() {
903 w.cancelCtx()
904 }
905 })
906 t.connsPerHostMu.Unlock()
907 if t2 := t.h2transport; t2 != nil {
908 t2.CloseIdleConnections()
909 }
910 }
911
912
913 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
914
915
916
917
918
919
920 cancel := func(err error) {
921 origCancel(err)
922 t.reqMu.Lock()
923 delete(t.reqCanceler, req)
924 t.reqMu.Unlock()
925 }
926 t.reqMu.Lock()
927 if t.reqCanceler == nil {
928 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
929 }
930 t.reqCanceler[req] = cancel
931 t.reqMu.Unlock()
932 return cancel
933 }
934
935
936
937
938
939
940
941 func (t *Transport) CancelRequest(req *Request) {
942 t.reqMu.Lock()
943 cancel := t.reqCanceler[req]
944 t.reqMu.Unlock()
945 if cancel != nil {
946 cancel(errRequestCanceled)
947 }
948 }
949
950
951
952
953
954 var (
955 envProxyOnce sync.Once
956 envProxyFuncValue func(*url.URL) (*url.URL, error)
957 )
958
959
960
961 func envProxyFunc() func(*url.URL) (*url.URL, error) {
962 envProxyOnce.Do(func() {
963 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
964 })
965 return envProxyFuncValue
966 }
967
968
969 func resetProxyConfig() {
970 envProxyOnce = sync.Once{}
971 envProxyFuncValue = nil
972 }
973
974 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
975 cm.targetScheme = treq.URL.Scheme
976 cm.targetAddr = canonicalAddr(treq.URL)
977 if t.Proxy != nil {
978 cm.proxyURL, err = t.Proxy(treq.Request)
979 }
980 cm.onlyH1 = treq.requiresHTTP1()
981 return cm, err
982 }
983
984
985
986 func (cm *connectMethod) proxyAuth() string {
987 if cm.proxyURL == nil {
988 return ""
989 }
990 if u := cm.proxyURL.User; u != nil {
991 username := u.Username()
992 password, _ := u.Password()
993 return "Basic " + basicAuth(username, password)
994 }
995 return ""
996 }
997
998
999 var (
1000 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1001 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1002 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1003 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1004 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1005 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1006 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1007 errIdleConnTimeout = errors.New("http: idle connection timeout")
1008
1009
1010
1011
1012
1013 errServerClosedIdle = errors.New("http: server closed idle connection")
1014 )
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 type transportReadFromServerError struct {
1025 err error
1026 }
1027
1028 func (e transportReadFromServerError) Unwrap() error { return e.err }
1029
1030 func (e transportReadFromServerError) Error() string {
1031 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1032 }
1033
1034 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1035 if err := t.tryPutIdleConn(pconn); err != nil {
1036 pconn.close(err)
1037 }
1038 }
1039
1040 func (t *Transport) maxIdleConnsPerHost() int {
1041 if v := t.MaxIdleConnsPerHost; v != 0 {
1042 return v
1043 }
1044 return DefaultMaxIdleConnsPerHost
1045 }
1046
1047
1048
1049
1050
1051
1052 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1053 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1054 return errKeepAlivesDisabled
1055 }
1056 if pconn.isBroken() {
1057 return errConnBroken
1058 }
1059 pconn.markReused()
1060
1061 t.idleMu.Lock()
1062 defer t.idleMu.Unlock()
1063
1064
1065
1066
1067 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1068 return nil
1069 }
1070
1071
1072
1073
1074
1075 key := pconn.cacheKey
1076 if q, ok := t.idleConnWait[key]; ok {
1077 done := false
1078 if pconn.alt == nil {
1079
1080
1081 for q.len() > 0 {
1082 w := q.popFront()
1083 if w.tryDeliver(pconn, nil, time.Time{}) {
1084 done = true
1085 break
1086 }
1087 }
1088 } else {
1089
1090
1091
1092
1093 for q.len() > 0 {
1094 w := q.popFront()
1095 w.tryDeliver(pconn, nil, time.Time{})
1096 }
1097 }
1098 if q.len() == 0 {
1099 delete(t.idleConnWait, key)
1100 } else {
1101 t.idleConnWait[key] = q
1102 }
1103 if done {
1104 return nil
1105 }
1106 }
1107
1108 if t.closeIdle {
1109 return errCloseIdle
1110 }
1111 if t.idleConn == nil {
1112 t.idleConn = make(map[connectMethodKey][]*persistConn)
1113 }
1114 idles := t.idleConn[key]
1115 if len(idles) >= t.maxIdleConnsPerHost() {
1116 return errTooManyIdleHost
1117 }
1118 for _, exist := range idles {
1119 if exist == pconn {
1120 log.Fatalf("dup idle pconn %p in freelist", pconn)
1121 }
1122 }
1123 t.idleConn[key] = append(idles, pconn)
1124 t.idleLRU.add(pconn)
1125 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1126 oldest := t.idleLRU.removeOldest()
1127 oldest.close(errTooManyIdle)
1128 t.removeIdleConnLocked(oldest)
1129 }
1130
1131
1132
1133
1134 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1135 if pconn.idleTimer != nil {
1136 pconn.idleTimer.Reset(t.IdleConnTimeout)
1137 } else {
1138 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1139 }
1140 }
1141 pconn.idleAt = time.Now()
1142 return nil
1143 }
1144
1145
1146
1147
1148 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1149 if t.DisableKeepAlives {
1150 return false
1151 }
1152
1153 t.idleMu.Lock()
1154 defer t.idleMu.Unlock()
1155
1156
1157
1158 t.closeIdle = false
1159
1160 if w == nil {
1161
1162 return false
1163 }
1164
1165
1166
1167
1168 var oldTime time.Time
1169 if t.IdleConnTimeout > 0 {
1170 oldTime = time.Now().Add(-t.IdleConnTimeout)
1171 }
1172
1173
1174 if list, ok := t.idleConn[w.key]; ok {
1175 stop := false
1176 delivered := false
1177 for len(list) > 0 && !stop {
1178 pconn := list[len(list)-1]
1179
1180
1181
1182
1183 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1184 if tooOld {
1185
1186
1187
1188 go pconn.closeConnIfStillIdle()
1189 }
1190 if pconn.isBroken() || tooOld {
1191
1192
1193
1194
1195
1196 list = list[:len(list)-1]
1197 continue
1198 }
1199 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1200 if delivered {
1201 if pconn.alt != nil {
1202
1203
1204 } else {
1205
1206
1207 t.idleLRU.remove(pconn)
1208 list = list[:len(list)-1]
1209 }
1210 }
1211 stop = true
1212 }
1213 if len(list) > 0 {
1214 t.idleConn[w.key] = list
1215 } else {
1216 delete(t.idleConn, w.key)
1217 }
1218 if stop {
1219 return delivered
1220 }
1221 }
1222
1223
1224 if t.idleConnWait == nil {
1225 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1226 }
1227 q := t.idleConnWait[w.key]
1228 q.cleanFrontNotWaiting()
1229 q.pushBack(w)
1230 t.idleConnWait[w.key] = q
1231 return false
1232 }
1233
1234
1235 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1236 t.idleMu.Lock()
1237 defer t.idleMu.Unlock()
1238 return t.removeIdleConnLocked(pconn)
1239 }
1240
1241
1242 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1243 if pconn.idleTimer != nil {
1244 pconn.idleTimer.Stop()
1245 }
1246 t.idleLRU.remove(pconn)
1247 key := pconn.cacheKey
1248 pconns := t.idleConn[key]
1249 var removed bool
1250 switch len(pconns) {
1251 case 0:
1252
1253 case 1:
1254 if pconns[0] == pconn {
1255 delete(t.idleConn, key)
1256 removed = true
1257 }
1258 default:
1259 for i, v := range pconns {
1260 if v != pconn {
1261 continue
1262 }
1263
1264
1265 copy(pconns[i:], pconns[i+1:])
1266 t.idleConn[key] = pconns[:len(pconns)-1]
1267 removed = true
1268 break
1269 }
1270 }
1271 return removed
1272 }
1273
1274 var zeroDialer net.Dialer
1275
1276 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1277 if t.DialContext != nil {
1278 c, err := t.DialContext(ctx, network, addr)
1279 if c == nil && err == nil {
1280 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1281 }
1282 return c, err
1283 }
1284 if t.Dial != nil {
1285 c, err := t.Dial(network, addr)
1286 if c == nil && err == nil {
1287 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1288 }
1289 return c, err
1290 }
1291 return zeroDialer.DialContext(ctx, network, addr)
1292 }
1293
1294
1295
1296
1297
1298
1299
1300 type wantConn struct {
1301 cm connectMethod
1302 key connectMethodKey
1303
1304
1305
1306
1307 beforeDial func()
1308 afterDial func()
1309
1310 mu sync.Mutex
1311 ctx context.Context
1312 cancelCtx context.CancelFunc
1313 done bool
1314 result chan connOrError
1315 }
1316
1317 type connOrError struct {
1318 pc *persistConn
1319 err error
1320 idleAt time.Time
1321 }
1322
1323
1324 func (w *wantConn) waiting() bool {
1325 w.mu.Lock()
1326 defer w.mu.Unlock()
1327
1328 return !w.done
1329 }
1330
1331
1332 func (w *wantConn) getCtxForDial() context.Context {
1333 w.mu.Lock()
1334 defer w.mu.Unlock()
1335
1336 return w.ctx
1337 }
1338
1339
1340 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1341 w.mu.Lock()
1342 defer w.mu.Unlock()
1343
1344 if w.done {
1345 return false
1346 }
1347 if (pc == nil) == (err == nil) {
1348 panic("net/http: internal error: misuse of tryDeliver")
1349 }
1350 w.ctx = nil
1351 w.done = true
1352
1353 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1354 close(w.result)
1355
1356 return true
1357 }
1358
1359
1360
1361 func (w *wantConn) cancel(t *Transport) {
1362 w.mu.Lock()
1363 var pc *persistConn
1364 if w.done {
1365 if r, ok := <-w.result; ok {
1366 pc = r.pc
1367 }
1368 } else {
1369 close(w.result)
1370 }
1371 w.ctx = nil
1372 w.done = true
1373 w.mu.Unlock()
1374
1375
1376
1377
1378 if pc != nil && pc.alt == nil {
1379 t.putOrCloseIdleConn(pc)
1380 }
1381 }
1382
1383
1384 type wantConnQueue struct {
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395 head []*wantConn
1396 headPos int
1397 tail []*wantConn
1398 }
1399
1400
1401 func (q *wantConnQueue) len() int {
1402 return len(q.head) - q.headPos + len(q.tail)
1403 }
1404
1405
1406 func (q *wantConnQueue) pushBack(w *wantConn) {
1407 q.tail = append(q.tail, w)
1408 }
1409
1410
1411 func (q *wantConnQueue) popFront() *wantConn {
1412 if q.headPos >= len(q.head) {
1413 if len(q.tail) == 0 {
1414 return nil
1415 }
1416
1417 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1418 }
1419 w := q.head[q.headPos]
1420 q.head[q.headPos] = nil
1421 q.headPos++
1422 return w
1423 }
1424
1425
1426 func (q *wantConnQueue) peekFront() *wantConn {
1427 if q.headPos < len(q.head) {
1428 return q.head[q.headPos]
1429 }
1430 if len(q.tail) > 0 {
1431 return q.tail[0]
1432 }
1433 return nil
1434 }
1435
1436
1437
1438 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1439 for {
1440 w := q.peekFront()
1441 if w == nil || w.waiting() {
1442 return cleaned
1443 }
1444 q.popFront()
1445 cleaned = true
1446 }
1447 }
1448
1449
1450 func (q *wantConnQueue) cleanFrontCanceled() {
1451 for {
1452 w := q.peekFront()
1453 if w == nil || w.cancelCtx != nil {
1454 return
1455 }
1456 q.popFront()
1457 }
1458 }
1459
1460
1461
1462 func (q *wantConnQueue) all(f func(*wantConn)) {
1463 for _, w := range q.head[q.headPos:] {
1464 f(w)
1465 }
1466 for _, w := range q.tail {
1467 f(w)
1468 }
1469 }
1470
1471 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1472 if t.DialTLSContext != nil {
1473 conn, err = t.DialTLSContext(ctx, network, addr)
1474 } else {
1475 conn, err = t.DialTLS(network, addr)
1476 }
1477 if conn == nil && err == nil {
1478 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1479 }
1480 return
1481 }
1482
1483
1484
1485
1486
1487 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1488 req := treq.Request
1489 trace := treq.trace
1490 ctx := req.Context()
1491 if trace != nil && trace.GetConn != nil {
1492 trace.GetConn(cm.addr())
1493 }
1494
1495
1496
1497
1498
1499
1500 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1501
1502 w := &wantConn{
1503 cm: cm,
1504 key: cm.key(),
1505 ctx: dialCtx,
1506 cancelCtx: dialCancel,
1507 result: make(chan connOrError, 1),
1508 beforeDial: testHookPrePendingDial,
1509 afterDial: testHookPostPendingDial,
1510 }
1511 defer func() {
1512 if err != nil {
1513 w.cancel(t)
1514 }
1515 }()
1516
1517
1518 if delivered := t.queueForIdleConn(w); !delivered {
1519 t.queueForDial(w)
1520 }
1521
1522
1523 select {
1524 case r := <-w.result:
1525
1526
1527 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1528 info := httptrace.GotConnInfo{
1529 Conn: r.pc.conn,
1530 Reused: r.pc.isReused(),
1531 }
1532 if !r.idleAt.IsZero() {
1533 info.WasIdle = true
1534 info.IdleTime = time.Since(r.idleAt)
1535 }
1536 trace.GotConn(info)
1537 }
1538 if r.err != nil {
1539
1540
1541
1542 select {
1543 case <-treq.ctx.Done():
1544 err := context.Cause(treq.ctx)
1545 if err == errRequestCanceled {
1546 err = errRequestCanceledConn
1547 }
1548 return nil, err
1549 default:
1550
1551 }
1552 }
1553 return r.pc, r.err
1554 case <-treq.ctx.Done():
1555 err := context.Cause(treq.ctx)
1556 if err == errRequestCanceled {
1557 err = errRequestCanceledConn
1558 }
1559 return nil, err
1560 }
1561 }
1562
1563
1564
1565 func (t *Transport) queueForDial(w *wantConn) {
1566 w.beforeDial()
1567
1568 t.connsPerHostMu.Lock()
1569 defer t.connsPerHostMu.Unlock()
1570
1571 if t.MaxConnsPerHost <= 0 {
1572 t.startDialConnForLocked(w)
1573 return
1574 }
1575
1576 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1577 if t.connsPerHost == nil {
1578 t.connsPerHost = make(map[connectMethodKey]int)
1579 }
1580 t.connsPerHost[w.key] = n + 1
1581 t.startDialConnForLocked(w)
1582 return
1583 }
1584
1585 if t.connsPerHostWait == nil {
1586 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1587 }
1588 q := t.connsPerHostWait[w.key]
1589 q.cleanFrontNotWaiting()
1590 q.pushBack(w)
1591 t.connsPerHostWait[w.key] = q
1592 }
1593
1594
1595
1596 func (t *Transport) startDialConnForLocked(w *wantConn) {
1597 t.dialsInProgress.cleanFrontCanceled()
1598 t.dialsInProgress.pushBack(w)
1599 go func() {
1600 t.dialConnFor(w)
1601 t.connsPerHostMu.Lock()
1602 defer t.connsPerHostMu.Unlock()
1603 w.cancelCtx = nil
1604 }()
1605 }
1606
1607
1608
1609
1610 func (t *Transport) dialConnFor(w *wantConn) {
1611 defer w.afterDial()
1612 ctx := w.getCtxForDial()
1613 if ctx == nil {
1614 t.decConnsPerHost(w.key)
1615 return
1616 }
1617
1618 pc, err := t.dialConn(ctx, w.cm)
1619 delivered := w.tryDeliver(pc, err, time.Time{})
1620 if err == nil && (!delivered || pc.alt != nil) {
1621
1622
1623
1624 t.putOrCloseIdleConn(pc)
1625 }
1626 if err != nil {
1627 t.decConnsPerHost(w.key)
1628 }
1629 }
1630
1631
1632
1633 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1634 if t.MaxConnsPerHost <= 0 {
1635 return
1636 }
1637
1638 t.connsPerHostMu.Lock()
1639 defer t.connsPerHostMu.Unlock()
1640 n := t.connsPerHost[key]
1641 if n == 0 {
1642
1643
1644 panic("net/http: internal error: connCount underflow")
1645 }
1646
1647
1648
1649
1650
1651 if q := t.connsPerHostWait[key]; q.len() > 0 {
1652 done := false
1653 for q.len() > 0 {
1654 w := q.popFront()
1655 if w.waiting() {
1656 t.startDialConnForLocked(w)
1657 done = true
1658 break
1659 }
1660 }
1661 if q.len() == 0 {
1662 delete(t.connsPerHostWait, key)
1663 } else {
1664
1665
1666 t.connsPerHostWait[key] = q
1667 }
1668 if done {
1669 return
1670 }
1671 }
1672
1673
1674 if n--; n == 0 {
1675 delete(t.connsPerHost, key)
1676 } else {
1677 t.connsPerHost[key] = n
1678 }
1679 }
1680
1681
1682
1683
1684 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1685
1686 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1687 if cfg.ServerName == "" {
1688 cfg.ServerName = name
1689 }
1690 if pconn.cacheKey.onlyH1 {
1691 cfg.NextProtos = nil
1692 }
1693 plainConn := pconn.conn
1694 tlsConn := tls.Client(plainConn, cfg)
1695 errc := make(chan error, 2)
1696 var timer *time.Timer
1697 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1698 timer = time.AfterFunc(d, func() {
1699 errc <- tlsHandshakeTimeoutError{}
1700 })
1701 }
1702 go func() {
1703 if trace != nil && trace.TLSHandshakeStart != nil {
1704 trace.TLSHandshakeStart()
1705 }
1706 err := tlsConn.HandshakeContext(ctx)
1707 if timer != nil {
1708 timer.Stop()
1709 }
1710 errc <- err
1711 }()
1712 if err := <-errc; err != nil {
1713 plainConn.Close()
1714 if err == (tlsHandshakeTimeoutError{}) {
1715
1716
1717 <-errc
1718 }
1719 if trace != nil && trace.TLSHandshakeDone != nil {
1720 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1721 }
1722 return err
1723 }
1724 cs := tlsConn.ConnectionState()
1725 if trace != nil && trace.TLSHandshakeDone != nil {
1726 trace.TLSHandshakeDone(cs, nil)
1727 }
1728 pconn.tlsState = &cs
1729 pconn.conn = tlsConn
1730 return nil
1731 }
1732
1733 type erringRoundTripper interface {
1734 RoundTripErr() error
1735 }
1736
1737 var testHookProxyConnectTimeout = context.WithTimeout
1738
1739 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1740 pconn = &persistConn{
1741 t: t,
1742 cacheKey: cm.key(),
1743 reqch: make(chan requestAndChan, 1),
1744 writech: make(chan writeRequest, 1),
1745 closech: make(chan struct{}),
1746 writeErrCh: make(chan error, 1),
1747 writeLoopDone: make(chan struct{}),
1748 }
1749 trace := httptrace.ContextClientTrace(ctx)
1750 wrapErr := func(err error) error {
1751 if cm.proxyURL != nil {
1752
1753 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1754 }
1755 return err
1756 }
1757 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1758 var err error
1759 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1760 if err != nil {
1761 return nil, wrapErr(err)
1762 }
1763 if tc, ok := pconn.conn.(*tls.Conn); ok {
1764
1765
1766 if trace != nil && trace.TLSHandshakeStart != nil {
1767 trace.TLSHandshakeStart()
1768 }
1769 if err := tc.HandshakeContext(ctx); err != nil {
1770 go pconn.conn.Close()
1771 if trace != nil && trace.TLSHandshakeDone != nil {
1772 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1773 }
1774 return nil, err
1775 }
1776 cs := tc.ConnectionState()
1777 if trace != nil && trace.TLSHandshakeDone != nil {
1778 trace.TLSHandshakeDone(cs, nil)
1779 }
1780 pconn.tlsState = &cs
1781 }
1782 } else {
1783 conn, err := t.dial(ctx, "tcp", cm.addr())
1784 if err != nil {
1785 return nil, wrapErr(err)
1786 }
1787 pconn.conn = conn
1788 if cm.scheme() == "https" {
1789 var firstTLSHost string
1790 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1791 return nil, wrapErr(err)
1792 }
1793 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1794 return nil, wrapErr(err)
1795 }
1796 }
1797 }
1798
1799
1800 switch {
1801 case cm.proxyURL == nil:
1802
1803 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1804 conn := pconn.conn
1805 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1806 if u := cm.proxyURL.User; u != nil {
1807 auth := &socksUsernamePassword{
1808 Username: u.Username(),
1809 }
1810 auth.Password, _ = u.Password()
1811 d.AuthMethods = []socksAuthMethod{
1812 socksAuthMethodNotRequired,
1813 socksAuthMethodUsernamePassword,
1814 }
1815 d.Authenticate = auth.Authenticate
1816 }
1817 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1818 conn.Close()
1819 return nil, err
1820 }
1821 case cm.targetScheme == "http":
1822 pconn.isProxy = true
1823 if pa := cm.proxyAuth(); pa != "" {
1824 pconn.mutateHeaderFunc = func(h Header) {
1825 h.Set("Proxy-Authorization", pa)
1826 }
1827 }
1828 case cm.targetScheme == "https":
1829 conn := pconn.conn
1830 var hdr Header
1831 if t.GetProxyConnectHeader != nil {
1832 var err error
1833 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1834 if err != nil {
1835 conn.Close()
1836 return nil, err
1837 }
1838 } else {
1839 hdr = t.ProxyConnectHeader
1840 }
1841 if hdr == nil {
1842 hdr = make(Header)
1843 }
1844 if pa := cm.proxyAuth(); pa != "" {
1845 hdr = hdr.Clone()
1846 hdr.Set("Proxy-Authorization", pa)
1847 }
1848 connectReq := &Request{
1849 Method: "CONNECT",
1850 URL: &url.URL{Opaque: cm.targetAddr},
1851 Host: cm.targetAddr,
1852 Header: hdr,
1853 }
1854
1855
1856
1857
1858 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1859 defer cancel()
1860
1861 didReadResponse := make(chan struct{})
1862 var (
1863 resp *Response
1864 err error
1865 )
1866
1867 go func() {
1868 defer close(didReadResponse)
1869 err = connectReq.Write(conn)
1870 if err != nil {
1871 return
1872 }
1873
1874
1875 br := bufio.NewReader(conn)
1876 resp, err = ReadResponse(br, connectReq)
1877 }()
1878 select {
1879 case <-connectCtx.Done():
1880 conn.Close()
1881 <-didReadResponse
1882 return nil, connectCtx.Err()
1883 case <-didReadResponse:
1884
1885 }
1886 if err != nil {
1887 conn.Close()
1888 return nil, err
1889 }
1890
1891 if t.OnProxyConnectResponse != nil {
1892 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1893 if err != nil {
1894 conn.Close()
1895 return nil, err
1896 }
1897 }
1898
1899 if resp.StatusCode != 200 {
1900 _, text, ok := strings.Cut(resp.Status, " ")
1901 conn.Close()
1902 if !ok {
1903 return nil, errors.New("unknown status code")
1904 }
1905 return nil, errors.New(text)
1906 }
1907 }
1908
1909 if cm.proxyURL != nil && cm.targetScheme == "https" {
1910 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1911 return nil, err
1912 }
1913 }
1914
1915
1916 unencryptedHTTP2 := pconn.tlsState == nil &&
1917 t.Protocols != nil &&
1918 t.Protocols.UnencryptedHTTP2() &&
1919 !t.Protocols.HTTP1()
1920 if unencryptedHTTP2 {
1921 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1922 if !ok {
1923 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1924 }
1925 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1926 if e, ok := alt.(erringRoundTripper); ok {
1927
1928 return nil, e.RoundTripErr()
1929 }
1930 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1931 }
1932
1933 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1934 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1935 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1936 if e, ok := alt.(erringRoundTripper); ok {
1937
1938 return nil, e.RoundTripErr()
1939 }
1940 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1941 }
1942 }
1943
1944 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1945 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1946
1947 go pconn.readLoop()
1948 go pconn.writeLoop()
1949 return pconn, nil
1950 }
1951
1952
1953
1954
1955
1956
1957
1958 type persistConnWriter struct {
1959 pc *persistConn
1960 }
1961
1962 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1963 n, err = w.pc.conn.Write(p)
1964 w.pc.nwrite += int64(n)
1965 return
1966 }
1967
1968
1969
1970
1971 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1972 n, err = io.Copy(w.pc.conn, r)
1973 w.pc.nwrite += n
1974 return
1975 }
1976
1977 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995 type connectMethod struct {
1996 _ incomparable
1997 proxyURL *url.URL
1998 targetScheme string
1999
2000
2001
2002 targetAddr string
2003 onlyH1 bool
2004 }
2005
2006 func (cm *connectMethod) key() connectMethodKey {
2007 proxyStr := ""
2008 targetAddr := cm.targetAddr
2009 if cm.proxyURL != nil {
2010 proxyStr = cm.proxyURL.String()
2011 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2012 targetAddr = ""
2013 }
2014 }
2015 return connectMethodKey{
2016 proxy: proxyStr,
2017 scheme: cm.targetScheme,
2018 addr: targetAddr,
2019 onlyH1: cm.onlyH1,
2020 }
2021 }
2022
2023
2024 func (cm *connectMethod) scheme() string {
2025 if cm.proxyURL != nil {
2026 return cm.proxyURL.Scheme
2027 }
2028 return cm.targetScheme
2029 }
2030
2031
2032 func (cm *connectMethod) addr() string {
2033 if cm.proxyURL != nil {
2034 return canonicalAddr(cm.proxyURL)
2035 }
2036 return cm.targetAddr
2037 }
2038
2039
2040
2041 func (cm *connectMethod) tlsHost() string {
2042 h := cm.targetAddr
2043 if hasPort(h) {
2044 h = h[:strings.LastIndex(h, ":")]
2045 }
2046 return h
2047 }
2048
2049
2050
2051
2052 type connectMethodKey struct {
2053 proxy, scheme, addr string
2054 onlyH1 bool
2055 }
2056
2057 func (k connectMethodKey) String() string {
2058
2059 var h1 string
2060 if k.onlyH1 {
2061 h1 = ",h1"
2062 }
2063 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2064 }
2065
2066
2067
2068 type persistConn struct {
2069
2070
2071
2072 alt RoundTripper
2073
2074 t *Transport
2075 cacheKey connectMethodKey
2076 conn net.Conn
2077 tlsState *tls.ConnectionState
2078 br *bufio.Reader
2079 bw *bufio.Writer
2080 nwrite int64
2081 reqch chan requestAndChan
2082 writech chan writeRequest
2083 closech chan struct{}
2084 isProxy bool
2085 sawEOF bool
2086 readLimit int64
2087
2088
2089
2090
2091 writeErrCh chan error
2092
2093 writeLoopDone chan struct{}
2094
2095
2096 idleAt time.Time
2097 idleTimer *time.Timer
2098
2099 mu sync.Mutex
2100 numExpectedResponses int
2101 closed error
2102 canceledErr error
2103 broken bool
2104 reused bool
2105
2106
2107
2108 mutateHeaderFunc func(Header)
2109 }
2110
2111 func (pc *persistConn) maxHeaderResponseSize() int64 {
2112 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2113 return v
2114 }
2115 return 10 << 20
2116 }
2117
2118 func (pc *persistConn) Read(p []byte) (n int, err error) {
2119 if pc.readLimit <= 0 {
2120 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2121 }
2122 if int64(len(p)) > pc.readLimit {
2123 p = p[:pc.readLimit]
2124 }
2125 n, err = pc.conn.Read(p)
2126 if err == io.EOF {
2127 pc.sawEOF = true
2128 }
2129 pc.readLimit -= int64(n)
2130 return
2131 }
2132
2133
2134 func (pc *persistConn) isBroken() bool {
2135 pc.mu.Lock()
2136 b := pc.closed != nil
2137 pc.mu.Unlock()
2138 return b
2139 }
2140
2141
2142
2143 func (pc *persistConn) canceled() error {
2144 pc.mu.Lock()
2145 defer pc.mu.Unlock()
2146 return pc.canceledErr
2147 }
2148
2149
2150 func (pc *persistConn) isReused() bool {
2151 pc.mu.Lock()
2152 r := pc.reused
2153 pc.mu.Unlock()
2154 return r
2155 }
2156
2157 func (pc *persistConn) cancelRequest(err error) {
2158 pc.mu.Lock()
2159 defer pc.mu.Unlock()
2160 pc.canceledErr = err
2161 pc.closeLocked(errRequestCanceled)
2162 }
2163
2164
2165
2166
2167 func (pc *persistConn) closeConnIfStillIdle() {
2168 t := pc.t
2169 t.idleMu.Lock()
2170 defer t.idleMu.Unlock()
2171 if _, ok := t.idleLRU.m[pc]; !ok {
2172
2173 return
2174 }
2175 t.removeIdleConnLocked(pc)
2176 pc.close(errIdleConnTimeout)
2177 }
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2188 if err == nil {
2189 return nil
2190 }
2191
2192
2193
2194
2195
2196
2197
2198
2199 <-pc.writeLoopDone
2200
2201
2202
2203
2204 if cerr := pc.canceled(); cerr != nil {
2205 return cerr
2206 }
2207
2208
2209 req.mu.Lock()
2210 reqErr := req.err
2211 req.mu.Unlock()
2212 if reqErr != nil {
2213 return reqErr
2214 }
2215
2216 if err == errServerClosedIdle {
2217
2218 return err
2219 }
2220
2221 if _, ok := err.(transportReadFromServerError); ok {
2222 if pc.nwrite == startBytesWritten {
2223 return nothingWrittenError{err}
2224 }
2225
2226 return err
2227 }
2228 if pc.isBroken() {
2229 if pc.nwrite == startBytesWritten {
2230 return nothingWrittenError{err}
2231 }
2232 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2233 }
2234 return err
2235 }
2236
2237
2238
2239
2240 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2241
2242 func (pc *persistConn) readLoop() {
2243 closeErr := errReadLoopExiting
2244 defer func() {
2245 pc.close(closeErr)
2246 pc.t.removeIdleConn(pc)
2247 }()
2248
2249 tryPutIdleConn := func(treq *transportRequest) bool {
2250 trace := treq.trace
2251 if err := pc.t.tryPutIdleConn(pc); err != nil {
2252 closeErr = err
2253 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2254 trace.PutIdleConn(err)
2255 }
2256 return false
2257 }
2258 if trace != nil && trace.PutIdleConn != nil {
2259 trace.PutIdleConn(nil)
2260 }
2261 return true
2262 }
2263
2264
2265
2266
2267 eofc := make(chan struct{})
2268 defer close(eofc)
2269
2270
2271 testHookMu.Lock()
2272 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2273 testHookMu.Unlock()
2274
2275 alive := true
2276 for alive {
2277 pc.readLimit = pc.maxHeaderResponseSize()
2278 _, err := pc.br.Peek(1)
2279
2280 pc.mu.Lock()
2281 if pc.numExpectedResponses == 0 {
2282 pc.readLoopPeekFailLocked(err)
2283 pc.mu.Unlock()
2284 return
2285 }
2286 pc.mu.Unlock()
2287
2288 rc := <-pc.reqch
2289 trace := rc.treq.trace
2290
2291 var resp *Response
2292 if err == nil {
2293 resp, err = pc.readResponse(rc, trace)
2294 } else {
2295 err = transportReadFromServerError{err}
2296 closeErr = err
2297 }
2298
2299 if err != nil {
2300 if pc.readLimit <= 0 {
2301 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2302 }
2303
2304 select {
2305 case rc.ch <- responseAndError{err: err}:
2306 case <-rc.callerGone:
2307 return
2308 }
2309 return
2310 }
2311 pc.readLimit = maxInt64
2312
2313 pc.mu.Lock()
2314 pc.numExpectedResponses--
2315 pc.mu.Unlock()
2316
2317 bodyWritable := resp.bodyIsWritable()
2318 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2319
2320 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2321
2322
2323
2324 alive = false
2325 }
2326
2327 if !hasBody || bodyWritable {
2328
2329
2330
2331
2332
2333 alive = alive &&
2334 !pc.sawEOF &&
2335 pc.wroteRequest() &&
2336 tryPutIdleConn(rc.treq)
2337
2338 if bodyWritable {
2339 closeErr = errCallerOwnsConn
2340 }
2341
2342 select {
2343 case rc.ch <- responseAndError{res: resp}:
2344 case <-rc.callerGone:
2345 return
2346 }
2347
2348 rc.treq.cancel(errRequestDone)
2349
2350
2351
2352
2353 testHookReadLoopBeforeNextRead()
2354 continue
2355 }
2356
2357 waitForBodyRead := make(chan bool, 2)
2358 body := &bodyEOFSignal{
2359 body: resp.Body,
2360 earlyCloseFn: func() error {
2361 waitForBodyRead <- false
2362 <-eofc
2363 return nil
2364
2365 },
2366 fn: func(err error) error {
2367 isEOF := err == io.EOF
2368 waitForBodyRead <- isEOF
2369 if isEOF {
2370 <-eofc
2371 } else if err != nil {
2372 if cerr := pc.canceled(); cerr != nil {
2373 return cerr
2374 }
2375 }
2376 return err
2377 },
2378 }
2379
2380 resp.Body = body
2381 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2382 resp.Body = &gzipReader{body: body}
2383 resp.Header.Del("Content-Encoding")
2384 resp.Header.Del("Content-Length")
2385 resp.ContentLength = -1
2386 resp.Uncompressed = true
2387 }
2388
2389 select {
2390 case rc.ch <- responseAndError{res: resp}:
2391 case <-rc.callerGone:
2392 return
2393 }
2394
2395
2396
2397
2398 select {
2399 case bodyEOF := <-waitForBodyRead:
2400 alive = alive &&
2401 bodyEOF &&
2402 !pc.sawEOF &&
2403 pc.wroteRequest() &&
2404 tryPutIdleConn(rc.treq)
2405 if bodyEOF {
2406 eofc <- struct{}{}
2407 }
2408 case <-rc.treq.ctx.Done():
2409 alive = false
2410 pc.cancelRequest(context.Cause(rc.treq.ctx))
2411 case <-pc.closech:
2412 alive = false
2413 }
2414
2415 rc.treq.cancel(errRequestDone)
2416 testHookReadLoopBeforeNextRead()
2417 }
2418 }
2419
2420 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2421 if pc.closed != nil {
2422 return
2423 }
2424 if n := pc.br.Buffered(); n > 0 {
2425 buf, _ := pc.br.Peek(n)
2426 if is408Message(buf) {
2427 pc.closeLocked(errServerClosedIdle)
2428 return
2429 } else {
2430 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2431 }
2432 }
2433 if peekErr == io.EOF {
2434
2435 pc.closeLocked(errServerClosedIdle)
2436 } else {
2437 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2438 }
2439 }
2440
2441
2442
2443
2444 func is408Message(buf []byte) bool {
2445 if len(buf) < len("HTTP/1.x 408") {
2446 return false
2447 }
2448 if string(buf[:7]) != "HTTP/1." {
2449 return false
2450 }
2451 return string(buf[8:12]) == " 408"
2452 }
2453
2454
2455
2456
2457 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2458 if trace != nil && trace.GotFirstResponseByte != nil {
2459 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2460 trace.GotFirstResponseByte()
2461 }
2462 }
2463
2464 continueCh := rc.continueCh
2465 for {
2466 resp, err = ReadResponse(pc.br, rc.treq.Request)
2467 if err != nil {
2468 return
2469 }
2470 resCode := resp.StatusCode
2471 if continueCh != nil && resCode == StatusContinue {
2472 if trace != nil && trace.Got100Continue != nil {
2473 trace.Got100Continue()
2474 }
2475 continueCh <- struct{}{}
2476 continueCh = nil
2477 }
2478 is1xx := 100 <= resCode && resCode <= 199
2479
2480 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2481 if is1xxNonTerminal {
2482 if trace != nil && trace.Got1xxResponse != nil {
2483 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2484 return nil, err
2485 }
2486
2487
2488
2489
2490
2491
2492
2493 pc.readLimit = pc.maxHeaderResponseSize()
2494 }
2495 continue
2496 }
2497 break
2498 }
2499 if resp.isProtocolSwitch() {
2500 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2501 }
2502 if continueCh != nil {
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515 if resp.Close || rc.treq.Request.Close {
2516 close(continueCh)
2517 } else {
2518 continueCh <- struct{}{}
2519 }
2520 }
2521
2522 resp.TLS = pc.tlsState
2523 return
2524 }
2525
2526
2527
2528
2529 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2530 if continueCh == nil {
2531 return nil
2532 }
2533 return func() bool {
2534 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2535 defer timer.Stop()
2536
2537 select {
2538 case _, ok := <-continueCh:
2539 return ok
2540 case <-timer.C:
2541 return true
2542 case <-pc.closech:
2543 return false
2544 }
2545 }
2546 }
2547
2548 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2549 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2550 if br.Buffered() != 0 {
2551 body.br = br
2552 }
2553 return body
2554 }
2555
2556
2557
2558
2559
2560
2561 type readWriteCloserBody struct {
2562 _ incomparable
2563 br *bufio.Reader
2564 io.ReadWriteCloser
2565 }
2566
2567 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2568 if b.br != nil {
2569 if n := b.br.Buffered(); len(p) > n {
2570 p = p[:n]
2571 }
2572 n, err = b.br.Read(p)
2573 if b.br.Buffered() == 0 {
2574 b.br = nil
2575 }
2576 return n, err
2577 }
2578 return b.ReadWriteCloser.Read(p)
2579 }
2580
2581 func (b *readWriteCloserBody) CloseWrite() error {
2582 if cw, ok := b.ReadWriteCloser.(interface{ CloseWrite() error }); ok {
2583 return cw.CloseWrite()
2584 }
2585 return fmt.Errorf("CloseWrite: %w", ErrNotSupported)
2586 }
2587
2588
2589 type nothingWrittenError struct {
2590 error
2591 }
2592
2593 func (nwe nothingWrittenError) Unwrap() error {
2594 return nwe.error
2595 }
2596
2597 func (pc *persistConn) writeLoop() {
2598 defer close(pc.writeLoopDone)
2599 for {
2600 select {
2601 case wr := <-pc.writech:
2602 startBytesWritten := pc.nwrite
2603 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2604 if bre, ok := err.(requestBodyReadError); ok {
2605 err = bre.error
2606
2607
2608
2609
2610
2611
2612
2613 wr.req.setError(err)
2614 }
2615 if err == nil {
2616 err = pc.bw.Flush()
2617 }
2618 if err != nil {
2619 if pc.nwrite == startBytesWritten {
2620 err = nothingWrittenError{err}
2621 }
2622 }
2623 pc.writeErrCh <- err
2624 wr.ch <- err
2625 if err != nil {
2626 pc.close(err)
2627 return
2628 }
2629 case <-pc.closech:
2630 return
2631 }
2632 }
2633 }
2634
2635
2636
2637
2638
2639
2640
2641 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2642
2643
2644
2645 func (pc *persistConn) wroteRequest() bool {
2646 select {
2647 case err := <-pc.writeErrCh:
2648
2649
2650 return err == nil
2651 default:
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661
2662 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2663 defer t.Stop()
2664 select {
2665 case err := <-pc.writeErrCh:
2666 return err == nil
2667 case <-t.C:
2668 return false
2669 }
2670 }
2671 }
2672
2673
2674
2675 type responseAndError struct {
2676 _ incomparable
2677 res *Response
2678 err error
2679 }
2680
2681 type requestAndChan struct {
2682 _ incomparable
2683 treq *transportRequest
2684 ch chan responseAndError
2685
2686
2687
2688
2689 addedGzip bool
2690
2691
2692
2693
2694
2695 continueCh chan<- struct{}
2696
2697 callerGone <-chan struct{}
2698 }
2699
2700
2701
2702
2703
2704 type writeRequest struct {
2705 req *transportRequest
2706 ch chan<- error
2707
2708
2709
2710
2711 continueCh <-chan struct{}
2712 }
2713
2714
2715
2716 type timeoutError struct {
2717 err string
2718 }
2719
2720 func (e *timeoutError) Error() string { return e.err }
2721 func (e *timeoutError) Timeout() bool { return true }
2722 func (e *timeoutError) Temporary() bool { return true }
2723 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2724
2725 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2726
2727
2728
2729 var errRequestCanceled = http2errRequestCanceled
2730 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2731
2732
2733
2734 var errRequestDone = errors.New("net/http: request completed")
2735
2736 func nop() {}
2737
2738
2739 var (
2740 testHookEnterRoundTrip = nop
2741 testHookWaitResLoop = nop
2742 testHookRoundTripRetried = nop
2743 testHookPrePendingDial = nop
2744 testHookPostPendingDial = nop
2745
2746 testHookMu sync.Locker = fakeLocker{}
2747 testHookReadLoopBeforeNextRead = nop
2748 )
2749
2750 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2751 testHookEnterRoundTrip()
2752 pc.mu.Lock()
2753 pc.numExpectedResponses++
2754 headerFn := pc.mutateHeaderFunc
2755 pc.mu.Unlock()
2756
2757 if headerFn != nil {
2758 headerFn(req.extraHeaders())
2759 }
2760
2761
2762
2763
2764
2765 requestedGzip := false
2766 if !pc.t.DisableCompression &&
2767 req.Header.Get("Accept-Encoding") == "" &&
2768 req.Header.Get("Range") == "" &&
2769 req.Method != "HEAD" {
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780
2781
2782 requestedGzip = true
2783 req.extraHeaders().Set("Accept-Encoding", "gzip")
2784 }
2785
2786 var continueCh chan struct{}
2787 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2788 continueCh = make(chan struct{}, 1)
2789 }
2790
2791 if pc.t.DisableKeepAlives &&
2792 !req.wantsClose() &&
2793 !isProtocolSwitchHeader(req.Header) {
2794 req.extraHeaders().Set("Connection", "close")
2795 }
2796
2797 gone := make(chan struct{})
2798 defer close(gone)
2799
2800 const debugRoundTrip = false
2801
2802
2803
2804
2805 startBytesWritten := pc.nwrite
2806 writeErrCh := make(chan error, 1)
2807 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2808
2809 resc := make(chan responseAndError)
2810 pc.reqch <- requestAndChan{
2811 treq: req,
2812 ch: resc,
2813 addedGzip: requestedGzip,
2814 continueCh: continueCh,
2815 callerGone: gone,
2816 }
2817
2818 handleResponse := func(re responseAndError) (*Response, error) {
2819 if (re.res == nil) == (re.err == nil) {
2820 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2821 }
2822 if debugRoundTrip {
2823 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2824 }
2825 if re.err != nil {
2826 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2827 }
2828 return re.res, nil
2829 }
2830
2831 var respHeaderTimer <-chan time.Time
2832 ctxDoneChan := req.ctx.Done()
2833 pcClosed := pc.closech
2834 for {
2835 testHookWaitResLoop()
2836 select {
2837 case err := <-writeErrCh:
2838 if debugRoundTrip {
2839 req.logf("writeErrCh recv: %T/%#v", err, err)
2840 }
2841 if err != nil {
2842 pc.close(fmt.Errorf("write error: %w", err))
2843 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2844 }
2845 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2846 if debugRoundTrip {
2847 req.logf("starting timer for %v", d)
2848 }
2849 timer := time.NewTimer(d)
2850 defer timer.Stop()
2851 respHeaderTimer = timer.C
2852 }
2853 case <-pcClosed:
2854 select {
2855 case re := <-resc:
2856
2857
2858
2859 return handleResponse(re)
2860 default:
2861 }
2862 if debugRoundTrip {
2863 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2864 }
2865 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2866 case <-respHeaderTimer:
2867 if debugRoundTrip {
2868 req.logf("timeout waiting for response headers.")
2869 }
2870 pc.close(errTimeout)
2871 return nil, errTimeout
2872 case re := <-resc:
2873 return handleResponse(re)
2874 case <-ctxDoneChan:
2875 select {
2876 case re := <-resc:
2877
2878
2879
2880 return handleResponse(re)
2881 default:
2882 }
2883 pc.cancelRequest(context.Cause(req.ctx))
2884 }
2885 }
2886 }
2887
2888
2889
2890 type tLogKey struct{}
2891
2892 func (tr *transportRequest) logf(format string, args ...any) {
2893 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2894 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2895 }
2896 }
2897
2898
2899
2900 func (pc *persistConn) markReused() {
2901 pc.mu.Lock()
2902 pc.reused = true
2903 pc.mu.Unlock()
2904 }
2905
2906
2907
2908
2909
2910
2911 func (pc *persistConn) close(err error) {
2912 pc.mu.Lock()
2913 defer pc.mu.Unlock()
2914 pc.closeLocked(err)
2915 }
2916
2917 func (pc *persistConn) closeLocked(err error) {
2918 if err == nil {
2919 panic("nil error")
2920 }
2921 pc.broken = true
2922 if pc.closed == nil {
2923 pc.closed = err
2924 pc.t.decConnsPerHost(pc.cacheKey)
2925
2926
2927 if pc.alt == nil {
2928 if err != errCallerOwnsConn {
2929 pc.conn.Close()
2930 }
2931 close(pc.closech)
2932 }
2933 }
2934 pc.mutateHeaderFunc = nil
2935 }
2936
2937 func schemePort(scheme string) string {
2938 switch scheme {
2939 case "http":
2940 return "80"
2941 case "https":
2942 return "443"
2943 case "socks5", "socks5h":
2944 return "1080"
2945 default:
2946 return ""
2947 }
2948 }
2949
2950 func idnaASCIIFromURL(url *url.URL) string {
2951 addr := url.Hostname()
2952 if v, err := idnaASCII(addr); err == nil {
2953 addr = v
2954 }
2955 return addr
2956 }
2957
2958
2959 func canonicalAddr(url *url.URL) string {
2960 port := url.Port()
2961 if port == "" {
2962 port = schemePort(url.Scheme)
2963 }
2964 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2965 }
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978 type bodyEOFSignal struct {
2979 body io.ReadCloser
2980 mu sync.Mutex
2981 closed bool
2982 rerr error
2983 fn func(error) error
2984 earlyCloseFn func() error
2985 }
2986
2987 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2988
2989 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2990 es.mu.Lock()
2991 closed, rerr := es.closed, es.rerr
2992 es.mu.Unlock()
2993 if closed {
2994 return 0, errReadOnClosedResBody
2995 }
2996 if rerr != nil {
2997 return 0, rerr
2998 }
2999
3000 n, err = es.body.Read(p)
3001 if err != nil {
3002 es.mu.Lock()
3003 defer es.mu.Unlock()
3004 if es.rerr == nil {
3005 es.rerr = err
3006 }
3007 err = es.condfn(err)
3008 }
3009 return
3010 }
3011
3012 func (es *bodyEOFSignal) Close() error {
3013 es.mu.Lock()
3014 defer es.mu.Unlock()
3015 if es.closed {
3016 return nil
3017 }
3018 es.closed = true
3019 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3020 return es.earlyCloseFn()
3021 }
3022 err := es.body.Close()
3023 return es.condfn(err)
3024 }
3025
3026
3027 func (es *bodyEOFSignal) condfn(err error) error {
3028 if es.fn == nil {
3029 return err
3030 }
3031 err = es.fn(err)
3032 es.fn = nil
3033 return err
3034 }
3035
3036
3037
3038 type gzipReader struct {
3039 _ incomparable
3040 body *bodyEOFSignal
3041 zr *gzip.Reader
3042 zerr error
3043 }
3044
3045 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3046 if gz.zr == nil {
3047 if gz.zerr == nil {
3048 gz.zr, gz.zerr = gzip.NewReader(gz.body)
3049 }
3050 if gz.zerr != nil {
3051 return 0, gz.zerr
3052 }
3053 }
3054
3055 gz.body.mu.Lock()
3056 if gz.body.closed {
3057 err = errReadOnClosedResBody
3058 }
3059 gz.body.mu.Unlock()
3060
3061 if err != nil {
3062 return 0, err
3063 }
3064 return gz.zr.Read(p)
3065 }
3066
3067 func (gz *gzipReader) Close() error {
3068 return gz.body.Close()
3069 }
3070
3071 type tlsHandshakeTimeoutError struct{}
3072
3073 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3074 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3075 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3076
3077
3078
3079
3080 type fakeLocker struct{}
3081
3082 func (fakeLocker) Lock() {}
3083 func (fakeLocker) Unlock() {}
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3099 if cfg == nil {
3100 return &tls.Config{}
3101 }
3102 return cfg.Clone()
3103 }
3104
3105 type connLRU struct {
3106 ll *list.List
3107 m map[*persistConn]*list.Element
3108 }
3109
3110
3111 func (cl *connLRU) add(pc *persistConn) {
3112 if cl.ll == nil {
3113 cl.ll = list.New()
3114 cl.m = make(map[*persistConn]*list.Element)
3115 }
3116 ele := cl.ll.PushFront(pc)
3117 if _, ok := cl.m[pc]; ok {
3118 panic("persistConn was already in LRU")
3119 }
3120 cl.m[pc] = ele
3121 }
3122
3123 func (cl *connLRU) removeOldest() *persistConn {
3124 ele := cl.ll.Back()
3125 pc := ele.Value.(*persistConn)
3126 cl.ll.Remove(ele)
3127 delete(cl.m, pc)
3128 return pc
3129 }
3130
3131
3132 func (cl *connLRU) remove(pc *persistConn) {
3133 if ele, ok := cl.m[pc]; ok {
3134 cl.ll.Remove(ele)
3135 delete(cl.m, pc)
3136 }
3137 }
3138
3139
3140 func (cl *connLRU) len() int {
3141 return len(cl.m)
3142 }
3143
View as plain text