
7

快速入门使用ActiveMQ
source link: https://segmentfault.com/a/1190000040290217
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

MQ老演员了,这篇文章主要是记录ActiveMQ从下载到应用到项目中的去的一个配置、编写的过程
让大家快速上配置到应用
下载并修改配置
首先,下载ActiveMQ,这是官网下载地址:https://activemq.apache.org/c...
下载下来之后,找到config目录里的activemq.xml文件
找到<policyEntries>标签,在标签下里添加配置
<!--死信队列-->
<policyEntry topic=">" >
<deadLetterStrategy>
<!--
queuePrefix:设置死信队列前缀
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>
这是配置死信队列
需要用到的Maven依赖
<!--activemq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
配置SpringBoot的YML文件
spring:
#ActiveMQ基础配置
activemq:
#通信地址,不是http,是tcp,61616是;连接地址
broker-url: tcp://127.0.0.1:61616
#账号密码可以在MQ的users.properties配置文件中配置
user: admin
password: admin
pool:
#是否启用
enabled: true
#最大连接数
max-connections: 100
生产者配置使用
生产者——config类
//MQ的配置
@Configuration
public class ActiveMQConfig {
//yml配置文件中的那个连接地址
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
//队列的Bean
@Bean
public Queue getQueue(){
return new ActiveMQQueue("ActiveMQQueue");
}
//连接工厂的Bean,brokerUrl连接地址
@Bean
public ActiveMQConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(brokerUrl);
}
}
生产者——使用
使用@Scheduled(cron = "0/5 ?")定时任务注解,需要在启动类加@EnableScheduling注解,来扫描
@Autowired
private JmsMessagingTemplate messagingTemplate;
@Autowired
private Queue queue;
//定时任务的注解(启动后每隔5秒执行一次)
@Scheduled(cron = "0/5 * * * * ?")
//事务注解
@Transactional(rollbackFor = Exception.class)
public void task() {
System.out.println("定时任务执行...");
//放入队列
messagingTemplate.convertAndSend(queue, "MingLog");
System.out.println("已放入队列...");
}
消费者配置使用
消费者——config类
//RctiveMQ 消费者的配置
@Configuration
public class ActiveMQConfig {
//yml配置文件中的那个连接地址
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
//连接工厂的Bean
@Bean
public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
//账号、密码、连接地址
new ActiveMQConnectionFactory("admin","admin",brokerUrl);
//配置控制消息在回滚时如何重新传递
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
//messageConsumer的配置选项Bean
@Bean
public RedeliveryPolicy getRedeliveryPolicy(){
return new RedeliveryPolicy();
}
//消息的监听连接工厂Bean
@Bean
public JmsListenerContainerFactory getJmsListenerContainerFactory(ActiveMQConnectionFactory connectionFactory){
//创建默认消息监听工厂
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
//将连接工厂set进去
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
//设置消费确认方式 1: 自动确认,2: 客户端手动确认,3:自动批量确认,4 事务提交并确认。
defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(2);
return defaultJmsListenerContainerFactory;
}
}
消费者——使用
一般使用消息监听器,来执行任务/方法/业务
消息成功,手动确认acknowledge()
消息消费失败,手动回滚,recover()
单个消息消费失败6次,该消息进入死信队列
//消息监听注解,destination要监听的队列,containerFactory消息监听的连接工厂
@JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
//TextMessage 监听到的消息
public void mqListenerEvent(TextMessage textMessage, Session session) throws JMSException {
try {
String text = textMessage.getText();
System.out.println("收到的消息:" + text);
//业务代码
//消息消费确认
textMessage.acknowledge();
}catch (Exception e){
System.out.println("异常了...");
e.getMessage();
//消息回滚
session.recover();
}
}
所以,一般我们会再去单独维护死信队列,将为消费成功的消息,做补偿、记录日志等操作
死信队列进行监听,和上面的监听类似,只不过监听的对象不同
//监听死信队列
@JmsListener(destination = "ActiveMQ.DLQ")
public void receive2(TextMessage textMessage, Session session) throws JMSException {
try {
//做日志记录、补偿策略、记录DB、Redis等等等等
System.out.println("死信队列:"+textMessage.getText());
//记录好,手动确认
textMessage.acknowledge();
}catch (Exception e){
System.out.println("异常了...");
e.getMessage();
//消息回滚
session.recover();
}
}
好啦,到这就结束了,小伙伴们快去启动试试吧
嘿嘿,大家喜欢的可以关注我的微信公众号哦,听说现在关注的,以后都是尊贵的老粉了
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK