8

AlterNats是如何做到高性能的发布订阅的? - InCerry

 1 year ago
source link: https://www.cnblogs.com/InCerry/p/highperformance-alternats.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

AlterNats是如何做到高性能的发布订阅的?

在过去的一些文章里面,我们聊了一些.NET平台上高性能编程的技巧,今天带大家了解一下AlterNats这个库是如何做到远超同类SDK性能的。

NATS:NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的Publish/Subscribe模型。NATS的开发哲学认为高质量的QoS应该在客户端构建,故只建立了Request-Reply,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列等功能,所以它的性能可以非常好。

NATS.NET:NATS.NET是NATS官方实现的C#语言客户端,它的架构和Go版本保持一致,导致没有使用一些高性能的API和新的语法,性能整体较弱,不过它支持.NET4.6+和.NETStandard1.6+几乎兼容主流的.NET版本。

AlterNats:因为官方实现的NATS.NET性能较弱,所以大佬又实现使用了C#和.NET新特性和API编写了这个高性能NATS客户端,它的发布订阅性能比StackExchange.Redis和官方的Nats.Net快三倍以上。

997046-20220712092639156-1718791881.png

上图是8byte数据发布订阅性能对比,可以看到AlterNats遥遥领先,比官方的实现快了很多。下面就带大家了解一下如何使用AlterNats和为什么它能实现这么高的性能。

AlterNats的API完全采用async/await并保持C#原生风格。

// 创建一个链接
await using var conn = new NatsConnection();

// 订阅消息
var subscription = await conn.SubscribeAsync<Person>("foo", x =>
{
    Console.WriteLine($"Received {x}");
});

// 发布消息
await conn.PublishAsync("foo", new Person(30, "bar"));

// 退订
subscription.Dipose();

// ---

public record Person(int Age, string Name);

NatsOptions/ConnectOptions是不可变的记录,它可以使用C#的new和with语法,非常的方便。

// 可选配置项可以通过`with`关键字
var options = NatsOptions.Default with
{
    Url = "nats://127.0.0.1:9999",
    LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information),
    Serializer = new MessagePackNatsSerializer(),
    ConnectOptions = ConnectOptions.Default with
    {
        Echo = true,
        Username = "foo",
        Password = "bar",
    }
};

await using var conn = new NatsConnection(options);

它还提供了用于接收结果的标准协议。可以将其用作服务器之间的简单RPC,在某些情况下可能很有用。

// 服务端
await conn.SubscribeRequestAsync("foobar", (int x) => $"Hello {x}");

// 客户端
var response = await conn.RequestAsync<int, string>("foobar", 100);

如何做到高性能的?

在之前的文章中,和大家聊过,高性能就是在相同的资源的情况下,能处理更多的数据。我们需要降低单次数据处理所耗费的资源(CPU、内存、磁盘等等),下面就带大家了解AlterNats做了什么,来节省这些资源。

高性能Socket编程

在C#中,最底层的网络处理类是Socket,如果你想要异步、高性能的处理网络请求,你需要重用带回调的SocketAsyncEventArgs
然而,现在我们有更简单的方式使用async/await方法,不需要复杂的SocketAsyncEventArgs,不过它有许多使用异步的方法,需要你选择正确的一个去使用。最简单选择的方法就是,用返回值为ValueTask的API就好了。

// 使用这些
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken)

public ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken)

public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken))

// 不要使用这些
public Task ConnectAsync(string host, int port)

public Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)

public Task<int> SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)

返回ValueTask的API内部使用的AwaitableSocketAsyncEventArgs,它非常的高效,另外由于ValueTask是结构体类型,无需像Task一样在堆上分配,还能简单的享受到异步带来的性能提升。与SocketAsyncEventArgs相比,这是一个非常大的改进,SocketAsyncEventArgs非常难用,我强烈推荐上面提到的ValueTask API.

另外要注意的是,同步API可以使用Span<T>类型,但异步的API只能使用Memroy<T>(因为数据需要存在在堆上),这个点不仅限于Socket网络编程,其它API也是一样,如果整个系统在设计的时候就没有考虑这些,那么无法使用Span<T>类型可能为成为障碍。但是你必须保证你可以随心所欲的使用Memory<T>

使用二进制解析文本协议

NATS的协议是基于文本的协议,和Redis等协议类似,它可以简单通过字符串函数来拆分和处理。可以使用StreamReader很容易的实现这个协议,因为你所需要就是ReadLine来读取数据就好。然而,在网络上传输的是UTF-8格式的二进制数据,将其作为字符串来处理开销较大,如果我们需要高性能,那么必须将其作为二进制数据来处理。

NATS的协议可以通过前导字符串(INFO、MSG、PINT、+OK、-ERR等等)来确定消息的类型。虽然可以很容易的使用if(msg == "INFO")这样的代码来分割字符串处理,但是出于性能原因,这样的开销是不可接受的。

那么我们使用什么样的方法来提升性能呢?举一个例子,字符串INFO的UTF-8编码是[73, 78, 70, 79]

997046-20220712092638833-87867595.png

我们可以直接使用ReadOnlySpan<byte>.SequenceEqual()函数来处理(这个函数优化非常好,它会使用SIMD来提高性能)。

不过在我们的场景里,因为NATS的前导字符串都在4byte以内,所以我们可以将INFO转换为一个int类型来处理,比如将字符串INFO的UTF-8编码转换为int就是1330007625.

997046-20220712092638510-99175447.png

所以在AlterNats里面的代码就是这样处理INFO的。

var msg = new byte[] {73,78,70,79};
if (Unsafe.ReadUnaligned<int>(ref MemoryMarshal.GetReference<byte>(msg)) == 1330007625) // INFO
{
	"Command is INFO".Dump();
}

997046-20220712092638143-239589147.png


这应该是在理论上最快的判断方式了,3个字符的指令后面总是紧跟着空格或者换行符,所以可以使用下面这些常量来判断其它的类型。


internal static class ServerOpCodes
{
    public const int Info = 1330007625;  // Encoding.ASCII.GetBytes("INFO") |> MemoryMarshal.Read<int>
    public const int Msg = 541545293;    // Encoding.ASCII.GetBytes("MSG ") |> MemoryMarshal.Read<int>
    public const int Ping = 1196312912;  // Encoding.ASCII.GetBytes("PING") |> MemoryMarshal.Read<int>
    public const int Pong = 1196314448;  // Encoding.ASCII.GetBytes("PONG") |> MemoryMarshal.Read<int>
    public const int Ok = 223039275;     // Encoding.ASCII.GetBytes("+OK\r") |> MemoryMarshal.Read<int>
    public const int Error = 1381123373; // Encoding.ASCII.GetBytes("-ERR") |> MemoryMarshal.Read<int>
}

使用栈上分配

在请求发送中,有很多小的字符串和byte[]对象,这些小对象会比较频繁产生从而影响GC标记时间,在AlterNats中,比较多的使用了stackalloc byte[10]将这些小的对象分配在栈上,当方法结束时,对象就自动释放了,无需GC再参与,有利于降低内存占用率和GC的暂停时间。

public void WritePublish<T>(in NatsKey subject, ReadOnlyMemory<byte> inboxPrefix, int id, T? value, INatsSerializer serializer)  
{  
    // 栈上分配小对象
    Span<byte> idBytes = stackalloc byte[10];  
    if (Utf8Formatter.TryFormat(id, idBytes, out var written))  
    {  
        idBytes = idBytes.Slice(0, written);  
    }  
  
    var offset = 0;  
    var maxLengthWithoutPayload = CommandConstants.PubWithPadding.Length  
        + subject.LengthWithSpacePadding  
        + (inboxPrefix.Length + idBytes.Length + 1) // with space  
        + MaxIntStringLength  
        + NewLineLength;
}

自动管道批处理

在NATS协议中,所有的写入和读取操作都是流水线的(批处理)。这很容易用Redis的流水线来解释。比如,如果你同一时间发送3个消息,每次发送一个,然后等待响应,那么多次往返的发送和接收会成为性能瓶颈。

在发送消息中,AlterNats自动将它们组织成流水线:使用System.Threading.Channels,消息被打包进入队列,然后由一个写循环检索它们,并将它们通过网络成批的发送出去。一旦网络传输完成,写循环的方法又会将等待网络传输时累积的消息再次进行批处理。

997046-20220712092637768-463911399.png

这不仅能节省往返的时间(在NATS中,发布和订阅都是独立的,所以不需要等待响应),另外它也能减少连续的系统调用。.NET最快的日志记录组件ZLogger也采用了相同的方法。

将许多功能整合到单个对象中

为了实现这样的PublishAsync方法,我们需要将数据放入队列的Channel中,并且将其固定在堆上。我们还需要一个异步方法的Task,以便我们可以用await等待它写入完成。

await connection.PublishAsync(value);

为了高效地实现这样一个API,避免多余的分配,我们把所有的功能都在一个消息对象(内部名称叫Command)里面,这样的话只有它会被分配内存。

class AsyncPublishCommand<T> : 
    ICommand,
    IValueTaskSource, 
    IThreadPoolWorkItem, 
    IObjectPoolNode<AsyncPublishCommand<T>>

internal interface ICommand
{
    void Write(ProtocolWriter writer);
}

internal interface IObjectPoolNode<T>
{
    ref T? NextNode { get; }
}

这个对象(AsyncPublicCommand)本身就有用于保存T类型数据和将其二进制数据写入Socket的角色(ICommand)。

此外,通过实现IValueTaskSource接口,该对象本身也变成了ValueTask。

然后,await后面的回调需要交给线程池处理,以避免阻塞写循环。使用传统的ThreadPool.QueueUserWorkItem(callback)会有额外的内存分配,因为它会在内部创建一个ThreadPoolWorkItem并将其塞入线程池队列中。在.NET Core 3.0以后我们可以通过实现IThreadPoolWorkItem来避免内部ThreadPoolWorkItem对象的内存分配。

最后,我们只有一个对象需要分配,另外我们还可以池化这个对象,使其达到零分配(zero allocated)。可以使用ConcurrentQueue<T>或者类似的轻松实现对象池,上面的类中,通过实现IObjectPoolNode<T>接口,使它自己成为栈中的节点,避免分配数组。堆栈也可以提供一个无效的实现,为这种缓存的使用进行优化。

零拷贝架构

需要发布、订阅的数据通常是序列化的C#类型,比如Json、MessagePack等。在这种情况下,它们不可避免的会使用bytes[]交换数据,例如,StackExchange.Redis中的RedisValue内容实际上就是bytes[],无论是发送还是接收,我们都需要创建和保存bytes[]

为了避免这种情况,通常会使用ArrayPool来实现零分配,但是这仍然会产生复制的成本。当然,零分配是我们的目标,但是我们也要朝着zero-copy去努力。

AlterNats序列化要求使用IBufferWriter<byte>写入,使用ReadOnlySequence<byte>来读取。

public interface INatsSerializer
{
    int Serialize<T>(ICountableBufferWriter bufferWriter, T? value);
    T? Deserialize<T>(in ReadOnlySequence<byte> buffer);
}

public interface ICountableBufferWriter : IBufferWriter<byte>
{
    int WrittenCount { get; }
}

// ---

// 举个例子,使用MessagePack来实现序列化和反序列化

public class MessagePackNatsSerializer : INatsSerializer
{
    public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
    {
        var before = bufferWriter.WrittenCount;
        MessagePackSerializer.Serialize(bufferWriter, value);
        return bufferWriter.WrittenCount - before;
    }

    public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
    {
        return MessagePackSerializer.Deserialize<T>(buffer);
    }
}

C#的System.Text.Json或MessagePack有接收IBufferWriter<byte>参数的序列化重载方法。序列化器通过IBufferWriter直接读取和写入Socket提供的缓冲区,从而消除了Socket和序列化器之间的bytes[]复制。

997046-20220712092637230-5490040.png

在读取时,ReadOnlySequence<byte>是必须的,因为从Socket接收的数据通常是分段的。

一种常见的设计模式就使用System.IO.Pipelines的PipeReader来读取和处理数据,它目的是一个简单使用的高性能I/O库。但是AlterNats没有使用Pipelines,而是使用了自己的读取机制和ReadOnlySequence<byte>

System.Text.Json和MessagePack for C#的序列化方法提供了一个接受IBufferWriter<byte>参数的重载,反序列化方法接受ReadOnlySequence<byte>。换句话说,现代序列化器必须支持IBufferWriter<byte>ReadOnlySequence<byte>

本文内容70%来自AlterNats作者的博客文章,这是一篇不可多得的好文章,详细的说明了AlterNats是如何做到高性能的,让我们在回顾一下。

  • 使用最新的Socket ValueTask API
  • 将所有的功能放到单个对象中,降低SDK的内存分配
  • 池化SDK使用类,栈上分配数据,做到堆上零分配
  • 使用二进制方式解析NATS协议
  • 对读取和写入自动进行批处理
  • 使用IBufferWriter<byte>ReadOnlySequence<byte>,对网络数据处理做到zero-copy

AlterNats项目地址: https://github.com/Cysharp/AlterNats


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK