43

用最简单的方式实现 Rust RPC 服务

 4 years ago
source link: https://www.tuicool.com/articles/UnAV3qE
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RPC是对进程内函数调用的分布式开展,它将进程内的函数调用,扩展成对远程机器上的相应的函数的调用。如何使用最少的代码,用最容易的方式调用远程函数呢?

首先需要知道远程机器的IP地址和端口,然后呢,使用原先进程内的方式直接调用即可,这是最容易的RPC调用。容易不意味着功能简单,而是rpc库背后默默为你承担了序列化、网络传输、路由、服务调用、服务治理的功能。

rpcx 秉承 Go 标准库简单而强大的设计理念,为 Rust 提供了一个原生的 rpc 库。

为什么说是原生呢?因为你可以使用任何你熟悉的编程语言通过HTTP或者JSON-RPC2.0的方式访问Go或者Java实现的rpcx服务,但是除了Go/Java编程语言你没有办法使用raw的rpcx protocol实现TCP的网络调用,而基于TCP的RPC性能要远远高于Request-Response这种类HTTP的调用模型。

现在rpcx for rust的库也发布了: rpcx-rs 。 你可以使用它原生的访问Go或者Java实现的rpcx服务,也可以使用Rust提供rpcx服务。

关键是,依然是那么的简单。

FZFBJzZ.png!web

目前 rpcx-rs 发布了 0.1.2 版本,可以方便的将 Rust 函数暴露成rpcx服务,也可以使用Rust访问rpcx服务,支持 JSON 和 MessagePack 两种序列化方式。

rpcx-rs秉承着Go语言版本的信念,采用最简单的方式实现rpc服务,小到一年级的小学生,大到花甲之年的老太太,都能轻松的实现rpc服务。

后续的功能会持续增加,包括protobuf序列化的支持、服务治理各种功能(路由、失败处理、重试、熔断器、限流)、监控(metrics、trace)、注册中心(etcd、consul)等众多的功能。

那么,让我们看看如何开发一个rust实现的rpcx服务,以及如何调用,相应的Go语言的代码也一并列出。

我们的例子是一个 乘法 服务,客户端传入两个整数, 服务器端计算出两个数的 ,然后返回给客户端。

客户端和服务器端公用的数据

客户端和服务器端交互,一般不会使用字节数组,而是封装的数据对象,像Rust和Go语言中的 struct 、Java中的Class等,所以我们先定义好交流用的数据结构,这样服务器和客户端都可以使用了。

你要在你的Cargo.toml文件中引入rpcx库:

rpcx =  "0.1.2"

然后定义数据结构。

Rust
use std::error::Error as StdError;

use rmp_serde as rmps; 
use serde::{Deserialize, Serialize};

use rpcx::*;

#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddArgs {
    #[serde(rename = "A")]
    pub a: u64,
    #[serde(rename = "B")]
    pub b: u64,
}
#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddReply {
    #[serde(rename = "C")]
    pub c: u64,
}
Go
type Args struct {
	A int
	B int
}

type Reply struct {
	C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
	reply.C = args.A * args.B
	fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
	return nil
}

我们使用 serde 实现 JSON 和 MessagePack 序列化格式的支持。你不必手工实现 RpcxParam trait,通过 derive 属性就可以自动为你的数据架构加上JSON 和 MessagePack 序列化格式的支持。同时你也需要在derive属性上加上 DefaultSerialize , Deserialize ,以便rpcx实现自动的序列化和初始化。

当然这一切都是通过属性完成的,你只需要定义数据结构即可。为了和Go默认的JSON属性相一致(GO默认序列化好的字段名称首字母是大写的),我们这里也加上了serde的属性,让serde进行 JSON序列化的时候使用指定的名称。

这里我们定义了传入参数 ArithAddArgs 和传出参数( ArithAddReply )。

服务端实现

服务器端实现了一个函数 mul ,函数的名称不重要,因为我们注册的时候会手工传入服务的路径名称和方法名称,也就是Go语言中实现的 service_pathservice_method

函数的参数类型是 ArithAddArgs ,输出结果的类型是 ArithAddReply

在main函数中我们新建了一个服务器,线程池中线程的数量默认是CPU的两倍,你也可以根据需要设置更大的线程数量。

然后我们注册了这个服务(函数),使用 register_func! 宏来进行注册,第一个参数是服务器实例,第二个参数是 service_path 、第三个参数是 service_method ,第四个参数是要注册的函数,第五个和第六个参数是函数的传入参数和传出参数的类型。

然后调用服务的 start 就可以提供服务了。

Rust
use mul_model::{ArithAddArgs, ArithAddReply};
use rpcx::*;

fn mul(args: ArithAddArgs) -> ArithAddReply {
    ArithAddReply { c: args.a * args.b }
}

fn main() {
    let mut rpc_server = Server::new("0.0.0.0:8972".to_owned(), 0);

    register_func!(rpc_server, "Arith", "Mul", mul, ArithAddArgs, ArithAddReply);

    rpc_server.start().unwrap();
}
Go
package main

import (
	"context"
	"flag"
	"fmt"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/server"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

type Arith struct{}

// the second parameter is not a pointer
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
	reply.C = args.A * args.B
	fmt.Println("C=", reply.C)
	return nil
}

func main() {
	flag.Parse()

	s := server.NewServer()
	//s.Register(new(Arith), "")
	s.RegisterName("Arith", new(Arith), "")
	err := s.Serve("tcp", *addr)
	if err != nil {
		panic(err)
	}
}

客户端实现

客户端使用 Client::new 先连接一个服务器,你可以指定序列化格式,服务器也会使用你的这个序列化格式返回结果,然后指定要调用的 service_pathservice_method , 使用 call 返回结果(Option类型,因为有的调用不需要返回结果,返回结果是Result类型,可能是成功的结果,也可能是Error); 使用 acall 异步调用,会返回一个future。

你可以和Go语言的版本交叉着互相调用,可以看到很容易的我们就实现了rpc服务和调用。

Rust
use std::collections::hash_map::HashMap;

use mul_model::*;
use rpcx::Client;
use rpcx::{Result, SerializeType};

pub fn main() {
    let mut c: Client = Client::new("127.0.0.1:8972");
    c.start().map_err(|err| println!("{}", err)).unwrap();
    c.opt.serialize_type = SerializeType::JSON;

    let mut a = 1;
    loop {
        let service_path = String::from("Arith");
        let service_method = String::from("Mul");
        let metadata = HashMap::new();
        let args = ArithAddArgs { a: a, b: 10 };
        a = a + 1;

        let reply: Option<Result<ArithAddReply>> =
            c.call(service_path, service_method, false, metadata, &args);
        if reply.is_none() {
            continue;
        }

        let result_reply = reply.unwrap();
        match result_reply {
            Ok(r) => println!("received: {:?}", r),
            Err(err) => println!("received err:{}", err),
        }
    }
}
Go
package main

import (
	"context"
	"flag"
	"log"

	"github.com/smallnest/rpcx/protocol"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/client"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()

	d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	opt := client.DefaultOption
	opt.SerializeType = protocol.JSON

	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
	defer xclient.Close()

	args := example.Args{
		A: 10,
		B: 20,
	}

	reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK