1

RocketMQ部署 - POCOPOCOPOCO

 1 year ago
source link: https://www.cnblogs.com/POCOPOCOPOCO/p/16697680.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RocketMQ部署手册

单MasterRocketMQ集群#

系统要求与准备条件#

  1. 64位操作系统,推荐 Linux/Unix/macOS

  2. 64位 JDK 1.8+

  3. Maven

    tips

    检验java环境与maven环境

    java -version

    截屏2022-09-15 15.20.46

    mvn -v

    截屏2022-09-15 15.20.09

下载安装Apache RocketMQ#

RocketMQ 的安装包分为两种,二进制包和源码包,二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

截屏2022-09-15 15.31.08

启动NameServer#

### 启动namesrv$ nohup sh bin/mqnamesrv & ### 验证namesrv是否启动成功$ tail -f ~/logs/rocketmqlogs/namesrv.logThe Name Server boot success...

截屏2022-09-15 16.28.15

我们可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功启动。

截屏2022-09-15 16.28.50

启动Broker#

### 先启动broker$ nohup sh bin/mqbroker -n localhost:9876 &### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a$ tail -f ~/logs/rocketmqlogs/Broker.log The broker[broker-a,192.169.1.2:10911] boot success...

截屏2022-09-15 16.30.48

我们可以在 Broker.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。

截屏2022-09-15 16.37.38

工具测试消息收发#

在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

截屏2022-09-15 16.42.18
$ export NAMESRV_ADDR=localhost:9876$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ...$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...

截屏2022-09-15 16.44.46

截屏2022-09-15 16.44.15

安装可视化控制台#

1.下载项目

在 GitHub 中搜索 rocketmq-externals,其中 rocketmq-console 就是 RocketMQ 可视化控制台,我们可以将源码克隆下来,然后自己 mvn package,然后运行 jar 包。

或者直接下载官方提供的 1.0.0 版本的 rocketmq-console

https://github.com/apache/rocketmq-externals/releases/tag/rocketmq-console-1.0.0

下载 zip 包或者 tar 包

截屏2022-09-15 17.29.41
  • 修改配置文件application.properties

    配置rocketmq.config.namesrvAddr属性的值,即nameserver的服务地址

    rocketmq.config.namesrvAddr=localhost:9876

截屏2022-09-15 17.32.25
  • 保存修改后的配置文件,返回rocketmq-console目录

  • 使用maven打包命令打包

    mvn clean package -Dmaven.test.skip=true
  • 打包完成后进入target目录

截屏2022-09-15 17.33.53

rocketmq-console-ng-2.0.0.jar即为打包后得到的jar包

  • nohup java -jar rocketmq-console-ng-2.0.0.jar &

    截屏2022-09-15 17.38.01

截屏2022-09-15 17.38.48

SDK测试消息收发#

准备工作#

启动 NameServer 和 broker

nohup sh bin/mqnamesrv >mqnamesrv-log.txt & nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &

启动控制台

mvn spring-boot:run

创建一个 topic名为 test_quick_topic

截屏2022-09-15 17.41.37

工具测试完成后,我们可以尝试使用 SDK 收发消息。这里以 Java SDK 为例介绍一下消息收发过程。

  • 在IDEA中创建一个Java工程。

  • pom.xml 文件中添加以下依赖引入Java依赖库。

<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.13.graal</version> </dependency> </dependencies>

生产者#

  • 同步投递10条消息
package TestProducer;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.exception.RemotingException;public class TestProducer01 { public static final String NAMESRV_ADDR = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name"); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i <10; i++) { Message message = new Message("test_quick_topic",//主题 "tagA", //标签 "key" + i, //自定义key,唯一标识 ("第" + i+"次消息").getBytes()); //消息内容实体 (byte[]) SendResult result = producer.send(message); System.out.println("第" + i + "条消息发出,结果:" + result); } producer.shutdown(); }}

截屏2022-09-15 17.42.14

消费者#

  • 消费上面生产者生产的10条消息
package TestConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class TestConsumer01 { public static final String NAMESRV_ADDR = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name"); consumer.setNamesrvAddr(NAMESRV_ADDR); //从最后开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// consumer.subscribe("test_quick_topic","tagA"); //过滤:消费tag为tagA的消息 consumer.subscribe("test_quick_topic", "*"); //消费所有的 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt messageExt = list.get(0); try { String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("comsumer start"); }}

截屏2022-09-15 17.43.25

关闭服务器#

$ sh bin/mqshutdown brokerThe mqbroker(36695) is running...Send shutdown request to mqbroker(36695) OK$ sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running...Send shutdown request to mqnamesrv(36664) OK

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK