Categories
程式開發

Go中的HTTP請求之——HTTP1.1請求流程分析


來自公眾號:新世界雜貨舖

前言

http是目前應用最為廣泛, 也是程序員接觸最多的協議之一。今天筆者站在GoPher的角度對http1.1的請求流程進行全面的分析。希望讀者讀完此文後, 能夠有以下幾個收穫:

對http1.1的請求流程有一個大概的了解在平時的開發中能夠更好地重用底層TCP連接對http1.1的線頭阻塞能有一個更清楚的認識

HTTP1.1流程

今天內容較多, 廢話不多說, 直接上乾貨。

Go中的HTTP請求之——HTTP1.1請求流程分析 1

接下來, 筆者將根據流程圖,對除了NewRequest以外的函數進行逐步的展開和分析

(*客戶).do

(*Client).do方法的核心代碼是一個沒有結束條件的for循環。

for {
// For all but the first request, create the next
// request hop and replace req.
if len(reqs) > 0 {
loc := resp.Header.Get("Location")
// ...此处省略代码...
err = c.checkRedirect(req, reqs)
// ...此处省略很多代码...
}

reqs = append(reqs, req)
var err error
var didTimeout func() bool
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
// ...此处省略代码...
return nil, uerr(err)
}

var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}

req.closeBody()
}

上面的代碼中, 請求第一次進入會調用c.send, 得到響應後會判斷請求是否需要重定向, 如果需要重定向則繼續循環, 否則返迴響應。

進入重定向流程後, 這裡筆者簡單介紹一下checkRedirect函數:

func defaultCheckRedirect(req *Request, via []*Request) error {
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}
return nil
}
// ...
func (c *Client) checkRedirect(req *Request, via []*Request) error {
fn := c.CheckRedirect
if fn == nil {
fn = defaultCheckRedirect
}
return fn(req, via)
}

由上可知, 用戶可以自己定義重定向的檢查規則。如果用戶沒有自定義檢查規則, 則重定向次數不能超過10次。

(*客戶)。發送

(*Client).send方法邏輯較為簡單, 主要看用戶有沒有為http.Client的Jar字段實現CookieJar接口。主要流程如下:

如果實現了CookieJar接口, 為Request添加保存的cookie信息。調用send函數。如果實現了CookieJar接口, 將Response中的cookie信息保存下來。

// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}

另外, 我們還需要關注c.transport()的調用。如果用戶未對http.Client指定Transport則會使用go默認的DefaultTransport。

該Transport實現RoundTripper接口。在go中RoundTripper的定義為“執行單個HTTP事務的能力,獲取給定請求的響應”。

func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}

發送

send函數會檢查request的URL,以及參數的rt, 和header值。如果URL和rt為nil則直接返回錯誤。同時, 如果請求中設置了用戶信息, 還會檢查並設置basic的驗證頭信息,最後調用rt.RoundTrip得到請求的響應。

func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
req := ireq // req is either the original request, or a modified fork
// ...此处省略代码...
if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
username := u.Username()
password, _ := u.Password()
forkReq()
req.Header = cloneOrMakeHeader(ireq.Header)
req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
}

if !deadline.IsZero() {
forkReq()
}
stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

resp, err = rt.RoundTrip(req)
if err != nil {
// ...此处省略代码...
return nil, didTimeout, err
}
// ...此处省略代码...
return resp, nil, nil
}

(*運輸).RoundTrip

(Transport).RoundTrip的邏輯很簡單,它會調用(Transport).roundTrip方法,因此本節實際上是對(*Transport).roundTrip方法的分析。

func (t *Transport) RoundTrip(req *Request) (*Response, error) {
return t.roundTrip(req)
}
func (t *Transport) roundTrip(req *Request) (*Response, error) {
// ...此处省略校验header头和headervalue的代码以及其他代码...

for {
select {
case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) // ...此处省略代码... pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil } // ...此处省略判断是否重试请求的代码逻辑... } }

由上可知, 每次for循環, 會判斷請求上下文是否已經取消, 如果沒有取消則繼續進行後續的流程。

先調用t.getConn方法獲取一個persistConn。

因為本篇主旨是http1.1,所以我們直接看http1.1的執行分支。根據源碼中的註釋和實際的debug結果,獲取到連接後, 會繼續調用pconn.roundTrip。

(*運輸).getConn

筆者認為這一步在http請求中是非常核心的一個步驟,因為只有和server端建立連接後才能進行後續的通信。

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
// ...此处省略代码...
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
// ...此处省略代码...
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered {
pc := w.pc
// ...此处省略代码...
return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(req, func(err error) { cancelc <- err }) // Queue for permission to dial. t.queueForDial(w) // Wait for completion or cancellation. select { case <-w.ready: // Trace success but only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) } // ...此处省略代码... return w.pc, w.err case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }

由上能夠清楚的知道, 獲取連接分為以下幾個步驟:

調用t.queueForIdleConn獲取一個空閒且可複用的連接,如果獲取成功則直接返回該連接。如果未獲取到空閒連接則調用t.queueForDial開始新建一個連接。等待w.ready關閉,則可以返回新的連接。

(*運輸).queueForIdleConn

(*Transport).queueForIdleConn方法會根據請求的connectMethodKey從t.idleConn獲取一個[]*persistConn切片, 並從切片中,根據算法獲取一個有效的空閒連接。如果未獲取到空閒連接,則將wantConn結構體變量放入t.idleConnWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w。

connectMethodKey定義和queueForIdleConn部分關鍵代碼如下:

type connectMethodKey struct {
proxy, scheme, addr string
onlyH1 bool
}

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
// ...此处省略代码...
// Look for most recently-used idle connection.
if list, ok := t.idleConn[w.key]; ok {
stop := false
delivered := false
for len(list) > 0 && !stop {
pconn := list[len(list)-1]

// See whether this connection has been idle too long, considering
// only the wall time (the Round(0)), in case this is a laptop or VM
// coming out of suspend with previously cached idle connections.
tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
// ...此处省略代码...
delivered = w.tryDeliver(pconn, nil)
if delivered {
// ...此处省略代码...
}
stop = true
}
if len(list) > 0 {
t.idleConn[w.key] = list
} else {
delete(t.idleConn, w.key)
}
if stop {
return delivered
}
}

// Register to receive next connection that becomes idle.
if t.idleConnWait == nil {
t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
}
q := t.idleConnWait[w.key]
q.cleanFront()
q.pushBack(w)
t.idleConnWait[w.key] = q
return false
}

其中w.tryDeliver方法主要作用是將連接協程安全的賦值給w.pc,並關閉w.ready管道。此時我們便可以和(*Transport).getConn中調用queueForIdleConn成功後的返回值對應上。

(*運輸).queueForDial

(*Transport).queueForDial方法包含三個步驟:

如果t.MaxConnsPerHost小於等於0,執行go t.dialConnFor(w)並返回。其中MaxConnsPerHost代表著每個host的最大連接數,小於等於0表示不限制。如果當前host的連接數不超過t.MaxConnsPerHost,對當前host的連接數+1,然後執行go t.dialConnFor(w)並返回。如果當前host的連接數等於t.MaxConnsPerHost,則將wantConn結構體變量放入t.connsPerHostWait[w.key]等待隊列,此處wantConn結構體變量就是前面提到的w。另外在放入等待隊列前會先清除隊列中已經失效或者不再等待的變量。

func (t *Transport) queueForDial(w *wantConn) {
w.beforeDial()
if t.MaxConnsPerHost <= 0 { go t.dialConnFor(w) return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) t.connsPerHostWait[w.key] = q }

(*運輸).dialConnFor

(*Transport).dialConnFor方法調用t.dialConn獲取一個真正的*persistConn。並將這個連接傳遞給w, 如果w已經獲取到了連接,則會傳遞失敗,此時調用t.putOrCloseIdleConn將連接放回空閒連接池。

如果連接獲取錯誤則會調用t.decConnsPerHost減少當前host的連接數。

func (t *Transport) dialConnFor(w *wantConn) {
defer w.afterDial()

pc, err := t.dialConn(w.ctx, w.cm)
delivered := w.tryDeliver(pc, err)
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
// or it is HTTP/2 and can be shared.
// Add to the idle connection pool.
t.putOrCloseIdleConn(pc)
}
if err != nil {
t.decConnsPerHost(w.key)
}
}

(* Transport).putOrCloseIdleConn方法

func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
if err := t.tryPutIdleConn(pconn); err != nil {
pconn.close(err)
}
}
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
if t.DisableKeepAlives || t.MaxIdleConnsPerHost 0 {
w := q.popFront()
if w.tryDeliver(pconn, nil) {
done = true
break
}
}
} else {
// HTTP/2.
// Can hand the same pconn to everyone in the waiting list,
// and we still won't be done: we want to put it in the idle
// list unconditionally, for any future clients too.
for q.len() > 0 {
w := q.popFront()
w.tryDeliver(pconn, nil)
}
}
if q.len() == 0 {
delete(t.idleConnWait, key)
} else {
t.idleConnWait[key] = q
}
if done {
return nil
}
}

if t.closeIdle {
return errCloseIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
// ...此处省略代码...
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
// ...此处省略代码...
// Set idle timer, but only for HTTP/1 (pconn.alt == nil).
// The HTTP/2 implementation manages the idle timer itself
// (see idleConnTimeout in h2_bundle.go).
if t.IdleConnTimeout > 0 && pconn.alt == nil {
if pconn.idleTimer != nil {
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}
pconn.idleAt = time.Now()
return nil
}
func (t *Transport) maxIdleConnsPerHost() int {
if v := t.MaxIdleConnsPerHost; v != 0 {
return v
}
return DefaultMaxIdleConnsPerHost // 2
}

由上可知,將連接放入t.idleConn前,先檢查t.idleConnWait的數量。如果有請求在等待空閒連接, 則將連接復用,沒有空閒連接時,才將連接放入t.idleConn。連接放入t.idleConn後,還會重置連接的可空閒時間。

另外在t.putOrCloseIdleConn函數中還需要注意兩點:

如果用戶自定義了http.client,且將DisableKeepAlives設置為true,或者將MaxIdleConnsPerHost設置為負數,則連接不會放入t.idleConn即連接不能複用。在判斷已有空閒連接數量時, 如果MaxIdleConnsPerHost 不等於0, 則返回用戶設置的數量,否則返回默認值2,詳見上面的(*Transport).maxIdleConnsPerHost 函數。

綜上, 我們知道對於部分有連接數限制的業務, 我們可以為http.Client自定義一個Transport, 並設置Transport的MaxConnsPerHost,MaxIdleConnsPerHost,IdleConnTimeout和DisableKeepAlives從而達到即限制連接數量,又能保證一定的並發。

(*Transport).decConnsPerHost方法

func (t *Transport) decConnsPerHost(key connectMethodKey) {
// ...此处省略代码...
t.connsPerHostMu.Lock()
defer t.connsPerHostMu.Unlock()
n := t.connsPerHost[key]
// ...此处省略代码...

// Can we hand this count to a goroutine still waiting to dial?
// (Some goroutines on the wait list may have timed out or
// gotten a connection another way. If they're all gone,
// we don't want to kick off any spurious dial operations.)
if q := t.connsPerHostWait[key]; q.len() > 0 {
done := false
for q.len() > 0 {
w := q.popFront()
if w.waiting() {
go t.dialConnFor(w)
done = true
break
}
}
if q.len() == 0 {
delete(t.connsPerHostWait, key)
} else {
// q is a value (like a slice), so we have to store
// the updated q back into the map.
t.connsPerHostWait[key] = q
}
if done {
return
}
}

// Otherwise, decrement the recorded count.
if n--; n == 0 {
delete(t.connsPerHost, key)
} else {
t.connsPerHost[key] = n
}
}

由上可知, decConnsPerHost方法主要乾了兩件事:

判斷是否有請求在等待撥號, 如果有則執行go t.dialConnFor(w)。如果沒有請求在等待撥號, 則減少當前host的連接數量。

######(* Transport).dialConn

根據http.Client的默認配置和實際的debug結果,(*Transport).dialConn方法主要邏輯如下:

調用t.dial(ctx, "tcp", cm.addr())創建TCP連接。如果是https的請求, 則對請求建立安全的tls傳輸通道。為persistConn創建讀寫buffer, 如果用戶沒有自定義讀寫buffer的大小, 根據writeBufferSize和readBufferSize方法可知, 讀寫bufffer的大小默認為4096。執行go pconn.readLoop()和go pconn.writeLoop()開啟讀寫循環然後返回連接。

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
// ...此处省略代码...
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
// ...此处省略代码...
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}

// Proxy setup.
switch { // ...此处省略代码... }

if cm.proxyURL != nil && cm.targetScheme == "https" {
// ...此处省略代码...
}

if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
// ...此处省略代码...
}

pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
func (t *Transport) writeBufferSize() int {
if t.WriteBufferSize > 0 {
return t.WriteBufferSize
}
return 4 < 0 { return t.ReadBufferSize } return 4 << 10 }

(* persistConn).roundTrip

(*persistConn).roundTrip方法是http1.1請求的核心之一,該方法在這裡獲取真實的Response並返回給上層。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
// ...此处省略代码...

gone := make(chan struct{})
defer close(gone)
// ...此处省略代码...
const debugRoundTrip = false

// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
startBytesWritten := pc.nwrite
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case err := <-writeErrCh: // ...此处省略代码... if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } // ...此处省略代码... case <-pc.closech: // ...此处省略代码... return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // ...此处省略代码... return nil, errTimeout case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

由上可知, (*persistConn).roundTrip方法可以分為三步:

向連接的writech寫入writeRequest: pc.writech <- writeRequest{req, writeErrCh, continueCh}, 參考(*Transport).dialConn可知pc.writech是一個緩衝大小為1的管道,所以會立馬寫入成功。向連接的reqch寫入requestAndChan: pc.reqch <- requestAndChan, pc.reqch和pc.writech一樣都是緩衝大小為1的管道。其中requestAndChan.ch是一個無緩衝的responseAndError管道,(*persistConn).roundTrip就通過這個管道讀取到真實的響應。開啟for循環select, 等待響應或者超時等信息。

(*persistConn).writeLoop 寫循環

(*persistConn).writeLoop方法主體邏輯相對簡單,把用戶的請求寫入連接的寫緩存buffer, 最後再flush就可以了。

func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
select {
case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } case <-pc.closech: return } } }

(*persistConn).readLoop 讀循環

(*persistConn).readLoop有較多的細節, 我們先看代碼, 然後再逐步分析。

func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()

tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
// ...此处省略代码...
}
// ...此处省略代码...
return true
}
// ...此处省略代码...
alive := true
for alive {
// ...此处省略代码...
rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } // ...此处省略代码... bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { // ...此处省略代码... continue } waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body // ...此处省略代码... select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }

由上可知, 只要連接處於活躍狀態, 則這個讀循環會一直開啟, 直到

連接不活躍或者產生其​​他錯誤才會結束讀循環。

在上述源碼中,pc.readResponse(rc,trace)會從連接的讀buffer中獲取一個請求對應的Response。

讀到響應之後判斷請求是否是HEAD請求或者響應內容為空,如果是HEAD請求或者響應內容為空則將響應寫入rc.ch,並將連接放入idleConn(此處因為篇幅的原因省略了源碼內容, 正常請求的邏輯也有寫響應和將連接放入idleConn兩個步驟)。

如果不是HEAD請求並且響應內容不為空即!hasBody || bodyWritable為false:

創建一個緩衝大小為2的等待響應被讀取的管道waitForBodyRead: waitForBodyRead := make(chan bool, 2)將響應的Body修改為bodyEOFSignal結構體。通過上面的源碼我們可以知道,此時的resp.Body中有earlyCloseFn和fn兩個函數。 earlyCloseFn函數會向waitForBodyRead管道寫入false, fn函數會判斷響應是否讀完, 如果已經讀完則向waitForBodyRead寫入true否則寫入false。將修改後的響應寫入rc.ch。其中rc.ch從rc := <-pc.reqch獲取,而pc.reqch正是前面(*persistConn).roundTrip函數寫入的requestAndChan。 requestAndChan.ch是一個無緩衝的responseAndError管道,(*persistConn).roundTrip通過這個管道讀取到真實的響應。 select 讀取waitForBodyRead被寫入的值。如果讀到到的是true則可以調用tryPutIdleConn(此方法會調用前面提到的(Transport).tryPutIdleConn方法*)將連接放入idleConn從而復用連接。

waitForBodyRead寫入true的原因我們已​​經知道了,但是被寫入true的時機我們尚不明確。

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
// ...此处省略代码...
n, err = es.body.Read(p)
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
}

func (es *bodyEOFSignal) Close() error {
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}
err := es.body.Close()
return es.condfn(err)
}

// caller must hold es.mu.
func (es *bodyEOFSignal) condfn(err error) error {
if es.fn == nil {
return err
}
err = es.fn(err)
es.fn = nil
return err
}

由上述源碼可知, 只有當調用方完整的讀取了響應,該連接才能夠被復用。因此在http1.1中,一個連接上的請求,只有等前一個請求處理完之後才能繼續下一個請求。如果前面的請求處理較慢, 則後面的請求必須等待, 這就是http1.1中的線頭阻塞。

根據上面的邏輯, 我們GoPher在平時的開發中如果遇到了不關心響應的請求, 也一定要記得把響應body讀完以保證連接的複用性。筆者在這裡給出一個demo:

io.CopyN(ioutil.Discard, resp.Body, 2 << 10) resp.Body.Close()

以上,就是筆者整理的HTTP1.1的請求流程。

注意

筆者本著嚴謹的態度, 特此提醒:

>

>上述流程中筆者對很多細節並未詳細提及或者僅一筆帶過,希望讀者酌情參考。

總結

在go中發起http1.1的請求時, 如果遇到不關心響應的請求,請務必完整讀取響應內容以保證連接的複用性。如果遇到對連接數有限制的業務,可以通過自定義http.Client的Transport, 並設置Transport的MaxConnsPerHost,MaxIdleConnsPerHost,IdleConnTimeout和DisableKeepAlives的值,來控制連接數。如果對於重定向業務邏輯有需求,可以自定義http.Client的CheckRedirect。在http1.1,中一個連接上的請求,只有等前一個請求處理完之後才能繼續下一個請求。如果前面的請求處理較慢, 則後面的請求必須等待, 這就是http1.1中的線頭阻塞。

注: 寫本文時, 筆者所用go版本為: go1.14.2

生命不息, 探索不止, 後續將持續更新有關於go的技術探索

原創不易, 卑微求關注收藏二連.