3

延迟队列实现订单超时自动取消 - 码出code

 1 year ago
source link: https://www.cnblogs.com/jeremylai7/p/17174452.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在上一篇 Java 实现订单未支付超时自动取消,使用Java自带的定时任务TimeTask实现订单超时取消,但是有小伙伴提出这种实现,会有以下几个问题:

  • 线上服务挂了,导致服务下所有的定时任务失效。
  • 服务重启,定时任务也会失效。
  • 服务上线需要发布新的服务,原来服务也会关闭。

针对上述服务挂了、或者服务重启导致消息失效的问题,需要使用独立于项目的服务,比如消息中间件,比如Redis或者RabbitMQ。本文主要讲解消息队列RabbitMQ

创建一个订单,超时30分钟未支付就取消订单。

RabbitMQ本身是不支持延迟队列的,但可以利用RabbitMQ存活时间 + 死信队列来实现消息延迟。

TTL + DLX

存活时间 TTL

TTL全称为:time to live,意思为存活时间,当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息会被自动删除

RabbitMQ支持两种TTL设置:

  • 对消息本身设置存活时间,每条消息的存活时间可以灵活设置为不同的存活时间。
  • 对传递的队列设置存活时间,每条传到到队列的过期时间都一致。

当消息过期还没有被消费,此时消息会变成死信消息dead letter这是实现延迟队列的关键

消息变为死信的条件:

  • 消息被拒绝basic.reject/basic.nack,并且requeue=false
  • 消息的过期时间到期了。
  • 队列达到最大长度。

死信交换机 DLX

当上面的消息变成死信消息之后,它不会立即被删除,首先它要看有没有对应的死信交换机,如果有绑定的死信交换机,消息就会从发送到对应的死信交换机上。

DLX全程为Dead Letter Exchanges,意思为死信交换机。

死信交换机和普通交换机没什么区别,不同的是死信交换机会绑定在其他队列上,当队列的消息变成死信消息后,死信消息会发送到死信交换上。

队列绑定死信交换机需要两个参数:

  • x-dead-letter-exchange: 绑定的死信交换机名称。
  • x-dead-letter-routing-key: 绑定的死信交换机routingKey

死信交换机和普通交换机的区别就是死信交换机的ExchangeroutingKey作为绑定参数,绑定在其他队列上。

消息发送的流程图:

  • 生产者将带有TTL的消息发送给交换机,由交换机路由到队列中。
  • 队列由于没有消费,消息一直停留在队列中,一直等到消息超时,变成死信消息。
  • 死信消息转发到死信交换机在路由到死信队列上,最后给消费者消费。

创建死信队列

@Configuration
public class DelayQueueRabbitConfig {
  // 下面是死信队列
	/**
	 * 死信队列
	 */
	public static final String DLX_QUEUE = "queue.dlx";

	/**
	 * 死信交换机
	 */
	public static final String DLX_EXCHANGE = "exchange.dlx";

	/**
	 * 死信routing-key
	 */
	public static final String DLX_ROUTING_KEY = "routingKey.dlx";


	/**
	 * 死信队列
	 * @return
	 */
	@Bean
	public Queue dlxQueue() {
		return new Queue(DLX_QUEUE,true);
	}

	/**
	 * 死信交换机
	 * @return
	 */
	@Bean
	public DirectExchange dlxExchange() {
		return new DirectExchange(DLX_EXCHANGE,true,false);
	}

	/**
	 * 死信队列和交换机绑定
	 * @return
	 */
	@Bean
	public Binding bindingDLX() {
		return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
	}
}

创建延迟队列,并绑定死信队列

  // 下面的是延迟队列
	/**
	 * 订单延迟队列
	 */
	public static final String ORDER_QUEUE = "queue.order";

	/**
	 * 订单交换机
	 */
	public static final String ORDER_EXCHANGE = "exchange.order";

	/**
	 * 订单routing-key
	 */
	public static final String ORDER_ROUTING_KEY = "routingkey.order";


	/**
	 * 订单延迟队列
	 * @return
	 */
	@Bean
	public Queue orderQueue() {
		Map<String,Object> params = new HashMap<>();
		params.put("x-dead-letter-exchange", DLX_EXCHANGE);
		params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
		return new Queue(ORDER_QUEUE, true, false, false, params);
	}

	/**
	 * 订单交换机
	 * @return
	 */
	@Bean
	public DirectExchange orderExchange() {
		return new DirectExchange(ORDER_EXCHANGE,true,false);
	}

	/**
	 * 订单队列和交换机绑定
	 * @return
	 */
	@Bean
	public Binding orderBinding() {
		return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
	}

绑定死信交换通过添加x-dead-letter-exchangex-dead-letter-routing-key参数指定对应的交换机和路由。

设置五秒超时时间

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
        String date = DateUtil.dateFormat(new Date());
        String delayTime = "5000";
        System.out.println("【发送消息】延迟 5 秒 发送时间 " + date);
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
       return "ok";         
    }
    
    class DateUtil{
       public static String dateFormat(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(date);
      }
    } 
}    
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msg,Channel channel,Message message){
    System.out.println("【接收消息】" + msg + " 接收时间" + DateUtil.dateFormat(new Date()));
}
    

控制台输出

【发送消息】延迟5 秒 发送时间 21:32:15
【接收消息】延迟5 秒 发送时间 21:32:15 接收时间21:32:20

发送消息,5秒之后消费者后会收到消息。说明延迟成功。

队列都有先进先出的特点,如果队列前面的消息延迟比后的消息延迟更长,会出现什么情况。

消息时序问题

发送三条消息,延迟分别是10s2s5s

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("/dlx")
    public String dlx() {
         dlxSend("延迟10秒","10000");
         dlxSend("延迟2 秒","2000");
         dlxSend("延迟5 秒","5000");
         return "ok";
    }
    
    private void dlxSend(String message,String delayTime) {
         System.out.println("【发送消息】" + message +  "当前时间" + DateUtil.dateFormat(new Date()));
         rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
    }

控制台输出:

【发送消息】延迟10秒当前时间21:54:36
【发送消息】延迟2 秒当前时间21:54:36
【发送消息】延迟5 秒当前时间21:54:36
【接收消息】延迟10秒 当前时间21:54:46
【接收消息】延迟2 秒 当前时间21:54:46
【接收消息】延迟5 秒 当前时间21:54:46

所有的消息都要等10s的消息消费完才能消费,当10s消息未被消费,其他消息也会阻塞,即使消息设置了更短的延迟。因为队列有先进先出的特征,当队列有多条消息,延迟时间就没用作用了,前面的消息消费后,后的消息才能被消费,不然会被阻塞到队列中。

插件实现解决消息时序问题

针对上面消息的时序问题,RabbitMQ开发一个延迟消息的插件delayed_message_exchange,延迟消息交换机。使用该插件可以解决上面时序的问题。

Github官网找到对应的版本,我选择的是3.8.17

将文件下载下来放到服务器的/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目录下,执行以下命令,启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动插件,交换机会有新的类型x-delayed-message:

x-delayed-message类型的交换机,支持延迟投递消息。发送消息给x-delayed-message类型的交换流程图:

  • x-delayed-message类型的交换机接收消息投递后,并未将直接路由到队列中,而是存储到mnesia(一个分布式数据系统),该系统会检测消息延迟时间。
  • 消息达到可投递时间,消息会被投递到目标队列。

配置延迟队列

@Configuration
public class XDelayedMessageConfig {
  /**
	 * 队列
	 */
	public static final String DIRECT_QUEUE = "queue.delayed";

	/**
	 * 延迟交换机
	 */
	public static final String DELAYED_EXCHANGE = "exchange.delayed";

	/**
	 * 绑定的routing key
	 */
	public static final String ROUTING_KEY = "routingKey.bind";

	@Bean
	public Queue directQueue() {
		return new Queue(DIRECT_QUEUE,true);
	}

	/**
	 * 定义延迟交换机
	 * 交换机的类型为 x-delayed-message
	 * @return
	 */
	@Bean
	public CustomExchange delayedExchange() {
		Map<String,Object> map = new HashMap<>();
		map.put("x-delayed-type","direct");
		return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
	}

	@Bean
	public Binding delayOrderBinding() {
		return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
	}

}

发送消息:

    @GetMapping("/delay")
    public String delay() {
	    delaySend("延迟队列10 秒",10000);
	    delaySend("延迟队列5 秒",5000);
	    delaySend("延迟队列2 秒",2000);
        return "ok";
    }
    
    private void delaySend(String message,Integer delayTime) {
        message = message + " " + DateUtil.dateFormat(new Date());
        System.out.println("【发送消息】" + message);
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setDelay(delayTime);
                    //message1.getMessageProperties().setHeader("x-delay",delayTime);
                    return message1;
                });
    }    

消费消息:

    @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
    public void delayProcess(String msg,Channel channel, Message message) {
        System.out.println("【接收消息】" + msg + " 当前时间" + DateUtil.dateFormat(new Date()));
   }

控制台输出:

【发送消息】延迟队列10 秒 22:00:01
【发送消息】延迟队列5 秒 22:00:01
【发送消息】延迟队列2 秒 22:00:01
【接收消息】延迟队列2 秒 22:00:01 当前时间22:00:03
【接收消息】延迟队列5 秒 22:00:01 当前时间22:00:05
【接收消息】延迟队列10 秒 22:00:01 当前时间22:00:10

解决了消息的时序问题。

  • 使用Java自带的延迟消息,系统重启或者挂了之后,消息就无法发送,不适于用在生产环境上。
  • RabbitMQ本身不支持延迟队列,可以使用存活时间ttl + 死信队列dlx实现消息延迟。
    • 发送的消息设置ttl,所在的队列不设置消费者。
    • 队列绑定死信队列,消息超时之后,变成死信消息,再发送给死信队列,最后发送给消费者。
  • 发送多条不同延迟时间消息,前面消息没有到延迟时间,会阻塞后面延迟更低的消息,因为队列有先进先出的特性。
  • RabbitMQx-delay-message插件可以解决消息时序问题。
    • 带有ttl的消息发送x-delayed-message类型的交换机,消息不会直接路由到队列中。而且存储到分布式数据系统中,该系统会检测消息延迟时间。
    • 消息到达延迟时间,消息才能会投递到队列中,最后发送给消费者。

Github 源码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK