8

最近对rpcx做的一些优化以及一些优化尝试

 1 year ago
source link: https://colobu.com/2022/08/25/some-small-optimizations-of-rpcx/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

最近在做2022 Go生态圈 rpc 框架 Benchmark之前,专门花了一星期时间,对rpcx进行了重点的优化,这篇文章专门记录一下几个重要的优化点,供大家参考。

增加handler方式,避免服务端使用reflect

在之前的rpcx实现中,参考的是标准库rpc的注册方式,一个服务的注册如下:

rpcxserver.RegisterName("Hello", new(Hello), "")

它实际是通过反射的方式遍历这个rcvr,找到它的服务方法和参数类型,并缓存下来:

func (s *Server) register(rcvr interface{}, name string) (string, error) {
s.serviceMapMu.Lock()
defer s.serviceMapMu.Unlock()
service := new(service)
service.typ = reflect.TypeOf(rcvr)
service.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(service.rcvr).Type().Name() // Type
service.name = sname
// Install the methods
service.method = suitableMethods(service.typ, true)
if len(service.method) == 0 {
......
s.serviceMap[service.name] = service
return sname, nil

然后在处理请求的时候,根据调用的服务和方法,找到相应的类型,对请求类型和返回类型利用reflect产生相应的值,再利用reflect的function.Call执行方法调用,虽然中间使用了池化的技术,但是内部还是大量使用的反射的功能,性能损失不少。

Go标准库http router的实现给我了灵感,而lesismal/arpc的应用和性能优异的表现,促使我决定在rpcx增加一个更有效的服务处理方式,类型配置http handler一样的方式:

func hello(ctx *server.Context) error {
msg := &proto.BenchmarkMessage{}
err := ctx.Bind(msg)
if err != nil {
return err
msg.Field1 = "OK"
msg.Field2 = 100
if *delay > 0 {
time.Sleep(*delay)
} else {
runtime.Gosched()
return ctx.Write(msg)
host = flag.String("s", "127.0.0.1:8972", "listened ip and port")
delay = flag.Duration("delay", 0, "delay to mock business processing by sleep")
func main() {
flag.Parse()
rpcxserver := server.NewServer()
rpcxserver.AddHandler("Hello", "Say", hello)
rpcxserver.Serve("tcp", *host)

服务方法签名如func Xxx(ctx *server.Context) error,它会传入一个特制的Context,通过这个Context,能获得请求参数,也可以写回response。

通过ctx.Bind解码出请求参数:

msg := &proto.BenchmarkMessage{}
err := ctx.Bind(msg)
if err != nil {
return err

通过ctx.Write(msg)把response写回。

这里不会用到反射。如果你的编解码器使用的不是反射的方式的话,会更高效。

使用goroutine pool

rpcx默认采用每个request至少一个goroutine处理方式。这样如果在高并发的情况下,服务端会有非常巨大的goroutine存在,虽然说Go支持上万的goroutine,但是在goroutine非常多的情况下,(内存)资源占用非常多,对Go的调度和垃圾回收也会有一定的影响,所以在高并发的场景下,采用goroutine 池在一定程度上会提升性能。

说起线程池,网上已经有不下十个goroutine pool (或者叫做worker pool)的实现。rpcx使用的是alitto/pond, 并没有太多的性能的考虑,而是从使用方便性方面去考虑。

如果想使用goroutine池,你可以使用server.WithPool(100, 1000000):

rpcxserver := server.NewServer(server.WithPool(100, 1000000))

其中第一个参数是goroutine(worker)的数量,第二个参数capacity是最大的待处理的请求。超过capacity,新的请求就会被阻塞。

解析后的请求会被提交的goroutine pool中。

if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn, writeCh)
} else {
go s.processOneRequest(ctx, req, conn, writeCh)

调整进程优先级

有时候,你的程序可能会和其它程序混跑,即使没有其它的业务程序,也可能有系统自带的一些应用,如果我们把rpcx程序的优先级设置的高一些,使它有更多的可能被Linux系统程序调度,也会带来更多的优先级。

Linux每个进程都有一个介于 -20 到 19 之间的 nice 值, 值越低会有更多的机会获得Linux的调度。默认情况下,进程的 nice 值为 0。

进程的 nice 值,可以通过 nice 命令和 renice 命令修改,进而调整进程的运行顺序。

nice [-n NI值] 程序 按照指定的优先级启动程序。

renice 命令可以在进程运行时修改其 NI 值,从而调整优先级。renice [优先级] PID

当然你也可以通过程序动态的设置进程的优先级:syscall.Setpriority(syscall.PRIO_PROCESS, 0, -20),这条命令设置本进程的优先级为-20

这个带来10% ~ 20%性能的提升。

我还想尝试只设置listener.Accept的goroutine,让有更多的机会获得处理的机会。不过没有可调用的系统调用去设置。尝试了runtime.LockOSThread()listener.Accept锁定一个线程去去处理,没啥鸟用。

进一步优化想法

当然,还有一些优化的方法,不过还没有验证。

比如redis,采用单线程(主逻辑)的方式处理请求、还有比如nginx,采用多个worker独立的去处理连接。这样可以减少并发锁的请求。
rpcx可以改成单worker的方式,然后启动多个worker去监听和处理请求,可能也会提升性能。不过这个没有进一步的测试。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK