3

C#实现一个万物皆可排序的队列

 1 year ago
source link: https://www.cnblogs.com/bossma/p/16441581.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

C#实现一个万物皆可排序的队列

产品中需要向不同的客户推送数据,原来的实现是每条数据产生后就立即向客户推送数据,走的的是HTTP协议。因为每条数据都比较小,而数据生成的频次也比较高,这就会频繁的建立HTTP连接,而且每次HTTP传输中携带的业务数据都很小,对网络的实际利用率不高。希望能够提高网络的利用率,并降低系统的负载。

一个很自然的想法就是将多条数据一起发送,这里有几个关键点:

1、多条数据的聚合逻辑: 是攒够几条发送,还是按照时间周期发送。如果是攒够几条发送,在数据比较稀疏或者产生频率不那么稳定的时候,攒够需要的数据条数可能比较困难,这时候还得需要一个过期时间,因为客户可能接受不了太多的延迟。既然不管怎样都需要使用时间进行控制,我这里索性就选择按照时间周期发送了。思路是:自上次发送时间起,经过了某个时长之后,就发送客户在这段时间内产生的所有数据。

2、数据到期判断方法:既然选择了按照时间周期发送,那么就必须有办法判断是否到了发送时间。一个很简单的想法就是轮询,把所有客户轮询一遍,看看谁的数据到期了,就发送谁的。这个算法的时间复杂度是O(N),如果客户比较多,就会消耗过多的时间在这上边。还有一个办法:如果客户按照时间排序好了,那么只需要取时间最早的客户的数据时间判断就好了,满足就发送,一直向后找,直到获取的客户数据时间不符合条件,则退出处理,然后等一会再进行判断处理。这就需要有一个支持排序的数据结构,写入数据时自动排序,这种数据结构的时间复杂度一般可以做到O(log(n))。对于这个数据结构的读写操作原理上就是队列的操作方式,只不过是个可排序的队列。

3、区分客户:不同客户的数据接收地址不同,向具体某个客户发送数据时,应该能比较方便的聚合他的数据,最好是直接就能拿到需要发送的数据。可以使用字典数据结构来满足这个需求,取某个客户数据的时间复杂度可以降低到O(1)。

4、数据的安全性问题:如果程序在数据发送成功之前退出了,未发送的数据怎么办?是还能继续发送,还是就丢掉不管了。如果要在程序重启后恢复未发送成功的数据,则必须将数据同步到别的地方,比如持久化到磁盘。因为我这里的数据安全性要求不高,丢失一些数据也是允许的,所以要发送的数据收到之后放到内存就行了。

上文提到可排序的数据结构,可以使用SortedList<TKey,TValue>,键是时间,值是这个时间产生了数据的客户标识列表。不过它的读写操作不是线程安全的,需要自己做同步,这里简单点就使用lock了。

对于不同客户的数据,为了方便获取,使用Dictionary<TKey,TValue>来满足,键是客户的标识,值是累积的未发送客户数据。这个数据读写也不是线程安全的,可以和SortedList的读写放到同一个lock中。

下边是它们的定义:

SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();

插入数据的时候,需要先写入SortedList,然后再写入Dictionary。代码逻辑比较简单,请看:

    public void Publish(TKey key, TValue value)
    {
        DateTime now = DateTime.Now;
        lock (_lock)
        {
            if (_queue.TryGetValue(now, out List<TKey>? keys))
            {
                if (!keys!.Contains(key))
                {
                    keys.Add(key);
                }
            }
            else
            {
                _queue.Add(now, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }

对于消费数据,这里采用拉数据的模式。最开始写的方法逻辑是:读取一条数据,处理它,然后从队列中删除。但是这个逻辑需要对队列进行读写,所以必须加锁。一般处理数据比较耗时,比如这里要通过HTTP发送数据,加锁的话就可能导致写数据到队列时阻塞的时间比较长。所以这里实现的是把可以发送的数据全部提取出来,然后就释放锁,数据的处理放到锁的外部实现,这样队列的读写性能就比较好了。

    public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        DateTime now = DateTime.Now;

        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var first = _queue.First();
                var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
                if (diffMillseconds < _valueDequeueMillseconds)
                {
                    break;
                }

                var keys = first.Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }

这段代码比较长一些,我梳理下逻辑:取队列的第一条数据,判断时间是否达到发送周期,未达到则直接退出,方法返回空列表。如果达到发送周期,则取出第一条数据中存储的客户标识,然后根据这些标识获取对应的客户未发送数据,将这些数据按照客户维度添加到返回列表中,将这些客户及其数据从队列中移除,返回有数据的列表。这里还增加了一个拉取数据的条数限制,方便根据业务实际情况进行控制。

再来看一下怎么使用这个队列,这里模拟多个生产者加一个消费者,其实可以任意多个生产者和消费者:

TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);

List<Task> publishTasks = new List<Task>();

for (int i = 0; i < 4; i++)
{
    var j = i;
    publishTasks.Add(Task.Factory.StartNew(() =>
    {
        int k = 0;
        while (true)
        {
            queue.Publish($"key_{k}", $"value_{j}_{k}");
            Thread.Sleep(15);
            k++;
        }
    }, TaskCreationOptions.LongRunning));
}

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var list = queue.Pull(100);
        if (list.Count <= 0)
        {
            Thread.Sleep(100);
            continue;
        }

        foreach (var item in list)
        {
            Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
        }
    }

}, TaskCreationOptions.LongRunning);

Task.WaitAll(publishTasks.ToArray());

以上就是针对这个特定需求实现的一个按照时间进行排序的队列。

万物皆可排序的队列
我们很容易想到,既然可以按照时间排序,那么按照别的数据类型排序也是可以的。这个数据结构可以应用的场景很多,比如按照权重排序的队列、按照优先级排序的队列、按照年龄排序的队列、按照银行存款排序的队列,等等。这就是一个万物皆可排序的队列。

我这里把主要代码贴出来(完整代码和示例请看文末):

public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
    Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();

    SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();

    readonly object _lock = new object();

    /// <summary>
    /// Create a new instance of SortedQueue
    /// </summary>
    public SortedQueue(int maxNumberOfMessageConsumedOnce)
    {
    }

    /// <summary>
    /// Publish a message to queue
    /// </summary>
    /// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
    /// <param name="key">The message key.</param>
    /// <param name="value">The message value.</param>
    public void Publish(TSortKey sortKey, TKey key, TValue value)
    {
        lock (_lock)
        {
            if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
            {
                keys.Add(key);
            }
            else
            {
                _queue.Add(sortKey, new List<TKey> { key });
            }

            if (_data.TryGetValue(key, out List<TValue>? values))
            {
                values.Add(value);
            }
            else
            {
                _data.Add(key, new List<TValue> { value });
            }
        }
    }


    /// <summary>
    /// Pull a batch of messages.
    /// </summary>
    /// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
    /// <returns></returns>
    public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
    {
        List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
        lock (_lock)
        {
            int messageCount = 0;
            while (true)
            {
                if (!_queue.Any())
                {
                    break;
                }

                var keys = _queue.First().Value;
                foreach (var key in keys)
                {
                    if (_data.TryGetValue(key, out List<TValue>? keyValues))
                    {
                        result.Add((key, keyValues));
                        _data.Remove(key);
                        messageCount += keyValues!.Count;
                    }
                }
                _queue.RemoveAt(0);

                if (messageCount >= maxNumberOfMessages)
                {
                    break;
                }
            }
        }

        return result;
    }
}

代码逻辑还是比较简单的,就不罗嗦了,如有问题欢迎留言交流。

再说数据安全

因为在这个实现中所有待处理的数据都在内存中,丢失数据会带来一定的风险,因为我这个程序前边还有一个队列,即使程序崩溃了,也只损失没处理的一小部分数据,业务上可以接受,所以这样做没有问题。如果你对这个程序感兴趣,需要慎重考虑你的应用场景。

来看看数据丢失可能发生的两种情况:

一是数据还在队列中时程序重启了:对于这种情况,前文提到将数据同步到其它地方,比如写入Redis、写入数据库、写入磁盘等等。不过因为网络IO、磁盘IO较慢,这往往会带来吞吐量的大幅下降,想要保证一定的吞吐量,还得引入一些分片机制,又因为分布式的不可靠,可能还得增加一些容错容灾机制,比较复杂,可以参考Kafka。

二是数据处理的时候失败了:对于这种情况,可以让程序重试;但是如果异常导致程序崩溃了,数据已经从内存或者其它存储中移除了,数据还是会发生丢失。这时候可以采用一个ACK机制,处理成功后向队列发送一个ACK,携带已经处理的数据标识,队列根据标识删除数据。否则消费者还能消费到这些数据。

这些问题并不一定要完全解决,还是得看业务场景,有可能你把数据持久化到Redis就够了,或者你也不用引入ACK机制,记录下处理到哪一条了就行了。


以上就是本文的主要内容了,完整代码和示例请访问Github:https://github.com/bosima/dotnet-demo/tree/main/CSharp-SortedList


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK