17

为 skynet 增加并行多请求的功能

 3 years ago
source link: https://blog.codingnow.com/2020/09/skynet_select.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

skynet 在设计时,就拒绝使用 callback 的形式来实现请求回应模式。我认为,callback 会导致业务中回应的流程和请求的流程割裂到两个执行序上,这不利于实现复杂的业务逻辑。尤其是对异常的支持不好。

所以,在 skynet 中发起请求时,当前执行序是阻塞的,一直等到远端回应,再继续在同一个执行序上延续。

如果依次发起请求,会有不该有的高延迟。因为在同一个执行序上,你必须等待前一个请求回应后,才可以提起下一个请求。而原本这个过程完全可以同时进行。

但是,如果我们想同时发起多个不相关的请求就会比较麻烦。为每个请求安排一个执行序的话,最后汇总所有请求回到一个执行序上又是一个问题。目前,只能用 fork/wait/wakeup 去实现,非常繁琐。

这类需求一直都存在 。我一直想找到一个合适的方法来实现这样一类功能:同时对外发起 n 个请求,依回应的次序来处理这些请求,直到所有的请求都回应后,再继续向后延续业务。实现这样的功能在目前的 skynet 框架下并不复杂,难点在于提供怎样的 api 形式给用户使用。

一开始,我想到的是利用一个 for 循环来处理多次回应。(以下不是合法的 lua 语法,只为了表达意思)

for index, ok, a,b,c,... in skynet.request(...)(...)(...) do
   if index == 1 then
       -- 处理第一个请求
  elseif index == 2 then
       -- 处理第二个请求
  else
    assert(index == 3)
       -- 处理第三个请求
  end
end

这段示意代码中,同时提起了三个不同的请求,它预期会收到三个回应。但回应的次序显然无法确定,所以我用了 index 这个序号来标识后面的参数是第几个回应。btw, 这里 ok 表示了请求成功还是失败(发生 error )。

请求和回应在这里被割裂开了,所以出现了怎样把请求和回应对应起来的问题。为此我想了好几个办法。其中一个是这样的形式:

for req, ok, a, b, ... in skynet.select() do
  if req(...) then
     -- 处理第一个请求
  elseif req(...) then
     -- 处理第二个请求
  elseif req(...) then
     -- 处理第三个请求
  end
end

如果设计一个 skynet.select() 返回一个迭代器,然后把请求本身放在循环内,似乎可以解决割裂的问题。用一定的实现技巧,让请求只在第一次调用的时候真正发出,而循环本身不停的匹配回应。

随之我意识到,这样过于技巧化。只是为了形式上好看一点,其实让语义更模糊了。就不再在这条路上走下去。

我发现,其实最简单的方案是用一个字符串标识每个请求,让业务编写的人可以用一个更明确的词区分不同的并发请求。再进一步,如果请求本身可以直接是一个对象(table) ,用请求本身就可以区分自己。

这样就是最终的方案:

local reqs = skynet.request()
local req1 = reqs:add { ... }
local req2 = reqs:add { ... }
for req, resp in reqs:select() do
    if req == req1 then
       -- 处理请求 1
    elseif req == req2 then
       -- 处理请求 2
    end
end

也可以简化成这样:

for req, resp in skynet.request 
    { ... , token = "req1" }
    { ... , token = "req2" }
     :select() do
    if req.token == "req1" then
       -- 处理请求 1
    elseif req.token == "req2" then
       -- 处理请求 2
    end
end

这里,我让 request/select 返回的迭代器每收到一个回应都返回请求对象和回应对象,用户可以自己识别。因为可以直接在请求对象中插入任意的标识符,所以很容易区分开不同的请求;或是把类似的请求分类处理。回应信息 resp 也用 table 承载,这样,如果 resp 为 nil ,则表示请求出错。

在一开始的实现中,这套 select 机制有一些限制。因为从调度器看来,这段业务的阻塞点在 select 上。也就是在这里,等待多个请求的回应。而回应处理的代码是在循环体内的,如果在处理回应的过程中又发起了新的请求(调用了阻塞函数),调度器就无法正确处理了。因为在同一个执行序列上,无法区分即将到回应到底是之前发起的多个请求之一,还是在处理某个回应时发起的新请求的回应。

不过这个限制是很容易消除的。只需要把 select 本身的执行序放在独立的 coroutine 里就可以了。相当于发起多个请求后,用一个独立的执行序去等待多个回应,每收到一个回应就转发回当初的位置,让用户可以在原有的执行序上依次处理。

有了这个基础结构之后,也就很容易增加超时的处理。超时相当于同时提交一个定时器请求,收到定时器请求的同时,放弃还没有收到的起他回应即可;或是等所有请求都处理完毕,放弃定时器的处理。

我们需要一个明确的处理流程来结束这个 select 循环:把多个请求的处理合流到后续的处理流程上,尚未处理的回应需要忽略掉。如果一切正常,select 的迭代器结束就可以做这些收尾工作。但如果发生异常,例如,在循环中 break 、产生 error 等等,我们需要主动调用关闭流程。

好在 lua 5.4 提供了 to be closed 方法来做这件事。所以这个新特性直接放在 lua 5.4 的基础上来实现。感兴趣的同学可以提前看看仓库中 select 这个分支 。目前最直接的好处是,使用新 api 重新实现的 debug console 在调用 stat 或 mem 这样需要轮询所有服务状态的功能时,默认加上了超时设计,不再会因为单个服务出了问题,而无法返回了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK