36

Dubbo-go 源码笔记(二)客户端调用过程

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzUzNzYxNjAzMg%3D%3D&%3Bmid=2247496103&%3Bidx=1&%3Bsn=68aeef14759b4d82dc58d9ff0abc8a70
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

rAVfYby.jpg!mobile

作者 | 李志信

导读: 有了上一篇文章 《Dubbo-go 源码笔记(一)Server 端开启服务过程》 的铺垫, 可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,发布自己的ivkURL并订阅事件开启监听;而客户应该是通过zk注册组件,拿到需要调用的serviceURL,更新invoker并重写用户的RPCService,从而实现对远程过程调用细节的封装。

配置文件和客户端源代码

1. client 配置文件

helloworld 提供的 demo:profiles/client.yaml。

registries :
"demoZk":
protocol: "zookeeper"
timeout : "3s"
address: "127.0.0.1:2181"
username: ""
password: ""
references:
"UserProvider":
# 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册
registry: "demoZk"
protocol : "dubbo"
interface : "com.ikurento.user.UserProvider"
cluster: "failover"
methods :
- name: "GetUser"
retries: 3

可看到配置文件与之前讨论过的 Server 端非常类似,其 refrences 部分字段就是对当前服务要主调的服务的配置,其中详细说明了调用协议、注册协议、接口 id、调用方法、集群策略等,这些配置都会在之后与注册组件交互、重写 ivk、调用的过程中使用到。

2. 客户端使用框架源码

user.go:

func init() {
config.SetConsumerService(userProvider)
hessian.RegisterPOJO(&User{})
}

main.go:

func main() {
hessian.RegisterPOJO(&User{})
config.Load()
time.Sleep(3e9)
println("\n\n\nstart to test dubbo")
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)
if err != nil {
panic(err)
}
println("response result: %v\n", user)
initSignal()
}

在官网提供的 helloworld demo 的源码中,可看到与服务端类似,在 user.go 内注册了 rpc-service,以及需要 rpc 传输的结构体 user。

在 main 函数中,同样调用了 config.Load() 函数,之后就可以通过实现好的 rpc-service:userProvider 直接调用对应的功能函数,即可实现 rpc 调用。

可以猜到,从 hessian 注册结构、SetConsumerService,到调用函数 .GetUser() 期间,用户定义的 rpc-service 也就是 userProvider 对应的函数被重写,重写后的 GetUser 函数已经包含实现了远程调用逻辑的 invoker。

接下来,就要通过阅读源码,看看 dubbo-go 是如何做到的。

实现远程过程调用

1. 加载配置文件

// file: config/config_loader.go :Load()


// Load Dubbo Init
func Load() {
// init router
initRouter()
// init the global event dispatcher
extension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)
// start the metadata report if config set
if err := startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err != nil {
logger.Errorf("Provider starts metadata report error, and the error is {%#v}", err)
return
}
// reference config
loadConsumerConfig()

在 main 函数中调用了 config.Load() 函数,进而调用了 loadConsumerConfig,类似于之前讲到的 server 端配置读入函数。

在 loadConsumerConfig 函数中,进行了三步操作:

// config/config_loader.go
func loadConsumerConfig() {
// 1 init other consumer config
conConfigType := consumerConfig.ConfigType
for key, value := range extension.GetDefaultConfigReader() {}
checkApplicationName(consumerConfig.ApplicationConfig)
configCenterRefreshConsumer()
checkRegistries(consumerConfig.Registries, consumerConfig.Registry)

// 2 refer-implement-reference
for key, ref := range consumerConfig.References {
if ref.Generic {
genericService := NewGenericService(key)
SetConsumerService(genericService)
}
rpcService := GetConsumerService(key)
ref.id = key
ref.Refer(rpcService)
ref.Implement(rpcService)
}
// 3 wait for invoker is available, if wait over default 3s, then panic
for {}
}
  1. 检查配置文件并将配置写入内存

  2. 在 for 循环内部 ,依次引用(refer)并且实例化(implement)每个被调 reference

  3. 等待三秒钟所有 invoker 就绪

其中重要的就是 for 循环里面的引用和实例化,两步操作,会在接下来展开讨论。

至此,配置已经被写入了框架。

2. 获取远程 Service URL,实现可供调用的 invoker

上述的 ref.Refer 完成的就是这部分的操作。

ZRBzi2J.png!mobile

(图一)

1)构造注册 url

和 server 端类似,存在注册 url 和服务 url,dubbo 习惯将服务 url 作为注册 url 的 sub。

// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {
//(一)配置url参数(serviceUrl),将会作为sub
cfgURL := common.NewURLWithOptions(
common.WithPath(c.id),
common.WithProtocol(c.Protocol),
common.WithParams(c.getUrlMap()),
common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),
)
...
// (二)注册地址可以通过url格式给定,也可以通过配置格式给定
// 这一步的意义就是配置->提取信息生成URL
if c.Url != "" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceUrl, err := common.NewURL(urlStr)
...
}
} else {// 配置读入注册中心的信息
// assemble SubURL from register center's configuration mode
// 这是注册url,protocol = registry,包含了zk的用户名、密码、ip等等
c.urls = loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)
...
// set url to regUrls
for _, regUrl := range c.urls {
regUrl.SubURL = cfgURL// regUrl的subURl存当前配置url
}
}
//至此,无论通过什么形式,已经拿到了全部的regURL
// (三)获取registryProtocol实例,调用其Refer方法,传入新构建好的regURL
if len(c.urls) == 1 {
// 这一步访问到registry/protocol/protocol.go registryProtocol.Refer
// 这里是registry
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
} else {
// 如果有多个注册中心,即有多个invoker,则采取集群策略
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
}

这个函数中,已经处理完从 Register 配置到 RegisterURL 的转换,即图(一)中部分:

RvI7Zrm.png!mobile

接下来,已经拿到的 url 将被传递给 RegistryProtocol,进一步 refer。

2)registryProtocol 获取到 zkRegistry 实例,进一步 Refer

// file: registry/protocol/protocol.go: Refer


// Refer provider service from registry center
// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {
var registryUrl = url
// 这里拿到的是referenceConfig,serviceUrl里面包含了Reference的所有信息,包含interfaceName、method等等
var serviceUrl = registryUrl.SubURL
if registryUrl.Protocol == constant.REGISTRY_PROTOCOL {// registryUrl.Proto = "registry"
protocol := registryUrl.GetParam(constant.REGISTRY_KEY, "")
registryUrl.Protocol = protocol//替换成了具体的值,比如"zookeeper"
}
// 接口对象
var reg registry.Registry
// (一)实例化接口对象,缓存策略
if regI, loaded := proto.registries.Load(registryUrl.Key()); !loaded {
// 缓存中不存在当前registry,新建一个reg
reg = getRegistry(&registryUrl)
// 缓存起来
proto.registries.Store(registryUrl.Key(), reg)
} else {
reg = regI.(registry.Registry)
}
// 到这里,获取到了reg实例 zookeeper的registry
//(二)根据Register的实例zkRegistry和传入的regURL新建一个directory
// 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,
// 这一步将在下面详细给出步骤
// new registry directory for store service url from registry
directory, err := extension.GetDefaultRegistryDirectory(&registryUrl, reg)
if err != nil {
logger.Errorf("consumer service %v create registry directory error, error message is %s, and will return nil invoker!",
serviceUrl.String(), err.Error())
return nil
}
// (三)DoRegister 在zk上注册当前client service
err = reg.Register(*serviceUrl)
if err != nil {
logger.Errorf("consumer service %v register registry %v error, error message is %s",
serviceUrl.String(), registryUrl.String(), err.Error())
}
// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
// invoker保存
proto.invokers = append(proto.invokers, invoker)
return invoker
}

可详细阅读上述注释,这个函数完成了从 url 到 invoker 的全部过程:

(一)首先获得 Registry 对象,默认是之前实例化的 zkRegistry,和之前 server 获取 Registry 的处理很类似。

(二)通过构造一个新的 directory,异步拿到之前在 zk 上注册的 server 端信息,生成 invoker。

(三)在 zk 上注册当前 service。

(四)集群策略,获得最终 invoker。

这一步完成了图(一)中所有余下的绝大多数操作,接下来就需要详细地查看 directory 的构造过程。

3)构造 directory(包含较复杂的异步操作)

rI7Vjum.png!mobile

图(二)

上述的 extension.GetDefaultRegistryDirectory(&registryUrl, reg) 函数,本质上调用了已经注册好的 NewRegistryDirectory 函数:

// file: registry/directory/directory.go: NewRegistryDirectory()


// NewRegistryDirectory will create a new RegistryDirectory
// 这个函数作为default注册在extension上面
// url为注册url,reg为zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {
if url.SubURL == nil {
return nil, perrors.Errorf("url is invalid, suburl can not be nil")
}
dir := &RegistryDirectory{
BaseDirectory: directory.NewBaseDirectory(url),
cacheInvokers: []protocol.Invoker{},
cacheInvokersMap: &sync.Map{},
serviceType: url.SubURL.Service(),
registry: registry,
}
dir.consumerConfigurationListener = newConsumerConfigurationListener(dir)
go dir.subscribe(url.SubURL)
return dir, nil
}

首先构造了一个注册 directory,开启协程调用其 subscribe 函数,传入 serviceURL。

这个 directory 目前包含了对应的 zkRegistry,以及传入的 URL,它的 cacheInvokers 部分是空的。

进入 dir.subscribe(url.SubURL) 这个异步函数:

// file: registry/directory/directory.go: subscribe()


// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {
// 增加两个监听,
dir.consumerConfigurationListener.addNotifyListener(dir)
dir.referenceConfigurationListener = newReferenceConfigurationListener(dir, url)
// subscribe调用
dir.registry.Subscribe(url, dir)
}

重点来了,它调用了 zkRegistry 的 Subscribe 方法,与此同时将自己作为 ConfigListener 传入。

我认为这种传入 listener 的设计模式非常值得学习,而且很有 java 的味道。

针对等待 zk 返回订阅信息这样的异步操作,需要传入一个 Listener,这个 Listener 需要实现 Notify 方法,进而在作为参数传入内部之后,可以被异步地调用 Notify,将内部触发的异步事件“传递出来”,再进一步处理加工。

层层的 Listener 事件链,能将传入的原始 serviceURL 通过 zkConn 发送给 zk 服务,获取到服务端注册好的 url 对应的二进制信息。

而 Notify 回调链,则将这串 byte[] 一步一步解析、加工;以事件的形式向外传递,最终落到 directory 上的时候,已经是成型的 newInvokers 了。

具体细节不再以源码形式展示,可参照上图查阅源码。

至此已经拿到了 server 端注册好的真实 invoker。

完成了图(一)中的部分:

u22iIjM.png!mobile

4)构造带有集群策略的 clusterinvoker

经过上述操作,已经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组里面缓存。

后续的操作对应本文从 url 到 invoker 的过程的最后一步,由 directory 生成带有特性集群策略的 invoker。

// (四)new cluster invoker,将directory写入集群,获得具有集群策略的invoker
cluster := extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))
invoker := cluster.Join(directory)
123

Join 函数的实现就是如下函数:

// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()


func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {
return &failoverClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
}
12345

dubbo-go 框架默认选择 failover 策略,既然返回了一个 invoker,我们查看一下 failoverClusterInvoker 的 Invoker 方法,看它是如何将集群策略封装到 Invoker 函数内部的:

// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()


// Invoker 函数
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
...
//调用List方法拿到directory缓存的所有invokers
invokers := invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {// 检查是否可以实现调用
return &protocol.RPCResult{Err: err}
}
// 获取来自用户方向传入的
methodName := invocation.MethodName()
retries := getRetries(invokers, methodName)
loadBalance := getLoadBalance(invokers[0], invocation)
for i := 0; i <= retries; i++ {
// 重要!这里是集群策略的体现,失败后重试!
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if i > 0 {
if err := invoker.checkWhetherDestroyed(); err != nil {
return &protocol.RPCResult{Err: err}
}
invokers = invoker.directory.List(invocation)
if err := invoker.checkInvokers(invokers, invocation); err != nil {
return &protocol.RPCResult{Err: err}
}
}
// 这里是负载均衡策略的体现!选择特定ivk进行调用。
ivk := invoker.doSelect(loadBalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
providers = append(providers, ivk.GetUrl().Key())
continue
}
return result
}
...
}

看了很多 Invoke 函数的实现,所有类似的 Invoker 函数都包含两个方向:一个是用户方向的 invcation;一个是函数方向的底层 invokers。

而集群策略的 invoke 函数本身作为接线员,把 invocation 一步步解析,根据调用需求和集群策略,选择特定的 invoker 来执行。

proxy 函数也是这样,一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。

proxy 函数负责将 ins 转换为 invocation,调用对应 invoker 的 invoker 函数,实现连通。

而出于这样的设计,可以在一步步 Invoker 封装的过程中,每个 Invoker 只关心自己负责操作的部分,从而使整个调用栈解耦。

妙啊!!!

至此,我们理解了 failoverClusterInvoker 的 Invoke 函数实现,也正是和这个集群策略 Invoker 被返回,接受来自上方的调用。

已完成图(一)中的:

iAjiqai.png!mobile

5)在 zookeeper 上注册当前 client

拿到 invokers 后,可以回到这个函数了:

  // file: config/refrence_config.go: Refer()

if len(c.urls) == 1 {
// 这一步访问到registry/protocol/protocol.go registryProtocol.Refer
c.invoker = extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])
// (一)拿到了真实的invokers
} else {
// 如果有多个注册中心,即有多个invoker,则采取集群策略
invokers := make([]protocol.Invoker, 0, len(c.urls))
...
cluster := extension.GetCluster(hitClu)
// If 'zone-aware' policy select, the invoker wrap sequence would be:
// ZoneAwareClusterInvoker(StaticDirectory) ->
// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
c.invoker = cluster.Join(directory.NewStaticDirectory(invokers))
}
// (二)create proxy,为函数配置代理
if c.Async {
callback := GetCallback(c.id)
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)
} else {
// 这里c.invoker已经是目的addr了
c.pxy = extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)
}

我们有了可以打通的 invokers,但还不能直接调用,因为 invoker 的入参是 invocation,而调用函数使用的是具体的参数列表,需要通过一层 proxy 来规范入参和出参。

接下来新建一个默认 proxy,放置在 c.proxy 内,以供后续使用。

至此,完成了图(一)中最后的操作:

niiErif.png!mobile

3. 将调用逻辑以代理函数的形式写入 rpc-service

上面完成了 config.Refer 操作,回到:

config/config_loader.go: loadConsumerConfig()

qaeieae.png!mobile

下一个重要的函数是 Implement,它的操作较为简单:旨在使用上面生成的 c.proxy 代理,链接用户自己定义的 rpcService 到 clusterInvoker 的信息传输。

函数较长,只选取了重要的部分:

// file: common/proxy/proxy.go: Implement()


// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
// Implement 实现的过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数
// 将当前rpc所有可供调用的函数注册到proxy.rpc内
func (p *Proxy) Implement(v common.RPCService) {
// makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数
// 这个被返回的函数是请求实现的载体,由他来发起调用获取结果
makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
// 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value
// 这个函数具体的实现如下:
...
// 目前拿到了 methodName、所有入参的interface和value,出参数reply
// (一)根据这些生成一个 rpcinvocation
inv = invocation_impl.NewRPCInvocationWithOptions(
invocation_impl.WithMethodName(methodName),
invocation_impl.WithArguments(inIArr),
invocation_impl.WithReply(reply.Interface()),
invocation_impl.WithCallBack(p.callBack),
invocation_impl.WithParameterValues(inVArr))
for k, value := range p.attachments {
inv.SetAttachments(k, value)
}
// add user setAttachment
atm := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment,也要写入inv
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
}
}
// 至此构造inv完毕
// (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用
result := p.invoke.Invoke(invCtx, inv)
// 如果有attachment,则加入
if len(result.Attachments()) > 0 {
invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())
}
...
}
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
methodName := t.Tag.Get("dubbo")
if methodName == "" {
methodName = t.Name
}
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() { // 针对于每个函数
outNum := t.Type.NumOut()
// 规定函数输出只能有1/2个
if outNum != 1 && outNum != 2 {
logger.Warnf("method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2",
t.Name, t.Type.String(), outNum)
continue
}
// The latest return type of the method must be error.
// 规定最后一个返回值一定是error
if returnType := t.Type.Out(outNum - 1); returnType != typError {
logger.Warnf("the latest return type %s of method %q is not error", returnType, t.Name)
continue
}
// 获取到所有的出参类型,放到数组里
var funcOuts = make([]reflect.Type, outNum)
for i := 0; i < outNum; i++ {
funcOuts[i] = t.Type.Out(i)
}
// do method proxy here:
// (三)调用make函数,传入函数名和返回值,获得能调用远程的proxy,将这个proxy替换掉原来的函数位置
f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))
logger.Debugf("set method [%s]", methodName)
}
}
...
}

正如之前所说,proxy 的作用是将用户定义的函数参数列表,转化为抽象的 invocation 传入 Invoker,进行调用。

其中已标明有三处较为重要的地方:

  1. 在代理函数中实现由参数列表生成 Invocation 的逻辑

  2. 在代理函数实现调用 Invoker 的逻辑

  3. 将代理函数替换为原始 rpc-service 对应函数

至此,也就解决了一开始的问题:

  // file: client.go: main()

config.Load()
user := &User{}
err := userProvider.GetUser(context.TODO(), []interface{}{"A001"}, user)

这里直接调用用户定义的 rpcService 的函数 GetUser,此处实际调用的是经过重写入的函数代理,所以就能实现远程调用了。

从 client 到 server 的 invoker 嵌套链- 小结

在阅读 dubbo-go 源码的过程中,我们能够发现一条清晰的 invoker-proxy 嵌套链,希望能够通过图的形式来展现:

a6ni2aB.png!mobile

如果你有任何疑问,欢迎钉钉扫码加入交流群【钉钉群号 23331795】:

作者简介

李志信  (GitHubID LaurenceLiZhixin),中山大学软件工程专业在校学生,擅长使用 Java/Go 语言,专注于云原生和微服务等技术方向。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK