0

记go-RPC的实现

 1 year ago
source link: https://ninokop.github.io/2018/03/27/go-rpc/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Remote Procedure Call

RPC是一种进程间通信协议,它允许程序调用另一个地址空间的过程或函数。开发RPC的动机和核心问题就是如何执行另外一个地址空间上的函数和方法,就像本地调用一样。

在网络通信中,RPC相当于一种约束Request和Response的协议。目前RPC框架大致有两种不同的侧重,一种是偏向于服务治理,提供了丰富的功能,适用于大型服务的微服务拆分和管理,另一种侧重于跨语言调用,比如gRPC。smallnest.gitbooks.io/go-rpc讨论了国内外许多RPC框架,总结了RPC调用的基本过程如下

rpc.png

个人理解RPC就是描述client server间点对点的通信过程,它要实现stub,通信和消息解析三个部分。下面就从这三个方面记录下go标准库的RPC是怎么实现这三部分的。

  • stub主要完成协议结构(Wire Protocol),它要跟序列化和反序列化配合完成消息的读取和转换
  • 通信传输(Transport)可以用TCP也可以HTTP
  • 序列化反序列化(Serialization)可以是protobuf也可以是json等,go-RPC的序列化用gob做的

Go-RPC-Client

官方例子中客户端示例中,client调用Call方法即可获得结果。RPC实现的要点就是如何把这个rpc call在client stub转化为发给server的请求。

func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("dialing:", err)
}
args := &schema.Args{7, 8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
log.Println("Arith:", args.A, "*", args.B, "=", reply)
}

DialHTTP

在RPC Over HTTP的场景下,rpc.DialHTTP其实就是用默认Path = /\_goRPC\_发一条CONNECT请求给server端。如果正常相应并连接,则创建rpcClient进行后续处理。

func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address)
...
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
resp, err := http.ReadResponse(bufio.NewReader(conn),
&http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
...
conn.Close()
return nil, &net.OpError{Op:"dial-http"}
}

Go & Do

go-RPC包默认使用gob编解码,本节跳过编解码过程。创建rpcClient的过程开启了input goroutine,等待响应。

type Client struct {
codec ClientCodec

reqMutex sync.Mutex // protects following
request Request

mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}

call方法是就是将method以及req和reply封装在数据结构Call中,最终经过client.send发送。

type Call struct {
ServiceMethod string
Args interface{}
Reply interface{}
Error error
Done chan *Call
}

rpc包提供给外部调用的是call方法,调用了异步的Go,他们通过call同步,但Call没有提供超时机制,肯定有性能问题。Do是rpc包提供的默认同步调用方法。

func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

send & input

之前看http client的实现,其实就是要实现conn复用。http的实现方式是通过复用一个DefaultTransport,在其中维护连接池来实现。而rpc的client是通过seq序列号和pending来记录client上每个请求。

这里send需要上锁是是因为一个rpcClient可以支持并发发送请求。pending这个map是用来存目前client正在处理的call,其中key用seq来标记,seq单调递增,这个seq类似在对client端的请求编号。

最后通过codec进行请求参数的序列化,并写入socket。若返回错误将pending中的请求记录删掉,并通过call同步调用方本次call已完成。

func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()

client.mutex.Lock()
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

input负责这条RPC连接上所有的响应读取。如果请求成功,则删掉pending对应的请求记录,并取出call记录反序列化响应到call.reply上,最后通知调用方call.done。

当读取响应头出错后说明发生连接关闭或EOF等错误,这时要把这个client上所有pending的请求全部call.done。

func (client *Client) input() {
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
case response.Error != "":
call.Error = ServerError(response.Error)
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
}

Go-RPC-Server

这是go-rpc官方注释里的例子,显然这是个RPC Over HTTP的例子。rpc.HandleHTTP直接向http的ServerMux注册了默认的rpcPath=/\_goRPC\_rpc.DefaultServer

func main() {
arith := new(schema.Arith)
rpc.Register(arith)
rpc.HandleHTTP()
e := http.ListenAndServe(":1234", nil)
if e != nil {
log.Fatal("listen error:", e)
}
}
func (server *Server) HandleHTTP(rpcPath, debugPath string) {
http.Handle(rpcPath, server)
http.Handle(debugPath, debugHTTP{server})
}

serveHTTP & serveConn

上一节讲了client如果是DialHTTP会先通过发送CONNECT请求来建立连接,对应的RPC server端只处理CONNECT这种请求,并通过Hijack读出conn,然后开始处理这个TCPconn。

func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
...
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}

serveConn包含了RPC server端所有的操作:read请求,开goroutine处理请求,返回响应。

func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}

register & call

server端就是要根据service,method以及对应的请求参数来执行一次远程调用,并返回响应。如何注册一个服务,如何从注册服务中找到要执行的方法,go-RPC是通过反射提供的类型信息完成的。

下面Server的数据结构,serviceMap用于保存所有注册的服务。其中service保留了服务名,服务类型以及服务实例,服务实例通常是指针的值。method中保留了服务对外Export的方法,以及方法名,参数和响应类型。

type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}

以arith示例为例,这里*schema.Arith是注册服务实例的类型。s.rcvr是该指针的值,即这个实例的地址。sname通过Indirect方法获得这个指针Value的Elem,即获得了该指针指向的实例,通过Type获得它的类型是schema.Arith,注册名为Arith。

reflect.Indirect方法是返回的指针value的Elem,是副本还是直接取地址,是否可改变

func (server *Server) register(rcvr interface{}) error {
server.mu.Lock()
defer server.mu.Unlock()

s := new(service)
s.typ = reflect.TypeOf(rcvr) // type of receiver
s.rcvr = reflect.ValueOf(rcvr) // receiver itself
sname := reflect.Indirect(s.rcvr).Type().Name()
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
if len(s.method) == 0 {
return errors.New(str)
}
server.serviceMap[s.name] = s
return nil
}

register的过程中检查该类型所有的method,并且通过NumInIn检查入参的类型和是否Exported,其中reply的类型必须是指针。最后NumOutOut 是用来检查返回值是否只有一个且类型为error。

var typeOfError = reflect.TypeOf((*error)(nil)).Elem()

func suitableMethods(typ reflect.Type) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
// method.Type ~ func(*schema.Arith, *schema.Args, *schema.Quotient) error
// method.Name ~ Divide
mtype := method.Type
mname := method.Name

// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
methods[mname] = &methodType{method: method,
ArgType: argType, ReplyType: replyType}
}
return methods
}

read request & header

处理连接时首先读取header,为了重用request数据结构,go-RPC用了一个链表。有点难理解的是Request其实只是一个头,其中有Seq和Method。从Req中读取的ServiceMethod通常是 Arith.Divide这种形式,最后就通过server.serviceMap找到服务名和方法名。

type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
func (server *Server) readRequestHeader(codec ServerCodec) (service *service, 
mtype *methodType, req *Request, keepReading bool, err error) {
req = server.getRequest()
err = codec.ReadRequestHeader(req)
...
keepReading = true
dot := strings.LastIndex(req.ServiceMethod, ".")
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot+1:]

server.mu.RLock()
service = server.serviceMap[serviceName]
server.mu.RUnlock()
mtype = service.method[methodName]
return
}

在header中读取到method之后,可以知道参数类型,通过reflect.New生成对应该类型的PtrTo(typ),即*schema.Arith类型。Interface方法返回的是当前argv的值,也就是&{0,0}。同理最后replyv就是创建的存储reply对象的指针。

func (server *Server) readRequest(codec ServerCodec) (service *service, 
mtype *methodType, req *Request, argv, replyv reflect.Value,
keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
// Decode the argument value.
argIsValue := false
if mtype.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
argIsValue = true
}
// argv guaranteed to be a pointer now.
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
return
}
if argIsValue {
argv = argv.Elem()
}
replyv = reflect.New(mtype.ReplyType.Elem())
return
}

call & response

reflect中的Method的Type字段存储方法类型,Func字段直接存储方法,该方法以receiver为第一个入参。返回值通过Interface反射回来,可能是nil或error类型。

func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, 
req *Request, argv, replyv reflect.Value, codec ServerCodec) {
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}

回写响应要把客户端带的seq返回,并且写响应时需要lock的。

type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
err := codec.WriteResponse(resp, reply)
sending.Unlock()
server.freeResponse(resp)
}

Reference

体系化认识RPC


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK