35

Spring Boot 整合 RabbitMQ

 3 years ago
source link: http://www.eknown.cn/index.php/default/springboot-rabbitmq.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

上一节介绍了 RabbitMQ 的基本概念,这一节介绍 RabbitMQ 的安装使用,以及 SpringBoot 中如何整合使用 RabbitMQ,将介绍 direct/fanout/topic 三种类型的交换器的使用,最后介绍死信队列的概念与使用。

本节示例代码已经上传至 GitHub:https://github.com/laolunsi/spring-boot-examples,该仓库内容包括 spring/springboot/springcloud,希望可以对你有所帮助!

下面进入正文。


RabbitMQ 安装

从官网下载安装包:https://www.rabbitmq.com/download.html


以 windows 为例,直接下载 EXE 文件:

image-1599987824553.png

运行 EXE 文件,等待安装完成后。可以在 windows 的应用中看到 RabbitMQ Service,启动它。

image-1599987854463.png

开启可视化管理插件,找到 RabbitMQ 的安装目录,切换到 sbin 文件夹下,打开命令行,输入:

rabbitmq-plugins enable rabbitmq_management
image-1599987882311.png

打开浏览器,输入:http://127.0.0.1:15672,可以看到管理页面了。在这个页面,我们可以手动操作 RabbitMQ,包括新建 exchange,新建队列,设置队列绑定,查看队列信息等等。


注:rabbitmq 默认的账号和密码都是 guest !

image-1599987910183.png

下面我们快速的看一下 Spring Boot 中如何整合使用 RabbitMQ


SpringBoot 中 RabbitMQ 基本使用

上一节提到 RabbitMQ 中的 exchange 有四种类型,其中 headers 类型很少使用,这里分别以 direct/fanout/topic 三种 exchange 类型为例,记录 默认/广播/模式匹配 这三种常见场景中 RabbitMQ 的使用。  


Spring Boot 提供了快速接入 RabbitMQ 的依赖,我们只要引入这个依赖并添加很少的一些配置就可以了!

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

direct

producer

在配置文件中添加 RabbitMQ 的配置信息:

server:
  port: 8701
spring:
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
    port: 5672
    virtual-host: /

发送消息:

@RestController
@RequestMapping(value = "msg")
public class HelloAction {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value = "direct")
    public String sayHello(String msg) {
        rabbitTemplate.convertAndSend("demo", msg);
        return "消息发送成功";
    }
}

这时候 demo 队列可能不存在,我们可以手动在 mq 中的创建,也可以通过 @Bean 来 declare:

@Configuration
public class RabbitConfig {
    @Bean
    public Queue demo() {
        return new Queue("demo", true);
    }
}
image-1599987945914.png

consumer

同样创建一个 spring boot 项目,引入 rabbitmq 依赖,并添加相关配置。
利用 @RabbitListener 注解接收数据:

@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"demo"})
    public void consume(Message message, Channel channel) throws IOException {
        System.out.println("接收到消息:" + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

启动项目:

image-1599987973765.png

这里用的是 Message 类型,我们可以通过 @Payloa 注解来直接接收 body 数据:

    @RabbitListener(queues = {"demo"})
    public void consume(@Payload String body, Channel channel) throws IOException {
        System.out.println("接收到消息:" + body);
    }

Spring Boot RabbitMQ 中可以使用 @RabbitListener 和 @RabbitHandler 两个注解联合来从同样的接口中接收不同参数类型的数据并处理,比如 String、Object 等。
不过个人建议发送数据的时候直接用 String,如果是对象,可以用 JSON 工具将对象转换为字符串。接收数据时再用 JSON 工具将字符串还原为对象。


注意,在这种情况下,我们使用的默认的 direct exchange,默认的 routing key 就是队列名。
如果需要自定义 exchange,那么就要指定 binding。比如新建一个 direct 类型的 exchange,名为 demoex,然后添加一个 binding 到 demo,这个 routing key 是完全匹配的:

image-1599987997265.png

fanout

fanout 类型的 exchange 表示广播。
我们这里创建三个队列:pub_demo1, pub_demo2, pub_demo3
然后创建一个 exchange:demoex_fanout
将这三个队列绑定到这个exchange上,不需要 routing_key。这些操作都是通过 management 页面进行的。

image-1599988018343.png

发送广播消息:

    @GetMapping(value = "broadCast")
    public String broadCast(String msg) {
        // 广播消息到 demoex_public 这个 exchange 绑定的所有队列
        rabbitTemplate.convertAndSend("demoex_fanout", "", msg);
        return "广播成功";
    }

测试接口,可以看到这三个队列中都有数据了:

image-1599988037997.png

topic

topic 的模式匹配包括两个通配符:#和 ,其中 # 匹配 0 或多个单词, 匹配一个单词。
需要注意的是,这里的匹配符是用在 exchange 的 bindling 里的,如下,将 pat_demo1,2,3 绑定到 demoex_topic 这个 topic 类型的 exchange 上:

image-1599988056341.png
    @GetMapping(value = "pattern")
    public String pattern(String msg) {
        rabbitTemplate.convertAndSend("demoex_topic", "demo.a.b.c", msg);
        return "模式传播成功";
    }

查看 mq 中的 pat_demo1,2,3 队列:

image-1599988079583.png

死信队列的使用

思考一个问题,如果某个消息被消费者 nack 或者 reject 怎么办?这些消息是直接丢掉还是返回到原队列中?如果这些不能被消费的消息堆积,超过最大队列长度怎么办?

死信消息与死信队列的概念

首先需要理解一个概念 —— 死信消息,即不能被消费的消息。当一个消息出现以下情况之一时,消息会变成死信消息:

  • 消息被否认确定,使用 channel.basicNack 或 channel.basicReject,且设置了 requeue = false
  • 消息在队列中的存活时间超过了设置的 TTL 
  • 消息队列的消息数量超过了最大队列长度

出现死信消息时,要怎么处理呢?
如果消息不重要,那就直接丢掉好了,最多记个日志方便回查。如果消息比较重要呢,最好还是有个专门的程序来处理这些死信消息。在 RabbitMQ 中,可以利用死信队列来处理死信消息。


死信队列是怎么配置的?它又是怎么工作的呢?
非常简单!
死信队列也是普通队列,只是在原队列 A 上配置一下死信队列 D 的参数,在概念上,这个队列 D 就变成了死信队列。

Spring Boot 使用死信队列

可以通过 RabbitMQ 的面板来绑定死信队列,也可以用程序来指定参数。

首先创建一下死信队列需要的交换器和队列,direct exchange 取名 demoex_dead,然后创建队列 demo_dead,绑定到 demoex_dead 这个交换器上。

下面分别以面板和程序来展示如何给 demo 队列绑定死信队列:

使用面板:

image-1599988102397.png

使用程序:

@Configuration
public class RabbitConfig {
    
    @Bean
    public Queue demo() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "demoex_dead");
        args.put("x-dead-letter-routing-key", "demo_dead");
        return new Queue("demo", true, false, false, args);
    }
}

绑定好死信队列后,我们来测试一下:

@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"demo"})
    public void consume(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        if (msg.contains("dead")) {
            System.out.println("拒绝包含dead的消息:" + msg);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            System.out.println("接收到正常消息:" + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = {"demo_dead"})
    public void consumeDead(Message message, Channel channel) throws IOException {
        System.out.println("死信队列消息:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

测试,首先推送消息 deadA,发现消息被放到死信队列了。
然后推送消息 da,消息接收正常。

image-1599988124932.png

好了,到此为止,我们已经介绍了 RabbitMQ 的基础知识、如何安装 RabbitMQ ,以及 SpringBoot 如何快速接入 RabbitMQ 了。
希望我的文章对你有所帮助!thanks for your reading...


看了一下,大概有一个月没更新了,最近比较忙,心情也比较累,困于生活,找不到理想的方向。很难用言语去表达吧,身边也没什么可以倾诉的人,而越是这种糟糕的情况,越导致个人工作学习效率低下,很多时候时候都像是在假装忙碌。


有时候挺惆怅的,感觉看不到方向。有时候做了决定之后,又在怀疑自己的选择是错的,但不选择,就不会有新的机会吧。在漫无边际的荒野里,到底是向目标前进,还是在远离呢?


无论如何,希望我们都能在满是泥泞的日子里,孤独与寂寥的日子里,保持斗志,稳定心态,默默积攒力量,不说什么茧化蝶之类的玄乎话,就简简单单,能够早一点过上自己想要的生活吧!


与君共勉!与君共期!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK