69

gRPC的HTTP2实现

 5 years ago
source link: https://ninokop.github.io/2018/06/18/gRPC的HTTP2实现/?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

因为最近工作上要做跟gRPC代理相关的东西,在端午假期出门看央美毕业展累得半死之后,还是决定最后一天宅家里看看gRPC的transport的实现,顺便能更了解HTTP2协议。之前写过gRPC负载均衡接口和拦截器相关的笔记,感觉挖的坑越来越多啦。

grpc server

之前就大致看过grpc和go-rpc的实现,总结来说go写的服务端都是一个意思,都是accept conn之后开单独的goroutine来处理这条连接。处理的过程也大概分成固定的几部分:

transport
codeC
registerService

本篇打算从transport开始记录下grpc里HTTP2协议的实现,最主要是想了解steaming形式的流请求是怎么做的,跟long polling、分块传输有什么本质区别。

func (s *Server) handleRawConn(rawConn net.Conn) {
    rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
    conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    ...
    var serve func()
    c := conn.(io.Closer)
    st := s.newHTTP2Transport(conn, authInfo)
    if st == nil {
        return
    }
    c = st
    serve = func() { s.serveStreams(st) }

    rawConn.SetDeadline(time.Time{})
    if !s.addConn(c) {
        return
    }
    go func() {
        serve()
        s.removeConn(c)
    }()
}

grpc用http2server实现了这个接口,并实现了以下方法。handleStreams提供了处理stream的方法,drain告诉client这个连接上拒绝接收新的RPC请求,write相关的方法允许在这个stream上写东西。

type ServerTransport interface {
    HandleStreams(func(*Stream), func(context.Context, string) context.Context)
    WriteHeader(s *Stream, md metadata.MD) error
    Write(s *Stream, hdr []byte, data []byte, opts *Options) error
    WriteStatus(s *Stream, st *status.Status) error
    Close() error
    RemoteAddr() net.Addr
    Drain()
    IncrMsgSent()
    IncrMsgRecv()
}

HTTP2 transport

WireShark默认支持HTTP2,只要在Analysize的decode as里选择这个端口对应的协议为HTTP2就可以了。在TCP三次握手之后server端主动发送了SETTINGS帧,client端也会回Mgic和SETTINGS。之后发送HEADERS和DATA进行通信,过程中可能伴随WINDOW_UPDATE和PING。

uUru2em.png!web

newHTTP2Server

这个类似于HTTP2协议的握手,在新建transport时可以看到这个过程。服务端发送的SETTINGS包含一些设置,比如最大并发stream数目。客户端先回应的MAGIC+SETTINGS,其中MAGIC就是PRI表明支持HTTP2.0协议。

The client connection preface starts with a sequence of 24 octets,

which in hex notation are:

0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a

(the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). This sequence is

followed by a SETTINGS [SETTINGS] frame (Section 6.5).

func newHTTP2Server(conn net.Conn, config *ServerConfig)
    (_ ServerTransport, err error) {
    framer := newFramer(conn, writeBufSize, readBufSize)
    // Send initial settings as connection preface to client.
    var isettings []http2.Setting
    isettings = append(isettings, http2.Setting{
        ID:  http2.SettingMaxConcurrentStreams,
        Val: maxStreams,
    })

    if err := framer.fr.WriteSettings(isettings...); err != nil {
        return nil, connectionErrorf(false, err, "transport: %v", err)
    }
    // Adjust the connection flow control window if needed.
    if delta := uint32(icwz - defaultWindowSize); delta > 0 {
        if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
            return nil, connectionErrorf(false, err, "transport: %v", err)
        }
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    t := &http2Server{...}
    t.controlBuf = newControlBuffer(t.ctxDone)
    t.framer.writer.Flush()

    // Check the validity of client preface.
    preface := make([]byte, len(clientPreface))
    io.ReadFull(t.conn, preface)
    if !bytes.Equal(preface, clientPreface) {
        // transport: http2Server.HandleStreams received bogus greeting
    }

    frame, err := t.framer.fr.ReadFrame()
    if err == io.EOF || err == io.ErrUnexpectedEOF {
        return nil, err
    }
    atomic.StoreUint32(&t.activity, 1)
    sf, ok := frame.(*http2.SettingsFrame)
    if !ok {
        // transport: http2Server.HandleStreams saw invalid preface type
    }
    t.handleSettings(sf)

    go func() {
        t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
        t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
        if err := t.loopy.run(); err != nil {
            errorf("transport: loopyWriter.run returning. Err: %v", err)
        }
        t.conn.Close()
        close(t.writerDone)
    }()
    go t.keepalive()
    return t, nil
}

这个 newLoopyWriter 是用来写controlBuf中的控制信息的,在需要发SETTINGS帧或者WindowUpdate帧时通常把信息写入controlBuf.put中。

keepalive 用来关闭超过最大空闲连接数或者最大空闲时间的conn。

handleStreams

对conn的处理都封装在了transport的HandleStreams方法中。从frame中读取帧并按照帧类型处理。一次通信过程通常以SETINGS+HEADERS+DATA开始。

func (t *http2Server) HandleStreams(handle func(*Stream),
    traceCtx func(context.Context, string) context.Context) {
    defer close(t.readerDone)
    for {
        frame, err := t.framer.fr.ReadFrame()
        atomic.StoreUint32(&t.activity, 1)
        ....
        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
        case *http2.DataFrame:
            t.handleData(frame)
        case *http2.RSTStreamFrame:
            t.handleRSTStream(frame)
        case *http2.SettingsFrame:
            t.handleSettings(frame)
        case *http2.PingFrame:
            t.handlePing(frame)
        case *http2.WindowUpdateFrame:
            t.handleWindowUpdate(frame)
        case *http2.GoAwayFrame:
        default:
        }
    }
}

每个stream都是从收到HEADER帧开始创建和处理的,根据header里带的fields去处理decodeState,新建Stream,下面就是这个header可能带上的fields。

ZbiyUfA.png!web

stream以streamID为区分,包含recvBuffer以及涉及编解码,方法和序列化方式,流控参数等一系列配置。最后交给handle这个回调方法去处理,在这里面真正开始读请求,进行本地调用和回写响应。

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, 
    handle func(*Stream), traceCtx func(context.Context, string) context.Context) 
    (close bool) {
    streamID := frame.Header().StreamID
    var state decodeState
    for _, hf := range frame.Fields {
        if err := state.processHeaderField(hf); err != nil {
        }
    }
    buf := newRecvBuffer()
    s := &Stream{
        id:             streamID,
        st:             t,
        buf:            buf,
        fc:             &inFlow{limit: uint32(t.initialWindowSize)},
        recvCompress:   state.encoding,
        method:         state.method,
        contentSubtype: state.contentSubtype,
    }
    t.maxStreamID = streamID
    t.activeStreams[streamID] = s
    ...
    handle(s)
    return
}

从stream里读出的method就是 helloworld.Greeter/SayHello ,前者是服务名,后者是方法名。根据在Server上通过RegisterService注册的服务可以获得对应的method,再根据是Unary还是Stream类型分别处理。

func (s *Server) handleStream(t transport.ServerTransport, 
    stream *transport.Stream, trInfo *traceInfo) {
    sm := stream.Method()
    pos := strings.LastIndex(sm, "/")
    service := sm[:pos]
    method := sm[pos+1:]
    srv, ok := s.m[service]

    // Unary RPC or Streaming RPC?
    if md, ok := srv.md[method]; ok {
        s.processUnaryRPC(t, stream, srv, md, trInfo)
        return
    }
    if sd, ok := srv.sd[method]; ok {
        s.processStreamingRPC(t, stream, srv, sd, trInfo)
        return
    }
}

processUnaryRPC

grpc的协议文档 里解释了request和response格式。总结来说就是一个请求分为HEADER帧,可以从帧里读出HTTP头域的信息,其次是连续的DATA帧,用来发请求message,最后有个DATA帧代表end of stream。其中中间用来发message的连续帧的格式是:

Request → Request-Headers *Length-Prefixed-Message EOS

  • Length-Prefixed-Message → Compressed-Flag Message-Length Message
  • Compressed-Flag → 0 / 1 # encoded as 1 byte unsigned integer
  • Message-Length → { length of Message } # encoded as 4 byte unsigned integer
  • Message → *{binary octet}

对应代码来看grpc在处理上直接把这个Length-prefixed-message作为Header帧的后续进行处理,并没有放在单独的dataframe的处理函数中。所以在下面processUnaryRPC的处理过程中recvMsg

func (s *Server) processUnaryRPC(t transport.ServerTransport, 
    stream *transport.Stream, srv *service, md *MethodDesc, 
    trInfo *traceInfo) (err error) {

    p := &parser{r: stream}
    pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)

    df := func(v interface{}) error {
        s.getCodec(stream.ContentSubtype()).Unmarshal(req, v)
        return nil
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
    ....
    opts := &transport.Options{
        Last:  true,
        Delay: false,
    }
    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
        return err
    }
    return t.WriteStatus(stream, status.New(codes.OK, ""))
}

对应协议,每个parser的头对应Compressed-Flag和Message-Length,在UnaryRPC模式下请求Message只有一个。读出后根据不同的codec和注册的方法入参进行反序列化,调用本地方法获得响应。最后通过sendresponse发送响应。收到EOS的data帧之后给stream写入EOF,并处理server端activeStream相关记录表。

type parser struct {
    r io.Reader
    header [5]byte
}
func (p *parser) recvMsg(maxReceiveMessageSize int) 
    (pf payloadFormat, msg []byte, err error) {
    if _, err := p.r.Read(p.header[:]); err != nil {
        return 0, nil, err
    }

    pf = payloadFormat(p.header[0])
    length := binary.BigEndian.Uint32(p.header[1:])
    if length == 0 {
        return pf, nil, nil
    }

    msg = make([]byte, int(length))
    if _, err := p.r.Read(msg); err != nil {
        return 0, nil, err
    }
    return pf, msg, nil
}

processStreamingRPC

grpc支持stream流式的请求和响应,sd是从注册的服务中读取的StreamDesc。

func (s *Server) processStreamingRPC(t transport.ServerTransport,
    stream *transport.Stream, srv *service, sd *StreamDesc, 
    trInfo *traceInfo) (err error) {
    ...
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    ss := &serverStream{
        ctx:   ctx,
        t:     t,
        s:     stream,
        p:     &parser{r: stream},
        codec: s.getCodec(stream.ContentSubtype()),
        maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
        maxSendMessageSize:    s.opts.maxSendMessageSize,
        trInfo:                trInfo,
        statsHandler:          sh,
    }
    appErr = sd.Handler(server, ss)
    ...
    return t.WriteStatus(ss.s, status.New(codes.OK, ""))
}

这个StreamDesc注册的函数格式是StreamHandler,每个 sd.Handler 执行之前由processStreamingRPC生成新的 serverStream 来实现协议基本的SendMsg和RecvMsg方法。

type StreamHandler func(srv interface{}, stream ServerStream) error

// StreamDesc represents a streaming RPC service's method specification.
type StreamDesc struct {
    StreamName string
    Handler    StreamHandler

    // At least one of these is true.
    ServerStreams bool
    ClientStreams bool
}

SendMsg和RecvMsg的实现跟processUnaryRPC里的一致。之所以是称为stream是指在server端的实现上通常是下面这种循环模式。最终以_User_Chat_Handler注册到StreamDesc里。

func (svr *UserService) Chat(stream pb.User_ChatServer) error {
    log.Printf("Begin Chat")
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        msg := in.GetTalk()
        stream.Send(&pb.UserMessage{
            Talk: "You Talk: " + msg,
        })
    }
}

这个示例转自某个博客,暂时没找到原始出处,之后补上。

func _User_Chat_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(UserServer).Chat(&userChatServer{stream})
}
type User_ChatServer interface {
    Send(*UserMessage) error
    Recv() (*UserMessage, error)
    grpc.ServerStream
}
type userChatServer struct {
    grpc.ServerStream
}
func (x *userChatServer) Send(m *UserMessage) error {
    return x.ServerStream.SendMsg(m)
}
func (x *userChatServer) Recv() (*UserMessage, error) {
    m := new(UserMessage)
    if err := x.ServerStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

register service

在server端的实现里通常pb会定义这个ServiceDesc以及包含的MethodDesc。外部可以自定义满足GreeterServer接口的具体实例通过RegisterGreeterServer注册到grpcServer里。

type GreeterServer interface {
    SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}

注册的这个MethodDesc的Handler是满足特定func入参格式的handler。在processUnaryRPC处理函数中会根据服务名和方法名找到Handler,并调用 _Greeter_SayHello_Handler 。其中srv就是register的那个实例,dec是加入在这里做compress和反序列化的函数,interceptor是拦截器可参考我之前博客 gRPC拦截器interceptor与chain

var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, 
    dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor)
    (interface{}, error) {
    in := new(HelloRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(GreeterServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/helloworld.Greeter/SayHello",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
    }
    return interceptor(ctx, in, info, handler)
}

grpc注册和读取的方式都很直接,pb的相关东西也是自动生成的。并不需要像go-rpc那样通过反射来存取相关函数的入参等信息。

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    ....
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

codec

grpc的encoding包提供了proto和gzip两种方式,以插件的形式存在。一般都可以直接proto.Unmarshal。

func (codec) Unmarshal(data []byte, v interface{}) error {
    protoMsg := v.(proto.Message)
    protoMsg.Reset()

    if pu, ok := protoMsg.(proto.Unmarshaler); ok {
        // object can unmarshal itself, no need for buffer
        return pu.Unmarshal(data)
    }

    cb := protoBufferPool.Get().(*cachedProtoBuffer)
    cb.SetBuf(data)
    err := cb.Unmarshal(protoMsg)
    cb.SetBuf(nil)
    protoBufferPool.Put(cb)
    return err
}

summary

主要写了HTTP2协议结合transport的实现,理解stream和grpc server端服务注册相关的内容。下一节写grpc client端和proto相关的内容吧。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK