0

OkHttp科普篇

 2 years ago
source link: http://www.androidchina.net/11903.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

OkHttp科普篇 – Android开发中文站

1.梳理okhttp的整体流程

2.Java和kotlin版本的对比 (Java版本为3.14.x)

3.流程梳理都在Java版本中,kotlin作为一个对比

OkHttp works on Android 5.0+ (API level 21+) and Java 8+.

要求在Android5.0以上的版本上且jdk版本为jdk8

  • 引入依赖
    //新版库采用kotlin语言编写
    
    implementation("com.squareup.okhttp3:okhttp:4.9.1")
    
  • 发送请求

    以get请求为例

    1. 创建一个OkhttpClient客户端对象
      OkHttpClient client = new OkHttpClient();
      
    2. 创建一个Request对象(异步时使用)
      Request request = new Request.Builder()
            .url(url)
            .build();
      
    3. 执行请求
      //同步请求
      Response response = client.newCall(request).execute();
      
      //异步请求
      client.newCall(request).enqueue(new Callback() {
          @Override 
          public void onFailure(Call call, IOException e) {
          }
          @Override 
          public void onResponse(Call call, Response response) throws IOException {
      
          }
      

源码梳理(java & kotlin)

关于拦截器(Intercept)

认识拦截器

下面的拦截器顺序都是递增的

  1. 用户自定义拦截器:(继承Interceptor接口,实现intercept方法)
  2. RetryAndFollowUpInterceptor:失败重试以及重定向拦截器
  3. BridgeInterceterptor:桥接拦截器 (请求时,对必要的Header进行一些添加,接收响应时,移除必要的Header)
  4. CacheInterceptor:缓存拦截

    1.根据request得到cache中缓存的response

    2.确认 request判断缓存的策略,是否要使用了网络,缓存或两者都使用

    3.调用下一个拦截器,决定从网络上来得到response

    4.如果本地已经存在cacheResponse,那么让它和网络得到的networkResponse做比较,决定是否来更新缓存的cacheResponse

    5.缓存未缓存过的response

    缓存拦截器会根据请求的信息和缓存的响应的信息来判断是否存在缓存可用,如果有可以使用的缓存,那么就返回该缓存给用户,否则就继续使用责任链模式来从服务器中获取响应。当获取到响应的时候,又会把响应缓存到磁盘上面

  5. ConnectionInterceptor:连接拦截器

    1.判断当前的连接是否可以使用:流是否已经被关闭,并且已经被限制创建新的流;

    2.如果当前的连接无法使用,就从连接池中获取一个连接;

    3.连接池中也没有发现可用的连接,创建一个新的连接,并进行握手,然后将其放到连接池中

  6. NetworkInterceptors:网络拦截器(配置OkHttpClient时设置的 NetworkInterceptors)
  7. CallServerInterceptor:请求拦截器(负责向服务器发送请求数据、从服务器读取响应数据)
// Call the next interceptor in the chain.
//调用链的下一个拦截器
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);    //(1)
    Interceptor interceptor = interceptors.get(index);     //(2)
    Response response = interceptor.intercept(next);    //(3)

1.实例化下一个拦截器对应的RealIterceptorChain对象,这个对象会在传递给当前的拦截器

  1. 得到当前的拦截器:interceptors是存放拦截器的ArryList
  2. 调用当前拦截器的intercept()方法,并将下一个拦截器的RealIterceptorChain对象传递下去
  3. 除了client中用户设置的interceptor,第一个调用的就是retryAndFollowUpInterceptor

1.拦截器用了责任链设计模式,它将请求一层一层向下传,直到有一层能够得到Response就停止向下传递

2.然后将response向上面的拦截器传递,然后各个拦截器会对respone进行一些处理,最后会传到RealCall类中通过execute来得到response

简而言之:每一个拦截器都对应一个 RealInterceptorChain ,然后每一个interceptor 再产生下一个RealInterceptorChain,直到 List 迭代完成,如下图所示

image.png

关于调度Dispatch

在开始下面的内容前,我们先简单的对Dispatcher有个认识

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

/** Ready async calls in the order they'll be run. */
   //正在准备中的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //运行中的异步请求
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); 

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //同步请求
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); 
  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  ......
}

1.有一个最大请求次数:64

2.有一个最大请求主机数:5

3.有一个懒加载的线程池,当执行 executorService() 方式时才创建

4.有三个队列(准备中的异步请求 | 运行中的异步请求 | 同步请求)

1.创建OkhttpClient

OkHttpClient client = new OkHttpClient()

这部分中Java和kotlin中没有什么区别,都用了 建造者模式 ,Builder里面的可配置参数也是一样的

public OkHttpClient() {
    this(new Builder());
}

OkHttpClient(Builder builder) {
    ....
}

public static final class Builder {
    Dispatcher dispatcher;// 分发器
    @Nullable Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;// 传输层版本和连接协议
    final List<Interceptor> interceptors = new ArrayList<>();// 拦截器
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable Cache cache;
    @Nullable InternalCache internalCache;// 内部缓存
    SocketFactory socketFactory;
    @Nullable SSLSocketFactory sslSocketFactory;// 安全套接层socket 工厂,用于HTTPS
    @Nullable CertificateChainCleaner certificateChainCleaner;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。
    HostnameVerifier hostnameVerifier;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。  
    CertificatePinner certificatePinner;// 证书锁定,使用CertificatePinner来约束哪些认证机构被信任。
    Authenticator proxyAuthenticator;// 代理身份验证
    Authenticator authenticator;// 身份验证
    ConnectionPool connectionPool;// 连接池
    Dns dns;
    boolean followSslRedirects; // 安全套接层重定向
    boolean followRedirects;// 本地重定向
    boolean retryOnConnectionFailure;// 重试连接失败
    int callTimeout;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;

    // 这里是默认配置的构建参数
    public Builder() {
        dispatcher = new Dispatcher();
        protocols = DEFAULT_PROTOCOLS;
        connectionSpecs = DEFAULT_CONNECTION_SPECS;
        ...
    }

    // 这里传入自己配置的构建参数
    Builder(OkHttpClient okHttpClient) {
        this.dispatcher = okHttpClient.dispatcher;
        this.proxy = okHttpClient.proxy;
        this.protocols = okHttpClient.protocols;
        this.connectionSpecs = okHttpClient.connectionSpecs;
        this.interceptors.addAll(okHttpClient.interceptors);
        this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
        ...
    }

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  //...
  constructor() : this(Builder())

 //...
  internal constructor(okHttpClient: OkHttpClient) : this() {
      this.dispatcher = okHttpClient.dispatcher
      this.connectionPool = okHttpClient.connectionPool
      this.interceptors += okHttpClient.interceptors
      this.networkInterceptors += okHttpClient.networkInterceptors
      this.eventListenerFactory = okHttpClient.eventListenerFactory
      this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure
      this.authenticator = okHttpClient.authenticator
      this.followRedirects = okHttpClient.followRedirects
      this.followSslRedirects = okHttpClient.followSslRedirects
      this.cookieJar = okHttpClient.cookieJar
      this.cache = okHttpClient.cache
      this.dns = okHttpClient.dns
      this.proxy = okHttpClient.proxy
      this.proxySelector = okHttpClient.proxySelector
      this.proxyAuthenticator = okHttpClient.proxyAuthenticator
      this.socketFactory = okHttpClient.socketFactory
      this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull
      this.x509TrustManagerOrNull = okHttpClient.x509TrustManager
      this.connectionSpecs = okHttpClient.connectionSpecs
      this.protocols = okHttpClient.protocols
      this.hostnameVerifier = okHttpClient.hostnameVerifier
      this.certificatePinner = okHttpClient.certificatePinner
      this.certificateChainCleaner = okHttpClient.certificateChainCleaner
      this.callTimeout = okHttpClient.callTimeoutMillis
      this.connectTimeout = okHttpClient.connectTimeoutMillis
      this.readTimeout = okHttpClient.readTimeoutMillis
      this.writeTimeout = okHttpClient.writeTimeoutMillis
      this.pingInterval = okHttpClient.pingIntervalMillis
      this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress
      this.routeDatabase = okHttpClient.routeDatabase
    }
}

2.执行请求

OkHttpClient client = new OkHttpClient();
//同步请求
Response response = client.newCall(request).execute();

通过创建完OkHttpClient对象,调用内部的 newCall() 方法,将最终的请求交给RealCall的 execute() 方法,在该方法内部处理

1.确保Call方法只执行一次(有版本区别,请看下文)

2.通知dispatcher进入执行状态

3.通过一系列的拦截器的请求处理和响应处理得到最终的结果

4告诉dispatcher已经执行完毕

java版本
/**
* Prepares the {@code request} to be executed at   some point in the future.
*/
@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

// RealCall为真正的请求执行者
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
}

 private boolean executed;
@Override public Response execute() throws IOException {
    synchronized (this) {
        // 每个Call只能执行一次
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
        // 通知dispatcher已经进入执行状态
        client.dispatcher().executed(this);
        // 通过一系列的拦截器请求处理和响应处理得到最终的返回结果
        Response result = getResponseWithInterceptorChain();
        if (result == null) throw new IOException("Canceled");
        return result;
    } catch (IOException e) {
        e = timeoutExit(e);
        eventListener.callFailed(this, e);
        throw e;
    } finally {
        // 通知 dispatcher 自己已经执行完毕
        client.dispatcher().finished(this);
    }
}

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    // 在配置 OkHttpClient 时设置的 interceptors;
    interceptors.addAll(client.interceptors());
    // 负责失败重试以及重定向
    interceptors.add(retryAndFollowUpInterceptor);
    // 请求时,对必要的Header进行一些添加,接收响应时,移除必要的Header
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // 负责读取缓存直接返回、更新缓存
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // 负责和服务器建立连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        // 配置 OkHttpClient 时设置的 networkInterceptors
        interceptors.addAll(client.networkInterceptors());
    }
    // 负责向服务器发送请求数据、从服务器读取响应数据
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    // 使用责任链模式开启链式调用
    return chain.proceed(originalRequest);
}

// StreamAllocation 对象,它相当于一个管理类,维护了服务器连接、并发流
// 和请求之间的关系,该类还会初始化一个 Socket 连接对象,获取输入/输出流对象。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
  RealConnection connection) throws IOException {
    ...

    // Call the next interceptor in the chain.
    // 实例化下一个拦截器对应的RealIterceptorChain对象
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    // 得到当前的拦截器
    Interceptor interceptor = interceptors.get(index);
    // 调用当前拦截器的intercept()方法,并将下一个拦截器的RealIterceptorChain对象传递下去,最后得到响应
    Response response = interceptor.intercept(next);

    ...

    return response;
Kotlin版本
//newcall真正的执行在RealCall类
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCall.kt

private val executed = AtomicBoolean()

override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

在Java版本中通过使用 synchronized 关键字来保证保证线程安全,并且确保executed只会被执行一次

kotlin版本中直接移除了 synchronized 关键字,并且将executed字段设置为具有原子性特征的boolean值,且通过CAS操作去确保是否已经执行了

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
    .url(url)
    .build();
client.newCall(request).enqueue(new Callback() {
    @Override 
    public void onFailure(Call call, IOException e) {
    }

    @Override 
    public void onResponse(Call call, Response response) throws IOException {
    }
  • 通过Request的构建者模式创建一个Request对象,然后调用OkHttpClient内部的 newCall() 方法,将最终的请求交给RealCall的 enqueue() 方法,在方法内部进行逻辑处理

​ 直接进入RealCall代码

java版本
public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

从上面看到,通过加锁后确保只会执行一次后,将当前的请求交给dispatcher拦截器中的enqueue()方法执行,那么我们来看看这个代码

Dispatcher.java

void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }

在拦截器中的方法中,发现它首先将这个请求添加到了readyAsyncCalls这个队列中,你问我怎么就知道它是队列了?你还记得上面我说的吗?

添加到队列后接着执行**promoteAndExecute()**方法

 private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; //判断是否大于最大的请求数
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // 判断主机数是否已经达到最大.

        // 如果其中的runningAsynCalls不满,且call占用的host小于最大数量,则将call加入到runningAsyncCalls中执行,
    //利用线程池执行call否者将call加入到readyAsyncCalls中。
        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);

      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

call加入到线程池中执行了。现在再看AsynCall的代码,它是RealCall中的内部类

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    /**
     * Attempt to enqueue this async call on {@code    executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting    the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        eventListener.callFailed(RealCall.this, ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      timeout.enter();
      try {
        // 跟同步执行一样,最后都会调用到这里
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new   IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this,   response);
        }
      } catch (IOException e) {
        e = timeoutExit(e);
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure   for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}
kotlin版本
override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

 internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

 private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

 inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get() = originalRequest.url.host

    val request: Request
        get() = originalRequest

    val call: RealCall
        get() = this@RealCall

    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

AysncCall中的execute()中的方法,同样是通过Response response = getResponseWithInterceptorChain();来获得response,这样异步任务也同样通过了

首先,请求的时候初始化一个Call的实例,然后执行它的**execute()方法或enqueue()方法,内部最后都会执行到getResponseWithInterceptorChain()**方法,

这个方法通过拦截器组成的责任链模式,依次经过用户自定义普通拦截器、重试拦截器(RetryAndFollowUpInterceptor)、桥接拦截器(BridgeInterceptor)、缓存拦截器(BridgeInterceptor)、连接拦截器(CallServerInterceptor)和用户自定义网络拦截器以及访问服务器拦截器等拦截处理过程,最终将获取到的响应结果交给用户

image.png

连接拦截器

在Okhttp整个过程中,比较重要的两个拦截器,缓存拦截器和连接拦截器,关于缓存拦截器在文一开始的时候就简单的说了下

现在说下另一个比较重要的拦截器

Java版本
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request.     Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // HttpCodec是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。在这个方法的内部实现连接池的复用处理
    HttpCodec httpCodec = streamAllocation.newStream(client, chain,     doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

通过调用了 streamAllocation 的 newStream() 方法的时候,经过一系列判断到达 StreamAllocation类 中的 findConnection() 方法

private RealConnection findConnection(int   connectTimeout, int readTimeout, int writeTimeout,
    int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
      ...

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      // 尝试使用已分配的连接,已经分配的连接可能已经被限制创建新的流
      releasedConnection = this.connection;
      // 释放当前连接的资源,如果该连接已经被限制创建新的流,就返回一个Socket以关闭连接
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        // 如果该连接从未被标记为获得,不要标记为发布状态,reportedAcquired 通过 acquire()   方法修改
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        // 尝试供连接池中获取一个连接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    // 关闭连接
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      // 如果已经从连接池中获取到了一个连接,就将其返回
      return result;
    }

    // If we need a route selection, make one. This   is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses,   make another attempt at getting a   connection from
        // the pool. This could match due to   connection coalescing.
         // 根据一系列的 IP地址从连接池中获取一个链接
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size;i++) {
          Route route = routes.get(i);
          // 从连接池中获取一个连接
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it   possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        // 在连接池中如果没有该连接,则创建一个新的连接,并将其分配,这样我们就可以在握手之前进行终端
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }
    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
    // 如果我们在第二次的时候发现了一个池连接,那么我们就将其返回
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking     operation.
     // 进行 TCP 和 TLS 握手
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
      connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      // 将该连接放进连接池中
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same   address was created concurrently, then
      // release this connection and acquire that one.
      // 如果同时创建了另一个到同一地址的多路复用连接,释放这个连接并获取那个连接
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
}

通过上面的源码也可以证明我们开头的结论是正确的,添加连接池里面去了,简单来说,连接复用省去了TCP和TLS握手的过程,因为建立连接本身也是需要消耗时间的,连接复用后就可以提升网络访问效率

最后说下ConnectionPool的作用

public final class ConnectionPool {
private final Deque<RealConnection> connections = new ArrayDeque<>();
  //......
   void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

  private final Runnable cleanupRunnable = () -> {
    while (true) {
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (ConnectionPool.this) {
          try {
            ConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
}

创建一个新的连接的时候,一方面需要它放进缓存里面另一边,另一方面对缓存进行清理。在 ConnectionPool 中,当我们向连接池中缓存一个连接的时候,只要调用双端队列的 add() 方法,将其加入到双端队列即可,而清理连接缓存的操作则交给线程池来定时执行

kotlin版本
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

1.为什么在Java版本的时候用锁,kotlin中用的是带有原子性的属性值并且是通过CAS操作呢?

2.为什么要用队列的形式存储数据?用链表可以吗?

3.拦截器是怎么工作的,怎么进行传递和响应数据的?

4.如何自定义拦截器?怎么添加配置?


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK