![](/style/images/good.png)
![](/style/images/bad.png)
rocketmq-spring : 实战与源码解析一网打尽 - 勇哥编程游记
source link: https://www.cnblogs.com/makemylife/p/17280455.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。
这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑。
![up-2a1bfce5e946b03d8d20727b33e20cd3610.png](https://oscimg.oschina.net/oscnet/up-2a1bfce5e946b03d8d20727b33e20cd3610.png)
1 SDK 简介
![up-24ce02d7ee3bdba4e10a358773de2425468.png](https://oscimg.oschina.net/oscnet/up-24ce02d7ee3bdba4e10a358773de2425468.png)
项目地址:
rocketmq-spring 的本质是一个 Spring Boot starter 。
Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。
Spring Boot starter 构造的启动器使用起来非常方便,开发者只需要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。
下面我们看下 rocketmq-spring-boot-starter 的配置:
1、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2、约定配置
![up-f30849217013b74f2995b5d348f0f7a016f.png](https://oscimg.oschina.net/oscnet/up-f30849217013b74f2995b5d348f0f7a016f.png)
接下来,我们分别按照生产者和消费者的顺序,详细的讲解消息收发的操作过程。
首先我们添加依赖后,进行如下三个步骤:
1、配置文件中配置如下
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: platform-sms-server-group
# access-key: myaccesskey
# secret-key: mysecretkey
topic: sms-common-topic
生产者配置非常简单,主要配置名字服务地址和生产者组。
2、需要发送消息的类中注入 RcoketMQTemplate
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic}")
private String smsTopic;
3、发送消息,消息体可以是自定义对象,也可以是 Message 对象
rocketMQTemplate 类包含多钟发送消息的方法:
- 同步发送 syncSend
- 异步发送 asyncSend
- 顺序发送 syncSendOrderly
- oneway发送 sendOneWay
下面的代码展示如何同步发送消息。
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
}
}
syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags ,
第二个参数是:spring-message 规范的 message 对象 ,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。
1、配置文件中配置如下
rocketmq:
name-server: 127.0.0.1:9876
consumer1:
group: platform-sms-worker-common-group
topic: sms-common-topic
2、实现消息监听器
@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", //消费组
topic = "${rocketmq.consumer1.topic}" //主题
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("普通短信:" + message);
}
}
消费者实现类也可以实现 RocketMQListener<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据 。
public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), "UTF-8");
logger.info("普通短信:" + message);
} catch (Exception e) {
logger.error("common onMessage error:", e);
}
}
4 源码概览
![up-d0d1a0a912779866786f0af63e8213c02a3.png](https://oscimg.oschina.net/oscnet/up-d0d1a0a912779866786f0af63e8213c02a3.png)
最新源码中,我们可以看到源码中包含四个模块:
1、rocketmq-spring-boot-parent
该模块是父模块,定义项目所有依赖的 jar 包。
2、rocketmq-spring-boot
核心模块,实现了 starter 的核心逻辑。
3、rocketmq-spring-boot-starter
SDK 模块,简单封装,外部项目引用。
4、rocketmq-spring-boot-samples
示例代码模块。这个模块非常重要,当用户使用 SDK 时,可以参考示例快速开发。
5 starter 实现
我们重点分析下 rocketmq-spring-boot 模块的核心源码:
![up-3a29f1daa24e10f4bd8051c24bbc9126ba2.png](https://oscimg.oschina.net/oscnet/up-3a29f1daa24e10f4bd8051c24bbc9126ba2.png)
spring-boot-starter 实现需要包含如下三个部分:
1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;
2、定义spring.factories 文件
在 resources 包下创建 META-INF 目录后,新建 spring.factories 文件,并在文件中定义自动加载类,文件内容是:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。
3、实现自动加载类
在 RocketMQAutoConfiguration 类的具体实现中,我们重点分析下生产者和消费者是如何分别启动的。
▍生产者发送模板类:RocketMQTemplate
RocketMQAutoConfiguration 类定义了两个默认的 Bean :
![up-869c8a4c21f2319dcdf07dbe0c2b3b17f08.png](https://oscimg.oschina.net/oscnet/up-869c8a4c21f2319dcdf07dbe0c2b3b17f08.png)
首先SpringBoot项目中配置文件中的配置值会根据属性条件绑定到 RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate 对象中。
两个重点需要强调:
-
发送消息时,将 spring-message 规范下的消息对象封装成 RocketMQ 消息对象
-
默认拉取消费者 litePullConsumer 。拉取消费者一般用于大数据批量处理场景 。
RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。
![up-9cadfb3630db144943ce42e9ab8a5a5f792.png](https://oscimg.oschina.net/oscnet/up-9cadfb3630db144943ce42e9ab8a5a5f792.png)
▍自定义消费者类
下图是并发消费者的例子:
![消费者示例代码](https://oscimg.oschina.net/oscnet/up-ff48ee7324c64f181529ec6c41e5d8cf089.png)
那么 rocketmq-spring 是如何自动启动消费者呢 ?
![up-5a880e266ef563a75f7b4c90e6a9257e8e4.png](https://oscimg.oschina.net/oscnet/up-5a880e266ef563a75f7b4c90e6a9257e8e4.png)
spring 容器首先注册了消息监听器后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。
对比并发消费者的例子,我们可以看到: DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 消费逻辑的封装。
![up-78beb4468f1372129f13ba5f2f01868d010.png](https://oscimg.oschina.net/oscnet/up-78beb4468f1372129f13ba5f2f01868d010.png)
封装消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。
首先DefaultRocketMQListenerContainer初始化之后, 获取 onMessage 方法的参数类型 。
![up-6ca2839044cdbcc20f49ae54104c5be55ee.png](https://oscimg.oschina.net/oscnet/up-6ca2839044cdbcc20f49ae54104c5be55ee.png)
然后消费者调用 consumeMessage 处理消息时,封装了一个 handleMessage 方法 ,将原生 RocketMQ 消息对象 MessageExt 转换成 onMessage 方法定义的参数对象,然后调用 rocketMQListener 的 onMessage 方法。
![mnjh9](https://oscimg.oschina.net/oscnet/up-59f548d1cf98228a35a035274bff4ae8324.png)
上图右侧标红的代码也就是该方法的精髓:
rocketMQListener.onMessage(doConvertMessage(messageExt));
6 写到最后
开源项目 rocketmq-spring 有很多值得学习的地方 ,我们可以从如下四个层面逐层进阶:
1、学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;
2、模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);
3、starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;
4、举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。
如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK