

Go 每日一库之 rpcx
source link: https://studygolang.com/articles/28889
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

简介
在之前的两篇文章 rpc
和 json-rpc
中,我们介绍了 Go 标准库提供的 rpc
实现。在实际开发中, rpc
库的功能还是有所欠缺。今天我们介绍一个非常优秀的 Go RPC 库—— rpcx
。 rpcx
是一位国人大牛开发的,详细开发历程可以在 rpcx
官方博客 了解。 rpcx
拥有媲美,甚至某种程度上超越 gRPC
的性能,有完善的中文文档,提供服务发现和治理的插件。
快速使用
本文示例使用 go modules
。
首先是安装:
$ go get -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
可以看出 rpcx
的安装有点特殊。使用 go get -v github.com/smallnest/rpcx/...
命令只会安装 rpcx
的基础功能。扩展功能都是通过 build tags
指定。为了使用方便,一般安装所有的 tags
,如上面命令所示。这也是官方推荐的安装方式。
我们先编写服务端程序,实际上这个程序与用 rpc
标准库编写的程序几乎一模一样:
package main import ( "context" "errors" "github.com/smallnest/rpcx/server" ) type Args struct { A, B int } type Quotient struct { Quo, Rem int } type Arith int func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error { *reply = args.A * args.B return nil } func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by 0") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { s := server.NewServer() s.RegisterName("Arith", new(Arith), "") s.Serve("tcp", ":8972") }
首先创建一个 Server
对象,调用它的 RegisterName()
方法在服务路径 Arith
下注册 Mul
和 Div
方法。与标准库相比, rpcx
要求注册方法的第一个参数必须为 context.Context
类型。最后调用 s.Serve("tcp", ":8972")
监听 TCP 端口 8972。是不是很简单?启动服务器:
$ go run main.go
然后是客户端程序:
package main import ( "context" "flag" "log" "github.com/smallnest/rpcx/client" ) var ( addr = flag.String("addr", ":8972", "service address") ) func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption) defer xclient.Close() args := &Args{A:10, B:20} var reply int err :=xclient.Call(context.Background(), "Mul", args, &reply) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d\n", args.A, args.B, reply) args = &Args{50, 20} var quo Quotient err = xclient.Call(context.Background(), "Div", args, &quo) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem) }
rpcx
支持多种服务发现的方式让客户端找到服务器。上面代码中我们使用的是最简单的点到点的方式,也就是 直连 。要调用服务端的方法,必须先创建一个 Client
对象。使用 Client
对象来调用远程方法。运行客户端:
$ go run main.go 10 * 20 = 200 50 * 20 = 2...10
注意到,创建 Client
对象的参数有 client.Failtry
和 client.RandomSelect
。这两个参数分别为 失败模式 和 如何选择服务器 。
传输
rpcx
支持多种传输协议:
-
TCP
:TCP 协议,网络名称为tcp
; -
HTTP
:HTTP 协议,网络名称为http
; -
UnixDomain
:unix 域协议,网络名称为unix
; -
QUIC
:是 Quick UDP Internet Connections 的缩写,意为 快速UDP网络连接 。HTTP/3 底层就是 QUIC 协议,Google 出品。网络名称为quic
; -
KCP
:快速并且可靠的 ARQ 协议,网络名称为kcp
。
rpcx
对这些协议做了非常好的封装。除了在创建服务器和客户端连接时需要指定协议名称,其它时候的使用基本是透明的。我们将上面的例子改装成使用 http
协议的:
服务端改动:
s.Serve("http", ":8972")
客户端改动:
d := client.NewPeer2PeerDiscovery("http@"+*addr, "")
QUIC
和 KCP
的使用有点特殊, QUIC
必须与 TLS 一起使用, KCP
也需要做传输加密。使用 Go 语言我们能很方便地生成一个证书和私钥:
package main import ( "crypto/rand" "crypto/rsa" "crypto/x509" "crypto/x509/pkix" "encoding/pem" "math/big" "net" "os" "time" ) func main() { max := new(big.Int).Lsh(big.NewInt(1), 128) serialNumber, _ := rand.Int(rand.Reader, max) subject := pkix.Name{ Organization: []string{"Go Daily Lib"}, OrganizationalUnit: []string{"TechBlog"}, CommonName: "go daily lib", } template := x509.Certificate{ SerialNumber: serialNumber, Subject: subject, NotBefore: time.Now(), NotAfter: time.Now().Add(365 * 24 * time.Hour), KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, IPAddresses: []net.IP{net.ParseIP("127.0.0.1")}, } pk, _ := rsa.GenerateKey(rand.Reader, 2048) derBytes, _ := x509.CreateCertificate(rand.Reader, &template, &template, &pk.PublicKey, pk) certOut, _ := os.Create("server.pem") pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}) certOut.Close() keyOut, _ := os.Create("server.key") pem.Encode(keyOut, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(pk)}) keyOut.Close() }
上面代码生成了一个证书和私钥,有效期为 1 年。运行程序,得到两个文件 server.pem
和 server.key
。然后我们就可以编写使用 QUIC
协议的程序了。服务端:
func main() { cert, _ := tls.LoadX509KeyPair("server.pem", "server.key") config := &tls.Config{Certificates: []tls.Certificate{cert}} s := server.NewServer(server.WithTLSConfig(config)) s.RegisterName("Arith", new(Arith), "") s.Serve("quic", "localhost:8972") }
实际上就是加载证书和密钥,然后在创建 Server
对象时作为选项传入。客户端改动:
conf := &tls.Config{ InsecureSkipVerify: true, } option := client.DefaultOption option.TLSConfig = conf d := client.NewPeer2PeerDiscovery("quic@"+*addr, "") xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option) defer xclient.Close()
客户端也需要配置 TLS。
有一点需要注意, rpcx
对 quic/kcp
这些协议的支持是通过 build tags
实现的。默认不会编译 quic/kcp
相关文件。如果要使用,必须自己手动指定 tags
。先启动服务端程序:
$ go run -tags quic main.go
然后切换到客户端程序目录,执行下面命令:
$ go run -tags quic main.go
还有一点需要注意,在使用 tcp
和 http
(底层也是 tcp
)协议的时候,我们可以将地址简写为 :8972
,因为默认就是本地地址。但是 quic
不行,必须把地址写完整:
// 服务端 s.Serve("quic", "localhost:8972") // 客户端 addr = flag.String("addr", "localhost:8972", "service address")
注册函数
上面的例子都是调用对象的方法,我们也可以调用函数。函数的类型与对象方法相比只是没有 接收者 。注册函数需要指定一个服务路径。服务端:
type Args struct { A, B int } type Quotient struct { Quo, Rem int } func Mul(cxt context.Context, args *Args, reply *int) error { *reply = args.A * args.B return nil } func Div(cxt context.Context, args *Args, quo *Quotient) error { if args.B == 0 { return errors.New("divide by 0") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { s := server.NewServer() s.RegisterFunction("function", Mul, "") s.RegisterFunction("function", Div, "") s.Serve("tcp", ":8972") }
只是注册方法由 RegisterName
变为了 RegisterFunction
,参数由一个对象变为一个函数。我们需要为注册的函数指定一个服务路径,客户端调用时会根据这个路径查找对应方法。客户端:
func main() { flag.Parse() d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "") xclient := client.NewXClient("function", client.Failtry, client.RandomSelect, d, client.DefaultOption) defer xclient.Close() args := &Args{A: 10, B: 20} var reply int err := xclient.Call(context.Background(), "Mul", args, &reply) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d\n", args.A, args.B, reply) args = &Args{50, 20} var quo Quotient err = xclient.Call(context.Background(), "Div", args, &quo) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem) }
注册中心
rpcx
支持多种注册中心:
zookeeper Etcd Consul/mDNS
我们之前演示的都是 点对点 的连接,接下来我们介绍如何使用 zookeeper
作为注册中心。在 rpcx
中,注册中心是通过插件的方式集成的。使用 ZooKeeperRegisterPlugin
这个插件来集成 Zookeeper
。服务端代码:
type Args struct { A, B int } type Quotient struct { Quo, Rem int } var ( addr = flag.String("addr", ":8972", "service address") zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address") basePath = flag.String("basePath", "/services/math", "service base path") ) type Arith int func (t *Arith) Mul(cxt context.Context, args *Args, reply *int) error { fmt.Println("Mul on", *addr) *reply = args.A * args.B return nil } func (t *Arith) Div(cxt context.Context, args *Args, quo *Quotient) error { fmt.Println("Div on", *addr) if args.B == 0 { return errors.New("divide by 0") } quo.Quo = args.A / args.B quo.Rem = args.A % args.B return nil } func main() { flag.Parse() p := &serverplugin.ZooKeeperRegisterPlugin{ ServiceAddress: "tcp@" + *addr, ZooKeeperServers: []string{*zkAddr}, BasePath: *basePath, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } if err := p.Start(); err != nil { log.Fatal(err) } s := server.NewServer() s.Plugins.Add(p) s.RegisterName("Arith", new(Arith), "") s.Serve("tcp", *addr) }
在 ZooKeeperRegisterPlugin
中,我们指定了本服务地址,zookeeper 集群地址(可以是多个),起始路径等。服务器启动时自动向 zookeeper 注册本服务的信息,客户端可直接从 zookeeper 拉取可用的服务列表。
首先启动 zookeeper 服务器,zookeeper 的安装与启动可以参考我的上一篇文章。分别在 3 个控制台中启动 3 个服务器,指定不同的端口(注意需要指定 -tags zookeeper
):
// 控制台1 $ go run -tags zookeeper main.go -addr 127.0.0.1:8971 // 控制台2 $ go run -tags zookeeper main.go -addr 127.0.0.1:8972 // 控制台3 $ go run -tags zookeeper main.go -addr 127.0.0.1:8973
启动之后,我们观察 zookeeper 路径 /services/math
中的内容:
非常棒,可用的服务地址不用我们手动维护了!
接下来是客户端:
var ( zkAddr = flag.String("zkAddr", "127.0.0.1:2181", "zookeeper address") basePath = flag.String("basePath", "/services/math", "service base path") ) func main() { flag.Parse() d := client.NewZookeeperDiscovery(*basePath, "Arith", []string{*zkAddr}, nil) xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption) defer xclient.Close() args := &Args{A: 10, B: 20} var reply int err := xclient.Call(context.Background(), "Mul", args, &reply) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d\n", args.A, args.B, reply) args = &Args{50, 20} var quo Quotient err = xclient.Call(context.Background(), "Div", args, &quo) if err != nil { log.Fatalf("failed to call: %v", err) } fmt.Printf("%d * %d = %d...%d\n", args.A, args.B, quo.Quo, quo.Rem) }
我们通过 zookeeper 读取可用的 Arith
服务列表,然后随机选择一个服务发送请求:
$ go run -tags zookeeper main.go 2020/05/26 23:03:40 Connected to 127.0.0.1:2181 2020/05/26 23:03:40 authenticated: id=72057658440744975, timeout=10000 2020/05/26 23:03:40 re-submitting `0` credentials after reconnect 10 * 20 = 200 50 * 20 = 2...10
我们的客户端发送了两条请求。由于使用了 client.RandomSelect
策略,所以这两个请求随机发送到某个服务端。我在 Mul
和 Div
方法中增加了一个打印,可以观察一下各个控制台的输出!
如果我们关闭了某个服务器,对应的服务地址会从 zookeeper 中移除。我关闭了服务器 1,zookeeper 服务列表变为:
相比上一篇文章中需要手动维护 zookeeper 的内容, rpcx
的自动注册和维护明显要方便太多了!
总结
rpcx
是 Go 语言中首屈一指的 rpc 库,功能丰富,性能出众,文档丰富,已经被不少公司和个人采用。本文介绍的只是最基础的功能, rpcx
支持各种路由选择策略、分组、限流、身份认证等高级功能,推荐深入学习!
大家如果发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue:smile:
参考
- rpcx GitHub: https://github.com/smallnest/rpcx
- rpcx 博客: https://blog.rpcx.io/
- rpcx 官网: https://rpcx.io/
- rpcx 文档: https://doc.rpcx.io/
- Go 每日一库 GitHub: https://github.com/darjun/go-daily-lib
我
我的博客: https://darjun.github.io
欢迎关注我的微信公众号【GoUpUp】,共同学习,一起进步~
欢迎关注我们的微信公众号,每天学习Go知识

Recommend
-
33
简介 今天我们来看一个很小,很实用的库 go-homedir 。顾名思义, go-homedir 用来获取用户的主目录。 实际上,使用标准库 os/user
-
46
简介 ini 是 Windows 上常用的配置文件格式。MySQL 的 Windows 版就是使用 ini 格式存储配置的。 go-ini 是 Go 语言中用于操作 ini 文件的第三方库。 本文...
-
23
简介 cobra 是一个命令行程序库,可以用来编写命令行程序。同时,它也提供了一个脚手架, 用于生成基于 cobra 的应用程序框架。非常多知名的开源项目使用了 cobra 库...
-
34
简介 上一篇文章介绍 cobra 的时候提到了 viper ,今天我们就来介绍一下这个库。 viper 是一...
-
29
简介 上一篇文章 Go 每日一库之 viper 中,我们介绍了 viper 可以监听文件修改进而自动重新加载。 其内部使用的就是 fsnotify 这...
-
33
简介 今天我们再来介绍 spf13 大神的另一个库 cast 。 cast 是一个小巧、实用的类型转换库,用于将一个类型转为另一个类型。 最初开发 cast
-
39
简介 在日常开发中,日志是必不可少的功能。虽然有时可以用 fmt 库输出一些信息,但是灵活性不够。Go 标准库提供了一个日志库 log 。本文介绍 log 库的使用。 快速使用
-
23
简介 前一篇文章 介绍了 Go 标准库中的日志库 log 。最后我们也提到, log 库只提供了三组接口,功能过于简单了。 ...
-
42
简介 一线开发人员每天都要使用日期和时间相关的功能,各种定时器,活动时间处理等。标准库 time 使用起来不太灵活,特别是日期时间的创建和运算。
-
32
简介 程序中时常有发送邮件的需求。有异常情况了需要通知管理员和负责人,用户下单后可能需要通知订单信息,电商平台、中国移动和联通都有每月账单,这些都可以通过邮件来推送。还有我们平时收到的垃圾邮件大都也是通过这种方...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK