4

用最少的代码打造一个Mini版的gRPC框架 - Artech

 1 year ago
source link: https://www.cnblogs.com/artech/p/16950268.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在《用最少的代码模拟gRPC四种消息交换模式》中,我使用很简单的代码模拟了gRPC四种消息交换模式(Unary、Client Streaming、Server Streaming和Duplex Streaming),现在我们更近一步,试着使用极简的方式打造一个gRPC框架(github地址)。这个gRPC是对ASP.NET Core gRPC实现原理的模拟,并不是想重新造一个轮子。

一、“标准”的gRPC定义、承载和调用
二、将gRPC方法抽象成委托
三、将委托转换成RequestDelegate
   UnaryCallHandler
   ClientStreamingCallHandler
   ServerStreamingCallHandler
   DuplexStreamingCallHandler
四、路由注册
五、为gRPC服务定义一个接口
六、重新定义和承载服务

一、“标准”的gRPC定义、承载和调用

可能有些读者朋友们对ASP.NET Core gRPC还不是太熟悉,所以我们先来演示一下如何在一个ASP.NET Core应用中如何定义和承载一个简单的gRPC服务,并使用自动生成的客户端代码进行调用。我们新建一个空的解决方案,并在其中添加如下所示的三个项目。

image

我们在类库项目Proto中定义了如下所示Greeter服务,并利用其中定义的四个操作分别模拟四种消息交换模式。HelloRequest 和HelloReply 是它们涉及的两个ProtoBuf消息。

syntax = "proto3";
import "google/protobuf/empty.proto";

service Greeter {
  rpc SayHelloUnary (HelloRequest) returns ( HelloReply);
  rpc SayHelloServerStreaming (google.protobuf.Empty) returns (stream HelloReply);
  rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply);
  rpc SayHelloDuplexStreaming (stream HelloRequest) returns (stream HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

ASP.NET Core项目中定义了如下的GreeterServce服务实现了定义的四个操作,基类GreeterBase是针对上面这个.proto文件生成的类型。

public class GreeterService: GreeterBase
{
    public override Task<HelloReply> SayHelloUnary(HelloRequest request, ServerCallContext context)
    => Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" });

    public override async Task<HelloReply> SayHelloClientStreaming(IAsyncStreamReader<HelloRequest> reader, ServerCallContext context)
    {
        var list = new List<string>();
        while (await reader.MoveNext(CancellationToken.None))
        {
            list.Add(reader.Current.Name);
        }
        return new HelloReply { Message = $"Hello, {string.Join(",", list)}" };
    }

    public  override async Task SayHelloServerStreaming(Empty request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context)
    {
        await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" });
        await Task.Delay(1000);
        await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" });
        await Task.Delay(1000);
        await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" });
    }

    public override async Task SayHelloDuplexStreaming(IAsyncStreamReader<HelloRequest> reader, IServerStreamWriter<HelloReply> writer, ServerCallContext context)
    {
        while (await reader.MoveNext())
        {
            await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" });
        }
    }
}

具体的服务承载代码如下。我们采用Minimal API的形式,通过调用IServiceCollection接口的AddGrpc扩展方法注册相关服务,并调用MapGrpcService<TService>将定义在GreeterServce中的四个方法映射我对应的路由终结点。

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2));
var app = builder.Build();
app.MapGrpcService<GreeterService>();
app.Run();

在控制台项目Client中,我们利用生成出来的客户端类型GreeterClient分别一对应的服务交换模式调用了四个gRPC方法。

var channel = GrpcChannel.ForAddress("http://localhost:5000"); var client = new GreeterClient(channel);

Console.WriteLine("Unary"); await UnaryCallAsync();

Console.WriteLine("\nServer Streaming"); await ServerStreamingCallAsync();

Console.WriteLine("\nClient Streaming"); await ClientStreamingCallAsync();

Console.WriteLine("\nDuplex Streaming"); await DuplexStreamingCallAsync();

Console.ReadLine();

async Task UnaryCallAsync() { var request = new HelloRequest { Name = "foobar" }; var reply = await client.SayHelloUnaryAsync(request); Console.WriteLine(reply.Message); }

async Task ServerStreamingCallAsync() { var streamingCall = client.SayHelloServerStreaming(new Empty()); var reader = streamingCall.ResponseStream; while (await reader.MoveNext(CancellationToken.None)) { Console.WriteLine(reader.Current.Message); } }

async Task ClientStreamingCallAsync() { var streamingCall = client.SayHelloClientStreaming(); var writer = streamingCall.RequestStream;

await writer.WriteAsync(new HelloRequest { Name = "Foo" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Bar" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Baz" }); await writer.CompleteAsync();

var reply = await streamingCall.ResponseAsync; Console.WriteLine(reply.Message); }

async Task DuplexStreamingCallAsync() { var streamingCall = client.SayHelloDuplexStreaming(); var writer = streamingCall.RequestStream; var reader = streamingCall.ResponseStream; _ = Task.Run(async () => { await writer.WriteAsync(new HelloRequest { Name = "Foo" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Bar" }); await Task.Delay(1000); await writer.WriteAsync(new HelloRequest { Name = "Baz" }); await writer.CompleteAsync(); }); await foreach (var reply in reader.ReadAllAsync()) { Console.WriteLine(reply.Message); } }

如下所示的是客户端控制台上的输出结果。

image

二、将gRPC方法抽象成委托

通过上面的演示我们也知道,承载的gRPC类型最终会将其实现的方法注册成路由终结点,这一点其实和MVC是一样的。但是gRPC的方法和定义在Controller类型中的Action方法不同之处在于,前者的签名其实是固定的。如果我们将请求和响应消息类型使用Request和Reply来表示,四种消息交换模式的方法签名就可以写成如下的形式。

Task<Reply> Unary(Request request, ServerCallContext context);
Task<Reply> ClientStreaming(IAsyncStreamReader<Request> reader, ServerCallContext context);
Task ServerStreaming(Empty request, IServerStreamWriter<Reply> responseStream, ServerCallContext context);
Task DuplexStreaming(IAsyncStreamReader<Request> reader, IServerStreamWriter<Reply> writer, ServerCallContext context);

“流式”方法中用来读取请求和写入响应的IAsyncStreamReader<T>和IServerStreamWriter<T>定义如下。

public interface IAsyncStreamReader<out T> { T Current { get; } Task<bool> MoveNext(CancellationToken cancellationToken = default); }

public interface IAsyncStreamWriter<in T> { Task WriteAsync(T message, CancellationToken cancellationToken = default); }

public interface IServerStreamWriter<in T> : IAsyncStreamWriter<T> { }

public interface IClientStreamWriter<in T> : IAsyncStreamWriter<T> { Task CompleteAsync(); }

表示服务端调用上下文的ServerCallContext 类型具有丰富的成员,但是它的本质就是对HttpContext上下文的封装,所以我们对它进行了简化。如下面的代码片段所示,我们给予这个上下文类型两个属性成员,一个是表示请求上下文的HttpContext,另一个则是用来设置响应状态StatusCode,后者对应的枚举定义了完整的gRPC状态码。

public class ServerCallContext
{
    public StatusCode StatusCode { get; set; } = StatusCode.OK;
    public HttpContext HttpContext { get; }
    public ServerCallContext(HttpContext httpContext)=> HttpContext = httpContext;
}

public enum StatusCode
{
    OK = 0,
    Cancelled = 1,
    Unknown = 2,
    InvalidArgument = 3,
    DeadlineExceeded = 4,
    NotFound = 5,
    AlreadyExists = 6,
    PermissionDenied = 7,
    Unauthenticated = 0x10,
    ResourceExhausted = 8,
    FailedPrecondition = 9,
    Aborted = 10,
    OutOfRange = 11,
    Unimplemented = 12,
    Internal = 13,
    Unavailable = 14,
    DataLoss = 0xF
}

既然方法签名固定,意味着我们可以将四种gRPC方法定义成如下四个对应的委托,泛型参数TService、TRequest和TResponse分别表示服务、请求和响应类型。

public delegate Task<TResponse> UnaryMethod<TService, TRequest, TResponse>(TService service, TRequest request, ServerCallContext context)
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>;

public delegate Task<TResponse> ClientStreamingMethod<TService, TRequest, TResponse>(TService service, IAsyncStreamReader<TRequest> reader, ServerCallContext context)
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>;

public delegate Task ServerStreamingMethod<TService, TRequest, TResponse>(TService service, TRequest request, IServerStreamWriter<TResponse> writer, ServerCallContext context)
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>;

public delegate Task DuplexStreamingMethod<TService, TRequest, TResponse>(TService service, IAsyncStreamReader<TRequest> reader, IServerStreamWriter<TResponse> writer, ServerCallContext context)
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>;

我们知道路由的本质就是创建一组路由模式(Pattern)和对应处理器之间的映射关系。路由模式很简单,对应的路由模板为“{ServiceName}/{MethodName}”,并且采用Post请求方法。对应的处理器最终体现为一个RequestDelegate。那么只要我们能够将上述四种委托类型都转换成RequestDelegate委托,一切都迎刃而解了。

三、将委托转换成RequestDelegate

为了将四种委托类型转化成RequestDelegate,我们将后者实现为一个ServiceCallHandler类型,并为其定义了如下两个基类。ServerCallHandlerBase的HandleCallAsync方法正好与RequestDelegate委托的签名一致,所以这个方法最终会用来处理gRPC请求。不同的消息交换模式采用不同的请求处理方式,只需实现抽象方法HandleCallAsyncCore就可以了。HandleCallAsync方法在调用此抽象方法之前将响应的ContentType设置成gRPC标准的响应类型“application/grpc”。在此之后将状态码设置为“grpc-status”首部,它将在HTTP2的DATA帧发送完毕后,以HEADERS帧发送到客户端。这两项操作都是gRPC协议的一部分。

public abstract class ServerCallHandlerBase
{
    public async Task HandleCallAsync(HttpContext httpContext)
    {
        try
        {
            var serverCallContext = new ServerCallContext(httpContext);
            var response = httpContext.Response;
            response.ContentType = "application/grpc";
            await HandleCallAsyncCore(serverCallContext);
            SetStatus(serverCallContext.StatusCode);
        }
        catch
        {
            SetStatus(StatusCode.Unknown);
        }
        void SetStatus(StatusCode statusCode)
        {
            httpContext.Response.AppendTrailer("grpc-status", ((int)statusCode).ToString());
        }
    }
    protected abstract Task HandleCallAsyncCore(ServerCallContext serverCallContext);
}

public abstract class ServerCallHandler<TService, TRequest, TResponse> : ServerCallHandlerBase
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>
{
    protected ServerCallHandler(MessageParser<TRequest> requestParser)=> RequestParser = requestParser;
    public MessageParser<TRequest> RequestParser { get; }
}

ServerCallHandler<TService, TRequest, TResponse>派生自ServerCallHandlerBase,并利用三个泛型参数TService、TRequest、TResponse来表示服务、请求和响应类型,RequestParser用来提供发序列化请求消息的MessageParser<TRequest>对象。针对四种消息交换模式的ServiceCallHandler类型均继承这个泛型基类。

UnaryCallHandler

基于Unary消息交换模式的ServerCallHandler的具体类型为UnaryCallHandler<TService, TRequest, TResponse>,它由上述的UnaryMethod<TService, TRequest, TResponse>委托构建而成。在重写的HandleCallAsyncCore方法中,我们利用HttpContext提供的IServiceProvider对象将服务实例创建出来后,从请求主体中将请求消息读取出来,然后交给指定的委托对象进行处理并得到响应消息,该响应消息最终用来对当前请求予以回复。

internal class UnaryCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>
{
    private readonly UnaryMethod<TService, TRequest, TResponse> _handler;

    public UnaryCallHandler(UnaryMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser):base(requestParser)
    => _handler = handler;
        protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext)
    {
        using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();
        var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);
        var httpContext = serverCallContext.HttpContext;
        var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync<TRequest>(RequestParser);
        var reply = await _handler(service, request!, serverCallContext);
        await httpContext.Response.BodyWriter.WriteMessageAsync(reply);
    }
}

请求消息是通过如下这个ReadSingleMessageAsync<TMessage>方法读取出来的。按照gRPC协议,通过网络传输的请求和响应消息都会在前面追加5个字节,第一个字节表示消息是否经过加密,后面四个字节是一个以大端序表示的整数,表示消息的长度。对于其他消息交换模式,也是调用Buffers的TryReadMessage<TRequest>方法从缓冲区中读取请求消息。

public static async Task<TMessage> ReadSingleMessageAsync<TMessage>(this PipeReader reader, MessageParser<TMessage> parser) where TMessage:IMessage<TMessage>
{
    while (true)
    {
        var result = await reader.ReadAsync();
        var buffer = result.Buffer;
        if (Buffers.TryReadMessage(parser, ref buffer, out var message))
        {
            return message!;
        }
        reader.AdvanceTo(buffer.Start, buffer.End);
        if (result.IsCompleted)
        {
            break;
        }
    }
    throw new IOException("Fails to read message.");
}

internal static class Buffers
{
    public static readonly int HeaderLength = 5;
    public static bool TryReadMessage<TRequest>(MessageParser<TRequest> parser, ref ReadOnlySequence<byte> buffer, out TRequest? message) where TRequest: IMessage<TRequest>
    {
        if (buffer.Length < HeaderLength)
        {
            message = default;
            return false;
        }

        Span<byte> lengthBytes = stackalloc byte[4];
        buffer.Slice(1, 4).CopyTo(lengthBytes);
        var length = BinaryPrimitives.ReadInt32BigEndian(lengthBytes);
        if (buffer.Length < length + HeaderLength)
        {
            message = default;
            return false;
        }

        message = parser.ParseFrom(buffer.Slice(HeaderLength, length));
        buffer = buffer.Slice(length + HeaderLength);
        return true;
    }
}

如下这个WriteMessageAsync扩展方法负责输出响应消息。

public static ValueTask<FlushResult> WriteMessageAsync(this PipeWriter writer, IMessage message)
{
    var length = message.CalculateSize();
    var span = writer.GetSpan(5 + length);
    span[0] = 0;
    BinaryPrimitives.WriteInt32BigEndian(span.Slice(1, 4), length);
    message.WriteTo(span.Slice(5, length));
    writer.Advance(5 + length);
    return writer.FlushAsync();
}

ClientStreamingCallHandler

ClientStreamingCallHandler<TService, TRequest, TResponse>代表Client Streaming模式下的ServerCallHandler,它由对应的ClientStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要一个用来以“流”的方式读取请求的IAsyncStreamReader<TRequest>对象,它们都将作为参数传递给指定的委托,后者执行后会返回最终的响应消息。此消息同样通过上面这个WriteMessageAsync扩展方法予以回复。

internal class ClientStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>
{
    private readonly ClientStreamingMethod<TService, TRequest, TResponse> _handler;
    public ClientStreamingCallHandler(ClientStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser)
        :base(requestParser)
    {
        _handler = handler;
    }
    protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext)
    {
        using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();
        var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);
        var reader = serverCallContext.HttpContext.Request.BodyReader;
        var writer = serverCallContext.HttpContext.Response.BodyWriter;
        var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext.HttpContext, RequestParser);
        var response = await _handler(service, streamReader, serverCallContext);
        await writer.WriteMessageAsync(response);
    }
}

IAsyncStreamReader<T>接口的实现类型为如下这个HttpContextStreamReader<T>。在了解了请求消息在网络中的结构之后,对于实现在该类型中针对请求的读取操作,应该不难理解。

public class HttpContextStreamReader<T> : IAsyncStreamReader<T> where T : IMessage<T>
{
    private readonly PipeReader _reader;
    private readonly MessageParser<T> _parser;
    private ReadOnlySequence<byte> _buffer;
    public HttpContextStreamReader(HttpContext httpContext, MessageParser<T> parser)
    {
        _reader = httpContext.Request.BodyReader;
        _parser = parser;
    }
    public T Current { get; private set; } = default!;
    public async Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        var completed = false;
        if (_buffer.IsEmpty)
        {
            var result = await _reader.ReadAsync(cancellationToken);
            _buffer = result.Buffer;
            completed = result.IsCompleted;
        }
        if (Buffers.TryReadMessage(_parser, ref _buffer, out var mssage))
        {
            Current = mssage!;
            _reader.AdvanceTo(_buffer.Start, _buffer.End);
            return true;
        }
        _reader.AdvanceTo(_buffer.Start, _buffer.End);
        _buffer = default;
        return !completed && await MoveNext(cancellationToken);
    }
}

ServerStreamingCallHandler

ServerStreamingCallHandler<TService, TRequest, TResponse>代表Server Streaming模式下的ServerCallHandler,它由对应的ServerStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要一个用来以“流”的方式写入响应的IAsyncStreamWriter<TResponse>对象,它们都将作为参数传递给指定的委托。

internal class ServerStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>
{
    private readonly ServerStreamingMethod<TService, TRequest, TResponse> _handler;
    public ServerStreamingCallHandler(ServerStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser):base(requestParser)
        => _handler = handler;
    protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext)
    {
        using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();
        var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);
        var httpContext = serverCallContext.HttpContext;
        var streamWriter = new HttpContextStreamWriter<TResponse>(httpContext);
        var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync(RequestParser);
        await _handler(service, request, streamWriter, serverCallContext);
    }
}

IAsyncStreamWriter<T>接口的实现类型为如下这个HttpContextStreamWriter<T>,它直接调用上面定义的WriteMessageAsync扩展方法将指定的消息写入响应主体的输出流。

public class HttpContextStreamWriter<T> : IServerStreamWriter<T> where T : IMessage<T>
{
    private readonly PipeWriter _writer;
    public HttpContextStreamWriter(HttpContext httpContext) => _writer = httpContext.Response.BodyWriter;
    public Task WriteAsync(T message, CancellationToken cancellationToken = default)
    {
        cancellationToken.ThrowIfCancellationRequested();
        return _writer.WriteMessageAsync(message).AsTask();
    }
}

DuplexStreamingCallHandler

DuplexStreamingCallHandler<TService, TRequest, TResponse>代表Duplex Streaming模式下的ServerCallHandler,它由对应的DuplexStreamingMethod<TService, TRequest, TResponse>委托创建而成。在重写的HandleCallAsyncCore方法中,除了服务实例,它还需要分别创建以“流”的方式读/写请求/响应的IAsyncStreamReader<TRequest>和IAsyncStreamWriter<TResponse>对象,对应的类型分别为上面定义的HttpContextStreamReader<TRequest>和HttpContextStreamWriter<TResponse>。

internal class DuplexStreamingCallHandler<TService, TRequest, TResponse> : ServerCallHandler<TService, TRequest, TResponse>
    where TService : class
    where TRequest : IMessage<TRequest>
    where TResponse : IMessage<TResponse>
{
    private readonly DuplexStreamingMethod<TService, TRequest, TResponse> _handler;
    public DuplexStreamingCallHandler(DuplexStreamingMethod<TService, TRequest, TResponse> handler, MessageParser<TRequest> requestParser) :base(requestParser)
        => _handler = handler;
    protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext)
    {
        using var scope = serverCallContext.HttpContext.RequestServices.CreateScope();
        var service = ActivatorUtilities.CreateInstance<TService>(scope.ServiceProvider);
        var reader = serverCallContext.HttpContext.Request.BodyReader;
        var writer = serverCallContext.HttpContext.Response.BodyWriter;
        var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext.HttpContext, RequestParser);
        var streamWriter = new HttpContextStreamWriter<TResponse>(serverCallContext.HttpContext);
        await _handler(service, streamReader, streamWriter, serverCallContext);
    }
}

四、路由注册

目前我们将针对四种消息交换模式的gRPC方法抽象成对应的泛型委托,并且可以利用它们创建ServerCallHandler,后者可以提供作为路由终结点处理器的RequestDelegate委托。枚举和对应ServerCallHandler之间的映射关系如下所示:

  • UnaryMethod<TService, TRequest, TResponse>:UnaryCallHandler<TService, TRequest, TResponse>
  • ClientStreamingMethod<TService, TRequest, TResponse>:ClientStreamingCallHandler<TService, TRequest, TResponse>
  • ServerStreamingMethod<TService, TRequest, TResponse>:ServerStreamingCallHandler<TService, TRequest, TResponse>
  • DuplexStreamingMethod<TService, TRequest, TResponse>:DuplexStreamingCallHandler<TService, TRequest, TResponse>

现在我们将整个路由注册的流程串起来,为此我们定义了如下这个IServiceBinder<TService>接口,它提供了两种方式将定义在服务类型TService中的gRPC方法注册成对应的路由终结点。

public interface IServiceBinder<TService> where TService : class
{
    IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;

    IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;

    IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;

    IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;


    IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;
    IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;

    IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;

    IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>;
}

路由终结点由路由模式和处理器两个元素组成,路由模式主要体现在由gRPC服务和操作名称组成的路由模板,我们默认使用服务类型的名称和方法名称(提出Async后缀)。为了能够对这两个名称进行定制,我们定义了如下两个特性GrpcServiceAttribute和GrpcMethodAttribute,它们可以分别标注在服务类型和操作方法上来指定一个任意的名称。

[AttributeUsage(AttributeTargets.Class)]
public class GrpcServiceAttribute: Attribute
{
    public string? ServiceName { get; set; }

}

[AttributeUsage(AttributeTargets.Method)]
public class GrpcMethodAttribute : Attribute
{
    public string? MethodName { get; set; }
}

如下所示的ServiceBinder<TService> 是对IServiceBinder<TService> 接口的实现,它是对一个IEndpointRouteBuilder 对象的封装。对于实现的第一组方法,我们利用提供的方法名称与解析TService类型得到的服务名称合并,进而得到路由终结点的URL模板。这些方法还提供了一个针对gRPC方法签名的Func<TService,Func<…>>委托,我们利用它来将提供用于构建对应ServiceCallHandler的委托。我们最终利用IEndpointRouteBuilder 对象完成针对路由终结点的注册。

public class ServiceBinder<TService> : IServiceBinder<TService> where TService : class
{
    private readonly IEndpointRouteBuilder _routeBuilder;
    public ServiceBinder(IEndpointRouteBuilder routeBuilder) => _routeBuilder = routeBuilder;

    public IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        Task<TResponse> GetMethod(TService service, TRequest request, ServerCallContext context) => methodAccessor(service)(request, context);
        var callHandler = new UnaryCallHandler<TService, TRequest, TResponse>(GetMethod, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, ServerCallContext, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        Task<TResponse> GetMethod(TService service, IAsyncStreamReader<TRequest> reader, ServerCallContext context) => methodAccessor(service)(reader, context);
        var callHandler = new ClientStreamingCallHandler<TService, TRequest, TResponse>(GetMethod, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<TRequest, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        ServerStreamingMethod<TService, TRequest, TResponse> handler = (service, request, writer, context) => methodAccessor(service)(request, writer, context);
        var callHandler = new ServerStreamingCallHandler<TService, TRequest, TResponse>(handler, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(string methodName, Func<TService, Func<IAsyncStreamReader<TRequest>, IServerStreamWriter<TResponse>, ServerCallContext, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        DuplexStreamingMethod<TService, TRequest, TResponse> handler = (service, reader, writer, context) => methodAccessor(service)(reader, writer, context);
        var callHandler = new DuplexStreamingCallHandler<TService, TRequest, TResponse>(handler, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    private static string GetPath(string methodName)
    {
        var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;
        if (methodName.EndsWith("Async"))
        {
            methodName = methodName.Substring(0, methodName.Length - 5);
        }
        return $"{serviceName}/{methodName}";
    }

    public IServiceBinder<TService> AddUnaryMethod<TRequest, TResponse>(Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        var method = CreateDelegate<UnaryMethod<TService, TRequest,TResponse>>(methodAccessor, out var methodName);
        var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;
        var callHandler = new UnaryCallHandler<TService, TRequest, TResponse>(method, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddClientStreamingMethod<TRequest, TResponse>( Expression<Func<TService, Task<TResponse>>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        var method = CreateDelegate<ClientStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);
        var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;
        var callHandler = new ClientStreamingCallHandler<TService, TRequest, TResponse>(method, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddServerStreamingMethod<TRequest, TResponse>(Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        var method = CreateDelegate<ServerStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);
        var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;
        var callHandler = new ServerStreamingCallHandler<TService, TRequest, TResponse>(method, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    public IServiceBinder<TService> AddDuplexStreamingMethod<TRequest, TResponse>(Expression<Func<TService, Task>> methodAccessor, MessageParser<TRequest> parser)
        where TRequest : IMessage<TRequest>
        where TResponse : IMessage<TResponse>
    {
        var method = CreateDelegate<DuplexStreamingMethod<TService, TRequest, TResponse>>(methodAccessor, out var methodName);
        var serviceName = typeof(TService).GetCustomAttribute<GrpcServiceAttribute>()?.ServiceName ?? typeof(TService).Name;
        var callHandler = new DuplexStreamingCallHandler<TService, TRequest, TResponse>(method, parser);
        _routeBuilder.MapPost(ServiceBinder<TService>.GetPath(methodName), callHandler.HandleCallAsync);
        return this;
    }

    private TDelegate CreateDelegate<TDelegate>(LambdaExpression expression, out string methodName) where TDelegate : Delegate
    {
        var method = ((MethodCallExpression)expression.Body).Method;
        methodName = method.GetCustomAttribute<GrpcMethodAttribute>()?.MethodName ?? method.Name;
        return (TDelegate)Delegate.CreateDelegate(typeof(TDelegate), method);
    }
}

由于第二组方法提供的针对gRPC方法调用的表达式,所以我们可以得到描述方法的MethodInfo对象,该对象不但解决了委托对象的创建问题,还可以提供方法的名称,所以这组方法无需提供gRPC方法的名称。但是提供的表达式并不能严格匹配方法的签名,所以无法提供编译时的错误检验,所以各有优缺点。

五、为gRPC服务定义一个接口

由于路由终结点的注册是针对服务类型进行的,所以我们决定让服务类型自身来完成所有的路由注册工作。在这里我们使用C# 11中一个叫做“静态接口方法”的特性,为服务类型定义如下这个IGrpcService<TService>接口,服务类型TService定义的所有gRPC方法的路由注册全部在静态方法Bind中完成,该方法将上述的IServiceBinder<TService>作为参数。

public interface  IGrpcService<TService> where TService:class
{
     static abstract void Bind(IServiceBinder<TService> binder);
}

我们定义了如下这个针对IEndpointRouteBuilder 接口的扩展方法完成针对指定服务类型的路由注册。为了与现有的方法区别开来,我特意将其命名为MapGrpcService2。该方法根据指定的IEndpointRouteBuilder 对象将ServiceBinder<TService>对象创建出来,并作为参数调用服务类型的静态Bind方法。到此为止,整个Mini版的gRPC服务端框架就构建完成了,接下来我们看看它能否工作。

public static class EndpointRouteBuilderExtensions
{
    public static IEndpointRouteBuilder MapGrpcService2<TService>(this IEndpointRouteBuilder routeBuilder) where TService : class, IGrpcService<TService>
    {

        var binder = new ServiceBinder<TService>(routeBuilder);
        TService.Bind(binder);
        return routeBuilder;
    }
}

六、重新定义和承载服务

我们开篇演示了ASP.NET Core gRPC的服务定义、承载和调用。如果我们上面构建的Mini版gRPC框架能够正常工作,意味着客户端代码可以保持不变,我们现在就来试试看。我们在Server项目中将GreeterService服务类型改成如下的形式,它不再继承任何基类,只实现IGrpcService<GreeterService>接口。针对四种消息交换模式的四个方法的实现方法保持不变,在实现的静态Bind方法中,我们采用两种形式完成了针对这四个方法的路由注册。

[GrpcService(ServiceName = "Greeter")] public class GreeterService: IGrpcService<GreeterService> { public Task<HelloReply> SayHelloUnaryAsync(HelloRequest request, ServerCallContext context) => Task.FromResult(new HelloReply { Message = $"Hello, {request.Name}" });

public async Task<HelloReply> SayHelloClientStreamingAsync(IAsyncStreamReader<HelloRequest> reader, ServerCallContext context) { var list = new List<string>(); while (await reader.MoveNext(CancellationToken.None)) { list.Add(reader.Current.Name); } return new HelloReply { Message = $"Hello, {string.Join(",", list)}" }; }

public async Task SayHelloServerStreamingAsync(Empty request, IServerStreamWriter<HelloReply> responseStream, ServerCallContext context) { await responseStream.WriteAsync(new HelloReply { Message = "Hello, Foo!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Bar!" }); await Task.Delay(1000); await responseStream.WriteAsync(new HelloReply { Message = "Hello, Baz!" }); }

public async Task SayHelloDuplexStreamingAsync(IAsyncStreamReader<HelloRequest> reader, IServerStreamWriter<HelloReply> writer, ServerCallContext context) { while (await reader.MoveNext()) { await writer.WriteAsync(new HelloReply { Message = $"Hello {reader.Current.Name}" }); } }

public static void Bind(IServiceBinder<GreeterService> binder) { binder

.AddUnaryMethod<HelloRequest, HelloReply>(it =>it.SayHelloUnaryAsync(default!,default!), HelloRequest.Parser) .AddClientStreamingMethod<HelloRequest, HelloReply>(it => it.SayHelloClientStreamingAsync(default!, default!), HelloRequest.Parser)

.AddServerStreamingMethod<Empty, HelloReply>(nameof(SayHelloServerStreamingAsync), it => it.SayHelloServerStreamingAsync, Empty.Parser) .AddDuplexStreamingMethod<HelloRequest, HelloReply>(nameof(SayHelloDuplexStreamingAsync), it => it.SayHelloDuplexStreamingAsync, HelloRequest.Parser); }
}

服务承载程序直接将针对MapGrpcService<GreeterService>方法的调用换成MapGrpcService2<GreeterService>。由于整个框架根本不需要预先注册任何的服务,所以针对AddGrpc扩展方法的调用也可以删除。

using GrpcMini;
using Microsoft.AspNetCore.Server.Kestrel.Core;

var builder = WebApplication.CreateBuilder(args);
builder.WebHost.ConfigureKestrel(kestrel => kestrel.ConfigureEndpointDefaults(options => options.Protocols = HttpProtocols.Http2));
var app = builder.Build();
app.MapGrpcService2<Server.Greeter>();
app.Run();

再次运行我们的程序,客户端依然可以得到相同的输出。

image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK