26

grpc server源码学习

 3 years ago
source link: https://studygolang.com/articles/30792
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

grpc server源码学习

如何实现一个最简单的grpc server

// 创建listener
  lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

  // 创建server示例
    s := grpc.NewServer()
  
  // 注册服务
    pb.RegisterGreeterServer(s, &server{})
    reflection.Register(s)
        
  // 启动服务端监听
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }

grpc.NewServer()

grpc.NewServer()会返回一个grpc.Server,它的结构如下:

// Server is a gRPC server to serve RPC requests.
type Server struct {
    opts serverOptions

    mu     sync.Mutex // guards following
    lis    map[net.Listener]bool
    conns  map[transport.ServerTransport]bool
    serve  bool
    drain  bool
    cv     *sync.Cond          // signaled when connections close for GracefulStop
    m      map[string]*service // service name -> service info
    events trace.EventLog

    quit               *grpcsync.Event
    done               *grpcsync.Event
    channelzRemoveOnce sync.Once
    serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop

    channelzID int64 // channelz unique identification number
    czData     *channelzData
}

由这个结构,我们可以略知一二,它使用了一个容器conns用来保存当前的所有连接;也有和优雅退出的waitgroup,猜测应该是需要等待所有请求处理完后退出;cond猜测是用来通知所有当前的连接,服务将被停止了;其他字段的用途暂时无法很明显的猜测出来,我们将在后面继续分析。

pb.RegisterGreeterServer

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}

最终是调用了grpc.Server的 RegisterService 进行注册,第一个参数是pb生成代码生成的 _Greeter_serviceDesc ,它描述了rpc service的一些属性信息,内容如下:

var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "demo.proto",
}

这里面描述了

  • 服务名
  • HandlerType(这个是做什么的?存疑)
  • 方法列表,方法列表包括方法名和一个Handler(Handler做什么的?存疑)
  • Streams,这个应该是只有stream类型的rpc service才会拥有具体的值
  • Metadata,这个代表生成这个go文件的原始pb文件的文件名

方法列表里,每一项是一个MethodDesc结构体,它的定义如下:

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)

// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
    MethodName string
    Handler    methodHandler
}

可以发现Handler其实是一个函数;

那么真实的 _Greeter_SayHello_Handler 包含了哪些信息呢?包含的信息如下:

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error)
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}

通过这段代码可以看出,它其实是真正调用处理rpc定义的接口方法的入口。

  • 有一个参数dec,这个首先会将其解码到request结构体上
  • 如果没有设置拦截器,那么直接调用srv实现的SayHello方法进行处理并返回。
  • 如果设置了拦截器,会通过传入的拦截器做一些特殊的处理。

可以猜测,在grpc server真实调用时,会将具体实现了rpc service的结构体作为第一个参数传入,将经过解码的request数据作为第三个参数传入,由实现了rpc service接口的对象来进行处理。

注册服务

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.printf("RegisterService(%q)", sd.ServiceName)
    if s.serve {
        grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
    }
    if _, ok := s.m[sd.ServiceName]; ok {
        grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
    }
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

可以根据之前描述的server结构体可以看出,server有一个属性s,这个集合可以存储多个pb定义的service。key为服务名,value为具体的服务结构体。意思是一个grpc server启动时,可以有多个pb定义的rpc service被注册到其中。并且同一个rpc service不能被注册两次。

注册服务时,将之前提到的pb生成的 _Greeter_serviceDesc 相关信息注册到了service集合中。 把service和method都注册到了server中

grpcServer.Serve

现在服务可以正式启动起来了,首先看一下官方对Serve方法的注释:

// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.

大致翻译一下:

Serve对每一个listener监听到的连接创建一个新的ServerTransport和协程,这个协程读取grpc请求然后调用被注册的handler来响应这些请求。Serve方法在lis.Accept失败时会返回error,当这个方法返回时,lis会被关闭。当使用正常退出或者优雅退出时,Serve方法不会返回error。

Serve方法的大致工作逻辑:

for {
        rawConn, err := lis.Accept()
        if err != nil {
            //错误处理
            //...
          }
          s.serveWG.Add(1)
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
     }

整个Serve方法在一个大的for循环中不断获取listener监听到的请求,然后对每一个监听到的请求开了一个协程去处理。

也就是说核心其实是 handleRawConn 方法

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
    if s.quit.HasFired() {
        rawConn.Close()
        return
    }
    //设置连接超时,若没有的话会有默认的超时时间,默认两分钟
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
      
      //检查证书认证相关信息
    conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    if err != nil {
        // ErrConnDispatched means that the connection was dispatched away from
        // gRPC; those connections should be left open.
        if err != credentials.ErrConnDispatched {
            s.mu.Lock()
            s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
            s.mu.Unlock()
            grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
            rawConn.Close()
        }
        rawConn.SetDeadline(time.Time{})
        return
    }

      //进行http2的传输
    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(conn, authInfo)//获取到传输st
    if st == nil {
        return
    }

      rawConn.SetDeadline(time.Time{})
        
      //把http2传输连接缓存到server的conns结构中
      //为何要缓存呢?
    if !s.addConn(st) {//如果server的conns结构为空时,会返回false,并关闭掉传输st
        return
    }
      //直接go一个协程,异步处理请求
    go func() {
        s.serveStreams(st)

          //最后从连接缓存conns结构中删除掉传输st
          //并通过cond,发起一个广播
        s.removeConn(st)
    }()

//这里可以看到,起了一个协程去处理这个http2的stream,这是因为http2是长连接,可以复用这个流不断接收数据,所以需要起一个协程来做这件事。

这样看来,实际上我们应该重点关注下 serveStreams 方法,它做了什么呢?

func (s *Server) serveStreams(st transport.ServerTransport) {
    defer st.Close()
    var wg sync.WaitGroup
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }, func(ctx context.Context, method string) context.Context {
        if !EnableTracing {
            return ctx
        }
        tr := trace.New("grpc.Recv."+methodFamily(method), method)
        return trace.NewContext(ctx, tr)
    })
    wg.Wait()
}

可以看到,这里实际上的处理方法是:

func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }

HandleStream ,为何这里要使用waitGroup,并等待这个完成,而不是同步的方式呢?。这是因为,我们要了解一个前提,grpc是可以支持长连接的,因此 severStreams 方法处理的其实并不仅仅是单个请求,而是一个源源不断的http2流。所以我们应该查看st的 HandleStream 干了什么事情。

st是 transport.ServerTransport 类型的interface,而实际上这里的对象是 http2Server

// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    defer close(t.readerDone)
    for {
        t.controlBuf.throttle()
        frame, err := t.framer.fr.ReadFrame()
        atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
        if err != nil {
            //错误处理
              //...
        }
        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
        case *http2.DataFrame:
            t.handleData(frame)
        case *http2.RSTStreamFrame:
            t.handleRSTStream(frame)
        case *http2.SettingsFrame:
            t.handleSettings(frame)
        case *http2.PingFrame:
            t.handlePing(frame)
        case *http2.WindowUpdateFrame:
            t.handleWindowUpdate(frame)
        case *http2.GoAwayFrame:
            // TODO: Handle GoAway from the client appropriately.
        default:
            errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
        }
    }
}

因此,整个方法是在一个for循环中,不断处理二进制流,所以对每次获取到的数据都使用一个协程去处理,这就是为何handle参数要使用waitgroup。

handleStream 做了什么事呢?这里只截取关键代码:

sm := stream.Method()

      service := sm[:pos]
    method := sm[pos+1:]

    srv, knownService := s.m[service]
    if knownService {
        if md, ok := srv.md[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
        if sd, ok := srv.sd[method]; ok {
            s.processStreamingRPC(t, stream, srv, sd, trInfo)
            return
        }
    }

总结起来是:

  • 从stream里取出method,service信息
  • 如果service与之前注册成功的rpc service名字匹配,则检查method是否与注册成功的rpc service的method相匹配,若匹配的话,直接调用 processUnaryRPCprocessStreamingRPC 进行处理。

接着看 processUnaryRPC ,发现了我们最开始提到的pb注册的 Handler: _Greeter_SayHello_Handler ,它是这样被调用的:

df := func(v interface{}) error {
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
            return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
        }
        if sh != nil {
            sh.HandleRPC(stream.Context(), &stats.InPayload{
                RecvTime:   time.Now(),
                Payload:    v,
                WireLength: payInfo.wireLength,
                Data:       d,
                Length:     len(d),
            })
        }
        if binlog != nil {
            binlog.Log(&binarylog.ClientMessage{
                Message: d,
            })
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
        }
        return nil
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)

最后处理完成后,会将resp写回:

if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
    ...
}

至此,整个grpc就处理完了一条请求,并返回了响应。

优雅退出是如何实现的?

之前我们有一个疑问,为什么要缓存所有的conns呢?这就是因为在优雅退出时,我们需要通知/等待所有当前的连接都释放再退出。

看一下优雅退出是如何实现的:

  • 第一阶段
s.mu.Lock()
    if s.conns == nil {
        s.mu.Unlock()
        return
    }

    for lis := range s.lis {
        lis.Close()
    }
    s.lis = nil
    if !s.drain {
        for st := range s.conns {
            st.Drain()
        }
        s.drain = true
    }

    // Wait for serving threads to be ready to exit.  Only then can we be sure no
    // new conns will be created.
    s.mu.Unlock()

第一阶段主要是首先将所有tcp listener关闭掉,并且若server状态不处于 停止接收请求的状态 ,那么就通知当前所有的连接停止再接收请求( Drain ),最后将状态drain置为true。

  • 第二阶段
s.serveWG.Wait()
    s.mu.Lock()

    for len(s.conns) != 0 {
        s.cv.Wait()
    }
    s.conns = nil
    if s.events != nil {
        s.events.Finish()
        s.events = nil
    }
    s.mu.Unlock()

首先等待所有的请求都处理完,然后加锁,等待缓存的conns连接都被关闭掉,若都被关闭掉,会有协程发起Broadcase进行通知,通知完毕后Wait()不会再阻塞,可以接着往下走,最后发送finish event,解锁即可。

什么时候会发起Broadcast呢?有两个地方:

  1. 处理完一个流,会将流从conns中删除
go func() {
        s.serveStreams(st)
        s.removeConn(st)
}()

而删除时,则会将st从conns中删除掉,并且发一个Broadcast

func (s *Server) removeConn(st transport.ServerTransport) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.conns != nil {
        delete(s.conns, st)
        s.cv.Broadcast()
    }
}

根据优雅退出代码,虽然发起了广播, s.cv.Wait 不再阻塞,但是若还有别的流没有释放,那么 s.conns 仍然不为0,因此又会进入到 s.cv.Wait 的阻塞中。

  1. server正常退出( Stop
s.mu.Lock()
    listeners := s.lis
    s.lis = nil
    st := s.conns
    s.conns = nil
    // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
    s.cv.Broadcast()
    s.mu.Unlock()

    for lis := range listeners {
        lis.Close()
    }
    for c := range st {
        c.Close()
    }

    s.mu.Lock()
    if s.events != nil {
        s.events.Finish()
        s.events = nil
    }
    s.mu.Unlock()

可以看到,在正常退出server时,直接将conns置为了nil,然后发送了Broadcasr,因此优雅退出的Wait也不会再等待了。

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK