1

快速入门一篇搞定RocketMq-实现微服务实战落地 - sowler

 1 week ago
source link: https://www.cnblogs.com/sowler/p/18173752
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

快速入门一篇搞定RocketMq-实现微服务实战落地

1、RocketMq介绍

RocketMQ起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在2016年开源分布式消息中间件,并逐渐成为Apache顶级项目。现在是Apache的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。

官网地址:https://rocketmq.apache.org

快速开始文档:https://rocketmq.apache.org/docs/

Github地址:https://github.com/apache/rocketmq

2、RocketMq架构说明

RocketMQ的架构主要由Producer(消息生产者)、Consumer(消息消费者)、Broker(消息中转角色)和Name Server(网络路由角色)四个核心组件组成。Name Server负责维护Broker集群和Topic信息的路由中心,而Broker负责存储和传输消息。RocketMQ采用类似于Kafka的发布订阅模型,支持消息的顺序传输和事务性传输,同时可以配置不同的消息过滤规则和重试策略。

查看微服务对应版本信息,下载相关版本。查看连接:https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明#2021x-分支

2661519-20240505185635454-735408773.png

根据自己使用的Spring Cloud Alibaba Version选择对应的版本进行下载即可。这里下载4.4.0版本,下载地址:https://rocketmq.apache.org/download 下载成功后,为一个压缩包文件。把文件上传linux

2661519-20240505185658258-902980621.png

使用命令解压zip文件并重命名文件夹命令:

unzip rocketmq-all-4.4.0-bin-release.zip -d rocketmq-4.4.0

解压成功后,如图:

2661519-20240505185712998-1718840721.png

进入rocketmq-4.4.0目录,查看目录结构。

2661519-20240505185739534-968972443.png
  • benchmark:性能测试相关的资源,如果想要了解RocketMQ的基准测试,可以考虑使用该压测工具。这个工具可以模拟生产者和消费者来测试RocketMQ集群的性能。
  • bin:里面是一些可执行文件,管理rocketmq服务
  • conf:里面就是一些配置文件,包括broker配置文件和logback配置文件
  • lib:所依赖的第三方jar包
4.1、启动Name Server命令
nohup sh bin/mqnamesrv -n 192.168.42.130:9876 > /dev/null 2>&1 &   # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败

启动成功后,默认启动日志在root目录下。可以查看启动日志信息:

tail -f ~/logs/rocketmqlogs/namesrv.log

输出下面信息启动成功:

2661519-20240505185814373-829752988.png

也可以通过端口9876查看是否启动成功

ps -ef|grep 9876
2661519-20240505185827894-688306540.png
4.2 启动Broker命令
nohup sh bin/mqbroker -n 192.168.42.130:9876 -c conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 & # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败

启动日志和启动Name Server日志在一个文件夹里面。查看启动日志信息:

tail -n 50 ~/logs/rocketmqlogs/broker.log
2661519-20240505185841695-1970031390.png

可以通过jps 查看启动信息如果能看到 NamesrvStartup 和 BrokerStartup 的话就表明单机版的 RocketMQ 启动成功了

2661519-20240505185851079-1365015946.png
4.3 Rocketmq服务关闭

关闭 MQ使用 bin 目录下的mqshutdown关闭服务

sh bin/mqshutdown namesrv #关闭namesrv服务

sh bin/mqshutdown broker #关闭broker服务 
4.4 启动脚本命令参数修改

在启动的过程中,如果服务器内存不足或者满足不了启动脚本里面的默认内存配置,启动的时候会启动报错。这是因为 apache-rocketmq/bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内存太大,而系统实际内存却太小导致启动失败。解决办法就是修改runbroker.sh 和 runserver.sh里的内存配置,调小一些即可。

首先先备份一份runbroker.sh 和 runserver.sh文件,以防万一改错了。

cp runserver.sh runserver.sh.init

cp runbroker.sh runbroker.sh.init

修改:runserver.sh脚本文件,找到配置JVM参数的内容,把JVM配置参数调小:

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"

2661519-20240505185929920-1493493819.jpg

修改:runbroker.sh脚本文件

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m"
2661519-20240505185940049-241064264.jpg

5、测试消息

通过上面的步骤,RocketMQ就启动成功了。接下来我们可以在服务器上面通过提供的测试脚本进行消息测试,验证RocketMq是否可以正常使用。

生产者发送消息:

export NAMESRV_ADDR=127.0.0.1:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2661519-20240505185952285-1182172636.png

通过输出内容,我们可以查看到消息发送成功了。下面运行监听脚本。测试消费者接受消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2661519-20240505190003455-207905877.jpg

成功拿到消息,可以说明RocketMq服务启动成功了。

6、监控程序rocketmq-console

6.1、配置rocketmq-console

rocketmq-externals是RocketMq的扩展插件项目。GitHub地址: https://github.com/apache/rocketmq-externals 之前rocketmq-console也在rocketmq-externals项目中。如今在GitHub apache/rocketmq-externals 项目下已经找不到 rocketmq-console模块了,官方已经从 apache/rocketmq-externals 独立出来并更名为 rocketmq-dashboard。 我们可以查看RocketMq官网配置仪表板说明 :RocketMQ 仪表板 |MQ (apache.org)

https://rocketmq.apache.org/docs/deploymentOperations/04Dashboard/

根据提示可以下载到源码内容

2661519-20240505190059777-1963182592.png

Github下载地址:https://github.com/apache/rocketmq-dashboard

如果是 5.0 版本的直接拉取最新的代码

 git clone https://github.com/apache/rocketmq-dashboard.git  

releases标签中的rocketmq-dashboard-1.0.0版本试用于5.0版本以下的。

https://github.com/apache/rocketmq-dashboard/releases/tag/rocketmq-dashboard-1.0.0

下载成功后,使用IDEA打开修改配置,改一下namesrvAddr配置项即可,如果没有指定默认就是localhost:9876,如果namesrvAddr是集群环境,每个节点使用;隔开。本地测试运行,运行成功后打包发布的linux系统。

mvn clean package -Dmaven.test.skip=true #跳过测试
6.2 启动rocketmq-console

指定NameServer的地址和启动端口(8830)以及输出日志。由于内部不够,设置JVM参数启动,如果使用的linux系统内存足够可以忽略jvm参数。启动命令如下:

nohup java -jar -Xmx256M -Xms256M -XX:MaxMetaspaceSize=128M -XX:MetaspaceSize=128M rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &

不指定JVM参数:

nohup java -jar  rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &

执行成功后,查看启动日志:

 tail -f ~/logs/consolelogs/rocketmq-console.log 
2661519-20240505190133260-2072845500.png

启动成功。开放8830端口进行公网访问。

2661519-20240505190158760-1918474845.png

监控成功。可以在集群导航中查看当前节点部署节点。

2661519-20240505190210318-629261692.png

也可以看到上面测试的数据输出:

2661519-20240505190240242-1339904326.png

7、微服务连接RockerMq

安全组需要开放10909、10911端口和9876端口,其中10909是VIP通道,10911是非VIP通道,9876是对外连接提供端口。不然连接发送会报错发送超时 sendDefaultImpl call timeout; nested exception is org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

maven引入依赖

        <!--RocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.1</version>
        </dependency>

配置中心加入RocketMq配置

rocketmq: # rocketMQ配置
  # name server地址
  name-server: 192.168.42.130:9876
  consumer:
    pull-batch-size: 10
    group: blog_message
  producer:
    group: blog_message
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 10000
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false

编写RocketEnhanceConfig文件,解决不支持Java时间类型配置

@Configuration
public class RocketEnhanceConfig {

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}
7.1 编写消息生产者:
@Slf4j
@Service
public class RocketStorage implements IDataStorage {
  
		@Autowired(required = true)
    private RocketMQTemplate rocketMQTemplate;

		@Value("${rocketMq.topic:blog_notify_sow}")  
    private String topic;

    @Override
    public void store(String value, Integer type, Long timestamp) {
        String message = String.format("%s,%s,%s",value,type,timestamp);
        rocketMQTemplate.convertAndSend(topic,message); //发送数据
        log.info("RocketMQ|data sent,value: {}, type:{}, timestamp: {}", value, type, timestamp);
    }

    @Override
    public String getType() {
        return "RocketMQ";
    }
}

编写接口:IDataStorage

/**
 * 数据发送到Mq里...
 */
public interface IDataStorage {

    /**
     * persistence data
     *
     * @param value 接收内容
     * @param type  数据类型
     * @param timestamp 当前时间(时间戳)
     */
    void store(String value,Integer type,Long timestamp);

    String getType();

}

在Controller中调用接口发送数据。

@RestController
@RequestMapping("/dataStorage")
public class DataStorageController {

    @Autowired
    private IDataStorage dataStorage;

    @GetMapping
    public Response sendDataStorage(String value){
        dataStorage.store(value,type,System.currentTimeMillis());
        return Response.success();
    }

}
7.2 编写消息消费者

编写一个RocketMq消息监听类实现消息监听 RocketDataConsumer :

@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "blog_message",topic = "blog_notify_sow")
public class RocketDataConsumer implements RocketMQListener {

    @PostConstruct
    public void post() {
        log.warn("***** RocketMq Data Consumer Activated");
    }

    @Autowired
    @Qualifier("dataPersist")
    private IDataPersist dataPersist;


    @Override
    public void onMessage(Object o) {
        log.info("RocketMq 接收到的信息 . . . . . .:{}",o);
        dataPersist.put(o.toString(),1,System.currentTimeMillis());
    }
}
7.3 测试消息发送和接收

启动项目,通过postman调用接口:

2661519-20240505190311291-102241310.png

调用接口后,发现接口调用成功了。下面我们查看控制台消息消费者是否接收到消息。

2661519-20240505190321979-642915427.png

通过上面输出的消息可以看到消息接收成功了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK