68

第四届阿里中间件性能大赛初赛top8思路解析

 5 years ago
source link: http://www.10tiao.com/html/681/201807/2651030018/1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

编者按:本文为朋友圈数云-王石冲投稿,王同学在第一赛季排名第10,去掉第一名及小号排名第8。

赛题背景分析及理解

初赛题目是吸引我参加比赛的最大原因。其中一段描述了Service Mesh的作用:

作为 Service Mesh 的核心组件之一,高性能的服务间代理(Agent)是不可或缺的,其可以做到请求转发、协议转换、服务注册与发现、动态路由、负载均衡、流量控制、降级和熔断等诸多功能,也是区别于传统微服务架构的重要特征。

而这种思想与《反应式设计模式》不约而同。在反应式系统设计的过程中,很重要的一块就是如何与现存的非反应式系统进行交互。非反应式系统典型地都具有同步阻塞调用者、无界输入队列、不遵循有界响应延迟的原则等缺点,这使得流量控制、资源高效利用以及降级、熔断等功能都比较难以实现。《反应式设计模式》一书中专门推荐了要使用单独的资源来与这些系统整合,并赋予他们“反应式”的假象。而Service Mesh中的Agent,则可以看作成专门用来与非反应式系统进行整合的组件。在第14章的资源管理模式中,描述了如何使用这样的资源与之进行交互的方法,尤其是托管阻塞模式;而第16章的流量控制模式,则指导了我们如何在调用过程中行之有效进行流量控制。当然,对于比赛来说,这些设计相对来说过于概括。不过我们可以先基于这种概括性地原则构建出体系架构来,之后我们再具体优化相关的细节,提高成绩。而基于之前描述的原因,我的第一版使用了Akka来进行开发。接下来我们先分析一下具体的题目。

题目分析

题目的要求是:

实现一个高性能的 Service Mesh Agent 组件,并包含如下一些功能:

  1. 服务注册与发现

  2. 协议转换

  3. 负载均衡

服务注册与发现是为了获得资源的访问方式。这个过程最好不要与正式的调用过程耦合。所以我们用一个单独的Actor来做服务发现。如果是在Consumer中,这个Actor会去监听ETCD的变更,如果发现Endpoints发生了变化,则将信息发布到ActorSystem的事件流中。之后关注EndpointsUpdated事件的Actor就会收到此消息,并根据它来更新自己的端点列表,进行负载的动态变更。

协议转换相对来说是一个打铁的活,根据Dubbo协议一点点写好就行了。

重要的则在于负载均衡。进一步回到题目描述中:

  1. 每轮评测在一台服务器中启动五个服务(以 Docker 实例的形式启动),一个 etcd 服务作为注册表、一个 Consumer 服务和三个 Provider 服务;

  2. 使用一台独立的施压服务器,分不同压力场景对 Consumer 服务进行压力测试,得到 QPS;

总共有3轮压力测试,分别是128、256、512个连接。由于每次请求的往返时间最少也是50ms,那么每秒钟,按照512连接的最大速度,则是1000 / 50 * 512 = 10240的最大QPS。

其中,三台Provider的负载能力有所不同,按照CPU的quota分配以及内存的大小分配,正常情况下应该是1比2比3。只是由于Provider的dubbo端最多同时只能处理200个请求,多出来的直接被reject掉。那么最好的分配比例在512条件下则是 112 : 200 : 200

当然,反应式系统的设计原则并不是固定分配比例的。它希望的理想情形是你先告诉我你能处理多少任务,一旦任务来了,我就尽量按照这个数量发给你。不要Consumer去强行推,不要Provider一直来拉。而这种模式最好的实现方式,就是利用Akka Stream啦。


核心思路

按照前面的分析,核心思路就是将每个Provider的处理过程看作是一条流。来自调用端的所有请求先汇聚到一个队列里面,之后根据后端Provider的处理能力,分别分配到三个不同的流中。而如果汇聚队列的长度达到了界限值,则降级服务,对外部请求进行按比例丢弃,直到与系统的处理能力重新匹配(详情参见《反应式设计模式》第十六章丢弃模式)。这样整个系统就又健壮又迅速。

关键代码

下面一段是用来抽象Consumer的Actor里面的代码,所有连接的请求都被注册到RequestHandler这个Actor了。

  val requestHandler: ActorRef = context.actorOf(Props(new RequestHandler).withDispatcher("mpsc"), "request-handler")  def receive: Receive = {    case Bound(localAddress) 
      etcdManager ! "consumer"
      log.info(s"service started at ${localAddress.getHostString}:${localAddress.getPort}")    case CommandFailed(_: Bind) 
      context stop self
    case Connected(_, _) 
      val connection = sender()
      connection ! Register(requestHandler) 
      //将Connection全部注册到RequestHandler,就是说所有连接发过来的数据都回转发到这个Actor
      //注意这个是Hack写法。正统的还是应该一个Actor一个连接,这样逻辑才会清晰。
  }


然后在Requesthandler里面,接收到的ByteString直接作为元素提供给后面的处理流代码里面。

 var source: SourceQueueWithComplete[(Long, ByteString)] = _  
  override def receive: Receive = {    case Received(bs) =>
      source.offer(sender().path.name.toLong, bs) //这里的sender是处理连接的actor,它们的名字刚刚好是ID,所以直接复用。
    case EndpointsUpdate(newEndpoints) =>
      log.info(s"start new source for endpoints $newEndpoints")
      source.complete()
      source = getSourceByEndpoints(newEndpoints)


这里的source是一个可完成SourceSourceFlowSink是Akka Stream里面的基本构建块。其大体意义如下:

  1. Source: 只有一个输出流的构件块;

  2. Sink: 只接收一个输入流的构件块;

  3. Flow: 接收一个输入流,并拥有一个输出流的构件块。

  4. Graph: 一个打包好的流处理拓扑,它可以拥有一组输入端口或者输出端口。

我们这里是一个可完成Source,它由Source.queue声明并物化后产生:

  def getSourceByEndpoints(endpoints: Set[Endpoint]): SourceQueueWithComplete[(Long, ByteString)] = {    val handleFlow = Flow[(Long, ByteString)]
      .via(DubboFlow.connectionIdFlow)
      .via(endpointsFlow(endpoints))
      .to(DubboFlow.decoder)      
    Source.queue[(Long, ByteString)](512, OverflowStrategy.backpressure)
      .to(handleFlow).run()
  }


这里是由这个函数基于Endpoint的个数构建。第一段handleFlow是构建了一个Flow,这个Flow可以接收一个二元组(Long, ByteString),并将其交给DubboFlow.connectionIdFlow来encode成自定义协议,之后将其发送到endpointsFlow进行对Provider的调用,并得到结果。得到结果之后,经由DubboFlow.decoder来decode,并发送回给各个连接Actor,由其返回给客户端。

上面的内容里,DubboFlow.connectionIdFlowDubboFlow.decoder不多说,都是打铁代码。核心逻辑endpointsFlow(endPoints)贴出如下:

def endpointsFlow(endpoints: List[Endpoint]) = {    
    val tcpFlows = endpoints.map { endpoint =>
      Tcp().outgoingConnection(endpoint.host, endpoint.port).async
    }    
    val framing = Framing.lengthField(4, 12, Int.MaxValue, ByteOrder.BIG_ENDIAN)    Flow.fromGraph(GraphDSL.create(tcpFlows) { implicit builder =>
      tcpFlows =>
        import GraphDSL.Implicits._
        
        val balance = builder.add(Balance[ByteString](tcpFlows.size))        val bigMerge = builder.add(Merge[(Long, ByteString)](tcpFlows.size))
        tcpFlows.foreach { tcp =>
          balance  ~> tcp ~> framing ~> bigMerge
        }        
        FlowShape(balance.in, bigMerge.out)
    })
  }


每一个endpoint都被映射成为一个TcpFlow,通往Provider端。之后使用Akka Stream的DSL方法,构建了一个Graph。这个Graph用图形表示,其拓扑结果则是如下:

                      +------> Small Provider +--------> Framing +-------+
                      |                                                  |
    Input             |                                                  |                Output
+--------> Balancer ---------> Medium Provider+--------> Framing +----------------> Merge +-------->
                      |                                                  |
                      |                                                  |
                      +------> Large Provider +--------> Framing +-------+

数据由左边输入,经过Balancer,这个Balancer是由Akka Stream提供的现成组件,它可以将上游的元素路由到下游,其特性如下:

  1. 一个Balance由一个in端口和2到多个out端口,

  2. 当任意下游端口停止回压之后,它输出元素到下游输出端口;

  3. 当下游所有端口都在回压的时候,它就回压上游;

  4. 当上游完成时,它也完成;

  5. 当其eagerCancel参数设置为true时,任意下游取消,则其也取消;设置为false的时候,当所有下游取消,它才取消。

由上面的拓扑结构可以看到,当任意Provider向上游表示可以处理请求的时候,Balancer就会在有请求到来的时候,向其输出;Provider处理完的请求,经过TCP拆包过程之后,就合并到一起,交由下游的流继续处理。如此,只要连接有请求过来,那么整个流就能一直运转。这个过程中,即使某个通往provider的连接断掉了,Balancer也能继续将请求路由到其他两个连接上。而这个时候,负责服务发现的Actor就会发出EndpointsUpdated的消息,此时RequestHandler会进入第二个匹配,用新的Endpoint来更新我们的处理流:

    case EndpointsUpdate(newEndpoints) =>
      log.info(s"start new source for endpoints $newEndpoints")
      source.complete()
      source = getSourceByEndpoints(newEndpoints)

注意这里的complete是表示流不再接收新的请求,这之前已经入队的请求仍然会继续完成,直到全部处理完毕。

Provider的代码相对Consumer就简单很多:

val handleFlow = Tcp().outgoingConnection(host, dubboPort).async  def startService: Future[Done] = {    Tcp().bind(host, port).runForeach { conn =>
      conn.handleWith(handleFlow)
    }
  }


它只需要将Consumer过来的连接转发给后端的Dubbo,或者为了性能原因,它需要将自定义协议包装成Dubbo协议,然后发过去,再将结果转回,即可。

到这里,我们用了大约不到300行代码,就完成了初赛题目的所有要求。并且代码的普适性和健壮性都很不错,后续还能依据需求,快速地实现任意一端的限流要求(Flow[Request].throttle(...)),或者加入断路器,进行快速失败。

这套代码在CPU资源充足的时候,例如在我本地(注意,已经按照docker参数限定了CPU quota和内存),256连接的时候可以跑4960, 512的时候可以跑9500。

然而线上则表现不好,分别最多4500和6400。这是为什么呢?

经过查询源码以后发现,问题出现在这一段:

@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
        if (remainingLimit > 0) {          // never read more than the configured limit
          buffer.clear()          val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
          buffer.limit(maxBufferSpace)          val readBytes = channel.read(buffer)
          buffer.flip()          if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)          if (readBytes > 0) info.handler ! Received(ByteString(buffer)) //这一段

          readBytes match {            case `maxBufferSpace`  if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace)            case x if x >= 0       AllRead
            case -1                EndOfStream
            case _ 
              throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
          }
        } else MoreDataWaiting


其中info.handler ! Received(ByteString(buffer))是将SocketChannel接收到的数据复制成ByteString类型之后,再发送出去的,所以相当于是从堆外把数据复制了出去,于是导致整个流程都是非zero copy的。本来在正常的逻辑下,不ZC是必然的,因为肯定要把数据读出来进行处理。但是在本次比赛的场景里,这种复制就是非常昂贵的操作了,直接导致Akka版本的代码无法和各位竞争,即使代码再精简,思想再先进,也无法取得好的成绩。所以在第二个版本中,我换用了netty来跑分。

Netty版本下的核心代码,分ConsumerAgent和调用PrivderAgent的NettyClient列出如下:

ConsumerAgent:

  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    msg match {      case in: ByteBuf =>
        val meshRequest = MeshRequest(cid) //cid是connectionId,在连接建立的时候获取
        val maybeClient = ClientChannelHolder.clientChannelCache.get() //client的channel存在了ThreadLocal里面,直接通过ThreadLocal获取到channelHandler
        maybeClient.writeAndFlush(meshRequest.toCustomProtocol(in), maybeClient.voidPromise()) //将流入的bytebuffer转变成自定义协议的格式,并直接向client的channel刷入数据
        meshRequest.recycle //回收MeshRequest对象
    }
  }


NettyClient:

override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
          msg match {            case bs: ByteBuf =>
              val cid = bs.getLong(4) //从ByteBuffer中获取connectionId
              val resp = MeshResponse(cid) //包装成MeshResponse
              val ch = ServerChannelHolder.serverChannelMap.get().get(cid) //根据connectionId获取这个连接的SocketChannel
              if(ch != null) { //如果存在的话,则刷入响应
                ch.writeAndFlush(resp.toHttpResponse(bs), ch.voidPromise())
              }
              resp.recycle //回收MeshResponse对象
          }
        }


这个是我所发现的最短的路线。其中省略了路由的过程。整体的线程设置如下:

val acceptorGroup = new EpollEventLoopGroup(1) 
val threadFactory = new DefaultThreadFactory("atf_wrk")
val workerNumber = 3val workerGroup = new EpollEventLoopGroup(workerNumber, threadFactory)


一个负责IO的线程,三个负责处理请求的线程。三个NettyClient分别使用三个worker中的一个就好了:

  val eventLoop = workerGroup.next() 
new NettyClient(ed.host, ed.port, eventLoop, 1, ed.scale)


主要的trick就是我只起了4个线程,1个负责IO,3个负责请求处理。通过连接绑定的线程来进行路由,所以少了很多人加权轮询的步骤,而且每个连接只通过同一个线程进行流转,所以也少了context switch的过程。情况好的话,4个线程应该pin到它们的cpu上,没有任何的上下文切换。

至于其他就是一些打铁的小细节,比如使用Recycler生成对象池来回收对象,使用池化的ByteBuf来避免堆外内存分配的开销,预先定义好一些要用来包装请求和回复的对象,使用Unpooled.unreleasableBuffer(buffer)来反复利用。如此,整个过程下来,不会有FGC,而YGC最多也就两三次而已。Recycler的代码列出如下:


class MeshRequest private(handle: Handle[MeshRequest]) {  private var cid: Long = _  private var buffer: ByteBuf = _  private var composite: CompositeByteBuf = _  def recycle = {
    cid = 0l
    buffer = null
    composite = null
    handle.recycle(this)
  }  def toCustomProtocol(bb: ByteBuf) = {    val n = bb.indexOf(280, bb.readableBytes(), '='.toByte)    val parameter = bb.skipBytes(n + 1)
    buffer.writeLong(cid)
    buffer.writeInt(parameter.readableBytes())
    composite.addComponents(true, buffer, parameter)
  }

}object MeshRequest {  private val RECYCLER = new Recycler[MeshRequest]() {    override def newObject(handle: Recycler.Handle[MeshRequest]): MeshRequest = {      new MeshRequest(handle)
    }
  }  def apply(id: Long): MeshRequest = {    val request = RECYCLER.get()
    request.cid = id
    request.buffer = ConsumerAgent.allocator.directBuffer(12)
    request.composite = ConsumerAgent.allocator.compositeBuffer()
    request
  }
}

最终,Netty版本的代码停留在6894,而Akka版本我没记错的话,应该是6400左右。

比赛经验总结和感想

其实是第一次参加这种编程的比赛,开始的时候看得蛮轻,因为按照实际生产的场景来说,我的第一种方案肯定是非常好的,编码简单、健壮、可扩展性强,应该是能够出彩的。但是因为比赛是唯成绩论的,或者说至少在初赛和复赛的时候是唯成绩论的,所以后续不得已,只能放弃我对Akka的信仰,使用Netty写了一个版本的打铁代码,以往前冲击一个比较好的名次,然后来向大家吹嘘Akka。事实证明,限定场景来做极致优化的话,Netty确实好很多,不过,在通用场景下,用Akka stream的思想,则可以迅速构建出一个集各种流控功能于一体,也非常好扩展,并且性能也不会相差太多的组件。所以,不管怎样,到最终的总结还是,如果是我来开发这个Service Mesh组件,Akka和Akka Stream绝对会是主力,而Netty则可以被应用在不需要将数据读出内存的场景(如只负责转发或者解析自定义协议的Provider端)。两者相结合,应该可以达到比较好的平衡。

· END ·


 往期推荐:


关注本公众号,欢迎订阅。


技术琐话 



以分布式设计、架构、体系思想为基础,兼论研发相关的点点滴滴,不限于代码、质量体系和研发管理。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK