60

OkHttp源码解析

 4 years ago
source link: https://www.tuicool.com/articles/yAVJbuf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Github: okhttp 分析版本: 930d4d0

An HTTP client for Android, Kotlin, and Java.

OkHttp is an HTTP client that’s efficient by default:

  • HTTP/2 support allows all requests to the same host to share a socket.
  • Connection pooling reduces request latency (if HTTP/2 isn’t available).
  • Transparent GZIP shrinks download sizes.
  • Response caching avoids the network completely for repeat requests.

使用

Get a URL

public class GetExample {
  OkHttpClient client = new OkHttpClient();

  String run(String url) throws IOException {
    Request request = new Request.Builder()
        .url(url)
        .build();

    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  public static void main(String[] args) throws IOException {
    GetExample example = new GetExample();
    String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
    System.out.println(response);
  }
}

Post to a Server

public class PostExample {
  public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");

  OkHttpClient client = new OkHttpClient();

  String post(String url, String json) throws IOException {
    RequestBody body = RequestBody.create(json, JSON);
    Request request = new Request.Builder()
        .url(url)
        .post(body)
        .build();
    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  String bowlingJson(String player1, String player2) {
    return "{'winCondition':'HIGH_SCORE',"
        + "'name':'Bowling',"
        + "'round':4,"
        + "'lastSaved':1367702411696,"
        + "'dateStarted':1367702378785,"
        + "'players':["
        + "{'name':'" + player1 + "','history':[10,8,6,7,8],'color':-13388315,'total':39},"
        + "{'name':'" + player2 + "','history':[6,10,5,10,10],'color':-48060,'total':41}"
        + "]}";
  }

  public static void main(String[] args) throws IOException {
    PostExample example = new PostExample();
    String json = example.bowlingJson("Jesse", "Jake");
    String response = example.post("http://www.roundsapp.com/post", json);
    System.out.println(response);
  }
}

更多

okhttp

源码

OkHttpClient

OkHttpClient client = new OkHttpClient();

根据名字我们就能看出,OkHttpClient 为 OkHttp 的客户端,在使用的时候首先要做的就是要创建这样一个客户端

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  constructor() : this(Builder())
}

默认构造方法使用的是默认配置的 Builder:

class Builder constructor() {
  internal var dispatcher: Dispatcher = Dispatcher() // 调度器
  internal var proxy: Proxy? = null // 代理
  internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS // 协议
  internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS // 传输层版本和连接协议
  internal val interceptors: MutableList<Interceptor> = mutableListOf() // 拦截器
  internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() // 网络拦截器
  internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
  internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() // 代理选择器
  internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie
  internal var cache: Cache? = null // cache 缓存
  internal var internalCache: InternalCache? = null // 内部缓存
  internal var socketFactory: SocketFactory = SocketFactory.getDefault() // socket 工厂
  internal var sslSocketFactory: SSLSocketFactory? = null // socket工厂 用于https
  internal var certificateChainCleaner: CertificateChainCleaner? = null // 验证确认响应书,适用HTTPS 请求连接的主机名
  internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier // 主机名字确认
  internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT // 证书链
  internal var proxyAuthenticator: Authenticator = Authenticator.NONE // 代理身份验证
  internal var authenticator: Authenticator = Authenticator.NONE // 身份验证
  internal var connectionPool: ConnectionPool = ConnectionPool() //链接复用池
  internal var dns: Dns = Dns.SYSTEM // DNS
  internal var followSslRedirects: Boolean = true // 重定向
  internal var followRedirects: Boolean = true // 本地重定向
  internal var retryOnConnectionFailure: Boolean = true // 重试连接失败
  internal var callTimeout: Int = 0 // 请求超时
  internal var connectTimeout: Int = 10000 // 连接超时
  internal var readTimeout: Int = 10000 // 读取超时
  internal var writeTimeout: Int = 10000 // 写入超时
  internal var pingInterval: Int = 0 // Web socket and HTTP/2 ping interval
// ...
  fun build(): OkHttpClient = OkHttpClient(this)
}

okhttp 的最佳表现就是创建一个 OkHttpClient 实例,并将其重用到所有的 http 请求调用上之所以所有请求公用一个 OkHttpClient,因为每个 OkHttpClient 都有自己的的连接池和线程池,这样的话可以重用连接和线程可减少延迟并节省内存

Request

Request request = new Request.Builder()
    .url(url)
    .build();

发送一个 HTTP 请求类要构建一个 Request 对象

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl, // 请求地址
  @get:JvmName("method") val method: String, // 请求方法[GET/POST/PUT/PATCH/...]
  @get:JvmName("headers") val headers: Headers, // 请求头
  @get:JvmName("body") val body: RequestBody?, // 请求体
  internal val tags: Map<Class<*>, Any> // 请求标签
) {
  // ...
  open class Builder {
    internal var url: HttpUrl? = null
    internal var method: String
    internal var headers: Headers.Builder
    internal var body: RequestBody? = null
    
    //...
    
    constructor() {
      this.method = "GET"
      this.headers = Headers.Builder()
    }
    
    /**
     * Sets the URL target of this request.
     *
     * @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
     *     exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
     */
    open fun url(url: String): Builder {
      // Silently replace web socket URLs with HTTP URLs.
      val finalUrl: String = when {
        url.startsWith("ws:", ignoreCase = true) -> {
          "http:${url.substring(3)}"
        }
        url.startsWith("wss:", ignoreCase = true) -> {
          "https:${url.substring(4)}"
        }
        else -> url
      }

      return url(finalUrl.toHttpUrl())
    }
    
    // ...
    
    open fun build(): Request {
      return Request(
          checkNotNull(url) { "url == null" },
          method,
          headers.build(),
          body,
          tags.toImmutableMap()
      )
    }
}

Request 也是通过 Builder 形式来创建的

Call

Call call = client.newCall(request);

Call 即调用是一个准备好去执行的请求 Request

interface Call : Cloneable {

  fun request(): Request

  @Throws(IOException::class)
  fun execute(): Response

  fun enqueue(responseCallback: Callback)

  fun cancel()

  fun isExecuted(): Boolean

  fun isCanceled(): Boolean

  fun timeout(): Timeout

  override fun clone(): Call

  interface Factory {
    fun newCall(request: Request): Call
  }
}
request()
execute()
enqueue(responseCallback: Callback)
cancel()
isExecuted()
isCanceled()
timeout()

OkHttpClient 实现了 Call.Factory,使用工厂模式将构建的细节交给具体实现,顶层只需要拿到 Call 对象即可

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  // ...
  
  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, false /* for web socket */)
  }
  
  // ...
}

继续看 RealCall 中的 newRealCall 方法:

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  /**
   * There is a cycle between the [Call] and [Transmitter] that makes this awkward.
   * This is set after immediately after creating the call instance.
   */
  private lateinit var transmitter: Transmitter
  
  // ...
  
  companion object {
    fun newRealCall(
      client: OkHttpClient,
      originalRequest: Request,
      forWebSocket: Boolean
    ): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }
  }
}

RealCall 为具体产品,实现了 Call 接口;其中 Transmitter 是 OkHttp 的应用层和网络层的一个桥梁类,包含了连接,请求,响应和流

Transmitter

class Transmitter(
  private val client: OkHttpClient,
  private val call: Call
) {
  private val connectionPool: RealConnectionPool = client.connectionPool().delegate
  private val eventListener: EventListener = client.eventListenerFactory().create(call)
  private val timeout = object : AsyncTimeout() {
    override fun timedOut() {
      cancel()
    }
  }.apply {
    timeout(client.callTimeoutMillis().toLong(), MILLISECONDS)
  }
  // ...
}

在创建 Transmitter 对象的时候设置了相关指标的监听器和 ConnectionPool

execute()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      client.dispatcher().executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher().finished(this)
    }
  }
  
  // ...
}
  • 同步代码块,内部是做一个判断,判断是否已经执行execute方法,如果执行了抛出异常
  • 超时计时,最终调用的是 AsyncTimeout 类中的 enter() 方法
  • 请求开始的相关操作
    getStackTraceForCloseable()
    
  • 将请求 call 添加到调度器中的同步双端队列中
  • 通过拦截器链获取响应并返回
  • 请求结束时候回收移除同步请求

getResponseWithInterceptorChain()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  @Throws(IOException::class)
  fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = ArrayList<Interceptor>()
    interceptors.addAll(client.interceptors())
    interceptors.add(RetryAndFollowUpInterceptor(client)) // 失败重试以及重定向
    interceptors.add(BridgeInterceptor(client.cookieJar())) // 用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应
    interceptors.add(CacheInterceptor(client.internalCache())) // 读取缓存直接返回、更新缓存
    interceptors.add(ConnectInterceptor(client)) // 和服务器建立连接
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors())
    }
    interceptors.add(CallServerInterceptor(forWebSocket)) // 向服务器发送请求数据、从服务器读取响应数据

    val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis())

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (transmitter.isCanceled) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }
  
  // ...
}
  • 将自定义的拦截器和 okhttp 本身存在的拦截器添加到拦截器的集合
  • 创建一个拦截器链对象 Interceptor.Chain
  • 调用拦截器链对象的 proceed() ,开启链式调用请求,并最终返回响应 response
  • 结束请求,调用 Transmitter 对象的 noMoreExchanges() ,释放请求连接

Interceptor 使用的是责任链模式

enqueue()

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  // ...
  
  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher().enqueue(AsyncCall(responseCallback))
  }
  
  // ...
}

最终调用的是 Dispatcher 中的 enqueue()

Dispatcher

executed(call: RealCall)

class Dispatcher constructor() {
  // ...
  
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()
  
  // ...
  
  /** Used by `Call#execute` to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
  
  // ...
}

直接将将 call 添加到正在执行的请求队列中去,runningSyncCalls 为正在请求的同步队列

enqueue(call: AsyncCall)

class Dispatcher constructor() {
  // ...
  
  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  
  // ...
  
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  
  // ...
}

封装到一个 AsyncCall 中传递进来,添加到正在等待的异步队列 readyAsyncCalls 中去,接着继续调用 promoteAndExecute() 方法执行相关操作

promoteAndExecute()

class Dispatcher constructor() {
  // ...
  
  /**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = ArrayList<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService())
    }

    return isRunning
  }
  
  @Synchronized fun executorService(): ExecutorService {
    if (executorService == null) {
      executorService = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
    }
    return executorService!!
  }
  
  // ...
}
maxRequests
maxRequestsPerHost

AsyncCall

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher()))
      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher().finished(this) // This call is no longer running!
        }
      }
    }
    
        override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } finally {
          client.dispatcher().finished(this)
        }
      }
    }
  }
}

又回到了 getResponseWithInterceptorChain()

RealInterceptorChain

所有的 interceptor 都整合到了 RealInterceptorChain 中,执行拦截器链方法 proceed()

class RealInterceptorChain(
  private val interceptors: List<Interceptor>,
  private val transmitter: Transmitter,
  private val exchange: Exchange?,
  private val index: Int,
  private val request: Request,
  private val call: Call,
  private val connectTimeout: Int,
  private val readTimeout: Int,
  private val writeTimeout: Int
) : Interceptor.Chain {
  
  // ...
  
  override fun proceed(request: Request): Response {
    return proceed(request, transmitter, exchange)
  }

  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()

    calls++

    // If we already have a stream, confirm that the incoming request will use it.
    check(this.exchange == null || this.exchange.connection()!!.supportsUrl(request.url)) {
      "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    check(this.exchange == null || calls <= 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // Call the next interceptor in the chain.
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    // Confirm that the next interceptor made its required call to chain.proceed().
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
      "network interceptor $interceptor must call proceed() exactly once"
    }

    check(response.body() != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
}

在初始化的时候,会将所有拦截器组成的集合传递过来,同时将请求 RequestCall 也会传递过来, index 参数,最开始传入的是 0, exchange 参数,如果是应用拦截器,connection 必须是 null;如果是网络拦截器,connection 必须不为 null

  • 首先判断 index 是否大于总的拦截器个数,大于抛出 AssertionError()
  • 对 calls 进行 +1 操作
  • 判断是否是一个网络拦截器,并且判断其 host 和 port 是否一致
  • 判断是否是一个网络拦截器,如果是判断是否该网络拦截器的 proceed 方法调用次数是否超过一次
  • 继续创建一个新的拦截器链对象,此时传入的 index 会进行 index+1 操作,表示开始真正的调用相关拦截器操作
  • 调用拦截器的 intercept 方法,将新的拦截器链对象塞进去
  • 返回 response 后进行校验

nQNzInU.jpg!web


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK