3

源码解析Grpc拦截器(C#版本)

 2 years ago
source link: https://www.cnblogs.com/snailZz/p/15287774.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

其实Grpc拦截器是我以前研究过,但是我看网上相关C#版本的源码解析相对少一点,所以笔者借这篇文章给大家分享下Grpc拦截器的实现,废话不多说,直接开讲(Grpc的源码看着很方便,包自动都能还原成功。.Net源码就硬生啃。。。弄了半天没还原成功😂)。
ps:

  • 本篇文章主要是讲解源码,并不进行举例Demo,所以读者尽量先写一个小Demo,看看生成的代码,然后伴随着看文章。
  • 如果没有用过Grpc的读者,可以先写个小Demo,可以看官网点击这里,主要是查看下通过Proto文件生成的代码的格式。
  • 这篇文章讲解分别从客户端和服务端两部分讲解(实现有点不一样),篇幅原因只讲解一元调用的示例,其他形式的调用其实是类似的。

Client端

Interceptor和CallInvoker抽象类

public abstract class Interceptor
{
    //一元调用同步拦截器
    public virtual TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
        where TRequest : class
        where TResponse : class
    {
        return continuation(request, context);
    }
    //一元调用异步拦截器
    public virtual AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
        where TRequest : class
        where TResponse : class
    {
        return continuation(request, context);
    }
}
public abstract class CallInvoker
{
    //一元调用同步拦截器
    public abstract TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
        where TRequest : class
        where TResponse : class;
    //一元调用异步拦截器
    public abstract AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
        where TRequest : class
        where TResponse : class;
}

首先我们要理解这两个抽象类分别是干什么的,上述代码讲解:

  • Interceptor我们知道,在实现自定义的拦截器时,需要继承这个类,并对某些方法进行自定义的实现,而continuation就是调用下一个拦截器。
  • 其实CallInvoker其实就是客户端构造的对象,主要用于调用远程服务,通过你自己实现的Demo可以看到,先创建Channel,然后通过Channe创建默认的CallInvoker,而在创建Client通过proto生成的文件里可以看到对应的重载构造函数。

添加拦截器

public static class CallInvokerExtensions
{
    //增加一个拦截器
    public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor)
    {
        return new InterceptingCallInvoker(invoker, interceptor);
    }
    //增加一组拦截器
    public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors)
    {
        //检查是否为Null
        GrpcPreconditions.CheckNotNull(invoker, nameof(invoker));
        GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors));
        //反转集合,构造对象
        foreach (var interceptor in interceptors.Reverse())
        {
            invoker = Intercept(invoker, interceptor);
        }

        return invoker;
    }
    //篇幅原因,这种方式这里不进行讲解,大家可以自己翻下源码看下,主要作用就是增加用户自定义的额外报文值,类似Http请求中的Header
    public static CallInvoker Intercept(this CallInvoker invoker, Func<Metadata, Metadata> interceptor)
    {
        return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor));
    }
}

上述代码总结:

  • 添加一个拦截器,则直接创建一个InterceptingCallInvoker对象返回,而它必定继承CallInvoker。
  • 添加一组拦截器,则将集合反转,然后构造Invoker。
  • 而在客户端proto生成的代码中可以看到,方法的调用是通过CallInvoker对象调用的,读者可以看一下你自己生成的代码。

InterceptingCallInvoker类

internal class InterceptingCallInvoker : CallInvoker
{
    //下一个invoker对象
    readonly CallInvoker invoker;
    //当前的拦截器
    readonly Interceptor interceptor;

    public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor)
    {
        this.invoker = GrpcPreconditions.CheckNotNull(invoker, nameof(invoker));
        this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
    }
    //一元同步调用
    public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
    {
        return interceptor.BlockingUnaryCall(
            request,
            new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
            //当前请求参数和上下文,调用下一个BlockingUnaryCall
            (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
    }
    //一元异步调用
    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
    {
        return interceptor.AsyncUnaryCall(
            request,
            new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
            //当前请求参数和上下文,调用下一个BlockingUnaryCall
            (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
    }
}
//默认的CallInvoker,也就是不加任何拦截器时候的实现
public class DefaultCallInvoker : CallInvoker
{
    readonly Channel channel;

    public DefaultCallInvoker(Channel channel)
    {
        this.channel = GrpcPreconditions.CheckNotNull(channel);
    }
    //一元同步调用
    public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
    {
        var call = CreateCall(method, host, options);
        return Calls.BlockingUnaryCall(call, request);
    }
    //一元异步调用
    public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
    {
        var call = CreateCall(method, host, options);
        return Calls.AsyncUnaryCall(call, request);
    }
}

上述代码总结:

  • 构建InterceptingCallInvoker对象时,会保留当前拦截器对象和下一个invoker对象,以方便调用。
  • 当前拦截器对象的在调用方法时,第三个参数是委托,而这个委托就是Interceptor对应方法里面的continuation参数,客户端通过它来调用下一个拦截器。
  • 而DefaultInvoker里面其实是内部调用远程服务,也就是默认实现,而这个是在通过Channel来构造Client的时候构造出来的。

Client总结

  • 贯穿上面的代码可以看出,不管是调用单个添加拦截器,或者链式添加单个拦截器,又或者是添加一组拦截器,最终必然返回CallInvoker对象,而CallInvoker对象是在proto生成的代码中可以看到,在调用对应方法时是由CallInvoker对象调用的。
  • 关于构建InterceptingCallInvoker ,其实可以和设计模式中的装饰着模式关联下,刚开始只构建了默认的DefaultInvoke(这个里面其实是构建连接,调用server端),然后在这基础上添加其他不同的拦截器功能,返回最终的CallInvoker对象。
  • 需要注意的是,当链式添加单个拦截器时,比如Intercept(a).Intercept(b).Intercept(c),那么最终执行的顺序是c(continuation前)->b(continuation前)->a->b(continuation后)->c(continuation后)。如果一次添加一组拦截器Intercept(a,b,c),那么最终执行的顺序是:a(continuation前)->b(continuation前)->c->b(continuation后)->a(continuation后)。

Server端

Interceptor抽象类和ServerServiceDefinition类

public abstract class Interceptor
{
    //服务端一元调用拦截器
    public virtual Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation)
        where TRequest : class
        where TResponse : class
    {
        return continuation(request, context);
    }
}
public class ServerServiceDefinition
{
    //方法列表,也就是服务端写的那些方法
    readonly IReadOnlyList<Action<ServiceBinderBase>> addMethodActions;
    
    internal ServerServiceDefinition(List<Action<ServiceBinderBase>> addMethodActions)
    {
        this.addMethodActions = addMethodActions.AsReadOnly();
    }
    //给方法绑定服务,也就是绑定拦截器,一会的源码会提到
    internal void BindService(ServiceBinderBase serviceBinder)
    {
        //给每个方法都绑定一下拦截器
        foreach (var addMethodAction in addMethodActions)
        {
            addMethodAction(serviceBinder);
        }
    }
    //创建Builder,可以在proto文件中生成的代码看到,会有调用这个方法
    public static Builder CreateBuilder()
    {
        return new Builder();
    }
    
    public class Builder
    {
        //检测是否有同名方法,这是不被允许的
        readonly Dictionary<string, object> duplicateDetector = new Dictionary<string, object>();
        //服务端方法集合
        readonly List<Action<ServiceBinderBase>> addMethodActions = new List<Action<ServiceBinderBase>>();

        public Builder()
        {
        }
        //可以看到在proto生成的代码中,有调用AddMethod,将方法添加到集合中
        public Builder AddMethod<TRequest, TResponse>(
            Method<TRequest, TResponse> method,
            UnaryServerMethod<TRequest, TResponse> handler)
                where TRequest : class
                where TResponse : class
        {
            duplicateDetector.Add(method.FullName, null);
            addMethodActions.Add((serviceBinder) => serviceBinder.AddMethod(method, handler));
            return this;
        }

        //这中间省略了除一元调用的其他调用,有兴趣的可以自己翻下源码
        
        //初始化build,将上面的方法列表添加到其中
        public ServerServiceDefinition Build()
        {
            return new ServerServiceDefinition(addMethodActions);
        }
    }
}

上述代码总结:

  • 对应每个service,都会维护一个方法的集合,然后把用户定义的方法添加到集合中(在proto生成的代码中可以看到)。
  • 在给每个方法添加拦截器时(当然目前看不出来,下面会说),会给每个方法都加上,也就是说,它们之间是互不影响的。

添加拦截器

public static class ServerServiceDefinitionExtensions
{
    //单个添加拦截器
    public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, Interceptor interceptor)
    {
        GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition));
        GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
        //构造新的ServiceBinder
        var binder = new InterceptingServiceBinder(interceptor);
        //将拦截器绑定到每个方法上
        serverServiceDefinition.BindService(binder);
        //生成并返回新的service
        return binder.GetInterceptedServerServiceDefinition();
    }

    //添加一组拦截器
    public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, params Interceptor[] interceptors)
    {
        GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition));
        GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors));

        foreach (var interceptor in interceptors.Reverse())
        {    
            serverServiceDefinition = Intercept(serverServiceDefinition, interceptor);
        }

        return serverServiceDefinition;
    }

    //只保留了一元调用的代码
    private class InterceptingServiceBinder : ServiceBinderBase
    {
        //创建一个空的Builder
        readonly ServerServiceDefinition.Builder builder = ServerServiceDefinition.CreateBuilder();
        //当前拦截器
        readonly Interceptor interceptor;

        public InterceptingServiceBinder(Interceptor interceptor)
        {
            this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
        }
        //构造新的Builder
        internal ServerServiceDefinition GetInterceptedServerServiceDefinition()
        {
            return builder.Build();
        }
        //添加一元调用的方法,而这个就是你自定义的拦截器
        public override void AddMethod<TRequest, TResponse>(
            Method<TRequest, TResponse> method,
            UnaryServerMethod<TRequest, TResponse> handler)
        {
            builder.AddMethod(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler));
        }
        //这里省略了一部分代码。。。
    }
}

其实到这里,咱们再串联上个小部分的代码,应该就能看出一些端倪,上述代码总结:

  • 这里链式添加或者单次添加一组,它和客户端拦截器调用顺序其实是一致的。
  • 我们结合目前上面Server端的所有代码,可以大概看出,当我们不添加任何拦截器时,ServerServiceDefinition对象里面的方法集合列表仅仅包含用户定义的方法委托集合。然而当我们添加拦截器时,它代码的执行顺序则是,构建InterceptingServiceBinder->调用BindService方法,原来的委托集合开始执行,构造新的委托,而调用的AddMethod则是InterceptingServiceBinder对象里面的AddMethod,handler则是我们写的拦截器里面的continuation,用于传递。
  • 最终我们就会得到一个ServerServiceDefinition对象。当然,上述我们只看到了构造对象,而这个对象在哪里调用的呢?我们继续往下看。

DefaultServiceBinder类

internal static class ServerServiceDefinitionExtensions
{
    //在写服务端的时候,我们需要绑定服务,而在绑定服务的时候需要先调用静态BindService方法(可以在proto生成的代码中看到这个方法),然后添加Services时,内部会调用GetCallHandlers方法。
    internal static ReadOnlyDictionary<string, IServerCallHandler> GetCallHandlers(this ServerServiceDefinition serviceDefinition)
    {    
        //构建默认的ServiceBinder,里面其实是执行构造的最终handler
        var binder = new DefaultServiceBinder();
        //调用BindService方法,将执行集合委托
        serviceDefinition.BindService(binder);
        //返回集合列表
        return binder.GetCallHandlers();
    }

    private class DefaultServiceBinder : ServiceBinderBase
    {
        readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();

        internal ReadOnlyDictionary<string, IServerCallHandler> GetCallHandlers()
        {
            return new ReadOnlyDictionary<string, IServerCallHandler>(this.callHandlers);
        }

        public override void AddMethod<TRequest, TResponse>(
            Method<TRequest, TResponse> method,
            UnaryServerMethod<TRequest, TResponse> handler)
        {
            //每个方法名称对应的一个handler
            callHandlers.Add(method.FullName, ServerCalls.UnaryCall(method, handler));
        }
    }
}

上述代码总结:

  • 在构造出ServerServiceDefinition对象时,用户再将对象绑定到grpc的Servers时,开始执行GetCallHandlers方法,把它又重新构建一遍。
  • grpc默认的会构造一个集合,key是方法全名,value则是IServerCallHandler,实际上每次请求进来会检索方法名,然后执行IServerCallHandler内部的HandleCall方法(这个是在源码里面可以看到😜)。
  • ServerCalls.UnaryCall想了解的可以看下源码,实质上内部就是执行handler,而这个handler就是用户构建的最终ServerServiceDefinition。

Server总结

  • 通过上面我们可以看出,其大致思路可Client端实现很相像,只不过最终返回的是ServerServiceDefinition对象,而这个对象从刚开始默认handler(用户重写的Server端方法),到添加拦截器时在上面的封装,而这个拦截器又通过InterceptingServiceBinder类将其添加进去,它们都继承了ServiceBinderBase,通过构造最终的Builder对象来返回最终的ServerServiceDefinition。
  • 最终的ServerServiceDefinition在我们写的服务端Demo中可以看到,它被添加到Servers中,而在这时候调用GetCallHandlers生成最终的以方法名为key,handler为value的集合。
  • 当有请求进来时,我们只需要根据方法名找到对应的handler,然后把参数传递进去,再执行handler就可以把拦截器和自己定义的方法全部走一遍,这些有兴趣的可以参考下源码。

关于Grpc的拦截器,相信你看完之后会有一定的收获,这里我再额外说一些其他的关于阅读Grpc源码时的小tips:

  • 默认情况下,服务启动时,只有4个后台线程去消费请求(和计算机的CPU数量有关),但是请求的执行默认是通过添加线程池任务来执行的,当然也可以设置不通过线程池执行,直接执行时要注意防止阻塞。
  • 默认情况下,Grpc支持同一时间同时处理8000个请求(也和计算机的CPU数量有关),如果有更多的请求应该就被阻塞了。这个数量是可以开发人员去调节的。

以上就是笔者对Grpc拦截器的理解,本篇文章也主要是希望给读者提供源码阅读思路,可能会有偏差,还请评论指正😂。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK