7

archer-rpc: 一个基于 Scala 宏的异步 RPC 服务器

 4 years ago
source link: https://blog.henix.info/blog/archer-rpc-scala/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

最后更新日期:2014-12-16

  archer-rpc 是一个简单的 RPC 服务器。架构如下:

  • 协议:面向行的 TCP + json ,用换行符分隔请求和响应。json 库采用 spray-json
  • 异步:基于 netty 和 Scala 的 Future
  • boilerplate-free:使用 Scala 宏自动生成代码

  由于协议是自定义的,别人恐怕很难拿去直接用。所以我这里开源其实主要是为了学习交流。

  编写这个 RPC 的主要动机是:更容易地实现跨语言 RPC ,特别是动态和静态语言之间的。先说一下我们公司使用跨语言 RPC 的历史吧:

HTTP + JSON

  相关的库最成熟,容易编写。但问题是:http 的很多东西我们是用不到的。比如各种状态码、Cookie、Cache、各种 headers 。所有数据在处理的时候都用过一次 HTTP 协议栈,效率较低。

  archer-rpc 中规定的返回值类型只有如下几种:

  • Success - 成功
  • InputError - 输入错误(相当于 4xx)
  • UpstreamError - 上游错误(相当于 502 504)
  • MethodCallError - 内部错误(相当于 500)

Thrift

  选择这个库的基本理由是原生就支持 Java 和 php ——两种我们主要使用的语言,而且性能看起来也不错。但问题是:太复杂。

  1. 使用一套自己的数据结构描述语言,双方都要适配到上面。对比前面的 json ,不够灵活
  2. Java 还需要生成代码,增加构建环节

archer-rpc

  最终我选择了自己实现,因为我发现用 Scala 的宏可以实现 Thrift-Java 中必须用工具生成代码的部分,也就是静态转换成动态的部分。

  下面就来看一下如何用 archer-rpc 编写编写 rpc 服务器吧,例子是加法函数(一个同步和一个异步版本):

  MyMath 模块,通过 @RpcExport 标记要 export 的函数:

import henix.archer.RpcExport

import scala.concurrent.Future

object MyMath {

    @RpcExport
    def add(a: Int, b: Int): Int = a + b

    @RpcExport
    def addAsync(a: Int, b: Int): Future[Int] = Future { a + b }
}

  Main.scala 启动 rpc 服务器,按 Ctrl+C 退出:

import henix.archer.{RpcServer, Rpc}
import spray.json._
import spray.json.DefaultJsonProtocol._
import sun.misc.{SignalHandler, Signal}

object Main extends App {

  val mods = Map(
    "MyMath" -> Rpc.genModule[MyMath.type]
  )

  val rpcServer = new RpcServer(mods, "localhost", 4600)

  // unix only
  val exitHandler = new SignalHandler {
    override def handle(sig: Signal): Unit = {
      rpcServer.stop()
    }
  }
  Signal.handle(new Signal("INT"), exitHandler)
  Signal.handle(new Signal("TERM"), exitHandler)

  rpcServer.join()
}

  这里的 Rpc.genModule[T] 是一个宏,它会在 编译 的时候找出 T(现在只能用 object)的所有带有 @RpcExport 注解的方法,然后为这些方法生成 json -> 方法参数的代码。参见 https://github.com/henix/archer-rpc/blob/master/macros/src/main/scala/RpcMacros.scala

  如何测试:因为 archer-rpc 是直接使用纯文本流的,这意味着可以使用普通的 tcp 工具与其交互(就像可以用 Firefox 跟 HTTP API 交互),比如 netcat 或 socat(–> 表示发送给服务器的内容,<– 表示服务器返回的内容):

nc localhost 4700
--> {"mod":"MyMath","func":"add","params":{"a":1,"b":2}}
<-- {"type":"Success","value":3}

  nc 只支持按 Ctrl+C 结束。socat 则支持更强大的 readline 行编辑,可以 Ctrl+D 结束:

socat readline tcp4:localhost:4600

  socat 也可以写脚本执行:

echo '{"mod":"MyMath","func":"add","params":{"a":1,"b":2}}' | socat -t 15 - tcp4:localhost:4600

  如何为所有调用加上日志和统计功能?因为 Rpc.genModule 返回的是一个 Map[String, RpcMethodCall => Future[JsValue]] ,我们只需要包装一下这个 RpcMethodCall => Future[JsValue] 即可。这里用 Metrics 来统计成功/失败计数、测量函数运行时间:

import com.codahale.metrics.MetricRegistry
import com.typesafe.scalalogging.LazyLogging
import henix.archer.{UpstreamException, InputException, RpcMethodCall}
import Global.execctx
import spray.json.JsValue

import scala.concurrent.Future
import scala.concurrent.duration._

object RpcUtils extends LazyLogging {

  private val metrics: MetricRegistry = Global.metrics

  def withLogAndMetrics(func: RpcMethodCall => Future[JsValue])(methodCall: RpcMethodCall): Future[JsValue] = {
    val metricName = methodCall.mod + "." + methodCall.func

    val successEvent = metrics.meter(metricName + ".Success")
    val inputErrorEvent = metrics.meter(metricName + ".InputError")
    val upstreamErrorEvent = metrics.meter(metricName + ".UpstreamError")
    val methodCallErrorEvent = metrics.meter(metricName + ".MethodCallError")
    val timer = metrics.timer(metricName)
    val activeRequests = metrics.counter(metricName + ".activeRequests")

    val timerContext = timer.time()
    activeRequests.inc()

    val f = Future(methodCall).flatMap(func)

    f.onSuccess { case _ => successEvent.mark() }
    f.onFailure {
      case e: InputException =>
        inputErrorEvent.mark()
        logger.warn("input.error: {} => {}", methodCall, e.getMessage)
      case e: UpstreamException =>
        upstreamErrorEvent.mark()
        logger.warn("upstream.error: {} => {}", methodCall, e.getMessage)
      case e: Exception =>
        methodCallErrorEvent.mark()
        logger.error(methodCall.toString, e)
    }
    f.onComplete { _ =>
      val elapsed = timerContext.stop()
      activeRequests.dec()
      logger.info("{} complete in {}ms", methodCall, elapsed.nanoseconds.toMillis.toString)
    }

    f
  }
}

  然后在 Main 里用 val mods = Map(...).mapValues(_.mapValues(RpcUtils.withLogAndMetrics))

JSON-RPC

  后来我在网上闲逛的时候发现原来还有 JSON-RPC 这种协议,真是于我心有戚戚焉!如果让我再做一次设计,我可能会直接用它。

  不过仔细思考了 JSON-RPC 跟上述我们的自定义协议的不同之后,我认为:

  1. JSON-RPC 跟 archer-rpc 的自定义协议的最大不同是:它不是面向行的!

    archer-rpc 的每个请求和响应都是按照换行符分割的:每个请求中间必须没有换行符,最后必须有一个换行符。每个响应也一样。这是为了方便解析。而 JSON-RPC 允许在中间随意插换行符,这导致无法简单地实现。

  2. JSON-RPC 实现 batch query 要用特殊语法。而我的自定义协议只需要一次发送多行就可以了。

P.S. 为何要用这个名字呢?随便取的,出自 Fate 系列的七阶职之一。

评论邮箱 评论帮助

请按照如下格式发邮件:
[email protected]
[复制]
评论 / 回复内容,只支持纯文本


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK