Source file
src/net/http/transport.go
1
2
3
4
5
6
7
8
9
10 package http
11
12 import (
13 "bufio"
14 "compress/gzip"
15 "container/list"
16 "context"
17 "crypto/tls"
18 "errors"
19 "fmt"
20 "internal/godebug"
21 "io"
22 "log"
23 "maps"
24 "net"
25 "net/http/httptrace"
26 "net/http/internal/ascii"
27 "net/textproto"
28 "net/url"
29 "reflect"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34 _ "unsafe"
35
36 "golang.org/x/net/http/httpguts"
37 "golang.org/x/net/http/httpproxy"
38 )
39
40
41
42
43
44
45 var DefaultTransport RoundTripper = &Transport{
46 Proxy: ProxyFromEnvironment,
47 DialContext: defaultTransportDialContext(&net.Dialer{
48 Timeout: 30 * time.Second,
49 KeepAlive: 30 * time.Second,
50 }),
51 ForceAttemptHTTP2: true,
52 MaxIdleConns: 100,
53 IdleConnTimeout: 90 * time.Second,
54 TLSHandshakeTimeout: 10 * time.Second,
55 ExpectContinueTimeout: 1 * time.Second,
56 }
57
58
59
60 const DefaultMaxIdleConnsPerHost = 2
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 type Transport struct {
97 idleMu sync.Mutex
98 closeIdle bool
99 idleConn map[connectMethodKey][]*persistConn
100 idleConnWait map[connectMethodKey]wantConnQueue
101 idleLRU connLRU
102
103 reqMu sync.Mutex
104 reqCanceler map[*Request]context.CancelCauseFunc
105
106 altMu sync.Mutex
107 altProto atomic.Value
108
109 connsPerHostMu sync.Mutex
110 connsPerHost map[connectMethodKey]int
111 connsPerHostWait map[connectMethodKey]wantConnQueue
112 dialsInProgress wantConnQueue
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 Proxy func(*Request) (*url.URL, error)
129
130
131
132
133 OnProxyConnectResponse func(ctx context.Context, proxyURL *url.URL, connectReq *Request, connectRes *Response) error
134
135
136
137
138
139
140
141
142
143 DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
144
145
146
147
148
149
150
151
152
153
154
155 Dial func(network, addr string) (net.Conn, error)
156
157
158
159
160
161
162
163
164
165
166
167 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
168
169
170
171
172
173
174
175 DialTLS func(network, addr string) (net.Conn, error)
176
177
178
179
180
181 TLSClientConfig *tls.Config
182
183
184
185 TLSHandshakeTimeout time.Duration
186
187
188
189
190
191
192 DisableKeepAlives bool
193
194
195
196
197
198
199
200
201
202 DisableCompression bool
203
204
205
206 MaxIdleConns int
207
208
209
210
211 MaxIdleConnsPerHost int
212
213
214
215
216
217
218 MaxConnsPerHost int
219
220
221
222
223
224 IdleConnTimeout time.Duration
225
226
227
228
229
230 ResponseHeaderTimeout time.Duration
231
232
233
234
235
236
237
238
239 ExpectContinueTimeout time.Duration
240
241
242
243
244
245
246
247
248
249
250
251 TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
252
253
254
255
256 ProxyConnectHeader Header
257
258
259
260
261
262
263
264
265 GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error)
266
267
268
269
270
271
272 MaxResponseHeaderBytes int64
273
274
275
276
277 WriteBufferSize int
278
279
280
281
282 ReadBufferSize int
283
284
285
286 nextProtoOnce sync.Once
287 h2transport h2Transport
288 tlsNextProtoWasNil bool
289
290
291
292
293
294
295 ForceAttemptHTTP2 bool
296
297
298
299
300
301 HTTP2 *HTTP2Config
302
303
304
305
306
307
308
309
310
311 Protocols *Protocols
312 }
313
314 func (t *Transport) writeBufferSize() int {
315 if t.WriteBufferSize > 0 {
316 return t.WriteBufferSize
317 }
318 return 4 << 10
319 }
320
321 func (t *Transport) readBufferSize() int {
322 if t.ReadBufferSize > 0 {
323 return t.ReadBufferSize
324 }
325 return 4 << 10
326 }
327
328
329 func (t *Transport) Clone() *Transport {
330 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
331 t2 := &Transport{
332 Proxy: t.Proxy,
333 OnProxyConnectResponse: t.OnProxyConnectResponse,
334 DialContext: t.DialContext,
335 Dial: t.Dial,
336 DialTLS: t.DialTLS,
337 DialTLSContext: t.DialTLSContext,
338 TLSHandshakeTimeout: t.TLSHandshakeTimeout,
339 DisableKeepAlives: t.DisableKeepAlives,
340 DisableCompression: t.DisableCompression,
341 MaxIdleConns: t.MaxIdleConns,
342 MaxIdleConnsPerHost: t.MaxIdleConnsPerHost,
343 MaxConnsPerHost: t.MaxConnsPerHost,
344 IdleConnTimeout: t.IdleConnTimeout,
345 ResponseHeaderTimeout: t.ResponseHeaderTimeout,
346 ExpectContinueTimeout: t.ExpectContinueTimeout,
347 ProxyConnectHeader: t.ProxyConnectHeader.Clone(),
348 GetProxyConnectHeader: t.GetProxyConnectHeader,
349 MaxResponseHeaderBytes: t.MaxResponseHeaderBytes,
350 ForceAttemptHTTP2: t.ForceAttemptHTTP2,
351 WriteBufferSize: t.WriteBufferSize,
352 ReadBufferSize: t.ReadBufferSize,
353 }
354 if t.TLSClientConfig != nil {
355 t2.TLSClientConfig = t.TLSClientConfig.Clone()
356 }
357 if t.HTTP2 != nil {
358 t2.HTTP2 = &HTTP2Config{}
359 *t2.HTTP2 = *t.HTTP2
360 }
361 if t.Protocols != nil {
362 t2.Protocols = &Protocols{}
363 *t2.Protocols = *t.Protocols
364 }
365 if !t.tlsNextProtoWasNil {
366 npm := maps.Clone(t.TLSNextProto)
367 if npm == nil {
368 npm = make(map[string]func(authority string, c *tls.Conn) RoundTripper)
369 }
370 t2.TLSNextProto = npm
371 }
372 return t2
373 }
374
375
376
377
378
379
380
381 type h2Transport interface {
382 CloseIdleConnections()
383 }
384
385 func (t *Transport) hasCustomTLSDialer() bool {
386 return t.DialTLS != nil || t.DialTLSContext != nil
387 }
388
389 var http2client = godebug.New("http2client")
390
391
392
393 func (t *Transport) onceSetNextProtoDefaults() {
394 t.tlsNextProtoWasNil = (t.TLSNextProto == nil)
395 if http2client.Value() == "0" {
396 http2client.IncNonDefault()
397 return
398 }
399
400
401
402
403
404
405 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
406 if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 {
407 if v := rv.Field(0); v.CanInterface() {
408 if h2i, ok := v.Interface().(h2Transport); ok {
409 t.h2transport = h2i
410 return
411 }
412 }
413 }
414
415 if _, ok := t.TLSNextProto["h2"]; ok {
416
417 return
418 }
419 protocols := t.protocols()
420 if !protocols.HTTP2() && !protocols.UnencryptedHTTP2() {
421 return
422 }
423 if omitBundledHTTP2 {
424 return
425 }
426 t2, err := http2configureTransports(t)
427 if err != nil {
428 log.Printf("Error enabling Transport HTTP/2 support: %v", err)
429 return
430 }
431 t.h2transport = t2
432
433
434
435
436
437
438
439 if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 {
440 const h2max = 1<<32 - 1
441 if limit1 >= h2max {
442 t2.MaxHeaderListSize = h2max
443 } else {
444 t2.MaxHeaderListSize = uint32(limit1)
445 }
446 }
447
448
449
450
451
452
453 t.TLSClientConfig.NextProtos = adjustNextProtos(t.TLSClientConfig.NextProtos, protocols)
454 }
455
456 func (t *Transport) protocols() Protocols {
457 if t.Protocols != nil {
458 return *t.Protocols
459 }
460 var p Protocols
461 p.SetHTTP1(true)
462 switch {
463 case t.TLSNextProto != nil:
464
465
466 if t.TLSNextProto["h2"] != nil {
467 p.SetHTTP2(true)
468 }
469 case !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()):
470
471
472
473
474
475
476 case http2client.Value() == "0":
477 default:
478 p.SetHTTP2(true)
479 }
480 return p
481 }
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499 func ProxyFromEnvironment(req *Request) (*url.URL, error) {
500 return envProxyFunc()(req.URL)
501 }
502
503
504
505 func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) {
506 return func(*Request) (*url.URL, error) {
507 return fixedURL, nil
508 }
509 }
510
511
512
513
514 type transportRequest struct {
515 *Request
516 extra Header
517 trace *httptrace.ClientTrace
518
519 ctx context.Context
520 cancel context.CancelCauseFunc
521
522 mu sync.Mutex
523 err error
524 }
525
526 func (tr *transportRequest) extraHeaders() Header {
527 if tr.extra == nil {
528 tr.extra = make(Header)
529 }
530 return tr.extra
531 }
532
533 func (tr *transportRequest) setError(err error) {
534 tr.mu.Lock()
535 if tr.err == nil {
536 tr.err = err
537 }
538 tr.mu.Unlock()
539 }
540
541
542
543 func (t *Transport) useRegisteredProtocol(req *Request) bool {
544 if req.URL.Scheme == "https" && req.requiresHTTP1() {
545
546
547
548
549 return false
550 }
551 return true
552 }
553
554
555
556
557 func (t *Transport) alternateRoundTripper(req *Request) RoundTripper {
558 if !t.useRegisteredProtocol(req) {
559 return nil
560 }
561 altProto, _ := t.altProto.Load().(map[string]RoundTripper)
562 return altProto[req.URL.Scheme]
563 }
564
565 func validateHeaders(hdrs Header) string {
566 for k, vv := range hdrs {
567 if !httpguts.ValidHeaderFieldName(k) {
568 return fmt.Sprintf("field name %q", k)
569 }
570 for _, v := range vv {
571 if !httpguts.ValidHeaderFieldValue(v) {
572
573
574 return fmt.Sprintf("field value for %q", k)
575 }
576 }
577 }
578 return ""
579 }
580
581
582 func (t *Transport) roundTrip(req *Request) (_ *Response, err error) {
583 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
584 ctx := req.Context()
585 trace := httptrace.ContextClientTrace(ctx)
586
587 if req.URL == nil {
588 req.closeBody()
589 return nil, errors.New("http: nil Request.URL")
590 }
591 if req.Header == nil {
592 req.closeBody()
593 return nil, errors.New("http: nil Request.Header")
594 }
595 scheme := req.URL.Scheme
596 isHTTP := scheme == "http" || scheme == "https"
597 if isHTTP {
598
599 if err := validateHeaders(req.Header); err != "" {
600 req.closeBody()
601 return nil, fmt.Errorf("net/http: invalid header %s", err)
602 }
603
604
605 if err := validateHeaders(req.Trailer); err != "" {
606 req.closeBody()
607 return nil, fmt.Errorf("net/http: invalid trailer %s", err)
608 }
609 }
610
611 origReq := req
612 req = setupRewindBody(req)
613
614 if altRT := t.alternateRoundTripper(req); altRT != nil {
615 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
616 return resp, err
617 }
618 var err error
619 req, err = rewindBody(req)
620 if err != nil {
621 return nil, err
622 }
623 }
624 if !isHTTP {
625 req.closeBody()
626 return nil, badStringError("unsupported protocol scheme", scheme)
627 }
628 if req.Method != "" && !validMethod(req.Method) {
629 req.closeBody()
630 return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
631 }
632 if req.URL.Host == "" {
633 req.closeBody()
634 return nil, errors.New("http: no Host in request URL")
635 }
636
637
638
639
640
641
642
643
644
645
646 ctx, cancel := context.WithCancelCause(req.Context())
647
648
649 if origReq.Cancel != nil {
650 go awaitLegacyCancel(ctx, cancel, origReq)
651 }
652
653
654
655
656
657 cancel = t.prepareTransportCancel(origReq, cancel)
658
659 defer func() {
660 if err != nil {
661 cancel(err)
662 }
663 }()
664
665 for {
666 select {
667 case <-ctx.Done():
668 req.closeBody()
669 return nil, context.Cause(ctx)
670 default:
671 }
672
673
674 treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
675 cm, err := t.connectMethodForRequest(treq)
676 if err != nil {
677 req.closeBody()
678 return nil, err
679 }
680
681
682
683
684
685 pconn, err := t.getConn(treq, cm)
686 if err != nil {
687 req.closeBody()
688 return nil, err
689 }
690
691 var resp *Response
692 if pconn.alt != nil {
693
694 resp, err = pconn.alt.RoundTrip(req)
695 } else {
696 resp, err = pconn.roundTrip(treq)
697 }
698 if err == nil {
699 if pconn.alt != nil {
700
701
702
703
704
705 cancel(errRequestDone)
706 }
707 resp.Request = origReq
708 return resp, nil
709 }
710
711
712 if http2isNoCachedConnError(err) {
713 if t.removeIdleConn(pconn) {
714 t.decConnsPerHost(pconn.cacheKey)
715 }
716 } else if !pconn.shouldRetryRequest(req, err) {
717
718
719 if e, ok := err.(nothingWrittenError); ok {
720 err = e.error
721 }
722 if e, ok := err.(transportReadFromServerError); ok {
723 err = e.err
724 }
725 if b, ok := req.Body.(*readTrackingBody); ok && !b.didClose {
726
727
728
729 req.closeBody()
730 }
731 return nil, err
732 }
733 testHookRoundTripRetried()
734
735
736 req, err = rewindBody(req)
737 if err != nil {
738 return nil, err
739 }
740 }
741 }
742
743 func awaitLegacyCancel(ctx context.Context, cancel context.CancelCauseFunc, req *Request) {
744 select {
745 case <-req.Cancel:
746 cancel(errRequestCanceled)
747 case <-ctx.Done():
748 }
749 }
750
751 var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss")
752
753 type readTrackingBody struct {
754 io.ReadCloser
755 didRead bool
756 didClose bool
757 }
758
759 func (r *readTrackingBody) Read(data []byte) (int, error) {
760 r.didRead = true
761 return r.ReadCloser.Read(data)
762 }
763
764 func (r *readTrackingBody) Close() error {
765 r.didClose = true
766 return r.ReadCloser.Close()
767 }
768
769
770
771
772
773 func setupRewindBody(req *Request) *Request {
774 if req.Body == nil || req.Body == NoBody {
775 return req
776 }
777 newReq := *req
778 newReq.Body = &readTrackingBody{ReadCloser: req.Body}
779 return &newReq
780 }
781
782
783
784
785
786 func rewindBody(req *Request) (rewound *Request, err error) {
787 if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) {
788 return req, nil
789 }
790 if !req.Body.(*readTrackingBody).didClose {
791 req.closeBody()
792 }
793 if req.GetBody == nil {
794 return nil, errCannotRewind
795 }
796 body, err := req.GetBody()
797 if err != nil {
798 return nil, err
799 }
800 newReq := *req
801 newReq.Body = &readTrackingBody{ReadCloser: body}
802 return &newReq, nil
803 }
804
805
806
807
808 func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool {
809 if http2isNoCachedConnError(err) {
810
811
812
813
814
815
816 return true
817 }
818 if err == errMissingHost {
819
820 return false
821 }
822 if !pc.isReused() {
823
824
825
826
827
828
829
830 return false
831 }
832 if _, ok := err.(nothingWrittenError); ok {
833
834
835 return req.outgoingLength() == 0 || req.GetBody != nil
836 }
837 if !req.isReplayable() {
838
839 return false
840 }
841 if _, ok := err.(transportReadFromServerError); ok {
842
843
844 return true
845 }
846 if err == errServerClosedIdle {
847
848
849
850 return true
851 }
852 return false
853 }
854
855
856 var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol")
857
858
859
860
861
862
863
864
865
866
867
868 func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
869 t.altMu.Lock()
870 defer t.altMu.Unlock()
871 oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
872 if _, exists := oldMap[scheme]; exists {
873 panic("protocol " + scheme + " already registered")
874 }
875 newMap := maps.Clone(oldMap)
876 if newMap == nil {
877 newMap = make(map[string]RoundTripper)
878 }
879 newMap[scheme] = rt
880 t.altProto.Store(newMap)
881 }
882
883
884
885
886
887 func (t *Transport) CloseIdleConnections() {
888 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
889 t.idleMu.Lock()
890 m := t.idleConn
891 t.idleConn = nil
892 t.closeIdle = true
893 t.idleLRU = connLRU{}
894 t.idleMu.Unlock()
895 for _, conns := range m {
896 for _, pconn := range conns {
897 pconn.close(errCloseIdleConns)
898 }
899 }
900 t.connsPerHostMu.Lock()
901 t.dialsInProgress.all(func(w *wantConn) {
902 if w.cancelCtx != nil && !w.waiting() {
903 w.cancelCtx()
904 }
905 })
906 t.connsPerHostMu.Unlock()
907 if t2 := t.h2transport; t2 != nil {
908 t2.CloseIdleConnections()
909 }
910 }
911
912
913 func (t *Transport) prepareTransportCancel(req *Request, origCancel context.CancelCauseFunc) context.CancelCauseFunc {
914
915
916
917
918
919
920 cancel := func(err error) {
921 origCancel(err)
922 t.reqMu.Lock()
923 delete(t.reqCanceler, req)
924 t.reqMu.Unlock()
925 }
926 t.reqMu.Lock()
927 if t.reqCanceler == nil {
928 t.reqCanceler = make(map[*Request]context.CancelCauseFunc)
929 }
930 t.reqCanceler[req] = cancel
931 t.reqMu.Unlock()
932 return cancel
933 }
934
935
936
937
938
939
940
941 func (t *Transport) CancelRequest(req *Request) {
942 t.reqMu.Lock()
943 cancel := t.reqCanceler[req]
944 t.reqMu.Unlock()
945 if cancel != nil {
946 cancel(errRequestCanceled)
947 }
948 }
949
950
951
952
953
954 var (
955 envProxyOnce sync.Once
956 envProxyFuncValue func(*url.URL) (*url.URL, error)
957 )
958
959
960
961 func envProxyFunc() func(*url.URL) (*url.URL, error) {
962 envProxyOnce.Do(func() {
963 envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc()
964 })
965 return envProxyFuncValue
966 }
967
968
969 func resetProxyConfig() {
970 envProxyOnce = sync.Once{}
971 envProxyFuncValue = nil
972 }
973
974 func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) {
975 cm.targetScheme = treq.URL.Scheme
976 cm.targetAddr = canonicalAddr(treq.URL)
977 if t.Proxy != nil {
978 cm.proxyURL, err = t.Proxy(treq.Request)
979 }
980 cm.onlyH1 = treq.requiresHTTP1()
981 return cm, err
982 }
983
984
985
986 func (cm *connectMethod) proxyAuth() string {
987 if cm.proxyURL == nil {
988 return ""
989 }
990 if u := cm.proxyURL.User; u != nil {
991 username := u.Username()
992 password, _ := u.Password()
993 return "Basic " + basicAuth(username, password)
994 }
995 return ""
996 }
997
998
999 var (
1000 errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled")
1001 errConnBroken = errors.New("http: putIdleConn: connection is in bad state")
1002 errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called")
1003 errTooManyIdle = errors.New("http: putIdleConn: too many idle connections")
1004 errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host")
1005 errCloseIdleConns = errors.New("http: CloseIdleConnections called")
1006 errReadLoopExiting = errors.New("http: persistConn.readLoop exiting")
1007 errIdleConnTimeout = errors.New("http: idle connection timeout")
1008
1009
1010
1011
1012
1013 errServerClosedIdle = errors.New("http: server closed idle connection")
1014 )
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 type transportReadFromServerError struct {
1025 err error
1026 }
1027
1028 func (e transportReadFromServerError) Unwrap() error { return e.err }
1029
1030 func (e transportReadFromServerError) Error() string {
1031 return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err)
1032 }
1033
1034 func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
1035 if err := t.tryPutIdleConn(pconn); err != nil {
1036 pconn.close(err)
1037 }
1038 }
1039
1040 func (t *Transport) maxIdleConnsPerHost() int {
1041 if v := t.MaxIdleConnsPerHost; v != 0 {
1042 return v
1043 }
1044 return DefaultMaxIdleConnsPerHost
1045 }
1046
1047
1048
1049
1050
1051
1052 func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
1053 if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
1054 return errKeepAlivesDisabled
1055 }
1056 if pconn.isBroken() {
1057 return errConnBroken
1058 }
1059 pconn.markReused()
1060
1061 t.idleMu.Lock()
1062 defer t.idleMu.Unlock()
1063
1064
1065
1066
1067 if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
1068 return nil
1069 }
1070
1071
1072
1073
1074
1075 key := pconn.cacheKey
1076 if q, ok := t.idleConnWait[key]; ok {
1077 done := false
1078 if pconn.alt == nil {
1079
1080
1081 for q.len() > 0 {
1082 w := q.popFront()
1083 if w.tryDeliver(pconn, nil, time.Time{}) {
1084 done = true
1085 break
1086 }
1087 }
1088 } else {
1089
1090
1091
1092
1093 for q.len() > 0 {
1094 w := q.popFront()
1095 w.tryDeliver(pconn, nil, time.Time{})
1096 }
1097 }
1098 if q.len() == 0 {
1099 delete(t.idleConnWait, key)
1100 } else {
1101 t.idleConnWait[key] = q
1102 }
1103 if done {
1104 return nil
1105 }
1106 }
1107
1108 if t.closeIdle {
1109 return errCloseIdle
1110 }
1111 if t.idleConn == nil {
1112 t.idleConn = make(map[connectMethodKey][]*persistConn)
1113 }
1114 idles := t.idleConn[key]
1115 if len(idles) >= t.maxIdleConnsPerHost() {
1116 return errTooManyIdleHost
1117 }
1118 for _, exist := range idles {
1119 if exist == pconn {
1120 log.Fatalf("dup idle pconn %p in freelist", pconn)
1121 }
1122 }
1123 t.idleConn[key] = append(idles, pconn)
1124 t.idleLRU.add(pconn)
1125 if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
1126 oldest := t.idleLRU.removeOldest()
1127 oldest.close(errTooManyIdle)
1128 t.removeIdleConnLocked(oldest)
1129 }
1130
1131
1132
1133
1134 if t.IdleConnTimeout > 0 && pconn.alt == nil {
1135 if pconn.idleTimer != nil {
1136 pconn.idleTimer.Reset(t.IdleConnTimeout)
1137 } else {
1138 pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
1139 }
1140 }
1141 pconn.idleAt = time.Now()
1142 return nil
1143 }
1144
1145
1146
1147
1148 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
1149 if t.DisableKeepAlives {
1150 return false
1151 }
1152
1153 t.idleMu.Lock()
1154 defer t.idleMu.Unlock()
1155
1156
1157
1158 t.closeIdle = false
1159
1160 if w == nil {
1161
1162 return false
1163 }
1164
1165
1166
1167
1168 var oldTime time.Time
1169 if t.IdleConnTimeout > 0 {
1170 oldTime = time.Now().Add(-t.IdleConnTimeout)
1171 }
1172
1173
1174 if list, ok := t.idleConn[w.key]; ok {
1175 stop := false
1176 delivered := false
1177 for len(list) > 0 && !stop {
1178 pconn := list[len(list)-1]
1179
1180
1181
1182
1183 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
1184 if tooOld {
1185
1186
1187
1188 go pconn.closeConnIfStillIdle()
1189 }
1190 if pconn.isBroken() || tooOld {
1191
1192
1193
1194
1195
1196 list = list[:len(list)-1]
1197 continue
1198 }
1199 delivered = w.tryDeliver(pconn, nil, pconn.idleAt)
1200 if delivered {
1201 if pconn.alt != nil {
1202
1203
1204 } else {
1205
1206
1207 t.idleLRU.remove(pconn)
1208 list = list[:len(list)-1]
1209 }
1210 }
1211 stop = true
1212 }
1213 if len(list) > 0 {
1214 t.idleConn[w.key] = list
1215 } else {
1216 delete(t.idleConn, w.key)
1217 }
1218 if stop {
1219 return delivered
1220 }
1221 }
1222
1223
1224 if t.idleConnWait == nil {
1225 t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
1226 }
1227 q := t.idleConnWait[w.key]
1228 q.cleanFrontNotWaiting()
1229 q.pushBack(w)
1230 t.idleConnWait[w.key] = q
1231 return false
1232 }
1233
1234
1235 func (t *Transport) removeIdleConn(pconn *persistConn) bool {
1236 t.idleMu.Lock()
1237 defer t.idleMu.Unlock()
1238 return t.removeIdleConnLocked(pconn)
1239 }
1240
1241
1242 func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool {
1243 if pconn.idleTimer != nil {
1244 pconn.idleTimer.Stop()
1245 }
1246 t.idleLRU.remove(pconn)
1247 key := pconn.cacheKey
1248 pconns := t.idleConn[key]
1249 var removed bool
1250 switch len(pconns) {
1251 case 0:
1252
1253 case 1:
1254 if pconns[0] == pconn {
1255 delete(t.idleConn, key)
1256 removed = true
1257 }
1258 default:
1259 for i, v := range pconns {
1260 if v != pconn {
1261 continue
1262 }
1263
1264
1265 copy(pconns[i:], pconns[i+1:])
1266 t.idleConn[key] = pconns[:len(pconns)-1]
1267 removed = true
1268 break
1269 }
1270 }
1271 return removed
1272 }
1273
1274 var zeroDialer net.Dialer
1275
1276 func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
1277 if t.DialContext != nil {
1278 c, err := t.DialContext(ctx, network, addr)
1279 if c == nil && err == nil {
1280 err = errors.New("net/http: Transport.DialContext hook returned (nil, nil)")
1281 }
1282 return c, err
1283 }
1284 if t.Dial != nil {
1285 c, err := t.Dial(network, addr)
1286 if c == nil && err == nil {
1287 err = errors.New("net/http: Transport.Dial hook returned (nil, nil)")
1288 }
1289 return c, err
1290 }
1291 return zeroDialer.DialContext(ctx, network, addr)
1292 }
1293
1294
1295
1296
1297
1298
1299
1300 type wantConn struct {
1301 cm connectMethod
1302 key connectMethodKey
1303
1304
1305
1306
1307 beforeDial func()
1308 afterDial func()
1309
1310 mu sync.Mutex
1311 ctx context.Context
1312 cancelCtx context.CancelFunc
1313 done bool
1314 result chan connOrError
1315 }
1316
1317 type connOrError struct {
1318 pc *persistConn
1319 err error
1320 idleAt time.Time
1321 }
1322
1323
1324 func (w *wantConn) waiting() bool {
1325 w.mu.Lock()
1326 defer w.mu.Unlock()
1327
1328 return !w.done
1329 }
1330
1331
1332 func (w *wantConn) getCtxForDial() context.Context {
1333 w.mu.Lock()
1334 defer w.mu.Unlock()
1335
1336 return w.ctx
1337 }
1338
1339
1340 func (w *wantConn) tryDeliver(pc *persistConn, err error, idleAt time.Time) bool {
1341 w.mu.Lock()
1342 defer w.mu.Unlock()
1343
1344 if w.done {
1345 return false
1346 }
1347 if (pc == nil) == (err == nil) {
1348 panic("net/http: internal error: misuse of tryDeliver")
1349 }
1350 w.ctx = nil
1351 w.done = true
1352
1353 w.result <- connOrError{pc: pc, err: err, idleAt: idleAt}
1354 close(w.result)
1355
1356 return true
1357 }
1358
1359
1360
1361 func (w *wantConn) cancel(t *Transport, err error) {
1362 w.mu.Lock()
1363 var pc *persistConn
1364 if w.done {
1365 if r, ok := <-w.result; ok {
1366 pc = r.pc
1367 }
1368 } else {
1369 close(w.result)
1370 }
1371 w.ctx = nil
1372 w.done = true
1373 w.mu.Unlock()
1374
1375 if pc != nil {
1376 t.putOrCloseIdleConn(pc)
1377 }
1378 }
1379
1380
1381 type wantConnQueue struct {
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392 head []*wantConn
1393 headPos int
1394 tail []*wantConn
1395 }
1396
1397
1398 func (q *wantConnQueue) len() int {
1399 return len(q.head) - q.headPos + len(q.tail)
1400 }
1401
1402
1403 func (q *wantConnQueue) pushBack(w *wantConn) {
1404 q.tail = append(q.tail, w)
1405 }
1406
1407
1408 func (q *wantConnQueue) popFront() *wantConn {
1409 if q.headPos >= len(q.head) {
1410 if len(q.tail) == 0 {
1411 return nil
1412 }
1413
1414 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1415 }
1416 w := q.head[q.headPos]
1417 q.head[q.headPos] = nil
1418 q.headPos++
1419 return w
1420 }
1421
1422
1423 func (q *wantConnQueue) peekFront() *wantConn {
1424 if q.headPos < len(q.head) {
1425 return q.head[q.headPos]
1426 }
1427 if len(q.tail) > 0 {
1428 return q.tail[0]
1429 }
1430 return nil
1431 }
1432
1433
1434
1435 func (q *wantConnQueue) cleanFrontNotWaiting() (cleaned bool) {
1436 for {
1437 w := q.peekFront()
1438 if w == nil || w.waiting() {
1439 return cleaned
1440 }
1441 q.popFront()
1442 cleaned = true
1443 }
1444 }
1445
1446
1447 func (q *wantConnQueue) cleanFrontCanceled() {
1448 for {
1449 w := q.peekFront()
1450 if w == nil || w.cancelCtx != nil {
1451 return
1452 }
1453 q.popFront()
1454 }
1455 }
1456
1457
1458
1459 func (q *wantConnQueue) all(f func(*wantConn)) {
1460 for _, w := range q.head[q.headPos:] {
1461 f(w)
1462 }
1463 for _, w := range q.tail {
1464 f(w)
1465 }
1466 }
1467
1468 func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) {
1469 if t.DialTLSContext != nil {
1470 conn, err = t.DialTLSContext(ctx, network, addr)
1471 } else {
1472 conn, err = t.DialTLS(network, addr)
1473 }
1474 if conn == nil && err == nil {
1475 err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)")
1476 }
1477 return
1478 }
1479
1480
1481
1482
1483
1484 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (_ *persistConn, err error) {
1485 req := treq.Request
1486 trace := treq.trace
1487 ctx := req.Context()
1488 if trace != nil && trace.GetConn != nil {
1489 trace.GetConn(cm.addr())
1490 }
1491
1492
1493
1494
1495
1496
1497 dialCtx, dialCancel := context.WithCancel(context.WithoutCancel(ctx))
1498
1499 w := &wantConn{
1500 cm: cm,
1501 key: cm.key(),
1502 ctx: dialCtx,
1503 cancelCtx: dialCancel,
1504 result: make(chan connOrError, 1),
1505 beforeDial: testHookPrePendingDial,
1506 afterDial: testHookPostPendingDial,
1507 }
1508 defer func() {
1509 if err != nil {
1510 w.cancel(t, err)
1511 }
1512 }()
1513
1514
1515 if delivered := t.queueForIdleConn(w); !delivered {
1516 t.queueForDial(w)
1517 }
1518
1519
1520 select {
1521 case r := <-w.result:
1522
1523
1524 if r.pc != nil && r.pc.alt == nil && trace != nil && trace.GotConn != nil {
1525 info := httptrace.GotConnInfo{
1526 Conn: r.pc.conn,
1527 Reused: r.pc.isReused(),
1528 }
1529 if !r.idleAt.IsZero() {
1530 info.WasIdle = true
1531 info.IdleTime = time.Since(r.idleAt)
1532 }
1533 trace.GotConn(info)
1534 }
1535 if r.err != nil {
1536
1537
1538
1539 select {
1540 case <-treq.ctx.Done():
1541 err := context.Cause(treq.ctx)
1542 if err == errRequestCanceled {
1543 err = errRequestCanceledConn
1544 }
1545 return nil, err
1546 default:
1547
1548 }
1549 }
1550 return r.pc, r.err
1551 case <-treq.ctx.Done():
1552 err := context.Cause(treq.ctx)
1553 if err == errRequestCanceled {
1554 err = errRequestCanceledConn
1555 }
1556 return nil, err
1557 }
1558 }
1559
1560
1561
1562 func (t *Transport) queueForDial(w *wantConn) {
1563 w.beforeDial()
1564
1565 t.connsPerHostMu.Lock()
1566 defer t.connsPerHostMu.Unlock()
1567
1568 if t.MaxConnsPerHost <= 0 {
1569 t.startDialConnForLocked(w)
1570 return
1571 }
1572
1573 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
1574 if t.connsPerHost == nil {
1575 t.connsPerHost = make(map[connectMethodKey]int)
1576 }
1577 t.connsPerHost[w.key] = n + 1
1578 t.startDialConnForLocked(w)
1579 return
1580 }
1581
1582 if t.connsPerHostWait == nil {
1583 t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
1584 }
1585 q := t.connsPerHostWait[w.key]
1586 q.cleanFrontNotWaiting()
1587 q.pushBack(w)
1588 t.connsPerHostWait[w.key] = q
1589 }
1590
1591
1592
1593 func (t *Transport) startDialConnForLocked(w *wantConn) {
1594 t.dialsInProgress.cleanFrontCanceled()
1595 t.dialsInProgress.pushBack(w)
1596 go func() {
1597 t.dialConnFor(w)
1598 t.connsPerHostMu.Lock()
1599 defer t.connsPerHostMu.Unlock()
1600 w.cancelCtx = nil
1601 }()
1602 }
1603
1604
1605
1606
1607 func (t *Transport) dialConnFor(w *wantConn) {
1608 defer w.afterDial()
1609 ctx := w.getCtxForDial()
1610 if ctx == nil {
1611 t.decConnsPerHost(w.key)
1612 return
1613 }
1614
1615 pc, err := t.dialConn(ctx, w.cm)
1616 delivered := w.tryDeliver(pc, err, time.Time{})
1617 if err == nil && (!delivered || pc.alt != nil) {
1618
1619
1620
1621 t.putOrCloseIdleConn(pc)
1622 }
1623 if err != nil {
1624 t.decConnsPerHost(w.key)
1625 }
1626 }
1627
1628
1629
1630 func (t *Transport) decConnsPerHost(key connectMethodKey) {
1631 if t.MaxConnsPerHost <= 0 {
1632 return
1633 }
1634
1635 t.connsPerHostMu.Lock()
1636 defer t.connsPerHostMu.Unlock()
1637 n := t.connsPerHost[key]
1638 if n == 0 {
1639
1640
1641 panic("net/http: internal error: connCount underflow")
1642 }
1643
1644
1645
1646
1647
1648 if q := t.connsPerHostWait[key]; q.len() > 0 {
1649 done := false
1650 for q.len() > 0 {
1651 w := q.popFront()
1652 if w.waiting() {
1653 t.startDialConnForLocked(w)
1654 done = true
1655 break
1656 }
1657 }
1658 if q.len() == 0 {
1659 delete(t.connsPerHostWait, key)
1660 } else {
1661
1662
1663 t.connsPerHostWait[key] = q
1664 }
1665 if done {
1666 return
1667 }
1668 }
1669
1670
1671 if n--; n == 0 {
1672 delete(t.connsPerHost, key)
1673 } else {
1674 t.connsPerHost[key] = n
1675 }
1676 }
1677
1678
1679
1680
1681 func (pconn *persistConn) addTLS(ctx context.Context, name string, trace *httptrace.ClientTrace) error {
1682
1683 cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
1684 if cfg.ServerName == "" {
1685 cfg.ServerName = name
1686 }
1687 if pconn.cacheKey.onlyH1 {
1688 cfg.NextProtos = nil
1689 }
1690 plainConn := pconn.conn
1691 tlsConn := tls.Client(plainConn, cfg)
1692 errc := make(chan error, 2)
1693 var timer *time.Timer
1694 if d := pconn.t.TLSHandshakeTimeout; d != 0 {
1695 timer = time.AfterFunc(d, func() {
1696 errc <- tlsHandshakeTimeoutError{}
1697 })
1698 }
1699 go func() {
1700 if trace != nil && trace.TLSHandshakeStart != nil {
1701 trace.TLSHandshakeStart()
1702 }
1703 err := tlsConn.HandshakeContext(ctx)
1704 if timer != nil {
1705 timer.Stop()
1706 }
1707 errc <- err
1708 }()
1709 if err := <-errc; err != nil {
1710 plainConn.Close()
1711 if err == (tlsHandshakeTimeoutError{}) {
1712
1713
1714 <-errc
1715 }
1716 if trace != nil && trace.TLSHandshakeDone != nil {
1717 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1718 }
1719 return err
1720 }
1721 cs := tlsConn.ConnectionState()
1722 if trace != nil && trace.TLSHandshakeDone != nil {
1723 trace.TLSHandshakeDone(cs, nil)
1724 }
1725 pconn.tlsState = &cs
1726 pconn.conn = tlsConn
1727 return nil
1728 }
1729
1730 type erringRoundTripper interface {
1731 RoundTripErr() error
1732 }
1733
1734 var testHookProxyConnectTimeout = context.WithTimeout
1735
1736 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
1737 pconn = &persistConn{
1738 t: t,
1739 cacheKey: cm.key(),
1740 reqch: make(chan requestAndChan, 1),
1741 writech: make(chan writeRequest, 1),
1742 closech: make(chan struct{}),
1743 writeErrCh: make(chan error, 1),
1744 writeLoopDone: make(chan struct{}),
1745 }
1746 trace := httptrace.ContextClientTrace(ctx)
1747 wrapErr := func(err error) error {
1748 if cm.proxyURL != nil {
1749
1750 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
1751 }
1752 return err
1753 }
1754 if cm.scheme() == "https" && t.hasCustomTLSDialer() {
1755 var err error
1756 pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
1757 if err != nil {
1758 return nil, wrapErr(err)
1759 }
1760 if tc, ok := pconn.conn.(*tls.Conn); ok {
1761
1762
1763 if trace != nil && trace.TLSHandshakeStart != nil {
1764 trace.TLSHandshakeStart()
1765 }
1766 if err := tc.HandshakeContext(ctx); err != nil {
1767 go pconn.conn.Close()
1768 if trace != nil && trace.TLSHandshakeDone != nil {
1769 trace.TLSHandshakeDone(tls.ConnectionState{}, err)
1770 }
1771 return nil, err
1772 }
1773 cs := tc.ConnectionState()
1774 if trace != nil && trace.TLSHandshakeDone != nil {
1775 trace.TLSHandshakeDone(cs, nil)
1776 }
1777 pconn.tlsState = &cs
1778 }
1779 } else {
1780 conn, err := t.dial(ctx, "tcp", cm.addr())
1781 if err != nil {
1782 return nil, wrapErr(err)
1783 }
1784 pconn.conn = conn
1785 if cm.scheme() == "https" {
1786 var firstTLSHost string
1787 if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
1788 return nil, wrapErr(err)
1789 }
1790 if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil {
1791 return nil, wrapErr(err)
1792 }
1793 }
1794 }
1795
1796
1797 switch {
1798 case cm.proxyURL == nil:
1799
1800 case cm.proxyURL.Scheme == "socks5" || cm.proxyURL.Scheme == "socks5h":
1801 conn := pconn.conn
1802 d := socksNewDialer("tcp", conn.RemoteAddr().String())
1803 if u := cm.proxyURL.User; u != nil {
1804 auth := &socksUsernamePassword{
1805 Username: u.Username(),
1806 }
1807 auth.Password, _ = u.Password()
1808 d.AuthMethods = []socksAuthMethod{
1809 socksAuthMethodNotRequired,
1810 socksAuthMethodUsernamePassword,
1811 }
1812 d.Authenticate = auth.Authenticate
1813 }
1814 if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
1815 conn.Close()
1816 return nil, err
1817 }
1818 case cm.targetScheme == "http":
1819 pconn.isProxy = true
1820 if pa := cm.proxyAuth(); pa != "" {
1821 pconn.mutateHeaderFunc = func(h Header) {
1822 h.Set("Proxy-Authorization", pa)
1823 }
1824 }
1825 case cm.targetScheme == "https":
1826 conn := pconn.conn
1827 var hdr Header
1828 if t.GetProxyConnectHeader != nil {
1829 var err error
1830 hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr)
1831 if err != nil {
1832 conn.Close()
1833 return nil, err
1834 }
1835 } else {
1836 hdr = t.ProxyConnectHeader
1837 }
1838 if hdr == nil {
1839 hdr = make(Header)
1840 }
1841 if pa := cm.proxyAuth(); pa != "" {
1842 hdr = hdr.Clone()
1843 hdr.Set("Proxy-Authorization", pa)
1844 }
1845 connectReq := &Request{
1846 Method: "CONNECT",
1847 URL: &url.URL{Opaque: cm.targetAddr},
1848 Host: cm.targetAddr,
1849 Header: hdr,
1850 }
1851
1852
1853
1854
1855 connectCtx, cancel := testHookProxyConnectTimeout(ctx, 1*time.Minute)
1856 defer cancel()
1857
1858 didReadResponse := make(chan struct{})
1859 var (
1860 resp *Response
1861 err error
1862 )
1863
1864 go func() {
1865 defer close(didReadResponse)
1866 err = connectReq.Write(conn)
1867 if err != nil {
1868 return
1869 }
1870
1871
1872 br := bufio.NewReader(conn)
1873 resp, err = ReadResponse(br, connectReq)
1874 }()
1875 select {
1876 case <-connectCtx.Done():
1877 conn.Close()
1878 <-didReadResponse
1879 return nil, connectCtx.Err()
1880 case <-didReadResponse:
1881
1882 }
1883 if err != nil {
1884 conn.Close()
1885 return nil, err
1886 }
1887
1888 if t.OnProxyConnectResponse != nil {
1889 err = t.OnProxyConnectResponse(ctx, cm.proxyURL, connectReq, resp)
1890 if err != nil {
1891 conn.Close()
1892 return nil, err
1893 }
1894 }
1895
1896 if resp.StatusCode != 200 {
1897 _, text, ok := strings.Cut(resp.Status, " ")
1898 conn.Close()
1899 if !ok {
1900 return nil, errors.New("unknown status code")
1901 }
1902 return nil, errors.New(text)
1903 }
1904 }
1905
1906 if cm.proxyURL != nil && cm.targetScheme == "https" {
1907 if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil {
1908 return nil, err
1909 }
1910 }
1911
1912
1913 unencryptedHTTP2 := pconn.tlsState == nil &&
1914 t.Protocols != nil &&
1915 t.Protocols.UnencryptedHTTP2() &&
1916 !t.Protocols.HTTP1()
1917 if unencryptedHTTP2 {
1918 next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
1919 if !ok {
1920 return nil, errors.New("http: Transport does not support unencrypted HTTP/2")
1921 }
1922 alt := next(cm.targetAddr, unencryptedTLSConn(pconn.conn))
1923 if e, ok := alt.(erringRoundTripper); ok {
1924
1925 return nil, e.RoundTripErr()
1926 }
1927 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1928 }
1929
1930 if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
1931 if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
1932 alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))
1933 if e, ok := alt.(erringRoundTripper); ok {
1934
1935 return nil, e.RoundTripErr()
1936 }
1937 return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil
1938 }
1939 }
1940
1941 pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
1942 pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())
1943
1944 go pconn.readLoop()
1945 go pconn.writeLoop()
1946 return pconn, nil
1947 }
1948
1949
1950
1951
1952
1953
1954
1955 type persistConnWriter struct {
1956 pc *persistConn
1957 }
1958
1959 func (w persistConnWriter) Write(p []byte) (n int, err error) {
1960 n, err = w.pc.conn.Write(p)
1961 w.pc.nwrite += int64(n)
1962 return
1963 }
1964
1965
1966
1967
1968 func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) {
1969 n, err = io.Copy(w.pc.conn, r)
1970 w.pc.nwrite += n
1971 return
1972 }
1973
1974 var _ io.ReaderFrom = (*persistConnWriter)(nil)
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992 type connectMethod struct {
1993 _ incomparable
1994 proxyURL *url.URL
1995 targetScheme string
1996
1997
1998
1999 targetAddr string
2000 onlyH1 bool
2001 }
2002
2003 func (cm *connectMethod) key() connectMethodKey {
2004 proxyStr := ""
2005 targetAddr := cm.targetAddr
2006 if cm.proxyURL != nil {
2007 proxyStr = cm.proxyURL.String()
2008 if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" {
2009 targetAddr = ""
2010 }
2011 }
2012 return connectMethodKey{
2013 proxy: proxyStr,
2014 scheme: cm.targetScheme,
2015 addr: targetAddr,
2016 onlyH1: cm.onlyH1,
2017 }
2018 }
2019
2020
2021 func (cm *connectMethod) scheme() string {
2022 if cm.proxyURL != nil {
2023 return cm.proxyURL.Scheme
2024 }
2025 return cm.targetScheme
2026 }
2027
2028
2029 func (cm *connectMethod) addr() string {
2030 if cm.proxyURL != nil {
2031 return canonicalAddr(cm.proxyURL)
2032 }
2033 return cm.targetAddr
2034 }
2035
2036
2037
2038 func (cm *connectMethod) tlsHost() string {
2039 h := cm.targetAddr
2040 if hasPort(h) {
2041 h = h[:strings.LastIndex(h, ":")]
2042 }
2043 return h
2044 }
2045
2046
2047
2048
2049 type connectMethodKey struct {
2050 proxy, scheme, addr string
2051 onlyH1 bool
2052 }
2053
2054 func (k connectMethodKey) String() string {
2055
2056 var h1 string
2057 if k.onlyH1 {
2058 h1 = ",h1"
2059 }
2060 return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr)
2061 }
2062
2063
2064
2065 type persistConn struct {
2066
2067
2068
2069 alt RoundTripper
2070
2071 t *Transport
2072 cacheKey connectMethodKey
2073 conn net.Conn
2074 tlsState *tls.ConnectionState
2075 br *bufio.Reader
2076 bw *bufio.Writer
2077 nwrite int64
2078 reqch chan requestAndChan
2079 writech chan writeRequest
2080 closech chan struct{}
2081 isProxy bool
2082 sawEOF bool
2083 readLimit int64
2084
2085
2086
2087
2088 writeErrCh chan error
2089
2090 writeLoopDone chan struct{}
2091
2092
2093 idleAt time.Time
2094 idleTimer *time.Timer
2095
2096 mu sync.Mutex
2097 numExpectedResponses int
2098 closed error
2099 canceledErr error
2100 broken bool
2101 reused bool
2102
2103
2104
2105 mutateHeaderFunc func(Header)
2106 }
2107
2108 func (pc *persistConn) maxHeaderResponseSize() int64 {
2109 if v := pc.t.MaxResponseHeaderBytes; v != 0 {
2110 return v
2111 }
2112 return 10 << 20
2113 }
2114
2115 func (pc *persistConn) Read(p []byte) (n int, err error) {
2116 if pc.readLimit <= 0 {
2117 return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize())
2118 }
2119 if int64(len(p)) > pc.readLimit {
2120 p = p[:pc.readLimit]
2121 }
2122 n, err = pc.conn.Read(p)
2123 if err == io.EOF {
2124 pc.sawEOF = true
2125 }
2126 pc.readLimit -= int64(n)
2127 return
2128 }
2129
2130
2131 func (pc *persistConn) isBroken() bool {
2132 pc.mu.Lock()
2133 b := pc.closed != nil
2134 pc.mu.Unlock()
2135 return b
2136 }
2137
2138
2139
2140 func (pc *persistConn) canceled() error {
2141 pc.mu.Lock()
2142 defer pc.mu.Unlock()
2143 return pc.canceledErr
2144 }
2145
2146
2147 func (pc *persistConn) isReused() bool {
2148 pc.mu.Lock()
2149 r := pc.reused
2150 pc.mu.Unlock()
2151 return r
2152 }
2153
2154 func (pc *persistConn) cancelRequest(err error) {
2155 pc.mu.Lock()
2156 defer pc.mu.Unlock()
2157 pc.canceledErr = err
2158 pc.closeLocked(errRequestCanceled)
2159 }
2160
2161
2162
2163
2164 func (pc *persistConn) closeConnIfStillIdle() {
2165 t := pc.t
2166 t.idleMu.Lock()
2167 defer t.idleMu.Unlock()
2168 if _, ok := t.idleLRU.m[pc]; !ok {
2169
2170 return
2171 }
2172 t.removeIdleConnLocked(pc)
2173 pc.close(errIdleConnTimeout)
2174 }
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184 func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
2185 if err == nil {
2186 return nil
2187 }
2188
2189
2190
2191
2192
2193
2194
2195
2196 <-pc.writeLoopDone
2197
2198
2199
2200
2201 if cerr := pc.canceled(); cerr != nil {
2202 return cerr
2203 }
2204
2205
2206 req.mu.Lock()
2207 reqErr := req.err
2208 req.mu.Unlock()
2209 if reqErr != nil {
2210 return reqErr
2211 }
2212
2213 if err == errServerClosedIdle {
2214
2215 return err
2216 }
2217
2218 if _, ok := err.(transportReadFromServerError); ok {
2219 if pc.nwrite == startBytesWritten {
2220 return nothingWrittenError{err}
2221 }
2222
2223 return err
2224 }
2225 if pc.isBroken() {
2226 if pc.nwrite == startBytesWritten {
2227 return nothingWrittenError{err}
2228 }
2229 return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %w", err)
2230 }
2231 return err
2232 }
2233
2234
2235
2236
2237 var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
2238
2239 func (pc *persistConn) readLoop() {
2240 closeErr := errReadLoopExiting
2241 defer func() {
2242 pc.close(closeErr)
2243 pc.t.removeIdleConn(pc)
2244 }()
2245
2246 tryPutIdleConn := func(treq *transportRequest) bool {
2247 trace := treq.trace
2248 if err := pc.t.tryPutIdleConn(pc); err != nil {
2249 closeErr = err
2250 if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
2251 trace.PutIdleConn(err)
2252 }
2253 return false
2254 }
2255 if trace != nil && trace.PutIdleConn != nil {
2256 trace.PutIdleConn(nil)
2257 }
2258 return true
2259 }
2260
2261
2262
2263
2264 eofc := make(chan struct{})
2265 defer close(eofc)
2266
2267
2268 testHookMu.Lock()
2269 testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
2270 testHookMu.Unlock()
2271
2272 alive := true
2273 for alive {
2274 pc.readLimit = pc.maxHeaderResponseSize()
2275 _, err := pc.br.Peek(1)
2276
2277 pc.mu.Lock()
2278 if pc.numExpectedResponses == 0 {
2279 pc.readLoopPeekFailLocked(err)
2280 pc.mu.Unlock()
2281 return
2282 }
2283 pc.mu.Unlock()
2284
2285 rc := <-pc.reqch
2286 trace := rc.treq.trace
2287
2288 var resp *Response
2289 if err == nil {
2290 resp, err = pc.readResponse(rc, trace)
2291 } else {
2292 err = transportReadFromServerError{err}
2293 closeErr = err
2294 }
2295
2296 if err != nil {
2297 if pc.readLimit <= 0 {
2298 err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
2299 }
2300
2301 select {
2302 case rc.ch <- responseAndError{err: err}:
2303 case <-rc.callerGone:
2304 return
2305 }
2306 return
2307 }
2308 pc.readLimit = maxInt64
2309
2310 pc.mu.Lock()
2311 pc.numExpectedResponses--
2312 pc.mu.Unlock()
2313
2314 bodyWritable := resp.bodyIsWritable()
2315 hasBody := rc.treq.Request.Method != "HEAD" && resp.ContentLength != 0
2316
2317 if resp.Close || rc.treq.Request.Close || resp.StatusCode <= 199 || bodyWritable {
2318
2319
2320
2321 alive = false
2322 }
2323
2324 if !hasBody || bodyWritable {
2325
2326
2327
2328
2329
2330 alive = alive &&
2331 !pc.sawEOF &&
2332 pc.wroteRequest() &&
2333 tryPutIdleConn(rc.treq)
2334
2335 if bodyWritable {
2336 closeErr = errCallerOwnsConn
2337 }
2338
2339 select {
2340 case rc.ch <- responseAndError{res: resp}:
2341 case <-rc.callerGone:
2342 return
2343 }
2344
2345 rc.treq.cancel(errRequestDone)
2346
2347
2348
2349
2350 testHookReadLoopBeforeNextRead()
2351 continue
2352 }
2353
2354 waitForBodyRead := make(chan bool, 2)
2355 body := &bodyEOFSignal{
2356 body: resp.Body,
2357 earlyCloseFn: func() error {
2358 waitForBodyRead <- false
2359 <-eofc
2360 return nil
2361
2362 },
2363 fn: func(err error) error {
2364 isEOF := err == io.EOF
2365 waitForBodyRead <- isEOF
2366 if isEOF {
2367 <-eofc
2368 } else if err != nil {
2369 if cerr := pc.canceled(); cerr != nil {
2370 return cerr
2371 }
2372 }
2373 return err
2374 },
2375 }
2376
2377 resp.Body = body
2378 if rc.addedGzip && ascii.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
2379 resp.Body = &gzipReader{body: body}
2380 resp.Header.Del("Content-Encoding")
2381 resp.Header.Del("Content-Length")
2382 resp.ContentLength = -1
2383 resp.Uncompressed = true
2384 }
2385
2386 select {
2387 case rc.ch <- responseAndError{res: resp}:
2388 case <-rc.callerGone:
2389 return
2390 }
2391
2392
2393
2394
2395 select {
2396 case bodyEOF := <-waitForBodyRead:
2397 alive = alive &&
2398 bodyEOF &&
2399 !pc.sawEOF &&
2400 pc.wroteRequest() &&
2401 tryPutIdleConn(rc.treq)
2402 if bodyEOF {
2403 eofc <- struct{}{}
2404 }
2405 case <-rc.treq.ctx.Done():
2406 alive = false
2407 pc.cancelRequest(context.Cause(rc.treq.ctx))
2408 case <-pc.closech:
2409 alive = false
2410 }
2411
2412 rc.treq.cancel(errRequestDone)
2413 testHookReadLoopBeforeNextRead()
2414 }
2415 }
2416
2417 func (pc *persistConn) readLoopPeekFailLocked(peekErr error) {
2418 if pc.closed != nil {
2419 return
2420 }
2421 if n := pc.br.Buffered(); n > 0 {
2422 buf, _ := pc.br.Peek(n)
2423 if is408Message(buf) {
2424 pc.closeLocked(errServerClosedIdle)
2425 return
2426 } else {
2427 log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr)
2428 }
2429 }
2430 if peekErr == io.EOF {
2431
2432 pc.closeLocked(errServerClosedIdle)
2433 } else {
2434 pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %w", peekErr))
2435 }
2436 }
2437
2438
2439
2440
2441 func is408Message(buf []byte) bool {
2442 if len(buf) < len("HTTP/1.x 408") {
2443 return false
2444 }
2445 if string(buf[:7]) != "HTTP/1." {
2446 return false
2447 }
2448 return string(buf[8:12]) == " 408"
2449 }
2450
2451
2452
2453
2454 func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
2455 if trace != nil && trace.GotFirstResponseByte != nil {
2456 if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
2457 trace.GotFirstResponseByte()
2458 }
2459 }
2460
2461 continueCh := rc.continueCh
2462 for {
2463 resp, err = ReadResponse(pc.br, rc.treq.Request)
2464 if err != nil {
2465 return
2466 }
2467 resCode := resp.StatusCode
2468 if continueCh != nil && resCode == StatusContinue {
2469 if trace != nil && trace.Got100Continue != nil {
2470 trace.Got100Continue()
2471 }
2472 continueCh <- struct{}{}
2473 continueCh = nil
2474 }
2475 is1xx := 100 <= resCode && resCode <= 199
2476
2477 is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols
2478 if is1xxNonTerminal {
2479 if trace != nil && trace.Got1xxResponse != nil {
2480 if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil {
2481 return nil, err
2482 }
2483
2484
2485
2486
2487
2488
2489
2490 pc.readLimit = pc.maxHeaderResponseSize()
2491 }
2492 continue
2493 }
2494 break
2495 }
2496 if resp.isProtocolSwitch() {
2497 resp.Body = newReadWriteCloserBody(pc.br, pc.conn)
2498 }
2499 if continueCh != nil {
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512 if resp.Close || rc.treq.Request.Close {
2513 close(continueCh)
2514 } else {
2515 continueCh <- struct{}{}
2516 }
2517 }
2518
2519 resp.TLS = pc.tlsState
2520 return
2521 }
2522
2523
2524
2525
2526 func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
2527 if continueCh == nil {
2528 return nil
2529 }
2530 return func() bool {
2531 timer := time.NewTimer(pc.t.ExpectContinueTimeout)
2532 defer timer.Stop()
2533
2534 select {
2535 case _, ok := <-continueCh:
2536 return ok
2537 case <-timer.C:
2538 return true
2539 case <-pc.closech:
2540 return false
2541 }
2542 }
2543 }
2544
2545 func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser {
2546 body := &readWriteCloserBody{ReadWriteCloser: rwc}
2547 if br.Buffered() != 0 {
2548 body.br = br
2549 }
2550 return body
2551 }
2552
2553
2554
2555
2556
2557
2558 type readWriteCloserBody struct {
2559 _ incomparable
2560 br *bufio.Reader
2561 io.ReadWriteCloser
2562 }
2563
2564 func (b *readWriteCloserBody) Read(p []byte) (n int, err error) {
2565 if b.br != nil {
2566 if n := b.br.Buffered(); len(p) > n {
2567 p = p[:n]
2568 }
2569 n, err = b.br.Read(p)
2570 if b.br.Buffered() == 0 {
2571 b.br = nil
2572 }
2573 return n, err
2574 }
2575 return b.ReadWriteCloser.Read(p)
2576 }
2577
2578
2579 type nothingWrittenError struct {
2580 error
2581 }
2582
2583 func (nwe nothingWrittenError) Unwrap() error {
2584 return nwe.error
2585 }
2586
2587 func (pc *persistConn) writeLoop() {
2588 defer close(pc.writeLoopDone)
2589 for {
2590 select {
2591 case wr := <-pc.writech:
2592 startBytesWritten := pc.nwrite
2593 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
2594 if bre, ok := err.(requestBodyReadError); ok {
2595 err = bre.error
2596
2597
2598
2599
2600
2601
2602
2603 wr.req.setError(err)
2604 }
2605 if err == nil {
2606 err = pc.bw.Flush()
2607 }
2608 if err != nil {
2609 if pc.nwrite == startBytesWritten {
2610 err = nothingWrittenError{err}
2611 }
2612 }
2613 pc.writeErrCh <- err
2614 wr.ch <- err
2615 if err != nil {
2616 pc.close(err)
2617 return
2618 }
2619 case <-pc.closech:
2620 return
2621 }
2622 }
2623 }
2624
2625
2626
2627
2628
2629
2630
2631 var maxWriteWaitBeforeConnReuse = 50 * time.Millisecond
2632
2633
2634
2635 func (pc *persistConn) wroteRequest() bool {
2636 select {
2637 case err := <-pc.writeErrCh:
2638
2639
2640 return err == nil
2641 default:
2642
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652 t := time.NewTimer(maxWriteWaitBeforeConnReuse)
2653 defer t.Stop()
2654 select {
2655 case err := <-pc.writeErrCh:
2656 return err == nil
2657 case <-t.C:
2658 return false
2659 }
2660 }
2661 }
2662
2663
2664
2665 type responseAndError struct {
2666 _ incomparable
2667 res *Response
2668 err error
2669 }
2670
2671 type requestAndChan struct {
2672 _ incomparable
2673 treq *transportRequest
2674 ch chan responseAndError
2675
2676
2677
2678
2679 addedGzip bool
2680
2681
2682
2683
2684
2685 continueCh chan<- struct{}
2686
2687 callerGone <-chan struct{}
2688 }
2689
2690
2691
2692
2693
2694 type writeRequest struct {
2695 req *transportRequest
2696 ch chan<- error
2697
2698
2699
2700
2701 continueCh <-chan struct{}
2702 }
2703
2704
2705
2706 type timeoutError struct {
2707 err string
2708 }
2709
2710 func (e *timeoutError) Error() string { return e.err }
2711 func (e *timeoutError) Timeout() bool { return true }
2712 func (e *timeoutError) Temporary() bool { return true }
2713 func (e *timeoutError) Is(err error) bool { return err == context.DeadlineExceeded }
2714
2715 var errTimeout error = &timeoutError{"net/http: timeout awaiting response headers"}
2716
2717
2718
2719 var errRequestCanceled = http2errRequestCanceled
2720 var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection")
2721
2722
2723
2724 var errRequestDone = errors.New("net/http: request completed")
2725
2726 func nop() {}
2727
2728
2729 var (
2730 testHookEnterRoundTrip = nop
2731 testHookWaitResLoop = nop
2732 testHookRoundTripRetried = nop
2733 testHookPrePendingDial = nop
2734 testHookPostPendingDial = nop
2735
2736 testHookMu sync.Locker = fakeLocker{}
2737 testHookReadLoopBeforeNextRead = nop
2738 )
2739
2740 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
2741 testHookEnterRoundTrip()
2742 pc.mu.Lock()
2743 pc.numExpectedResponses++
2744 headerFn := pc.mutateHeaderFunc
2745 pc.mu.Unlock()
2746
2747 if headerFn != nil {
2748 headerFn(req.extraHeaders())
2749 }
2750
2751
2752
2753
2754
2755 requestedGzip := false
2756 if !pc.t.DisableCompression &&
2757 req.Header.Get("Accept-Encoding") == "" &&
2758 req.Header.Get("Range") == "" &&
2759 req.Method != "HEAD" {
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772 requestedGzip = true
2773 req.extraHeaders().Set("Accept-Encoding", "gzip")
2774 }
2775
2776 var continueCh chan struct{}
2777 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
2778 continueCh = make(chan struct{}, 1)
2779 }
2780
2781 if pc.t.DisableKeepAlives &&
2782 !req.wantsClose() &&
2783 !isProtocolSwitchHeader(req.Header) {
2784 req.extraHeaders().Set("Connection", "close")
2785 }
2786
2787 gone := make(chan struct{})
2788 defer close(gone)
2789
2790 const debugRoundTrip = false
2791
2792
2793
2794
2795 startBytesWritten := pc.nwrite
2796 writeErrCh := make(chan error, 1)
2797 pc.writech <- writeRequest{req, writeErrCh, continueCh}
2798
2799 resc := make(chan responseAndError)
2800 pc.reqch <- requestAndChan{
2801 treq: req,
2802 ch: resc,
2803 addedGzip: requestedGzip,
2804 continueCh: continueCh,
2805 callerGone: gone,
2806 }
2807
2808 handleResponse := func(re responseAndError) (*Response, error) {
2809 if (re.res == nil) == (re.err == nil) {
2810 panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
2811 }
2812 if debugRoundTrip {
2813 req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
2814 }
2815 if re.err != nil {
2816 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
2817 }
2818 return re.res, nil
2819 }
2820
2821 var respHeaderTimer <-chan time.Time
2822 ctxDoneChan := req.ctx.Done()
2823 pcClosed := pc.closech
2824 for {
2825 testHookWaitResLoop()
2826 select {
2827 case err := <-writeErrCh:
2828 if debugRoundTrip {
2829 req.logf("writeErrCh recv: %T/%#v", err, err)
2830 }
2831 if err != nil {
2832 pc.close(fmt.Errorf("write error: %w", err))
2833 return nil, pc.mapRoundTripError(req, startBytesWritten, err)
2834 }
2835 if d := pc.t.ResponseHeaderTimeout; d > 0 {
2836 if debugRoundTrip {
2837 req.logf("starting timer for %v", d)
2838 }
2839 timer := time.NewTimer(d)
2840 defer timer.Stop()
2841 respHeaderTimer = timer.C
2842 }
2843 case <-pcClosed:
2844 select {
2845 case re := <-resc:
2846
2847
2848
2849 return handleResponse(re)
2850 default:
2851 }
2852 if debugRoundTrip {
2853 req.logf("closech recv: %T %#v", pc.closed, pc.closed)
2854 }
2855 return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
2856 case <-respHeaderTimer:
2857 if debugRoundTrip {
2858 req.logf("timeout waiting for response headers.")
2859 }
2860 pc.close(errTimeout)
2861 return nil, errTimeout
2862 case re := <-resc:
2863 return handleResponse(re)
2864 case <-ctxDoneChan:
2865 select {
2866 case re := <-resc:
2867
2868
2869
2870 return handleResponse(re)
2871 default:
2872 }
2873 pc.cancelRequest(context.Cause(req.ctx))
2874 }
2875 }
2876 }
2877
2878
2879
2880 type tLogKey struct{}
2881
2882 func (tr *transportRequest) logf(format string, args ...any) {
2883 if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...any)); ok {
2884 logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
2885 }
2886 }
2887
2888
2889
2890 func (pc *persistConn) markReused() {
2891 pc.mu.Lock()
2892 pc.reused = true
2893 pc.mu.Unlock()
2894 }
2895
2896
2897
2898
2899
2900
2901 func (pc *persistConn) close(err error) {
2902 pc.mu.Lock()
2903 defer pc.mu.Unlock()
2904 pc.closeLocked(err)
2905 }
2906
2907 func (pc *persistConn) closeLocked(err error) {
2908 if err == nil {
2909 panic("nil error")
2910 }
2911 pc.broken = true
2912 if pc.closed == nil {
2913 pc.closed = err
2914 pc.t.decConnsPerHost(pc.cacheKey)
2915
2916
2917 if pc.alt == nil {
2918 if err != errCallerOwnsConn {
2919 pc.conn.Close()
2920 }
2921 close(pc.closech)
2922 }
2923 }
2924 pc.mutateHeaderFunc = nil
2925 }
2926
2927 var portMap = map[string]string{
2928 "http": "80",
2929 "https": "443",
2930 "socks5": "1080",
2931 "socks5h": "1080",
2932 }
2933
2934 func idnaASCIIFromURL(url *url.URL) string {
2935 addr := url.Hostname()
2936 if v, err := idnaASCII(addr); err == nil {
2937 addr = v
2938 }
2939 return addr
2940 }
2941
2942
2943 func canonicalAddr(url *url.URL) string {
2944 port := url.Port()
2945 if port == "" {
2946 port = portMap[url.Scheme]
2947 }
2948 return net.JoinHostPort(idnaASCIIFromURL(url), port)
2949 }
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962 type bodyEOFSignal struct {
2963 body io.ReadCloser
2964 mu sync.Mutex
2965 closed bool
2966 rerr error
2967 fn func(error) error
2968 earlyCloseFn func() error
2969 }
2970
2971 var errReadOnClosedResBody = errors.New("http: read on closed response body")
2972
2973 func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
2974 es.mu.Lock()
2975 closed, rerr := es.closed, es.rerr
2976 es.mu.Unlock()
2977 if closed {
2978 return 0, errReadOnClosedResBody
2979 }
2980 if rerr != nil {
2981 return 0, rerr
2982 }
2983
2984 n, err = es.body.Read(p)
2985 if err != nil {
2986 es.mu.Lock()
2987 defer es.mu.Unlock()
2988 if es.rerr == nil {
2989 es.rerr = err
2990 }
2991 err = es.condfn(err)
2992 }
2993 return
2994 }
2995
2996 func (es *bodyEOFSignal) Close() error {
2997 es.mu.Lock()
2998 defer es.mu.Unlock()
2999 if es.closed {
3000 return nil
3001 }
3002 es.closed = true
3003 if es.earlyCloseFn != nil && es.rerr != io.EOF {
3004 return es.earlyCloseFn()
3005 }
3006 err := es.body.Close()
3007 return es.condfn(err)
3008 }
3009
3010
3011 func (es *bodyEOFSignal) condfn(err error) error {
3012 if es.fn == nil {
3013 return err
3014 }
3015 err = es.fn(err)
3016 es.fn = nil
3017 return err
3018 }
3019
3020
3021
3022 type gzipReader struct {
3023 _ incomparable
3024 body *bodyEOFSignal
3025 zr *gzip.Reader
3026 zerr error
3027 }
3028
3029 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3030 if gz.zr == nil {
3031 if gz.zerr == nil {
3032 gz.zr, gz.zerr = gzip.NewReader(gz.body)
3033 }
3034 if gz.zerr != nil {
3035 return 0, gz.zerr
3036 }
3037 }
3038
3039 gz.body.mu.Lock()
3040 if gz.body.closed {
3041 err = errReadOnClosedResBody
3042 }
3043 gz.body.mu.Unlock()
3044
3045 if err != nil {
3046 return 0, err
3047 }
3048 return gz.zr.Read(p)
3049 }
3050
3051 func (gz *gzipReader) Close() error {
3052 return gz.body.Close()
3053 }
3054
3055 type tlsHandshakeTimeoutError struct{}
3056
3057 func (tlsHandshakeTimeoutError) Timeout() bool { return true }
3058 func (tlsHandshakeTimeoutError) Temporary() bool { return true }
3059 func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" }
3060
3061
3062
3063
3064 type fakeLocker struct{}
3065
3066 func (fakeLocker) Lock() {}
3067 func (fakeLocker) Unlock() {}
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
3083 if cfg == nil {
3084 return &tls.Config{}
3085 }
3086 return cfg.Clone()
3087 }
3088
3089 type connLRU struct {
3090 ll *list.List
3091 m map[*persistConn]*list.Element
3092 }
3093
3094
3095 func (cl *connLRU) add(pc *persistConn) {
3096 if cl.ll == nil {
3097 cl.ll = list.New()
3098 cl.m = make(map[*persistConn]*list.Element)
3099 }
3100 ele := cl.ll.PushFront(pc)
3101 if _, ok := cl.m[pc]; ok {
3102 panic("persistConn was already in LRU")
3103 }
3104 cl.m[pc] = ele
3105 }
3106
3107 func (cl *connLRU) removeOldest() *persistConn {
3108 ele := cl.ll.Back()
3109 pc := ele.Value.(*persistConn)
3110 cl.ll.Remove(ele)
3111 delete(cl.m, pc)
3112 return pc
3113 }
3114
3115
3116 func (cl *connLRU) remove(pc *persistConn) {
3117 if ele, ok := cl.m[pc]; ok {
3118 cl.ll.Remove(ele)
3119 delete(cl.m, pc)
3120 }
3121 }
3122
3123
3124 func (cl *connLRU) len() int {
3125 return len(cl.m)
3126 }
3127
View as plain text