4

七种武器:延迟队列的原理和实现总结

 2 years ago
source link: https://blog.yuanpei.me/posts/summary-of-the-principle-and-implementation-of-delay-queue/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

这是最好的时代,这是最坏的时代”,英国作家查尔斯·狄更斯在两百多年前写下的这句话,如果从辩证的角度来看,它或许可以适用于任何一个时代。我们生活在一个怎样的时代呢?我想,或许是一个矛盾的时代。因为,有时它让你对未来有无限的期待,有时它又会让你陷入无尽的绝望,特别是当集体和个人的命运形成强烈反差的时候,当实用主义、精致利己主义开始盛行的时候,我们偶尔会感慨罗曼蒂克的消亡、怀念从前慢、追忆芳华,可下一秒就被卷入到同时间赛跑的庸庸碌碌当中。生活节奏越来越快,人们越来越追求实时、速度、效率,选择当下的同时,意味着选择实时满足,譬如,我想吃一块美味的蛋糕,我现在就要吃。与之相对的,则被称之延迟满足,譬如,制定一个长期的写作计划以实现个人知识网络的构建。由此可见,人生本来就有快有慢、有张有弛,此时,便引入了这篇文章的主题——延迟队列。

什么是延迟队列

延迟队列,即 DelayQueue,所以,顾名思义,首先,它是一个队列,对于队列这种数据结构,相信大家都不陌生啦!这是一种先入先出(FIFO)的数据结构,就像现实生活中排队讲究先来后到一样,普通队列中的元素都是有序的。相比普通队列,延迟队列主要多了一个延迟的属性,此时,元素何时出队不再取决于入队顺序,而是入队时指定的延迟时间,它表示该元素希望在经过该指定时间后被处理。从某种意义上来讲,延迟队列更像是一种以时间作为权重的集合。我想,单纯地介绍概念,不一定能真正深入人心,所以,请允许我举几个生活中的例子:当你在网上购物的时候,如果下单后一段时间内没有完成付款,那这个订单就会被自动取消;当你通过 Outlook 预约了会议以后,Outlook 会在会议开始前 15 分钟提醒所有与会人员;当你在网上叫外卖以后,平台会在订单即将超时前 10 分钟通知外卖小哥…这样看起来,是不是顿时觉得延迟队列的使用场景还是挺广泛的呢?因为工作上的关系,博主接触类似场景的机会还是蛮多的,所以,想系统地研究下相关的技术,最终,就有了今天这篇博客,下面我们来看看具体的实现方式有哪些。

延迟队列的实现方式

延迟队列思维导图

我知道,在一个短视频横行的时代,人们的注意力注定要被那些实时满足的事物消耗掉,在我有预感到,不会有多少人愿意在我这篇自以为是的文字前驻留的时候,我唯有识趣地放出这个思维导图,TLDR的这种心理,其实我完全可以感同身受,因为看一部电影永远比看一本书容易,当媒介从文字变成图片再到视频,本质上是我们获取信息的能力下降了,我们变得只能接受低密度的信息。当然,这是一个时代的症结,你可以拥有你的选择,是独善其身还是随波逐流?

JDK 中提供了一个延迟队列的实现 DelayQueue,位于 Java.util.concurrent 这个包下面,它是一个 BlockingQueue,本质上封装了一个 PriorityQueue,队列中的元素只有到达了Delay时间,才允许从队列中取出。如下图所示,队列中放入三个订单,分别设置订单在当前时间的第 5、10、15 秒后取消:

延迟队列示意图

对于 Java 中的 DelayQueue 而言,其对应的代码实现如下面所示:

 1Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
 2Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
 3Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
 4
 5DelayQueue<Order> delayQueue = new DelayQueue<>();
 6delayQueue.put(Order1);
 7delayQueue.put(Order2);
 8delayQueue.put(Order3);
 9
10System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
11while (delayQueue.size() != 0) {
12    Order task = delayQueue.poll();
13    if (task != null) {
14        System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
15    }
16    Thread.sleep(1000);
17}

其中,Order 类要求实现 Delayed 接口,可以注意到这个 compareTo() 方法和 .NET 里的 IComparable 完全一样 :)

 1public class Order implements Delayed {
 2    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
 3    private long time;
 4    String name;
 5    
 6    public Order(String name, long time, TimeUnit unit) {
 7        this.name = name;
 8        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
 9    }
10    
11    @Override
12    public long getDelay(TimeUnit unit) {
13        return time - System.currentTimeMillis();
14    }
15
16    @Override
17    public int compareTo(Delayed o) {
18        Order Order = (Order) o;
19        long diff = this.time - Order.time;
20        if (diff <= 0) {
21            return -1;
22        } else {
23            return 1;
24        }
25    }
26}

此时,我们可以得到下面的结果,三个订单分别在第 5、10、15 秒后被执行,这样就实现了一个最简单的延时队列。我不会告诉你,为了得到这个演示结果,我特意搭建了一个 Java 环境:

Java 中的 DelayQueue 效果演示

.NET 中一直没有提供类似的实现,直到 .NET 6.0 中新增了 PriorityQueue 这个数据结构,它允许我们为队列中的元素定义一个优先级,此时,我们可以用下面的方法实现上面的功能:

 1var utcNow = DateTime.UtcNow;
 2var queue = new PriorityQueue<FooBar, long>();
 3queue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, new DateTimeOffset(utcNow.AddSeconds(5)).ToUnixTimeSeconds());
 4queue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, new DateTimeOffset(utcNow.AddSeconds(10)).ToUnixTimeSeconds());
 5queue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, new DateTimeOffset(utcNow.AddSeconds(15)).ToUnixTimeSeconds());
 6
 7while (queue.Count > 0)
 8{
 9    var current = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds();
10    var flag = queue.TryPeek(out var item, out var timestamp);
11    if (!flag || current < timestamp){
12        continue;
13    } else {
14        item = queue.Dequeue();
15        _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
16    }
17}

基本思路是,每次生成一个时间戳作为队列元素的“权重”,然后用当前时间和这个时间戳进行比较,如果时间到了,则从队列中出队,否则继续轮询:

.NET 中的 PriorityQueue 效果演示

可以注意到,它可以按照我们预期的时间和顺序,从队列中取出相应的元素,考虑到这个方法里使用了轮询,做法着实算不上优秀,不过对于我们理解 DelayQueue 非常有帮助,属于一种最基础的的实现。

接下来,我们来说第二种实现方式,定时任务,这种方式就非常的朴实无华啦,因为对于一个延迟执行的任务而言,其本质就是一个定点执行、执行一次的定时任务啦,所以,理论上普通的 Timer 一样可以做这件事情。不过,考虑到任务的持久化、分布式等等的问题,我们还是建议使用相对成熟的定时任务框架,例如 Quartz.NETHangfire 等等来实现。这里博主以 Quartz.NET 为例:

 1public async Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
 2{
 3    var jobDetail = JobBuilder.Create<DelayJob<T>>()
 4        .WithIdentity(Guid.NewGuid().ToString("N"), JobGroup)
 5        .UsingJobData(JobParameters, JsonConvert.SerializeObject(jobData))
 6        .Build();
 7
 8    jobDetail.JobDataMap[JobDelegate] = callback;
 9
10    var trigger = TriggerBuilder.Create()
11        .WithIdentity($"{jobDetail.Key.Name}Trigger", JobGroup)
12        .ForJob(jobDetail.Key)
13        .StartAt(DateTimeOffset.UtcNow.Add(delay))
14        .WithSimpleSchedule(x => x
15            .WithRepeatCount(0)
16            .WithIntervalInSeconds(0)
17        )
18        .Build();
19
20    await _scheduler.ScheduleJob(jobDetail, trigger);
21}

对于 Quartz 而言,核心的对象只有三个:JobTriggerSchedulerb,通过这三个对象,我们就可以创建一个定时任务,其中, DelayJob<T> 是表示一个带参数的任务,它实现了 IJob 接口,可以在任务执行时触发对应的委托:

 1internal class DelayJob<T> : IJob
 2{
 3    public Task Execute(IJobExecutionContext context)
 4    {
 5        var jobDetail = context.JobDetail;
 6        var callback = jobDetail.JobDataMap[QuartzDelayQueue.JobDelegate] as Action<T>;
 7        var jobData = context.MergedJobDataMap[QuartzDelayQueue.JobParameters]?.ToString();
 8        var jobParam = JsonConvert.DeserializeObject<T>(jobData);
 9        callback?.Invoke(jobParam);
10        return Task.CompletedTask;
11    }
12}

使用时非常简单,只要给一个延迟时间和回调函数即可:

1await _delayQueue.PutJob(
2    TimeSpan.FromSeconds(10),
3    new FooBar() { Foo = "Foo", Bar = "Bar" },
4    x => _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {x.Foo}, {x.Bar}.")
5);
基于 Quartz 实现延时任务

可以注意到,Quartz 在指定时间成功触发了回调函数,这样就达到了延时执行的目的。

Redis

接下来,分享两种基于 Redis 实现延迟队列的做法,分别基于 Redis 的 Key 过期机制 和 RedisZSet 结构,前者依赖 Redis 提供的发布-订阅机制,后者则是利用 ZSet 里每个成员的 score 属性实现排序。

基于 Redis 的 Key 过期机制

这个做法主要是利用 Redis 中的 Key 过期机制,简单来讲,就是利用 Redis 中的发布/订阅功能,如果我们开启了 Redis 的 Key 过期事件监听,那么,当某个 Key 过期的时候,Redis 就会把这条消息发布出来,通过订阅这个事件,从而达到延迟队列的效果。首先,确保 Redis 开启了 Key 过期事件监听,修改 Redis 的配置文件 redis.conf 如下:

notify-keyspace-events Ex

在这种情况下,如果我们为某一个 Key 指定了过期时间,那么,当到达这个过期时间以后,Redis 会向名为 __keyevent@0__:expired 的频道中推送一条消息,消息的内容为过期的这个 Key,其中 @0 表示默认的 Redis 库,这里以 CSRedis 这个库为例来进行演示:

 1public Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
 2{
 3    var guid = Guid.NewGuid().ToString("N");
 4
 5    // Default Database
 6    // EXPIRED_KEYS_CHANNEL = "__keyevent@{0}__:expired";
 7    var channel = string.Format(EXPIRED_KEYS_CHANNEL, 0);
 8
 9    _redisClient.Set(guid, jobData, delay);
10    _redisClient.Subscribe((channel, new Action<CSRedisClient.SubscribeMessageEventArgs>(msg =>
11    {
12        if (msg.Body != guid) return;
13        callback?.Invoke(jobData);
14    })));
15
16    _logger.LogInformation($"{DateTimeOffset.UtcNow}:Put a new delay job.");
17
18    return Task.CompletedTask;
19}

代码非常好理解,写入 Key 的时候设置一个过期时间,然后订阅 Key 过期的事件,因为 Key 过期事件的内容就是对应的 Key,所以,需要做一次判断避免重复触发。此时,我们可以得到下面的结果:

基于 Redis 的 Key 过期机制实现延迟队列

可以注意到,该任务在第 29 秒时创建,经过 5 秒后,因为 Key 过期而触发回调函数。需要说明的是,Redis 里的发布/订阅是不保证可靠性的,针对所有试图通过 Redis 实现消息队列的想法,我只想说,如果数据量不大,并且不需要可靠性保证的话,可以凑活着用一用,否则,还是建议使用专业的消息队列。

基于 Redis 的 ZSet 结构

接下来,我想介绍的是 Redis 中的 ZSet,即有序集合。其实,从一开始的 DelayQueue 大家就能注意到一件事情,那就是这个延迟队列最重要的是,要给一个“权重”来实现排序。所以,在 .NET 6.0 没有发布以前,人们为了实现类似 DelayQueue 的数据结构,通常只能通过 SortedList 这个类型来实现,感兴趣的朋友不妨参考这个项目:DelayQueue,这里面最大的难点是什么呢?SortedList是一个线程不安全的集合,需要考虑锁的问题,这说明什么呢?这说明模拟 DelayQueue 的关键是找到这样一个有序集合,显然 ZSet 刚好就是这样一个类型,它里面有一个 score 属性,我们只需要把延迟时间放到这个属性上即可。

 1public class ZSetDelayQueue<T> where T : class
 2{
 3    private readonly CSRedisClient _redisClient;
 4    private const string QueueName = "DelayQueue";
 5
 6    public ZSetDelayQueue(CSRedisClient redisClient)
 7    {
 8        _redisClient = redisClient;
 9    }
10
11    public Task Enqueue(T item, TimeSpan delay)
12    {
13        var score = new DateTimeOffset(DateTime.UtcNow.Add(delay)).ToUnixTimeSeconds();
14        _redisClient.ZAdd(QueueName, (score, JsonConvert.SerializeObject(item)));
15        return Task.CompletedTask;
16    }
17
18    public async Task<T> Dequeue()
19    {
20        var score = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds(); ;
21        var records = _redisClient.ZRangeByScore(QueueName, 0, score, 1);
22        if (records.Count() > 0) {
23            var item = JsonConvert.DeserializeObject<T>(records[0]);
24            await _redisClient.ZRemAsync(QueueName, item);
25            return item;
26        }
27
28        return null;
29    }
30
31    public bool IsEmpty()
32    {
33        var count = _redisClient.ZCount(QueueName, 0, decimal.MaxValue);
34        return count == 0;
35    }
36}

好了,现在一切都顺利成章了,元素入队的时候计算出对应的时间戳,这个时间戳就是 ZSet 里的 score 属性,调用ZAdd() 即可;同理,元素出队,则是利用 ZRangeByScore() 返回从 0 到 当前时间戳内的一个元素,显然,如果当前时间戳大于或者等于该元素的时间戳,表示这个元素设定的延迟时间已经到了,此时,我们需要调用ZRem() 命令将其从集合中移除,和 Java 里面的 DelayQueue 类似,Redis 会按照 score 属性由小到大排序,这样时间早的会被先取出来,时间晚的会被后取出来,不得不说,这一切堪称完美,接下来就非常简单啦!

 1var redisZSetDelayQueue = _serviceProvider.GetService<ZSetDelayQueue<FooBar>>();
 2await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, TimeSpan.FromMinutes(1));
 3await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, TimeSpan.FromMinutes(2));
 4await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, TimeSpan.FromMinutes(3));
 5
 6while (!redisZSetDelayQueue.IsEmpty())
 7{
 8    var item = await redisZSetDelayQueue.Dequeue();
 9    if (item == null) {
10        continue;
11    } else {
12        _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
13    }
14}

这个就和一开始的例子非常接近了,对吧? 效果如何呢,我们一起来看看:

基于 Redis 的 ZSet 类型实现延迟队列

可以看到,三个任务分别在 1 分钟、2 分钟 和 3 分钟后执行,这个延迟队列,个人表示还行,哈哈!事实上,基于 Redis 的延迟队列,业界的方案还是蛮多的,个人比较推荐 有赞 技术团队的方案,感兴趣的朋友可以在本文的基础上做进一步的探究,我个人关注这个话题,是因为我不太喜欢定时任务轮询的做法,虽然这是一种万金油式的做法,我个人更喜欢下面的做法。

OK,提到消息队列的话,参照面试八股文,我们会说,消息队列最主要的作用是削峰平谷,换句话说,消息队列可以将短时间内堆积的大量的请求任务“削峰”,然后“平摊”到平时请求任务较少的时段,所以,好像平时一提起 RabbitMQ 或者 Kafka 这样的东西,大家脑海中浮现出来的就是高并发、高吞吐、高性能这种类似糖尿病“三多一少”的存在,回顾我们一开始从生活中得到的启示,有没有一种可能,我们使用消息队列,并不单单是为了让这条消息被快速地消费,而是可以“让子弹飞一会儿”呢?我想,一切皆有可能。下面,我们以 RabbitMQ 为例,来展示如何实现一个延迟队列:

RabbitMQ 死信队列工作流程示意图

如图所示,假设消息发送方把消息投递到延迟交换机 default.delay.exchange,该交换机绑定了延迟队列 default.delay.queue,显然,正常情况下,消息会出现在这个延迟队列中。接下来,为了让死信机制生效,我们必须对这个延迟队列做一点设置,这里主要有三个参数,x-message-ttl 表示队列中消息的存活时间,x-dead-letter-exchange 表示消息过期以后再次投递时的死信交换器,x-dead-letter-routing-key 表示消息过期以后再次投递时的路由键名。通常情况下,在 RabbitMQ 中消息进入死信队列的前提有三种,即消息过期、队列已满和消息被拒绝。其中,x-max-lengthx-max-length-bytes 这两个属性,可以分别用来指队列中的最大消息数、最大字节数、而消息被拒绝,则是指主动调用BasicReject() 方法,针对这两种情况触发的死信,我们这里可以不用太关心,因为我们显然考虑的是因为消息过期而触发的死信。OK,讲完了理论,我们来看看代码层面具体是如何实现的吧!

 1using (var channel = _connection.CreateModel())
 2{
 3    // 普通/延迟交换机 default.delay.exchnage
 4    var exchangeNormal = "default.delay.exchnage";
 5    channel.ExchangeDeclare(exchangeNormal, "direct", true, false, null);
 6
 7    // 普通/延迟队列
 8    var queueNormal = "default.delay.queue";
 9    var arguments = new Dictionary<string, object>
10    {
11        ["x-message-ttl"] = 5000,
12        ["x-dead-letter-exchange"] = "default.deadletter.exchange",
13        ["x-dead-letter-routing-key"] = "dead.routingKey"
14    };
15    channel.QueueDeclare(queue: queueNormal, true, false, false, arguments: arguments);
16
17    // 绑定交换器
18    channel.QueueBind(queueNormal, exchangeNormal, "normal.routingKey");
19
20    // 发送消息
21    var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(jobData));
22    var properties = channel.CreateBasicProperties();
23    properties.DeliveryMode = 2;
24    channel.BasicPublish(exchange: exchangeNormal, routingKey: "normal.routingKey", mandatory: true, basicProperties: properties, body: body);
25}

简单来说,某一个队列如果需要死信队列,那么你就需要为其设置x-message-ttlx-dead-letter-exchangex-dead-letter-routing-key 这三个属性即可,你完全不用关心消息是如何投递到这个死信队列中,而对于消息的消费者来说,它只需要从这个死信队列中接收消息即可,因为能被投递到死信队列里的消息,一定是因为消息时间到了或者说过期了,这样就等于间接实现了延迟队列:

 1// 死信交换机 default.deadletter.exchange
 2var exchangeDead = "default.deadletter.exchange";
 3_consumerChannel.ExchangeDeclare(exchangeDead, "direct", true, false, null);
 4
 5// 死信队列 default.deadletter.queue
 6var queueDead = "default.deadletter.queue";
 7_consumerChannel.QueueDeclare(queue: queueDead, true, false, false, null);
 8
 9// 绑定交换器
10_consumerChannel.QueueBind(queueDead, exchangeDead, "dead.routingKey");
11
12// 消费消息
13_basicConsumer = new EventingBasicConsumer(_consumerChannel);
14_consumerChannel.BasicConsume(queue: queueDead, autoAck: false, consumer: _basicConsumer);
15_basicConsumer.Received += (s, e) =>
16{
17    var body = Encoding.UTF8.GetString(e.Body.ToArray());
18
19    // TODO:
20
21    _consumerChannel.BasicAck(e.DeliveryTag, false);
22};

如下图所示,消息首先会被发送到延迟队列 default.delay.queue 中,此时,消息还没有过期,不会触发死信机制,注意到,这时候队列中会有 4 条消息:

RabbitMQ 死信队列工作流程-01

一段时间后,消息过期,触发死信机制。此时,消息会被在再次转发到死信交换机 default.deadletter.exchange 中,并最终达到死信队列 default.deadletter.queue

RabbitMQ 死信队列工作流程-02

至此,我们就利用 RabbitMQ 里的 TTL + DLX 特性实现了一个延迟队列,达到了延迟执行的目的。不过,只要你使用消息队列,就一定会遇到消息堆积的问题,而一旦发生消息堆积,延迟执行的这个时间可能就会不准,如果你特别看重这个时间准确与否,那么,实际运作中还有一部分工作完要做。我们目前用定时任务轮训的做法,最大的问题是它产生大量重复且无用的请求,每天单单是相关日志就上百兆,这就算是我下班以后的一点探索,我现在依然觉得,那个定时任务的 API 设计得莫名其妙。

RabbitMQ 死信队列工作流程-03

最后,我们来说说 Kafka,虽然 Kafka 单机的 QPS 要远远超过 RabbitMQ 1 到 2 个数量级,但这种快是以牺牲一部分功能作为代价的,像典型的重试和死信,这两样儿都需要使用者自己去实现,比如死信,我们现在是为每个 topic 创建一个对应的死信的 topic 来实现的,比如,我们有一个 topic 叫做 orderInfo,与之相对应地,我们会同时创建一个叫做 orderInfo_DLQ 的 topic,作为它的死信队列。当然,你还需要一个机制去收集和转发过期消息,基本上你还是需要一个 Timer 去做某种轮询,也许,是因为它选择了 Kafka,所以,需要一个定时任务系统来作为补充,毕竟,技术选型这种问题,注定是要政治正确的啦!

《七种武器》是著名武侠小说家古龙先生的代表作之一,原本指长生剑、孔雀翎、碧玉刀、多情环、霸王枪、离别钩等七种精妙绝伦的武器,这里则是用来指实现延迟队列的各种方法,延迟队列适用于那些需要延迟执行的场合,在如今这样一个追求实时性、快节奏生活的时代,人们对快乐和满足的要求有实时和延时的区别,用罗翔老师的话来讲,即时快乐是一种低级的快乐,是一种短暂的、易得的快乐,从这个角度来看,延时满足则是一种需要培养和付出的高级快乐。此中优劣,我们不必去分个泾渭分明,就像这些不同的实现方式,更多的只是场景上的差异,而非功能上的差异,延迟队列可以认为是一种以时间作为权重的、有序的集合;Java 里的 DelayQueue,.NET 里的 PriorityQueue,可以实现进程内的、单机版的延迟队列;而像 QuartzHangfire 这类任务调度系统,则可以更精确地控制时间;通过 Redis 里的发布-订阅、ZSet,我们让 DelayQueue 离分布式稍微接近了一点;而 RabbitMQ 里的 TTL + DLX 特性,则让博主比两年前更加理解死信队列……这难道不是一种延时满足吗?你以前不明白的概念,有一天突然有了新的认识,我想,这就是整个过程的意义所在。当然,时间轮算法对我来说还有一点点难,我能留到未来的某一天争取搞懂它吗?好了,以上就是这篇博客的全部内容啦,祝各位晚安,谢谢大家。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK