16

go-kit 整体学习,微服务?

 3 years ago
source link: https://studygolang.com/articles/29278
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

首先,玩go-kit,他不是一个mvc框架,他的架构层次比较多一些。

大致上分为 transport , endpoint , service

http-demo

import (
	"context"
	"encoding/json"
	"fmt"
	transport "github.com/go-kit/kit/transport/http"
	"net/http"
)

type InfoDto struct {
	Version string
}

func main() {

	var endpoint = func(ctx context.Context, request interface{}) (response interface{}, err error) {
		dto := request.(*InfoDto)
		fmt.Println("version :", dto.Version)
		response = map[string]interface{}{
			"data": "ok",
		}
		err = nil
		return
	}

	// transport  需要将 service+编解码 柔和起来
	hand := transport.NewServer(endpoint, func(i context.Context, req *http.Request) (request interface{}, err error) {
		request = &InfoDto{}
		err = json.NewDecoder(req.Body).Decode(&request)
		return
	}, func(i context.Context, writer http.ResponseWriter, response interface{}) (err error) {
		err = json.NewEncoder(writer).Encode(response)
		return
	})

	http.Handle("/", hand)
	http.ListenAndServe(":8888", nil)

}
复制代码

简单的来看,其实他就是将 transport 抽象了一下,其实做的事controller的事情,endpoint可以理解为是service。

定义middlerware

中间件其实是修饰的 endpoint ,类似于Spring的interceptor

// 限流
type Limier interface {
	Allow() bool
}

type defaultLimiter struct {
}

func (*defaultLimiter) Allow() bool {
	return rand.Intn(3) == 1
}

// 添加限流
func addLimier(limier Limier) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			if limier.Allow() {
				return next(ctx, request)
			}
			return endpoint.Nop(ctx, request)
		}
	}
}

// 使用
end = addLimier(&defaultLimiter{})(end)
复制代码

定义前置、后置器等

类似于Java的Servlet的Filter,但是不具备拦截功能

// option 的方法,很好的解决go的重载
option := transport.ServerBefore(func(ctx context.Context, request *http.Request) context.Context {
  fmt.Println("http before")
  return ctx
})
复制代码

然后看看zipkin的组合实现 ,其实就是在前后结束实现了一个拦截,但是真的符合我们的要求吗。显然不符合。

func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption {
	serverBefore := kithttp.ServerBefore(
	)

	serverAfter := kithttp.ServerAfter(
	)

	serverFinalizer := kithttp.ServerFinalizer(
	)
	return func(s *kithttp.Server) {
		serverBefore(s)
		serverAfter(s)
		serverFinalizer(s)
	}
}
复制代码

go-kit http整体设计

结构

type Server struct {
	e            endpoint.Endpoint //service
	dec          DecodeRequestFunc // 编解码
	enc          EncodeResponseFunc
	before       []RequestFunc //前置处理器
	after        []ServerResponseFunc //后置处理器
	errorEncoder ErrorEncoder// error处理器
	finalizer    []ServerFinalizerFunc
	errorHandler transport.ErrorHandler// error处理器
}
复制代码

处理逻辑

// ServeHTTP implements http.Handler.
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()

  // defer
	if len(s.finalizer) > 0 {
		iw := &interceptingWriter{w, http.StatusOK, 0}
		defer func() {
			ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header())
			ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written)
			for _, f := range s.finalizer {
				f(ctx, iw.code, r)
			}
		}()
		w = iw
	}

  // before
	for _, f := range s.before {
		ctx = f(ctx, r)
	}

  // 解码
	request, err := s.dec(ctx, r)
	if err != nil {
    // 异常处理器
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}

  // 处理器,异常处理器
	response, err := s.e(ctx, request)
	if err != nil {
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}

  // 后置处理器
	for _, f := range s.after {
		ctx = f(ctx, w)
	}

  // 解码
	if err := s.enc(ctx, w, response); err != nil {
		s.errorHandler.Handle(ctx, err)
		s.errorEncoder(ctx, err, w)
		return
	}
}
复制代码

基本流程

qmIJbi.png!web

go-kit 整合grpc

​ 首先整合过程很麻烦,它基本是基于service实现的,

syntax = "proto3";

package grpc_demo;

service Add {
    rpc Sum (SumRequest) returns (SumReply) {
    }
}

message SumRequest {
    int64 a = 1;
    int64 b = 2;
}

message SumReply {
    int64 v = 1;
    string err = 2;
}
复制代码

脚本

#!/usr/bin/env sh

# Install proto3 from source
#  brew install autoconf automake libtool
#  git clone https://github.com/google/protobuf
#  ./autogen.sh ; ./configure ; make ; make install
#
# Update protoc Go bindings via
#  go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
#
# See also
#  https://github.com/grpc/grpc-go/tree/master/examples

protoc addsvc.proto --go_out=plugins=grpc:.
复制代码

程序

package grpc_demo

import (
	"context"
	"fmt"
	grpctransport "github.com/go-kit/kit/transport/grpc"
)

// 实现sum方法
type grpcServer struct {
	sum    grpctransport.Handler
	concat grpctransport.Handler
}


//生成器
func NewGRPCServer(service endpoint.Endpoint) AddServer {
	return &grpcServer{
		sum: grpctransport.NewServer(
			service,
			decodeGRPCSumRequest,
			encodeGRPCSumResponse,
		),
	}
}


//代码生成器生成
func (s *grpcServer) Sum(ctx context.Context, req *SumRequest) (*SumReply, error) {
	_, rep, err := s.sum.ServeGRPC(ctx, req)
	if err != nil {
		return nil, err
	}
	return rep.(*SumReply), nil
}

// 代码生成器生成
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
	req, _ := grpcReq.(*SumRequest)
	return &SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}

// 代码生成器
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
	resp, _ := response.(*SumReply)
	return resp, nil
}
复制代码

以上代码均可以使用代码生成器生成

一下,是主方法

func Main() {
	grpcListener, err := net.Listen("tcp", ":8888")
	if err != nil {
		panic(err)
	}
	g := grpc.NewServer()

	RegisterAddServer(g, NewGRPCServer(func(ctx context.Context, request interface{}) (response interface{}, err error) {
		req := request.(*SumRequest)
		return &SumReply{V: req.B + req.A, Err: ""}, nil
	}))
	err = g.Serve(grpcListener)
	if err != nil {
		panic(err)
	}
}
复制代码

客户端方法

func NewClient() {
	// 1. 创建一个连接
	conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	// 2. 然后创建客户端
	client := NewAddClient(conn)

	// 3. rpc调用
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	response, err := client.Sum(ctx, &SumRequest{A: 1, B: 2})
	if err != nil {
		fmt.Printf("超时 , 原因 : %s\n", err)
	} else {
		fmt.Printf("结果 : %v\n", response.V)
	}
}
复制代码

这里我并没有get 到 go-kit的好处,发现很多没有用,也就是编解码器,根本没有用处,我觉得他的源码中,是不是可以加入一个如果他为空,可以跳过呢。

其实性能压测来说,grpc本身的效率只和 http快2-3倍,并没有想象中那么快。

grpc真的那么快吗

​ 我自己测试,本地测试,grpc并没有http快,可能是go的http的transport比较好,复用连接,不需要重复建立,但是可以看到http在大多数场景下还是可以起到很大作用的。

我感觉dubbo会很快。

import (
	"bytes"
	"encoding/json"
	"fmt"
	"go-kit/demo"
	"io"
	"io/ioutil"
	"net/http"
	"runtime"
	"time"
)

func main() {
	server()
	runtime.Gosched()
	client()
}

func server() {
	http.HandleFunc("/add", func(writer http.ResponseWriter, request *http.Request) {
		req := demo.AddReqeust{}
		err := json.NewDecoder(request.Body).Decode(&req)
		if err != nil {
			panic(err)
		}
		defer request.Body.Close()
		_ = json.NewEncoder(writer).Encode(map[string]interface{}{
			"result": req.A + req.B,
		})
	})
	go http.ListenAndServe(":8888", nil)
}

func client() {
	now := time.Now()
	count := 10000
	for x := 0; x < count; x++ {
		request()
	}
	fmt.Println(time.Now().Sub(now).Milliseconds())
}

func request() {
	reader, err := addJsonRequestParams(&demo.AddReqeust{
		A: 1,
		B: 2,
	})
	if err != nil {
		return
	}
	resp, err := http.Post("http://127.0.0.1:8888/add", "application/json", reader)
	if err != nil {
		return
	}
	defer resp.Body.Close()
	all, err := ioutil.ReadAll(resp.Body)
	fmt.Printf("%s", all)
}

func addJsonRequestParams(params interface{}) (io.Reader, error) {
	var buf bytes.Buffer
	if err := json.NewEncoder(&buf).Encode(params); err != nil {
		return nil, err
	}
	return ioutil.NopCloser(&buf), nil
}
复制代码

grpc测试

import (
	"context"
	"fmt"
	"go-kit/grep_demo"
	"google.golang.org/grpc"
	"log"
	"net"
	"runtime"
	"time"
)

type demos struct {
}

func (*demos) Sum(ctx context.Context, req *grep_demo.SumRequest) (*grep_demo.SumReply, error) {
	return &grep_demo.SumReply{V: req.A + req.B, Err: ""}, nil
}

func main() {
	rpcserver()
	runtime.Gosched()
	rpcclient()
}

func rpcserver() {
	listener, err := net.Listen("tcp", ":8888")
	if err != nil {
		return
	}
	server := grpc.NewServer()
	dd := demos{}
	grep_demo.RegisterAddServer(server, ⅆ)
	go server.Serve(listener)
}

func rpcclient() {
	// 1. 创建一个连接
	conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
		return
	}
	defer conn.Close()

	// 2. 然后创建客户端
	client := grep_demo.NewAddClient(conn)
	now := time.Now()
	for x := 0; x < 10000; x++ {
		func() {
			// 3. rpc调用
			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
			defer cancel()
			response, err := client.Sum(ctx, &grep_demo.SumRequest{A: 1, B: 2})
			if err != nil {
				fmt.Printf("err=%s\n", err)
			} else {
				fmt.Printf("{\"result\":%d}\n", response.V)
			}
		}()
	}
	fmt.Println(time.Now().Sub(now).Milliseconds())
}
复制代码

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK