Go语言中HTTP1.1请求流程是如何一步步展开的?

2026-04-01 22:151阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计5790个文字,预计阅读时间需要24分钟。

Go语言中HTTP1.1请求流程是如何一步步展开的?

HTTP1.1流程+今日内容较多,废话不多说,直接上干货。接下来,笔者将根据流程图,对除NewRequest以外的函数进行逐步展开和分析。以(Client).do(Client).do方法的核心代码是一个没有结束的循环。

HTTP1.1流程

今天内容较多, 废话不多说, 直接上干货。

接下来, 笔者将根据流程图,对除了NewRequest以外的函数进行逐步的展开和分析

(*Client).do

(*Client).do方法的核心代码是一个没有结束条件的for循环。

for { // For all but the first request, create the next // request hop and replace req. if len(reqs) > 0 { loc := resp.Header.Get("Location") // ...此处省略代码... err = c.checkRedirect(req, reqs) // ...此处省略很多代码... } reqs = append(reqs, req) var err error var didTimeout func() bool if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true // ...此处省略代码... return nil, uerr(err) } var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil } req.closeBody() }

上面的代码中, 请求第一次进入会调用c.send, 得到响应后会判断请求是否需要重定向, 如果需要重定向则继续循环, 否则返回响应。

进入重定向流程后, 这里笔者简单介绍一下checkRedirect函数:

func defaultCheckRedirect(req *Request, via []*Request) error { if len(via) >= 10 { return errors.New("stopped after 10 redirects") } return nil } // ... func (c *Client) checkRedirect(req *Request, via []*Request) error { fn := c.CheckRedirect if fn == nil { fn = defaultCheckRedirect } return fn(req, via) }

由上可知, 用户可以自己定义重定向的检查规则。如果用户没有自定义检查规则, 则重定向次数不能超过10次。

(*Client).send

(*Client).send方法逻辑较为简单, 主要看用户有没有为insouciant.org/tech/connection-management-in-chromium/.) key := pconn.cacheKey if q, ok := t.idleConnWait[key]; ok { done := false if pconn.alt == nil { // HTTP/1. // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. for q.len() > 0 { w := q.popFront() if w.tryDeliver(pconn, nil) { done = true break } } } else { // HTTP/2. // Can hand the same pconn to everyone in the waiting list, // and we still won't be done: we want to put it in the idle // list unconditionally, for any future clients too. for q.len() > 0 { w := q.popFront() w.tryDeliver(pconn, nil) } } if q.len() == 0 { delete(t.idleConnWait, key) } else { t.idleConnWait[key] = q } if done { return nil } } if t.closeIdle { return errCloseIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key] if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost } // ...此处省略代码... t.idleConn[key] = append(idles, pconn) t.idleLRU.add(pconn) // ...此处省略代码... // Set idle timer, but only for HTTP/1 (pconn.alt == nil). // The HTTP/2 implementation manages the idle timer itself // (see idleConnTimeout in h2_bundle.go). if t.IdleConnTimeout > 0 && pconn.alt == nil { if pconn.idleTimer != nil { pconn.idleTimer.Reset(t.IdleConnTimeout) } else { pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } } pconn.idleAt = time.Now() return nil } func (t *Transport) maxIdleConnsPerHost() int { if v := t.MaxIdleConnsPerHost; v != 0 { return v } return DefaultMaxIdleConnsPerHost // 2 }

由上可知,将连接放入t.idleConn前,先检查t.idleConnWait的数量。如果有请求在等待空闲连接, 则将连接复用,没有空闲连接时,才将连接放入t.idleConn。连接放入t.idleConn后,还会重置连接的可空闲时间。

另外在t.putOrCloseIdleConn函数中还需要注意两点:

  1. 如果用户自定义了http.client,且将DisableKeepAlives设置为true,或者将MaxIdleConnsPerHost设置为负数,则连接不会放入t.idleConn即连接不能复用。

  2. 在判断已有空闲连接数量时, 如果MaxIdleConnsPerHost 不等于0, 则返回用户设置的数量,否则返回默认值2,详见上面的(*Transport).maxIdleConnsPerHost 函数。

综上, 我们知道对于部分有连接数限制的业务, 我们可以为http.Client自定义一个Transport, 并设置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives从而达到即限制连接数量,又能保证一定的并发。

  • (*Transport).decConnsPerHost方法

func (t *Transport) decConnsPerHost(key connectMethodKey) { // ...此处省略代码... t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() n := t.connsPerHost[key] // ...此处省略代码... // Can we hand this count to a goroutine still waiting to dial? // (Some goroutines on the wait list may have timed out or // gotten a connection another way. If they're all gone, // we don't want to kick off any spurious dial operations.) if q := t.connsPerHostWait[key]; q.len() > 0 { done := false for q.len() > 0 { w := q.popFront() if w.waiting() { go t.dialConnFor(w) done = true break } } if q.len() == 0 { delete(t.connsPerHostWait, key) } else { // q is a value (like a slice), so we have to store // the updated q back into the map. t.connsPerHostWait[key] = q } if done { return } } // Otherwise, decrement the recorded count. if n--; n == 0 { delete(t.connsPerHost, key) } else { t.connsPerHost[key] = n } }

由上可知, decConnsPerHost方法主要干了两件事:

  1. 判断是否有请求在等待拨号, 如果有则执行go t.dialConnFor(w)

  2. 如果没有请求在等待拨号, 则减少当前host的连接数量。

(*Transport).dialConn

根据http.Client的默认配置和实际的debug结果,(*Transport).dialConn方法主要逻辑如下:

  1. 调用t.dial(ctx, "tcp", cm.addr())创建TCP连接。

  2. 如果是https的请求, 则对请求建立安全的tls传输通道。

  3. 为persistConn创建读写buffer, 如果用户没有自定义读写buffer的大小, 根据writeBufferSize和readBufferSize方法可知, 读写bufffer的大小默认为4096。

  4. 执行go pconn.readLoop()go pconn.writeLoop()开启读写循环然后返回连接。

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } // ...此处省略代码... if cm.scheme() == "https" && t.hasCustomTLSDialer() { // ...此处省略代码... } else { conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // Proxy setup. switch { // ...此处省略代码... } if cm.proxyURL != nil && cm.targetScheme == "https" { // ...此处省略代码... } if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { // ...此处省略代码... } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) go pconn.readLoop() go pconn.writeLoop() return pconn, nil } func (t *Transport) writeBufferSize() int { if t.WriteBufferSize > 0 { return t.WriteBufferSize } return 4 << 10 } func (t *Transport) readBufferSize() int { if t.ReadBufferSize > 0 { return t.ReadBufferSize } return 4 << 10 }(*persistConn).roundTrip

(*persistConn).roundTrip方法是http1.1请求的核心之一,该方法在这里获取真实的Response并返回给上层。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { // ...此处省略代码... gone := make(chan struct{}) defer close(gone) // ...此处省略代码... const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case err := <-writeErrCh: // ...此处省略代码... if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } // ...此处省略代码... case <-pc.closech: // ...此处省略代码... return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // ...此处省略代码... return nil, errTimeout case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

由上可知, (*persistConn).roundTrip方法可以分为三步:

  1. 向连接的writech写入writeRequest: pc.writech <- writeRequest{req, writeErrCh, continueCh}, 参考(*Transport).dialConn可知pc.writech是一个缓冲大小为1的管道,所以会立马写入成功。

  2. 向连接的reqch写入requestAndChan: pc.reqch <- requestAndChan, pc.reqch和pc.writech一样都是缓冲大小为1的管道。其中requestAndChan.ch是一个无缓冲的responseAndError管道,(*persistConn).roundTrip就通过这个管道读取到真实的响应。

  3. 开启for循环select, 等待响应或者超时等信息。

  • (*persistConn).writeLoop 写循环

(*persistConn).writeLoop方法主体逻辑相对简单,把用户的请求写入连接的写缓存buffer, 最后再flush就可以了。

func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } case <-pc.closech: return } } }

  • (*persistConn).readLoop 读循环

(*persistConn).readLoop有较多的细节, 我们先看代码, 然后再逐步分析。

func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { // ...此处省略代码... } // ...此处省略代码... return true } // ...此处省略代码... alive := true for alive { // ...此处省略代码... rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } // ...此处省略代码... bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { // ...此处省略代码... continue } waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body // ...此处省略代码... select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }

由上可知, 只要连接处于活跃状态, 则这个读循环会一直开启, 直到 连接不活跃或者产生其他错误才会结束读循环。

Go语言中HTTP1.1请求流程是如何一步步展开的?

在上述源码中,pc.readResponse(rc,trace)会从连接的读buffer中获取一个请求对应的Response。

读到响应之后判断请求是否是HEAD请求或者响应内容为空,如果是HEAD请求或者响应内容为空则将响应写入rc.ch,并将连接放入idleConn(此处因为篇幅的原因省略了源码内容, 正常请求的逻辑也有写响应和将连接放入idleConn两个步骤)。

如果不是HEAD请求并且响应内容不为空即!hasBody || bodyWritable为false:

  1. 创建一个缓冲大小为2的等待响应被读取的管道waitForBodyRead: waitForBodyRead := make(chan bool, 2)

  2. 将响应的Body修改为bodyEOFSignal结构体。通过上面的源码我们可以知道,此时的resp.Body中有earlyCloseFnfn两个函数。earlyCloseFn函数会向waitForBodyRead管道写入false, fn函数会判断响应是否读完, 如果已经读完则向waitForBodyRead写入true否则写入false

  3. 将修改后的响应写入rc.ch。其中rc.chrc := <-pc.reqch获取,而pc.reqch正是前面(*persistConn).roundTrip函数写入的requestAndChanrequestAndChan.ch是一个无缓冲的responseAndError管道,(*persistConn).roundTrip通过这个管道读取到真实的响应。

  4. select 读取 waitForBodyRead被写入的值。如果读到到的是true则可以调用tryPutIdleConn(此方法会调用前面提到的(*Transport).tryPutIdleConn方法)将连接放入idleConn从而复用连接。

waitForBodyRead写入true的原因我们已经知道了,但是被写入true的时机我们尚不明确。

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { // ...此处省略代码... n, err = es.body.Read(p) if err != nil { es.mu.Lock() defer es.mu.Unlock() if es.rerr == nil { es.rerr = err } err = es.condfn(err) } return } func (es *bodyEOFSignal) Close() error { es.mu.Lock() defer es.mu.Unlock() if es.closed { return nil } es.closed = true if es.earlyCloseFn != nil && es.rerr != io.EOF { return es.earlyCloseFn() } err := es.body.Close() return es.condfn(err) } // caller must hold es.mu. func (es *bodyEOFSignal) condfn(err error) error { if es.fn == nil { return err } err = es.fn(err) es.fn = nil return err }

由上述源码可知, 只有当调用方完整的读取了响应,该连接才能够被复用。因此在http1.1中,一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。

根据上面的逻辑, 我们GoPher在平时的开发中如果遇到了不关心响应的请求, 也一定要记得把响应body读完以保证连接的复用性。笔者在这里给出一个demo:

io.CopyN(ioutil.Discard, resp.Body, 2 << 10) resp.Body.Close()

以上,就是笔者整理的HTTP1.1的请求流程。

注意

笔者本着严谨的态度, 特此提醒:

上述流程中笔者对很多细节并未详细提及或者仅一笔带过,希望读者酌情参考。

总结
  1. 在go中发起http1.1的请求时, 如果遇到不关心响应的请求,请务必完整读取响应内容以保证连接的复用性。

  2. 如果遇到对连接数有限制的业务,可以通过自定义http.Client的Transport, 并设置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives的值,来控制连接数。

  3. 如果对于重定向业务逻辑有需求,可以自定义http.Client的CheckRedirect

  4. 在http1.1,中一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。

本文共计5790个文字,预计阅读时间需要24分钟。

Go语言中HTTP1.1请求流程是如何一步步展开的?

HTTP1.1流程+今日内容较多,废话不多说,直接上干货。接下来,笔者将根据流程图,对除NewRequest以外的函数进行逐步展开和分析。以(Client).do(Client).do方法的核心代码是一个没有结束的循环。

HTTP1.1流程

今天内容较多, 废话不多说, 直接上干货。

接下来, 笔者将根据流程图,对除了NewRequest以外的函数进行逐步的展开和分析

(*Client).do

(*Client).do方法的核心代码是一个没有结束条件的for循环。

for { // For all but the first request, create the next // request hop and replace req. if len(reqs) > 0 { loc := resp.Header.Get("Location") // ...此处省略代码... err = c.checkRedirect(req, reqs) // ...此处省略很多代码... } reqs = append(reqs, req) var err error var didTimeout func() bool if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true // ...此处省略代码... return nil, uerr(err) } var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil } req.closeBody() }

上面的代码中, 请求第一次进入会调用c.send, 得到响应后会判断请求是否需要重定向, 如果需要重定向则继续循环, 否则返回响应。

进入重定向流程后, 这里笔者简单介绍一下checkRedirect函数:

func defaultCheckRedirect(req *Request, via []*Request) error { if len(via) >= 10 { return errors.New("stopped after 10 redirects") } return nil } // ... func (c *Client) checkRedirect(req *Request, via []*Request) error { fn := c.CheckRedirect if fn == nil { fn = defaultCheckRedirect } return fn(req, via) }

由上可知, 用户可以自己定义重定向的检查规则。如果用户没有自定义检查规则, 则重定向次数不能超过10次。

(*Client).send

(*Client).send方法逻辑较为简单, 主要看用户有没有为insouciant.org/tech/connection-management-in-chromium/.) key := pconn.cacheKey if q, ok := t.idleConnWait[key]; ok { done := false if pconn.alt == nil { // HTTP/1. // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. for q.len() > 0 { w := q.popFront() if w.tryDeliver(pconn, nil) { done = true break } } } else { // HTTP/2. // Can hand the same pconn to everyone in the waiting list, // and we still won't be done: we want to put it in the idle // list unconditionally, for any future clients too. for q.len() > 0 { w := q.popFront() w.tryDeliver(pconn, nil) } } if q.len() == 0 { delete(t.idleConnWait, key) } else { t.idleConnWait[key] = q } if done { return nil } } if t.closeIdle { return errCloseIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key] if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost } // ...此处省略代码... t.idleConn[key] = append(idles, pconn) t.idleLRU.add(pconn) // ...此处省略代码... // Set idle timer, but only for HTTP/1 (pconn.alt == nil). // The HTTP/2 implementation manages the idle timer itself // (see idleConnTimeout in h2_bundle.go). if t.IdleConnTimeout > 0 && pconn.alt == nil { if pconn.idleTimer != nil { pconn.idleTimer.Reset(t.IdleConnTimeout) } else { pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } } pconn.idleAt = time.Now() return nil } func (t *Transport) maxIdleConnsPerHost() int { if v := t.MaxIdleConnsPerHost; v != 0 { return v } return DefaultMaxIdleConnsPerHost // 2 }

由上可知,将连接放入t.idleConn前,先检查t.idleConnWait的数量。如果有请求在等待空闲连接, 则将连接复用,没有空闲连接时,才将连接放入t.idleConn。连接放入t.idleConn后,还会重置连接的可空闲时间。

另外在t.putOrCloseIdleConn函数中还需要注意两点:

  1. 如果用户自定义了http.client,且将DisableKeepAlives设置为true,或者将MaxIdleConnsPerHost设置为负数,则连接不会放入t.idleConn即连接不能复用。

  2. 在判断已有空闲连接数量时, 如果MaxIdleConnsPerHost 不等于0, 则返回用户设置的数量,否则返回默认值2,详见上面的(*Transport).maxIdleConnsPerHost 函数。

综上, 我们知道对于部分有连接数限制的业务, 我们可以为http.Client自定义一个Transport, 并设置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives从而达到即限制连接数量,又能保证一定的并发。

  • (*Transport).decConnsPerHost方法

func (t *Transport) decConnsPerHost(key connectMethodKey) { // ...此处省略代码... t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() n := t.connsPerHost[key] // ...此处省略代码... // Can we hand this count to a goroutine still waiting to dial? // (Some goroutines on the wait list may have timed out or // gotten a connection another way. If they're all gone, // we don't want to kick off any spurious dial operations.) if q := t.connsPerHostWait[key]; q.len() > 0 { done := false for q.len() > 0 { w := q.popFront() if w.waiting() { go t.dialConnFor(w) done = true break } } if q.len() == 0 { delete(t.connsPerHostWait, key) } else { // q is a value (like a slice), so we have to store // the updated q back into the map. t.connsPerHostWait[key] = q } if done { return } } // Otherwise, decrement the recorded count. if n--; n == 0 { delete(t.connsPerHost, key) } else { t.connsPerHost[key] = n } }

由上可知, decConnsPerHost方法主要干了两件事:

  1. 判断是否有请求在等待拨号, 如果有则执行go t.dialConnFor(w)

  2. 如果没有请求在等待拨号, 则减少当前host的连接数量。

(*Transport).dialConn

根据http.Client的默认配置和实际的debug结果,(*Transport).dialConn方法主要逻辑如下:

  1. 调用t.dial(ctx, "tcp", cm.addr())创建TCP连接。

  2. 如果是https的请求, 则对请求建立安全的tls传输通道。

  3. 为persistConn创建读写buffer, 如果用户没有自定义读写buffer的大小, 根据writeBufferSize和readBufferSize方法可知, 读写bufffer的大小默认为4096。

  4. 执行go pconn.readLoop()go pconn.writeLoop()开启读写循环然后返回连接。

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } // ...此处省略代码... if cm.scheme() == "https" && t.hasCustomTLSDialer() { // ...此处省略代码... } else { conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // Proxy setup. switch { // ...此处省略代码... } if cm.proxyURL != nil && cm.targetScheme == "https" { // ...此处省略代码... } if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { // ...此处省略代码... } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) go pconn.readLoop() go pconn.writeLoop() return pconn, nil } func (t *Transport) writeBufferSize() int { if t.WriteBufferSize > 0 { return t.WriteBufferSize } return 4 << 10 } func (t *Transport) readBufferSize() int { if t.ReadBufferSize > 0 { return t.ReadBufferSize } return 4 << 10 }(*persistConn).roundTrip

(*persistConn).roundTrip方法是http1.1请求的核心之一,该方法在这里获取真实的Response并返回给上层。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { // ...此处省略代码... gone := make(chan struct{}) defer close(gone) // ...此处省略代码... const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case err := <-writeErrCh: // ...此处省略代码... if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } // ...此处省略代码... case <-pc.closech: // ...此处省略代码... return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // ...此处省略代码... return nil, errTimeout case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

由上可知, (*persistConn).roundTrip方法可以分为三步:

  1. 向连接的writech写入writeRequest: pc.writech <- writeRequest{req, writeErrCh, continueCh}, 参考(*Transport).dialConn可知pc.writech是一个缓冲大小为1的管道,所以会立马写入成功。

  2. 向连接的reqch写入requestAndChan: pc.reqch <- requestAndChan, pc.reqch和pc.writech一样都是缓冲大小为1的管道。其中requestAndChan.ch是一个无缓冲的responseAndError管道,(*persistConn).roundTrip就通过这个管道读取到真实的响应。

  3. 开启for循环select, 等待响应或者超时等信息。

  • (*persistConn).writeLoop 写循环

(*persistConn).writeLoop方法主体逻辑相对简单,把用户的请求写入连接的写缓存buffer, 最后再flush就可以了。

func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { pc.close(err) return } case <-pc.closech: return } } }

  • (*persistConn).readLoop 读循环

(*persistConn).readLoop有较多的细节, 我们先看代码, 然后再逐步分析。

func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { // ...此处省略代码... } // ...此处省略代码... return true } // ...此处省略代码... alive := true for alive { // ...此处省略代码... rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } // ...此处省略代码... bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { // ...此处省略代码... continue } waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body // ...此处省略代码... select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }

由上可知, 只要连接处于活跃状态, 则这个读循环会一直开启, 直到 连接不活跃或者产生其他错误才会结束读循环。

Go语言中HTTP1.1请求流程是如何一步步展开的?

在上述源码中,pc.readResponse(rc,trace)会从连接的读buffer中获取一个请求对应的Response。

读到响应之后判断请求是否是HEAD请求或者响应内容为空,如果是HEAD请求或者响应内容为空则将响应写入rc.ch,并将连接放入idleConn(此处因为篇幅的原因省略了源码内容, 正常请求的逻辑也有写响应和将连接放入idleConn两个步骤)。

如果不是HEAD请求并且响应内容不为空即!hasBody || bodyWritable为false:

  1. 创建一个缓冲大小为2的等待响应被读取的管道waitForBodyRead: waitForBodyRead := make(chan bool, 2)

  2. 将响应的Body修改为bodyEOFSignal结构体。通过上面的源码我们可以知道,此时的resp.Body中有earlyCloseFnfn两个函数。earlyCloseFn函数会向waitForBodyRead管道写入false, fn函数会判断响应是否读完, 如果已经读完则向waitForBodyRead写入true否则写入false

  3. 将修改后的响应写入rc.ch。其中rc.chrc := <-pc.reqch获取,而pc.reqch正是前面(*persistConn).roundTrip函数写入的requestAndChanrequestAndChan.ch是一个无缓冲的responseAndError管道,(*persistConn).roundTrip通过这个管道读取到真实的响应。

  4. select 读取 waitForBodyRead被写入的值。如果读到到的是true则可以调用tryPutIdleConn(此方法会调用前面提到的(*Transport).tryPutIdleConn方法)将连接放入idleConn从而复用连接。

waitForBodyRead写入true的原因我们已经知道了,但是被写入true的时机我们尚不明确。

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { // ...此处省略代码... n, err = es.body.Read(p) if err != nil { es.mu.Lock() defer es.mu.Unlock() if es.rerr == nil { es.rerr = err } err = es.condfn(err) } return } func (es *bodyEOFSignal) Close() error { es.mu.Lock() defer es.mu.Unlock() if es.closed { return nil } es.closed = true if es.earlyCloseFn != nil && es.rerr != io.EOF { return es.earlyCloseFn() } err := es.body.Close() return es.condfn(err) } // caller must hold es.mu. func (es *bodyEOFSignal) condfn(err error) error { if es.fn == nil { return err } err = es.fn(err) es.fn = nil return err }

由上述源码可知, 只有当调用方完整的读取了响应,该连接才能够被复用。因此在http1.1中,一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。

根据上面的逻辑, 我们GoPher在平时的开发中如果遇到了不关心响应的请求, 也一定要记得把响应body读完以保证连接的复用性。笔者在这里给出一个demo:

io.CopyN(ioutil.Discard, resp.Body, 2 << 10) resp.Body.Close()

以上,就是笔者整理的HTTP1.1的请求流程。

注意

笔者本着严谨的态度, 特此提醒:

上述流程中笔者对很多细节并未详细提及或者仅一笔带过,希望读者酌情参考。

总结
  1. 在go中发起http1.1的请求时, 如果遇到不关心响应的请求,请务必完整读取响应内容以保证连接的复用性。

  2. 如果遇到对连接数有限制的业务,可以通过自定义http.Client的Transport, 并设置Transport的MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives的值,来控制连接数。

  3. 如果对于重定向业务逻辑有需求,可以自定义http.Client的CheckRedirect

  4. 在http1.1,中一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。