22

Go中的HTTP请求之——HTTP1.1请求流程分析

 3 years ago
source link: https://studygolang.com/articles/30868
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

来自公众号:新世界杂货铺

前言

http是目前应用最为广泛, 也是程序员接触最多的协议之一。今天笔者站在GoPher的角度对http1.1的请求流程进行全面的分析。希望读者读完此文后, 能够有以下几个收获:

  1. 对http1.1的请求流程有一个大概的了解
  2. 在平时的开发中能够更好地重用底层TCP连接
  3. 对http1.1的线头阻塞能有一个更清楚的认识

HTTP1.1流程

今天内容较多, 废话不多说, 直接上干货。

7fyUZzM.png!mobile

接下来, 笔者将根据流程图,对除了NewRequest以外的函数进行逐步的展开和分析

(*Client).do

(*Client).do方法的核心代码是一个没有结束条件的for循环。

for {
    // For all but the first request, create the next
    // request hop and replace req.
    if len(reqs) > 0 {
        loc := resp.Header.Get("Location")
        // ...此处省略代码...
        err = c.checkRedirect(req, reqs)
        // ...此处省略很多代码...
    }

    reqs = append(reqs, req)
    var err error
    var didTimeout func() bool
    if resp, didTimeout, err = c.send(req, deadline); err != nil {
        // c.send() always closes req.Body
        reqBodyClosed = true
        // ...此处省略代码...
        return nil, uerr(err)
    }

    var shouldRedirect bool
    redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
    if !shouldRedirect {
        return resp, nil
    }

    req.closeBody()
}

上面的代码中, 请求第一次进入会调用 c.send , 得到响应后会判断请求是否需要重定向, 如果需要重定向则继续循环, 否则返回响应。

进入重定向流程后, 这里笔者简单介绍一下 checkRedirect 函数:

func defaultCheckRedirect(req *Request, via []*Request) error {
    if len(via) >= 10 {
        return errors.New("stopped after 10 redirects")
    }
    return nil
}
// ...
func (c *Client) checkRedirect(req *Request, via []*Request) error {
    fn := c.CheckRedirect
    if fn == nil {
        fn = defaultCheckRedirect
    }
    return fn(req, via)
}

由上可知, 用户可以自己定义重定向的检查规则。如果用户没有自定义检查规则, 则 重定向次数不能超过10次

(*Client).send

(*Client).send方法逻辑较为简单, 主要看用户有没有为 http.Client的Jar 字段实现 CookieJar 接口。主要流程如下:

send
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    if c.Jar != nil {
        for _, cookie := range c.Jar.Cookies(req.URL) {
            req.AddCookie(cookie)
        }
    }
    resp, didTimeout, err = send(req, c.transport(), deadline)
    if err != nil {
        return nil, didTimeout, err
    }
    if c.Jar != nil {
        if rc := resp.Cookies(); len(rc) > 0 {
            c.Jar.SetCookies(req.URL, rc)
        }
    }
    return resp, nil, nil
}

另外, 我们还需要关注 c.transport() 的调用。如果用户未对http.Client指定Transport则会使用go默认的DefaultTransport。

该Transport实现RoundTripper接口。在go中RoundTripper的定义为“ 执行单个HTTP事务的能力,获取给定请求的响应 ”。

func (c *Client) transport() RoundTripper {
    if c.Transport != nil {
        return c.Transport
    }
    return DefaultTransport
}

send

send函数会检查request的URL,以及参数的rt, 和header值。如果URL和rt为nil则直接返回错误。同时, 如果请求中设置了用户信息, 还会检查并设置basic的验证头信息,最后调用 rt.RoundTrip 得到请求的响应。

func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    req := ireq // req is either the original request, or a modified fork
    // ...此处省略代码...
    if u := req.URL.User; u != nil && req.Header.Get("Authorization") == "" {
        username := u.Username()
        password, _ := u.Password()
        forkReq()
        req.Header = cloneOrMakeHeader(ireq.Header)
        req.Header.Set("Authorization", "Basic "+basicAuth(username, password))
    }

    if !deadline.IsZero() {
        forkReq()
    }
    stopTimer, didTimeout := setRequestCancel(req, rt, deadline)

    resp, err = rt.RoundTrip(req)
    if err != nil {
        // ...此处省略代码...
        return nil, didTimeout, err
    }
    // ...此处省略代码...
    return resp, nil, nil
}

(*Transport).RoundTrip

(*Transport).RoundTrip的逻辑很简单,它会调用(*Transport).roundTrip方法,因此本节实际上是对(*Transport).roundTrip方法的分析。

func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    return t.roundTrip(req)
}
func (t *Transport) roundTrip(req *Request) (*Response, error) {
    // ...此处省略校验header头和headervalue的代码以及其他代码...

    for {
        select {
        case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // treq gets modified by roundTrip, so we need to recreate for each retry.
        treq := &transportRequest{Request: req, trace: trace}
        cm, err := t.connectMethodForRequest(treq)
        // ...此处省略代码...
        pconn, err := t.getConn(treq, cm)
        if err != nil {
            t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {
            resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
            return resp, nil
        }

        // ...此处省略判断是否重试请求的代码逻辑...
    }
}

由上可知, 每次for循环, 会判断请求上下文是否已经取消, 如果没有取消则继续进行后续的流程。

t.getConn
pconn.roundTrip

(*Transport).getConn

笔者认为这一步在http请求中是非常核心的一个步骤,因为只有和server端建立连接后才能进行后续的通信。

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()
    // ...此处省略代码...
    w := &wantConn{
        cm:         cm,
        key:        cm.key(),
        ctx:        ctx,
        ready:      make(chan struct{}, 1),
        beforeDial: testHookPrePendingDial,
        afterDial:  testHookPostPendingDial,
    }
    // ...此处省略代码...
    // Queue for idle connection.
    if delivered := t.queueForIdleConn(w); delivered {
        pc := w.pc
        // ...此处省略代码...
        return pc, nil
    }

    cancelc := make(chan error, 1)
    t.setReqCanceler(req, func(err error) { cancelc <- err })

    // Queue for permission to dial.
    t.queueForDial(w)

    // Wait for completion or cancellation.
    select {
    case <-w.ready:
        // Trace success but only for HTTP/1.
        // HTTP/2 calls trace.GotConn itself.
        if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
            trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
        }
        // ...此处省略代码...
        return w.pc, w.err
    case <-req.Cancel:
        return nil, errRequestCanceledConn
    case <-req.Context().Done():
        return nil, req.Context().Err()
    case err := <-cancelc:
        if err == errRequestCanceled {
            err = errRequestCanceledConn
        }
        return nil, err
    }
}

由上能够清楚的知道, 获取连接分为以下几个步骤:

t.queueForIdleConn
t.queueForDial

(*Transport).queueForIdleConn

(*Transport).queueForIdleConn方法会根据请求的 connectMethodKey 从t.idleConn获取一个[]*persistConn切片, 并从切片中,根据算法获取一个有效的空闲连接。如果未获取到空闲连接,则将 wantConn 结构体变量放入 t.idleConnWait[w.key] 等待队列,此处wantConn结构体变量就是前面提到的 w

connectMethodKey定义和queueForIdleConn部分关键代码如下:

type connectMethodKey struct {
    proxy, scheme, addr string
    onlyH1              bool
}

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
    // ...此处省略代码...
    // Look for most recently-used idle connection.
    if list, ok := t.idleConn[w.key]; ok {
        stop := false
        delivered := false
        for len(list) > 0 && !stop {
            pconn := list[len(list)-1]

            // See whether this connection has been idle too long, considering
            // only the wall time (the Round(0)), in case this is a laptop or VM
            // coming out of suspend with previously cached idle connections.
            tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
            // ...此处省略代码...
            delivered = w.tryDeliver(pconn, nil)
            if delivered {
                // ...此处省略代码...
            }
            stop = true
        }
        if len(list) > 0 {
            t.idleConn[w.key] = list
        } else {
            delete(t.idleConn, w.key)
        }
        if stop {
            return delivered
        }
    }

    // Register to receive next connection that becomes idle.
    if t.idleConnWait == nil {
        t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.idleConnWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.idleConnWait[w.key] = q
    return false
}

其中 w.tryDeliver 方法主要作用是将连接协程安全的赋值给 w.pc ,并关闭 w.ready 管道。此时我们便可以和(*Transport).getConn中调用queueForIdleConn成功后的返回值对应上。

(*Transport).queueForDial

(*Transport).queueForDial方法包含三个步骤:

  1. 如果t.MaxConnsPerHost小于等于0,执行 go t.dialConnFor(w) 并返回。其中MaxConnsPerHost代表着每个host的最大连接数,小于等于0表示不限制。
  2. 如果当前host的连接数不超过t.MaxConnsPerHost,对当前host的连接数+1,然后执行 go t.dialConnFor(w) 并返回。
  3. 如果当前host的连接数等于t.MaxConnsPerHost,则将 wantConn 结构体变量放入 t.connsPerHostWait[w.key] 等待队列,此处wantConn结构体变量就是前面提到的 w 。另外在放入等待队列前会先清除队列中已经失效或者不再等待的变量。
func (t *Transport) queueForDial(w *wantConn) {
    w.beforeDial()
    if t.MaxConnsPerHost <= 0 {
        go t.dialConnFor(w)
        return
    }

    t.connsPerHostMu.Lock()
    defer t.connsPerHostMu.Unlock()

    if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
        if t.connsPerHost == nil {
            t.connsPerHost = make(map[connectMethodKey]int)
        }
        t.connsPerHost[w.key] = n + 1
        go t.dialConnFor(w)
        return
    }

    if t.connsPerHostWait == nil {
        t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.connsPerHostWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.connsPerHostWait[w.key] = q
}

(*Transport).dialConnFor

(*Transport).dialConnFor方法调用 t.dialConn 获取一个真正的 *persistConn 。并将这个连接传递给w, 如果w已经获取到了连接,则会传递失败,此时调用 t.putOrCloseIdleConn 将连接放回空闲连接池。

如果连接获取错误则会调用 t.decConnsPerHost 减少当前host的连接数。

func (t *Transport) dialConnFor(w *wantConn) {
    defer w.afterDial()

    pc, err := t.dialConn(w.ctx, w.cm)
    delivered := w.tryDeliver(pc, err)
    if err == nil && (!delivered || pc.alt != nil) {
        // pconn was not passed to w,
        // or it is HTTP/2 and can be shared.
        // Add to the idle connection pool.
        t.putOrCloseIdleConn(pc)
    }
    if err != nil {
        t.decConnsPerHost(w.key)
    }
}
  • (*Transport).putOrCloseIdleConn方法
func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
    if err := t.tryPutIdleConn(pconn); err != nil {
        pconn.close(err)
    }
}
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
    if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
        return errKeepAlivesDisabled
    }
    // ...此处省略代码...
    t.idleMu.Lock()
    defer t.idleMu.Unlock()
    // ...此处省略代码...
    
    // Deliver pconn to goroutine waiting for idle connection, if any.
    // (They may be actively dialing, but this conn is ready first.
    // Chrome calls this socket late binding.
    // See https://insouciant.org/tech/connection-management-in-chromium/.)
    key := pconn.cacheKey
    if q, ok := t.idleConnWait[key]; ok {
        done := false
        if pconn.alt == nil {
            // HTTP/1.
            // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
            for q.len() > 0 {
                w := q.popFront()
                if w.tryDeliver(pconn, nil) {
                    done = true
                    break
                }
            }
        } else {
            // HTTP/2.
            // Can hand the same pconn to everyone in the waiting list,
            // and we still won't be done: we want to put it in the idle
            // list unconditionally, for any future clients too.
            for q.len() > 0 {
                w := q.popFront()
                w.tryDeliver(pconn, nil)
            }
        }
        if q.len() == 0 {
            delete(t.idleConnWait, key)
        } else {
            t.idleConnWait[key] = q
        }
        if done {
            return nil
        }
    }

    if t.closeIdle {
        return errCloseIdle
    }
    if t.idleConn == nil {
        t.idleConn = make(map[connectMethodKey][]*persistConn)
    }
    idles := t.idleConn[key]
    if len(idles) >= t.maxIdleConnsPerHost() {
        return errTooManyIdleHost
    }
    // ...此处省略代码...
    t.idleConn[key] = append(idles, pconn)
    t.idleLRU.add(pconn)
    // ...此处省略代码...
    // Set idle timer, but only for HTTP/1 (pconn.alt == nil).
    // The HTTP/2 implementation manages the idle timer itself
    // (see idleConnTimeout in h2_bundle.go).
    if t.IdleConnTimeout > 0 && pconn.alt == nil {
        if pconn.idleTimer != nil {
            pconn.idleTimer.Reset(t.IdleConnTimeout)
        } else {
            pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
        }
    }
    pconn.idleAt = time.Now()
    return nil
}
func (t *Transport) maxIdleConnsPerHost() int {
    if v := t.MaxIdleConnsPerHost; v != 0 {
        return v
    }
    return DefaultMaxIdleConnsPerHost // 2
}

由上可知,将连接放入t.idleConn前,先检查t.idleConnWait的数量。如果有请求在等待空闲连接, 则将连接复用,没有空闲连接时,才将连接放入t.idleConn。连接放入t.idleConn后,还会重置连接的可空闲时间。

另外在t.putOrCloseIdleConn函数中还需要注意两点:

(*Transport).maxIdleConnsPerHost

综上, 我们知道对于部分有连接数限制的业务, 我们可以为http.Client自定义一个Transport, 并设置Transport的 MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives 从而达到即限制连接数量,又能保证一定的并发。

  • (*Transport).decConnsPerHost方法
func (t *Transport) decConnsPerHost(key connectMethodKey) {
    // ...此处省略代码...
    t.connsPerHostMu.Lock()
    defer t.connsPerHostMu.Unlock()
    n := t.connsPerHost[key]
    // ...此处省略代码...

    // Can we hand this count to a goroutine still waiting to dial?
    // (Some goroutines on the wait list may have timed out or
    // gotten a connection another way. If they're all gone,
    // we don't want to kick off any spurious dial operations.)
    if q := t.connsPerHostWait[key]; q.len() > 0 {
        done := false
        for q.len() > 0 {
            w := q.popFront()
            if w.waiting() {
                go t.dialConnFor(w)
                done = true
                break
            }
        }
        if q.len() == 0 {
            delete(t.connsPerHostWait, key)
        } else {
            // q is a value (like a slice), so we have to store
            // the updated q back into the map.
            t.connsPerHostWait[key] = q
        }
        if done {
            return
        }
    }

    // Otherwise, decrement the recorded count.
    if n--; n == 0 {
        delete(t.connsPerHost, key)
    } else {
        t.connsPerHost[key] = n
    }
}

由上可知, decConnsPerHost方法主要干了两件事:

go t.dialConnFor(w)

(*Transport).dialConn

根据http.Client的默认配置和实际的debug结果,(*Transport).dialConn方法主要逻辑如下:

  1. 调用 t.dial(ctx, "tcp", cm.addr()) 创建TCP连接。
  2. 如果是https的请求, 则对请求建立安全的tls传输通道。
  3. 为persistConn创建读写buffer, 如果用户没有自定义读写buffer的大小, 根据writeBufferSize和readBufferSize方法可知, 读写bufffer的大小默认为4096。
  4. 执行 go pconn.readLoop()go pconn.writeLoop() 开启读写循环然后返回连接。
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    pconn = &persistConn{
        t:             t,
        cacheKey:      cm.key(),
        reqch:         make(chan requestAndChan, 1),
        writech:       make(chan writeRequest, 1),
        closech:       make(chan struct{}),
        writeErrCh:    make(chan error, 1),
        writeLoopDone: make(chan struct{}),
    }
    // ...此处省略代码...
    if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        // ...此处省略代码...
    } else {
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                return nil, wrapErr(err)
            }
        }
    }

    // Proxy setup.
    switch { // ...此处省略代码... }

    if cm.proxyURL != nil && cm.targetScheme == "https" {
        // ...此处省略代码...
    }

    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
        // ...此处省略代码...
    }

    pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
    pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

    go pconn.readLoop()
    go pconn.writeLoop()
    return pconn, nil
}
func (t *Transport) writeBufferSize() int {
    if t.WriteBufferSize > 0 {
        return t.WriteBufferSize
    }
    return 4 << 10
}

func (t *Transport) readBufferSize() int {
    if t.ReadBufferSize > 0 {
        return t.ReadBufferSize
    }
    return 4 << 10
}

(*persistConn).roundTrip

(*persistConn).roundTrip方法是http1.1请求的核心之一,该方法在这里获取真实的Response并返回给上层。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    // ...此处省略代码...

    gone := make(chan struct{})
    defer close(gone)
    // ...此处省略代码...
    const debugRoundTrip = false

    // Write the request concurrently with waiting for a response,
    // in case the server decides to reply before reading our full
    // request body.
    startBytesWritten := pc.nwrite
    writeErrCh := make(chan error, 1)
    pc.writech <- writeRequest{req, writeErrCh, continueCh}

    resc := make(chan responseAndError)
    pc.reqch <- requestAndChan{
        req:        req.Request,
        ch:         resc,
        addedGzip:  requestedGzip,
        continueCh: continueCh,
        callerGone: gone,
    }

    var respHeaderTimer <-chan time.Time
    cancelChan := req.Request.Cancel
    ctxDoneChan := req.Context().Done()
    for {
        testHookWaitResLoop()
        select {
        case err := <-writeErrCh:
            // ...此处省略代码...
            if err != nil {
                pc.close(fmt.Errorf("write error: %v", err))
                return nil, pc.mapRoundTripError(req, startBytesWritten, err)
            }
            // ...此处省略代码...
        case <-pc.closech:
            // ...此处省略代码...
            return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
        case <-respHeaderTimer:
            // ...此处省略代码...
            return nil, errTimeout
        case re := <-resc:
            if (re.res == nil) == (re.err == nil) {
                panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
            }
            if debugRoundTrip {
                req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
            }
            if re.err != nil {
                return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
            }
            return re.res, nil
        case <-cancelChan:
            pc.t.CancelRequest(req.Request)
            cancelChan = nil
        case <-ctxDoneChan:
            pc.t.cancelRequest(req.Request, req.Context().Err())
            cancelChan = nil
            ctxDoneChan = nil
        }
    }
}

由上可知, (*persistConn).roundTrip方法可以分为三步:

  1. 向连接的writech写入 writeRequest : pc.writech <- writeRequest{req, writeErrCh, continueCh} , 参考(*Transport).dialConn可知pc.writech是一个缓冲大小为1的管道,所以会立马写入成功。
  2. 向连接的reqch写入 requestAndChan : pc.reqch <- requestAndChan , pc.reqch和pc.writech一样都是缓冲大小为1的管道。其中 requestAndChan.ch 是一个无缓冲的 responseAndError 管道,(*persistConn).roundTrip就通过这个管道读取到真实的响应。
  3. 开启for循环select, 等待响应或者超时等信息。
  • (*persistConn).writeLoop 写循环

(*persistConn).writeLoop方法主体逻辑相对简单,把用户的请求写入连接的写缓存buffer, 最后再flush就可以了。

func (pc *persistConn) writeLoop() {
    defer close(pc.writeLoopDone)
    for {
        select {
        case wr := <-pc.writech:
            startBytesWritten := pc.nwrite
            err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
            if bre, ok := err.(requestBodyReadError); ok {
                err = bre.error
                wr.req.setError(err)
            }
            if err == nil {
                err = pc.bw.Flush()
            }
            if err != nil {
                wr.req.Request.closeBody()
                if pc.nwrite == startBytesWritten {
                    err = nothingWrittenError{err}
                }
            }
            pc.writeErrCh <- err // to the body reader, which might recycle us
            wr.ch <- err         // to the roundTrip function
            if err != nil {
                pc.close(err)
                return
            }
        case <-pc.closech:
            return
        }
    }
}
  • (*persistConn).readLoop 读循环

(*persistConn).readLoop有较多的细节, 我们先看代码, 然后再逐步分析。

func (pc *persistConn) readLoop() {
    closeErr := errReadLoopExiting // default value, if not changed below
    defer func() {
        pc.close(closeErr)
        pc.t.removeIdleConn(pc)
    }()

    tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
        if err := pc.t.tryPutIdleConn(pc); err != nil {
            // ...此处省略代码...
        }
        // ...此处省略代码...
        return true
    }
    // ...此处省略代码...
    alive := true
    for alive {
        // ...此处省略代码...
        rc := <-pc.reqch
        trace := httptrace.ContextClientTrace(rc.req.Context())

        var resp *Response
        if err == nil {
            resp, err = pc.readResponse(rc, trace)
        } else {
            err = transportReadFromServerError{err}
            closeErr = err
        }

        // ...此处省略代码...
        bodyWritable := resp.bodyIsWritable()
        hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

        if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
            // Don't do keep-alive on error if either party requested a close
            // or we get an unexpected informational (1xx) response.
            // StatusCode 100 is already handled above.
            alive = false
        }

        if !hasBody || bodyWritable {
            // ...此处省略代码...
            continue
        }

        waitForBodyRead := make(chan bool, 2)
        body := &bodyEOFSignal{
            body: resp.Body,
            earlyCloseFn: func() error {
                waitForBodyRead <- false
                <-eofc // will be closed by deferred call at the end of the function
                return nil

            },
            fn: func(err error) error {
                isEOF := err == io.EOF
                waitForBodyRead <- isEOF
                if isEOF {
                    <-eofc // see comment above eofc declaration
                } else if err != nil {
                    if cerr := pc.canceled(); cerr != nil {
                        return cerr
                    }
                }
                return err
            },
        }

        resp.Body = body
        // ...此处省略代码...

        select {
        case rc.ch <- responseAndError{res: resp}:
        case <-rc.callerGone:
            return
        }

        // Before looping back to the top of this function and peeking on
        // the bufio.Reader, wait for the caller goroutine to finish
        // reading the response body. (or for cancellation or death)
        select {
        case bodyEOF := <-waitForBodyRead:
            pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
            alive = alive &&
                bodyEOF &&
                !pc.sawEOF &&
                pc.wroteRequest() &&
                tryPutIdleConn(trace)
            if bodyEOF {
                eofc <- struct{}{}
            }
        case <-rc.req.Cancel:
            alive = false
            pc.t.CancelRequest(rc.req)
        case <-rc.req.Context().Done():
            alive = false
            pc.t.cancelRequest(rc.req, rc.req.Context().Err())
        case <-pc.closech:
            alive = false
        }

        testHookReadLoopBeforeNextRead()
    }
}

由上可知, 只要连接处于活跃状态, 则这个读循环会一直开启, 直到

连接不活跃或者产生其他错误才会结束读循环。

在上述源码中, pc.readResponse(rc,trace) 会从连接的读buffer中获取一个请求对应的Response。

读到响应之后判断请求是否是HEAD请求或者响应内容为空,如果是HEAD请求或者响应内容为空则将响应写入 rc.ch ,并将连接放入idleConn(此处因为篇幅的原因省略了源码内容, 正常请求的逻辑也有写响应和将连接放入idleConn两个步骤)。

如果不是HEAD请求并且响应内容不为空即 !hasBody || bodyWritable 为false:

  1. 创建一个缓冲大小为2的等待响应被读取的管道 waitForBodyRead : waitForBodyRead := make(chan bool, 2)
  2. 将响应的Body修改为 bodyEOFSignal 结构体。通过上面的源码我们可以知道,此时的resp.Body中有 earlyCloseFnfn 两个函数。earlyCloseFn函数会向waitForBodyRead管道写入 false , fn函数会判断响应是否读完, 如果已经读完则向waitForBodyRead写入 true 否则写入 false
  3. 将修改后的响应写入 rc.ch 。其中 rc.chrc := <-pc.reqch 获取,而pc.reqch正是前面(*persistConn).roundTrip函数写入的 requestAndChanrequestAndChan.ch 是一个无缓冲的 responseAndError 管道,(*persistConn).roundTrip通过这个管道读取到真实的响应。
  4. select 读取 waitForBodyRead被写入的值。如果读到到的是true则可以调用tryPutIdleConn( 此方法会调用前面提到的(*Transport).tryPutIdleConn方法 )将连接放入idleConn从而复用连接。

waitForBodyRead写入true的原因我们已经知道了,但是被写入true的时机我们尚不明确。

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
    // ...此处省略代码...
    n, err = es.body.Read(p)
    if err != nil {
        es.mu.Lock()
        defer es.mu.Unlock()
        if es.rerr == nil {
            es.rerr = err
        }
        err = es.condfn(err)
    }
    return
}

func (es *bodyEOFSignal) Close() error {
    es.mu.Lock()
    defer es.mu.Unlock()
    if es.closed {
        return nil
    }
    es.closed = true
    if es.earlyCloseFn != nil && es.rerr != io.EOF {
        return es.earlyCloseFn()
    }
    err := es.body.Close()
    return es.condfn(err)
}

// caller must hold es.mu.
func (es *bodyEOFSignal) condfn(err error) error {
    if es.fn == nil {
        return err
    }
    err = es.fn(err)
    es.fn = nil
    return err
}

由上述源码可知, 只有当调用方完整的读取了响应,该连接才能够被复用。因此在http1.1中,一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。

根据上面的逻辑, 我们GoPher在平时的开发中如果遇到了不关心响应的请求, 也一定要记得把响应body读完以保证连接的复用性。笔者在这里给出一个demo:

io.CopyN(ioutil.Discard, resp.Body, 2 << 10)
resp.Body.Close()

以上,就是笔者整理的HTTP1.1的请求流程。

注意

上述流程中笔者对很多细节并未详细提及或者仅一笔带过,希望读者酌情参考。

总结

  1. 在go中发起http1.1的请求时, 如果遇到不关心响应的请求,请务必完整读取响应内容以保证连接的复用性。
  2. 如果遇到对连接数有限制的业务,可以通过自定义http.Client的Transport, 并设置Transport的 MaxConnsPerHostMaxIdleConnsPerHostIdleConnTimeoutDisableKeepAlives 的值,来控制连接数。
  3. 如果对于重定向业务逻辑有需求,可以自定义http.Client的 CheckRedirect
  4. 在http1.1,中一个连接上的请求,只有等前一个请求处理完之后才能继续下一个请求。如果前面的请求处理较慢, 则后面的请求必须等待, 这就是http1.1中的线头阻塞。

注: 写本文时, 笔者所用go版本为: go1.14.2

生命不息, 探索不止, 后续将持续更新有关于go的技术探索

原创不易, 卑微求关注收藏二连.

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK