162

Guzzle 源码分析 - 程小白

 6 years ago
source link: https://www.chengxiaobai.cn/record/guzzle-source-code-analysis.html?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Guzzle 源码分析

2017-10-222021-02-08PHP 29k 26 分钟

Guzzle 是一个非常强大而且稳定的 HTTP client。不同于一般的 cURL 封装组件, Guzzle 内部使用了多种请求方式,来实现 HTTP 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 HTTP 请求十分容易而且易拓展。现在 Guzzle 已经被 Drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 PSR-7 规范,拓展性和兼容性也更加优秀了。之前在一次重构记录中提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。

注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。

本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为

{
    "require": {
        "guzzlehttp/guzzle": "^6.3"
    }
}

Guzzle 的各种配置都和 HTTP 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 SSL、超时等等。

配置项是以数组形式在创建 client 对象的时候传入的,所有的配置都在这里。Guzzle 会提供一个默认配置,会和自定义配置进行合并,并优先自定义配置

public function __construct(array $config = [])
{
  $this->configureDefaults($config);
}

private function configureDefaults(array $config)
{
  // 自定义配置和默认配置,在这里合并,并赋值给了成员变量
  $this->config = $config + $defaults;
}

比如这样:

$config = [
  'allow_redirects' => [
    'max'     => 5,
    'referer' => true,
  ],
  'http_errors'     => false,
  'decode_content'  => true,
  'cookies'         => true,
  'connect_timeout' => 1.5,
  'timeout'         => 2.5,
  'headers'         => [
    'User-Agent' => 'test client for chengxiaobai.com',
  ],
];

$client = new \GuzzleHttp\Client($config);

你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并,并且只对当前请求有效

private function prepareDefaults($options)
{
  $defaults = $this->config;
  // 这里这是赋值给了局部变量,所以只对当前请求有效
  $result = $options + $defaults;
  return $result;
}

比如这样:

$client = new \GuzzleHttp\Client($config);

$client->request('GET', 'https://www.chengxiaobai.com/',
         [
           'allow_redirects' => [
             'max'     => 1,
             'referer' => false,
           ],
         ]);

特殊的 handler 参数

handler 参数比较特殊,它必须是闭包,并且参数为 Psr7\Http\Message\RequestInterface 和一个 array 类型的参数,并且必须返回 GuzzleHttp\Promise\PromiseInterface 或者在成功时满足 Psr7\Http\Message\ResponseInterface

如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口,Chengxiaobai\handler

interface Chengxiaobai
{
    /**
     * handler interface
     *
     * @param RequestInterface $request
     * @param array            $options
     *
     * @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface
     */
    public function handler(Psr\Http\Message\RequestInterface $request,array $options);
}

这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。

public function __construct(array $config = [])
{
  if (!isset($config['handler'])) {
    // 创建一个默认的 handler 栈
    $config['handler'] = HandlerStack::create();
  } elseif (!is_callable($config['handler'])) {
    throw new \InvalidArgumentException('handler must be a callable');
  }
}

很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack 。除非你有足够的把握,请不要随意操作 。

举个自定义 handler 操作的例子,比如对任意一个请求都返回 404。

$client = new \GuzzleHttp\Client($config);

$response = $client->request('GET', 'www.chengxiaobai.com/archives/',
               [
                 'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {
                   return new \GuzzleHttp\Psr7\Response(404);
                 },
               ]);

echo $response->getStatusCode();// 404

上面我们说 Guzzle 本身自带了一些 handler ,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。

public static function create(callable $handler = null)
{
  // 这里定义了底层请求实现方法
  $stack = new self($handler ?: choose_handler());
  // 下面都会添加一些 Middleware 中间件
  $stack->push(Middleware::httpErrors(), 'http_errors');
  $stack->push(Middleware::redirect(), 'allow_redirects');
  $stack->push(Middleware::cookies(), 'cookies');
  $stack->push(Middleware::prepareBody(), 'prepare_body');

  return $stack;
}

注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。

function choose_handler()
{
    $handler = null;
    // 判定 curl 方法,如果并发和常规 curl 同时存在
    if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {
        // 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式
        $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
    } elseif (function_exists('curl_exec')) {
        // 如果两种 curl 方法同时只有一个存在,则优先常规 curl
        $handler = new CurlHandler();
    } elseif (function_exists('curl_multi_exec')) {
        $handler = new CurlMultiHandler();
    }
  // 如果 allow_url_fopen 开启
    if (ini_get('allow_url_fopen')) {
        $handler = $handler
            // 已有 handler ? 再注册一个流处理 handler 
            ? Proxy::wrapStreaming($handler, new StreamHandler())
            // 否则只有流处理 handler
            : new StreamHandler();
    } elseif (!$handler) {
        throw new \RuntimeException('GuzzleHttp requires cURL, the '
            . 'allow_url_fopen ini setting, or a custom HTTP handler.');
    }

    return $handler;
}

创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和 Chengxiaobai\handler 一样。

需要深入了解 Handlers 和 Middleware 的可以点击这里看官方文档,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。

根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面” 处理请求 “章节会详细介绍。

其实所有的请求在处理上都是异步的,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。

public function request($method, $uri = '', array $options = [])
{
  $options[RequestOptions::SYNCHRONOUS] = true;
  // requestAsync 就是异步请求,不过直接调用了 wait 转同步
  return $this->requestAsync($method, $uri, $options)->wait();
}

请求的 uri 参数

如果你在配置中定义了 base_uri 参数,这个时候可以使用相对地址,如果没有,则不支持相对地址,Guzzle 并没有帮你校验最终 uri 参数是否正确,只有等到请求发出去了,才知道 uri 是否正确。

private function buildUri($uri, array $config)
{
  // for BC we accept null which would otherwise fail in uri_for
  $uri = Psr7\uri_for($uri === null ? '' : $uri);

  if (isset($config['base_uri'])) {
    $uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);
  }

  // 这里使用了 PSR-7 规范,返回的是一个实现了 UriInterface 的对象
  return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;
}

比如这样就会报错

$client = new \GuzzleHttp\Client();
$response = $client->request('GET', '/history.html');
/**
 * ountput :
 * Fatal error: Uncaught GuzzleHttp\Exception\RequestException:
 * cURL error 3: <url> malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)
 * in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187
 */

详细的规则可以参考 RFC 3986, section 2官方帮我们整理了一些快速了解的例子。我这里梳理出 4 种情况。

base_uriuriresult
chengxiaobai.com/first//secondchengxiaobai.com/second
chengxiaobai.com/first/secondchengxiaobai.com/first/second
chengxiaobai.com/first/secondchengxiaobai.com/second
chengxiaobai.com/firstsecondchengxiaobai.com/second

保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。

构建 reqsest

Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface 的实现,这样只要你能按照 PSR-7 的规范来就很容易拓展 Guzzle。

这里再次提醒大家,modern PHP 开发,应该遵循 PSR 规范,有利于社区更好的协作和稳健发展。

public function requestAsync($method, $uri = '', array $options = [])
{
  $request = new Psr7\Request($method, $uri, $headers, $body, $version); 
  return $this->transfer($request, $options);
}

丰富 request

transfer 的结构类型和 Chengxiaobai\handler 一样。

private function transfer(RequestInterface $request, array $options)
{
  // 这个方法会根据你的请求类型,构建更具体的请求对象
  $request = $this->applyOptions($request, $options);
  $handler = $options['handler'];
}

applyOptions 从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。

注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。

private function applyOptions(RequestInterface $request, array &$options)
{
    // 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request
    
    // 构建新的对象方法
    $request = Psr7\modify_request($request, $modify);
      
  return $request;
}

如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。

注意需要的参数,有些构建参数是从 $changes 取的,但是有些是从原本的 $request 对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。

function modify_request(RequestInterface $request, array $changes)
{
    if (!$changes) {
        return $request;
    }

  return new Request(
    isset($changes['method']) ? $changes['method'] : $request->getMethod(),
    $uri,
    $headers,
    isset($changes['body']) ? $changes['body'] : $request->getBody(),
    isset($changes['version'])
    ? $changes['version']
    : $request->getProtocolVersion()
  );
}

promise 简介

关于 promise ,属于 guzzlehttp/promises 类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。

通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。

下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A + 规范,Guzzle 使用的 promise 也是该规范的一个实现。

$promise = new Promise(
    function () {
        echo 'wait';
    },
    function () {
        echo 'cancle';
    }
);
$promise->then(
    function () {
        echo 'onFulfilled';
    },
    function () {
        echo 'onRejected';
    }
)->then(
    function () {
        echo 'onFulfilled';
    },
    function () {
        echo 'onRejected';
    }
);

从等待状态开始执行,满足了就执行 onFulfilled ,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 HTTP 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。

private function transfer(RequestInterface $request, array $options)
{
  $handler = $options['handler'];
  try {
    return Promise\promise_for($handler($request, $options));
  } catch (\Exception $e) {
    return Promise\rejection_for($e);
  }
}

成功就是 promise_for 失败就是 rejection_for。

promise_for

这个方法主要是用来保证返回的是一个 promise 对象,因为经过 $handler 处理后的值可能是一个 promise 对象 ($handler 如何处理紧接着会说),也能是一个 response 对象,也可能是一个异常,所以需要对对数据做一个 “清洗转换”,并返回一个满足状态的 promise。

function promise_for($value)
{
    // 如果是一个 promise 对象就直接返回
    if ($value instanceof PromiseInterface) {
        return $value;
    }
    // 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象
    if (method_exists($value, 'then')) {
        // 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null
        $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
        $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
        $promise = new Promise($wfn, $cfn);
        $value->then([$promise, 'resolve'], [$promise, 'reject']);
        return $promise;
    }
  // 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。
    return new FulfilledPromise($value);
}

rejection_for

异常情况会走入到 rejection_for 方法。同理进行 “数据清洗”,并返回一个拒绝状态的 promise 。

function rejection_for($reason)
{
    if ($reason instanceof PromiseInterface) {
        return $reason;
    }

    return new RejectedPromise($reason);
}

handler 处理

还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 promise 对象,用于外层异步调用。

private function transfer(RequestInterface $request, array $options)
{
  $handler = $options['handler'];

  try {
    // 这里会先调用配置中的 handler 方法
    return Promise\promise_for($handler($request, $options));
  } catch (\Exception $e) {
    return Promise\rejection_for($e);
  }
}

对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?

我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?

当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。

有了这个前提,我们看下 handlerStack 源码。

从 handlerStack 的名字上,我们就能知道它是一个” 栈 “数据结构,其满足” 后进先出 “的特性。

public function __invoke(RequestInterface $request, array $options)
{
  // 这个函数主要是实现 Middleware 中间件操作
  $handler = $this->resolve();

  //这里下面紧接着会有分析
  return $handler($request, $options);
}

public function resolve()
{
  // 变量缓存,能优化部分性能
  if (!$this->cached) {
    // 这个 handler 就是之前选择的 实现请求的底层方法
    // 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧
    if (!($prev = $this->handler)) {
      throw new \LogicException('No handler has been specified');
    }
    // 反转顺序,实现”后进先出“特性,调用每个中间件
    foreach (array_reverse($this->stack) as $fn) {
      // 中间件的注册是 [$middleware, $name] 形式的
      // 所以取第一个元素是其具体实现,第二个参数只是名字
      // 调用第一次传入的是 handler,后续传入的就是上一次处理的结果
      $prev = $fn[0]($prev);
    }
    // 所有的都处理完毕,缓存起来
    $this->cached = $prev;
  }
  return $this->cached;
}

上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce 这个函数,但是原理上大同小异,知道其原理一通百通。

我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。

根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke() 魔术方法而已。

Middleware 里面闭包结构复杂,好好理解下。

public function __invoke(RequestInterface $request, array $options)
{
  // 这个函数主要是实现 Middleware 中间件操作
  $handler = $this->resolve();
  // 现在我们分析这个
  return $handler($request, $options);
}

public static function httpErrors()
{
  // 第一次调用返回!传入一个 闭包-A
  return function (callable $handler) {
    // 第二次调用返回!传入 $request,$options
    return function ($request, array $options) use ($handler) {
      // Middleware 自己的逻辑判定返回什么样的闭包
      if (empty($options['http_errors'])) {
        // 第三次调用返回!返回 闭包-A 的处理结果
        // 这里根据配置 没有注册 then 函数,直接进行下一步处理
        return $handler($request, $options);
      }
      // 第三次调用返回!返回 闭包-A,附加 promise 
      // 根据上面我们说到的 promise 特性,这里用 then 
      // 附加了 闭包-A 处理完毕之后要调用的逻辑
      return $handler($request, $options)->then(
        function (ResponseInterface $response) use ($request, $handler) {
          $code = $response->getStatusCode();
          if ($code < 400) {
            return $response;
          }
          throw RequestException::create($request, $response);
        }
      );
    };
  };
}

关于返回层数,可以根据 return 来迅速定位,一个 return 就对应一次调用返回。

现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。

// 第一次
public static function create(callable $handler = null)
{
  $stack->push(Middleware::httpErrors(), 'http_errors');
}
// 第二次
public function resolve()
{
  $prev = $fn[0]($prev);
}
// 第三次
public function __invoke(RequestInterface $request, array $options)
{
  $handler($request, $options);
}

最后的结果应该是,如果按照 Middleware 结构应该是这样的:

$handler($request, $options)->then('http_errors')
                ->then('allow_redirects')
                ->then('cookies')
                ->then('prepare_body')

这个 $handler 就是最开始传入的请求实现底层方法。

整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。

注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。

这个 $handler 具体是哪个请求方法呢?还记得 choose_handler() 方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。

再回顾下 choose_handler() 方法。

function choose_handler()
{
  $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
    $handler = new CurlHandler();
    $handler = new CurlHandler();
    $handler = $handler
        ? Proxy::wrapStreaming($handler, new StreamHandler())
        : new StreamHandler();
    return $handler;
}

这两个方法都有源码分析,没有印象的可以再回去看看。

不同 $handler 都是在 __invoke() 方法上做文章。

我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());

public static function wrapSync(
  callable $default,
  callable $sync
) {
  return function (RequestInterface $request, array $options) use ($default, $sync) {   // 注意这里的三目运算符,判定同步请求选项是否为空
    return empty($options[RequestOptions::SYNCHRONOUS])
      // 默认是并发请求 new CurlMultiHandler()
      ? $default($request, $options)
      // 这里是同步请求 new CurlHandler()
      : $sync($request, $options);
  };
}

现在,异步和同步请求终于出现了区别。我们先看同步请求。

我们回顾下 request() 方法,注意到一个步骤。

public function request($method, $uri = '', array $options = [])
{
  // 这里往配置中添加了一个选项,设置该请求为同步的
  $options[RequestOptions::SYNCHRONOUS] = true;
}

所以,这里走的是同步请求,我们来分析 CurlHandler()

public function __invoke(RequestInterface $request, array $options)
{
  // 如果设置了延迟请求,会在这里阻塞一会
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 创建一个 handler 抽象对象
  $easy = $this->factory->create($request, $options);
  // 执行
  curl_exec($easy->handle);
  $easy->errno = curl_errno($easy->handle);
  // 请求处理结束
  return CurlFactory::finish($this, $easy, $this->factory);
}

这里需要分析一下工厂类 CurlFactory,里面大都涉及到 cURL 的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。

public function create(RequestInterface $request, array $options)
{
  if (isset($options['curl']['body_as_string'])) {
    $options['_body_as_string'] = $options['curl']['body_as_string'];
    unset($options['curl']['body_as_string']);
  }
  // handle 的一个抽象对象
  $easy = new EasyHandle;
  $easy->request = $request;
  $easy->options = $options;
  // 获取默认配置
  $conf = $this->getDefaultConf($easy);
  // 解析请求方法
  $this->applyMethod($easy, $conf);
  // 解析配置
  $this->applyHandlerOptions($easy, $conf);
  // 解析头部
  $this->applyHeaders($easy, $conf);
  unset($conf['_headers']);
  // 解析自定义 curl 配置
  if (isset($options['curl'])) {
    $conf = array_replace($conf, $options['curl']);
  }
  // 设置回调函数用于处理返回头
  $conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);
  // 从 handle池 获取一个 handle,没有就新建一个
  $easy->handle = $this->handles
    ? array_pop($this->handles)
    : curl_init();
  curl_setopt_array($easy->handle, $conf);

  return $easy;
}

public static function finish(
  callable $handler,
  EasyHandle $easy,
  CurlFactoryInterface $factory
) {
  // 这里会调用配置用设置的 on_stats 函数
  if (isset($easy->options['on_stats'])) {
    self::invokeStats($easy);
  }
  // 有错误的话走错误处理流程
  if (!$easy->response || $easy->errno) {
    return self::finishError($handler, $easy, $factory);
  }
  // 释放资源,还到 handle池
  $factory->release($easy);
  // 处理 流数据
  $body = $easy->response->getBody();
  if ($body->isSeekable()) {
    $body->rewind();
  }
  // 返回一个满足状态的 promise
  return new FulfilledPromise($easy->response);
}

根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats 函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler 池,也就是复用的请求句柄为 3 个,这个没有办法修改,写死在代码中的。

public function __invoke(RequestInterface $request, array $options)
{
    $easy = $this->factory->create($request, $options);
    // 为每个请求生成一个 ID
    $id = (int) $easy->handle;
// 注册一个 promise,分别是调用执行和关闭方法
    $promise = new Promise(
        [$this, 'execute'],
        // 依据 ID 来关闭请求
        function () use ($id) { return $this->cancel($id); }
    );
// 添加请求 底层是 curl_multi_add_handle 方法
    $this->addRequest(['easy' => $easy, 'deferred' => $promise]);
    return $promise;
}

工厂类 CurlFactory 在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄 (curl_multi_add_handle) 中,最后返回了一个 promise 对象,里面注册了 execute 函数和 cancel 函数,用于后面发起和关闭请求。

需要注意的就是设定了延迟执行的请求,是在 addRequest() 方法中处理的。后面在” 返回结果 “章节会讲到延迟请求处理。

配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。

public function __invoke(RequestInterface $request, array $options)
{
  // 如果设置了延迟请求,会在这里阻塞一会
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间
  $startTime = isset($options['on_stats']) ? microtime(true) : null;
  try {
    // 不支持 expect header.
    $request = $request->withoutHeader('Expect');
    // 当内容为0的时候,依然添加一个头信息
    if (0 === $request->getBody()->getSize()) {
      $request = $request->withHeader('Content-Length', 0);
    }
    // 发起请求,然后回调 on_stats 函数
    // 解析结果,同样返回一个满足状态的 promise
    return $this->createResponse(
      $request,
      $options,
      $this->createStream($request, $options),
      $startTime
    );
  } catch (\InvalidArgumentException $e) {
    throw $e;
  } catch (\Exception $e) {
    // Determine if the error was a networking error.
    $message = $e->getMessage();
    // This list can probably get more comprehensive.
    if (strpos($message, 'getaddrinfo') // DNS lookup failed
        || strpos($message, 'Connection refused')
        || strpos($message, "couldn't connect to host") // error on HHVM
       ) {
      $e = new ConnectException($e->getMessage(), $request, $e);
    }
    $e = RequestException::wrapException($request, $e);
    $this->invokeStats($options, $request, $startTime, null, $e);

    return \GuzzleHttp\Promise\rejection_for($e);
  }
}

关于流处理,因为其底层实现是 fopen() 函数,其支持的协议比较多,不止有 HTTP,它支持的协议和封装协议在这里可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。

根据上面的分析我们已经知道 transfer 方法返回的结果是什么了,然后就是获取返回结果。

同步请求因为在 transfer 方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。

public function send(RequestInterface $request, array $options = [])
{ 
  // 我们注意到最后调用的 wait 方法
  return $this->sendAsync($request, $options)->wait();
}

在同步请求方法中,直接调用了 wait() 方法,所以直接走 promise 对象的 wait() 方法及注册的 then() 方法。还记得之前的 Middleware 里面注册了一些 then() 方法吗?这里主要就是调用它们了,完成中间件 “处理返回结果” 的这一步骤,当然还有一些在逻辑处理中注册的 then() 方法,在此不再举例。

异步请求在 transfer 方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。

$promise = $client->requestAsync('GET', 'https://www.chengxiaobai.com');
$promise->then(
    function (ResponseInterface $res) {
        echo $res->getStatusCode() . "\n";
    },
    function (RequestException $e) {
        echo $e->getMessage() . "\n";
        echo $e->getRequest()->getMethod();
    }
);

这种方式是对每个异步请求单独注册 then() 方法,说明这个请求成功了怎么处理,失败了怎么处理。

$client = new Client(['base_uri' => 'https://www.chengxiaobai.com']);
// 注册多个异步请求,实现并发
$promises = [
    'image' => $client->getAsync('/image'),
    'png'   => $client->getAsync('/image/png'),
    'jpeg'  => $client->getAsync('/image/jpeg'),
    'webp'  => $client->getAsync('/image/webp')
];
// 有一个失败就终止
$results = Promise\unwrap($promises);
// 忽略某些请求的异常,保证所有请求都发送出去
$results = Promise\settle($promises)->wait();

这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。

$client = new Client();
$requests = function ($total) use ($client) {
    for ($i = 1; $i < $total; $i++) {
      $uri = 'https://www.chengxiaobai.com/page/' . $i;
        // 这里用到了协程
        yield function() use ($client, $uri) {
            return $client->getAsync($uri.$i);
        };
    }
};
$pool = new Pool($client, $requests(10), [
    // 并发数
    'concurrency' => 5,
    'fulfilled' => function ($response, $index) {
        echo $res->getStatusCode() . "\n";
    },
    'rejected' => function ($reason, $index) {
        echo $e->getMessage() . "\n";
    },
]);
// 初始化 Promise
$promise = $pool->promise();
// 发起请求处理
$promise->wait();

这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率 (concurrency),使用统一的处理逻辑,处理请求池当中的数据。

我们来分析下 Pool 的源码,主要是构造函数。

public function __construct(
  ClientInterface $client,
  $requests,
  array $config = []
) {
  // 设定请求池大小
  if (isset($config['pool_size'])) {
    $config['concurrency'] = $config['pool_size'];
  } elseif (!isset($config['concurrency'])) {
    // 默认并发数 25
    $config['concurrency'] = 25;
  }
  if (isset($config['options'])) {
    $opts = $config['options'];
    unset($config['options']);
  } else {
    $opts = [];
  }
  // 将请求列表转换为一个迭代器
  $iterable = \GuzzleHttp\Promise\iter_for($requests);
  $requests = function () use ($iterable, $client, $opts) {
    // 遍历请求列表
    foreach ($iterable as $key => $rfn) {
      // 如果是一个 request 的实现,转换为一个异步请求
      if ($rfn instanceof RequestInterface) {
        yield $key => $client->sendAsync($rfn, $opts);
      } elseif (is_callable($rfn)) {
        // 如过是一个闭包,直接调用 
        yield $key => $rfn($opts);
      } else {
        throw new \InvalidArgumentException('...');
      }
    }
  };
  // 支持迭代的 promise 对象
  $this->each = new EachPromise($requests(), $config);
}

我们可以看到,Pool 模式下,所有的请求配置 $opts 都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。

不管哪种形式,都可以发现触发最终调用的都是 wait() 方法。这个和 promise 的规范有关。

我们看下异步如何处理请求的。

还记得异步请求返回的 promise 吗?

$promise = new Promise(
            [$this, 'execute'],
            // 依据 ID 来关闭请求
            function () use ($id) { return $this->cancel($id); }
        );

wait() 方法调用的就是 [$this, 'execute'],我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。

对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是 20 个异步请求中包含 10 个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。

在” 请求处理 “章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays 队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。

public function execute()
{
  $queue = P\queue();
  while ($this->handles || !$queue->isEmpty()) {
    // 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞
    if (!$this->active && $this->delays) {
      usleep($this->timeToNext());
    }
    $this->tick();
  }
}
private function timeToNext()
{
  $currentTime = microtime(true);
  $nextTime = PHP_INT_MAX;
  // 找出现有延迟请求队列中最小的延迟时间
  foreach ($this->delays as $time) {
    if ($time < $nextTime) {
      $nextTime = $time;
    }
  }
  return max(0, $nextTime - $currentTime) * 1000000;
}

execute 主要是调用了 tick() 这个方法。

public function tick()
{
  // 如果延迟请求队列不为空,处理延迟请求
  if ($this->delays) {
    $currentTime = microtime(true);
    // $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
    foreach ($this->delays as $id => $delay) {
      // 延迟任务已经达到延迟预期时间,开始处理
      if ($currentTime >= $delay) {
        // 将它从延迟任务队列中删除
        unset($this->delays[$id]);
        // 添加到批量请求句柄中
        curl_multi_add_handle(
          $this->_mh,
          $this->handles[$id]['easy']->handle
        );
      }
    }
  }
  // 执行队列中的任务
  P\queue()->run();
  // 执行请求
  if ($this->active &&
      curl_multi_select($this->_mh, $this->selectTimeout) === -1
     ) {
    // See: https://bugs.php.net/bug.php?id=61141
    usleep(250);
  }
  while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
  // 获取请求结果信息,移除请求成功的请求
  $this->processMessages();
}

然后异步处理流程就很清晰了:

  1. 如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行 2。
  2. 发起一次批量请求。
  3. 获取请求信息,移除成功的请求。
  4. 如果请求队列不为空,执行 1-3。

从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。

和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then() 方法完成返回结果处理。

因为流请求本质上是基于 fopen 的,发起请求逻辑比较简单。

public function __invoke(RequestInterface $request, array $options)
{
  // 延迟请求直接 delay 操作
  if (isset($options['delay'])) {
    usleep($options['delay'] * 1000);
  }
  // 重点1:解析返回值
  return $this->createResponse(
    $request,
    $options,
    // 重点2:发起请求
    $this->createStream($request, $options),
    $startTime
  );
}

先看如何发起请求的,重点在配置项的处理。

private function createStream(RequestInterface $request, array $options)
{
    $params = [];
    // 这里设置了默认请求参数
    $context = $this->getDefaultContext($request);
  // 这里方法主要是依据配置项调用了
    // add_proxy,add_timeout,add_verify,add_cert,add_progress,add_debug
    // 其实本质上就是用自定义配置覆盖默认请求参数
    if (!empty($options)) {
        foreach ($options as $key => $value) {
            $method = "add_{$key}";
            if (isset($methods[$method])) {
                $this->{$method}($request, $context, $value, $params);
            }
        }
    }
  // 这里也是用自定义配置覆盖默认请求参数
    if (isset($options['stream_context'])) {
        if (!is_array($options['stream_context'])) {
            throw new \InvalidArgumentException('stream_context must be an array');
        }
        $context = array_replace_recursive(
            $context,
            $options['stream_context']
        );
    }
    // 解析 host ,支持强制 IP 解析,v4 和 v6 都支持
    $uri = $this->resolveHost($request, $options);
    $context = $this->createResource(
        function () use ($context, $params) {
            // 这里创建资源流
            return stream_context_create($context, $params);
        }
    );

    return $this->createResource(
        function () use ($uri, &$http_response_header, $context, $options) {
            // 这里发起请求
            $resource = fopen((string) $uri, 'r', null, $context);
            $this->lastHeaders = $http_response_header;
      // 设置超时时间
            if (isset($options['read_timeout'])) {
                $readTimeout = $options['read_timeout'];
                $sec = (int) $readTimeout;
                $usec = ($readTimeout - $sec) * 100000;
                stream_set_timeout($resource, $sec, $usec);
            }
            return $resource;
        }
    );
}

从代码看,默认启用了 HTTPS 。

这里的自定义配置和默认配置合并,不再是之前简单的数组合并操作了,因为某个配置的修改,可能会涉及到其他配置项的变动,所以对几个主要选项 (proxy,timeout,verify,cert,progress,debug) 做了封装。

毕竟 fopen 这个功能强大的函数在设计之初其目标就是操作 resource ,所以它的配置项也根据 resource 的不同而有差异,其针对 HTTP 的配置项在这里可以看到。

然后就是处理返回值,如果使用 cURL 这些都能方便的处理,但是在流处理中,就得自己去解析它,相当于要自己要完成一部分 cURL 的工作。

private function createResponse(
        RequestInterface $request,
        array $options,
        $stream,
        $startTime
    ) {
        $hdrs = $this->lastHeaders;
        $this->lastHeaders = [];
        $parts = explode(' ', array_shift($hdrs), 3);
        $ver = explode('/', $parts[0])[1];
        $status = $parts[1];
        $reason = isset($parts[2]) ? $parts[2] : null;
      // 解析 header 
        $headers = \GuzzleHttp\headers_from_lines($hdrs);
      // 解析返回类型
        list ($stream, $headers) = $this->checkDecode($options, $headers, $stream);
      // 构建一个 Psr7\StreamInterface 的 Stream 对象
        $stream = Psr7\stream_for($stream);
        $sink = $stream;
        $response = new Psr7\Response($status, $headers, $sink, $ver, $reason);
    // 回调 on_headers 函数
        if (isset($options['on_headers'])) {
            try {
                $options['on_headers']($response);
            } catch (\Exception $e) {
                $msg = 'An error was encountered during the on_headers event';
                $ex = new RequestException($msg, $request, $response, $e);
                return \GuzzleHttp\Promise\rejection_for($ex);
            }
        }
    // 回调 on_stats 函数
        $this->invokeStats($options, $request, $startTime, $response, null);
  
        return new FulfilledPromise($response);
    }

整个过程都是在解析数据,响应内容是通过 stream_get_contents 拿到的,在 Psr7\StreamInterface 实例中有体现。

这里单独说下 on_headers 这个函数。这个函数是在拿到返回头之后,依据返回头里面的信息,来判定如何响应后面的操作,在返回数据比较大的时候可以做到提前拦截,避免浪费资源。

这个设置在所有请求方式中都有效,只是用在流处理中意义更大。

$client->request('GET', 'http://httpbin.org/stream/1024', [
    'on_headers' => function (ResponseInterface $response) {
        if ($response->getHeaderLine('Content-Length') > 1024) {
            throw new \Exception('The file is too big!');
        }
    }
]);

在分析源码的过程中,发现了一个没有被使用的类 GuzzleHttp\Promise\Coroutine ,它也是对 promise 的一个实现,但是其是通过迭代器来实现的,会不会有协程版的 promise 呢?我们拭目以待。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK