5

跟着gRPC源码简单学习RPC原理

 4 years ago
source link: https://studygolang.com/articles/26241
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RPC(Remote Procedure Call/远程过程调用)是一种服务间交互的方式,达到 像本地方法一样调用远程方法 的目的。

看看源码,简单了解gRPC是怎么实现RPC的,以gRPC官方代码示例 helloworld 为例。

服务注册

先看一下服务端怎么注册服务。

helloworld.pb.go 文件中,会有 RegisterGreeterServer 方法以及 _Greeter_serviceDesc 变量,

_Greeter_serviceDesc 描述了服务的属性。 RegisterGreeterServer 方法会向gRPC服务端 s 注册服务 srv

//...
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}
//...
var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter", // ServiceName包括`.proto`文件中的package名称
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

server.go 中的 RegisterService 方法,会判断ServiceServer是否实现sd中描述的HandlerType;如果实现了则调用 s.register 方法注册。

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    ht := reflect.TypeOf(sd.HandlerType).Elem()
    st := reflect.TypeOf(ss)
    if !st.Implements(ht) { // 判断ServiceServer是否实现sd中描述的HandlerType
        grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
    }
    s.register(sd, ss)
}

register 根据 Method 创建对应的map,并将名称作为键,方法描述(指针)作为值,添加到相应的map中。

最后将{服务名称:服务}添加到服务端。

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {// 服务端是否启动
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {//服务是否已经注册
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc), // map of Methods
        sd:     make(map[string]*StreamDesc), // map of Stream
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv // 添加服务到服务端
}

服务调用

服务端注册了服务之后,接下来就是调用服务。

客户端调用服务

helloworld.pb.go 中的 SayHello 方法中使用 c.cc.Invoke 远程调用服务端:

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    // 远程调用服务端的方法
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

Invoke 的定义如下, "/helloworld.Greeter/SayHello" 对应的是 method 参数。

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error

method 被传入后,依次被 invokenewClientStream 方法调用,保存在 cs.callHdr.Method ;后续在 newAttemptLocked 方法中生成 cs.attempt.t

cs.SendMsg 最终通过 csAttempt.sendMsg 方法中调用 a.t.Write 方法写出请求 req

最后 cs.RecvMsg 轮询获取返回结果。

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

服务端执行调用

handleStream 方法会处理请求中的 Method 字段,获得服务名与方法名。调用 s.processUnaryRPC , 调用 md.Handler 方法(即 _Greeter_SayHello_Handler 方法)。

最终调用 s.sendResponse 方法返回结果。

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK