7

RocketMQ Broker 深入学习

 3 years ago
source link: https://www.sevenyuan.cn/2020/10/31/jms/2020-11-01-RocketMQ-Learning-Broker/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

RocketMQ Broker 深入学习

2020-10-31

| RocketMQ

| 游览 9次

MontageJupiterIo_ZH-CN2512372897_1920x1080.jpg

学习一下火箭消息 - Broker 的原理和使用🚀

参数 说明 brokerClusterName=rocketmq-cluster-1 所属集群名字 brokerName=broker-a broker名字,注意此处不同的配置文件填写的不一样 brokerId=0 0 表示Master, > 0 表示slave namesrvAddr=127.0.0.1:9876;127.0.0.2:9876 nameServer 地址,分号分割 defaultTopicQueueNums=4 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数 autoCreateTopicEnable=false 是否允许Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateSubscriptionGroup=false 是否允许Broker自动创建订阅组,建议线下开启,线上关闭 useEpollNativeSelector=true

listenPort=10923 Broker 对外服务的监听端口 haListenPort=10924

deleteWhen=04 删除时间,默认是凌晨四点 fileReservedTime=120 文件保留时间,默认48小时,单位是 hour mapedFileSizeCommitLog=1073741824 commitLog每个文件的大小默认1G mapedFileSizeConsumeQueue=300000 ConsumeQueue每个文件默认存30W条,根据业务情况调整 #redeleteHangedFileInterval=120000 destroyMapedFileIntervalForcibly=120000 diskMaxUsedSpaceRatio=88 检测物理文件磁盘空间 storePathRootDir=/data/store 存储路径 storePathCommitLog=/data/store/commitlog commitLog存储路径 storePathConsumeQueue=/data/store/consumequeue 消费队列存储路径 storePathIndex=/data/store/index 消息索引存储路径 storeCheckpoint=/data/store/checkpoint checkpoint 文件存储路径 abortFile=/data/store/abort abort 文件存储路径 maxMessageSize=5242880 限制的消息大小,默认 1M # flushCommitLogLeastPages=4

# flushConsumeQueueLeastPages=2

# flushCommitLogThoroughInterval=10000

# flushConsumeQueueThoroughInterval=60000

brokerRole=SYNC_MASTER Broker 的角色 - ASYNC_MASTER 异步复制Master - SYNC_MASTER 同步双写Master - SLAVE flushDiskType=ASYNC_MASTER 刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 #checkTransactionMessageEnable=false

sendMessageThreadPoolNums=128 发消息线程池数量 #pullMessageTreadPoolNums=128 拉消息线程池数量 useReentrantLockWhenPutMessage=false

waitTimeMillsInSendQueue=2500 刷盘等待时间,超时将会返回发送失败码给发送者 transferMsgByHeap=false

slaveReadEnable=true 是否允许 slave 读

上面罗列的是常见的参数,例如 Broker 集群的名称,主从角色,消息存储路径、文件大小、清理时间等等

之前也遇到默认参数不适合使用的常见,例如遇到业务方瞬间发送大量消息, Broker 同步刷盘时间超过默认的 2.5s,导致其它业务方遇到发送消息失败的场景,于是在 Broker 能力完全充足的情景下,调整了 waitTimeMillsInSendQueue 到 5s,避免影响其它业务方使用

文件存储机制

重要说明:

想要深入了解 RocketMQ 消息存储的内幕,需要了解这两方面

  • 文件存储的数据结构
  • 灵活利用 Linux 的文件机制 mmap

这次学习记录,参考了 STAR 皆空 大神,这里记录的是『消息存储的数据结构』,关于 mmap 的内容,可以点击参考链接深入学习。


RocketMQ 有很多亮点,其中一个是选择直接使用操作系统来提升存储效率,写入二进制格式的文件,消息持久化过程最大化的转成顺序写,避免随机写的额外开销。

这里记录一下跟 Broker 消息存储相关的内容

  • CommitLog(消息内容)
  • ConsumeQueue(位点数据)
  • Index(检索索引)
  • Broker 接收到【发送消息】操作
  • Broker 接收到【获取消息】操作

Broker 端整体架构

Broker 作为消息中转器,提供了消息发送、存储、查询,还有高可用的功能。

其中有几个重要模块:

RocketMQ_Broker_Architecture.png
  • Remoing Module: 请求的入口,使用了 Netty 作为远程通讯工具,处理发送过来的请求。
  • Client Manager: 管理客户端(发送者、消费者)并维护消费者的主题订阅。
  • Store Service: 提供 API 来存储或查询物理磁盘上的消息。
  • HA Service: 高可用服务,为主从节点之间提供数据同步功能。
  • Index Service: 索引服务,可以根据特定 Key,建立消息索引,并提供快速消息查询功能。

消息物理存储结构

从前面表格中的参数 storePathXXXX 可以知道,文件存储相关位置在 /data/store,使用 tree 命令查看这些消息文件的存储结构

$tree commitlog/ consumequeue/ index/
commitlog/
├── 00000000051539607552
├── 00000000052613349376
├── 00000000053687091200
├── 00000000054760833024
├── 00000000055834574848
├── 00000000056908316672
consumequeue/
├── prod%test-events
│   ├── 0
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 1
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 10
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 11
│   │   ├── 00000000000000000000
│   ├── 2
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 3
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
│   ├── 4
│   │   ├── 00000000000000000000
│   │   └── 00000000000006000000
index/
├── 20200930012239943
├── 20201012111555094
└── 20201017045900966

CommitLog 消息数据文件

借鉴于 Kafka,RocketMQ 也是以 Topic 作为文件存储的基本单元,每个 Topic 都有其对应的数据文件和索引文件。

RocketMQ 与 Kafka 不同点在于,Kafka 将消息数据文件按 Topic 分开存储,如果存在大量 Topic 情况下,消息持久化会逐渐变成随机磁盘读写,消息中间件的高性能被磁盘IO 所限制;而 RocketMQ 对其进行改进,将全部 Topic 的数据文件写入同一个文件(commitLog)中,实现消息的顺序写。

单个 CommitLog 的大小为 1GB,每条消息及其元信息被顺序追加至文件,文件的尾部可能存在空闲区域。

除了记录消息本身的属性(消息长度、消息体、Topic 长度、Topic、消息属性长度、消息属性),CommitLog 同时记录了消息所在 ConsumeQueue 消费队列的信息(消费队列 ID 和偏移量)。

由于存储条目具备不定长的特性,当 CommitLog 剩余空间无法满足下一条消息的存储,会在当前 CommitLog 的尾部追加一个 MAGIC CODE 等于 BLANK_MAGIC_CODE 的存储条目作为结束标记,并开始下一个 CommitLog 文件的操作。

RocketMQ_Broker_CommitLog_Item.png

ConsumeQueue 消息队列文件

Topic 是个抽象概念,消息实际发往的是 consumeQueue 这个逻辑队列中,在 consumeQueue 中,记录了消息在 CommitLog 中的位置信息

单个 ConsumeQueue 文件大小为 6000000 Byte,存储 30W 条记录,每条记录固定 20B

与 CommitLog 不同,ConsumeQueue 的存储条目采用定长存储结构。

为了实现定长存储,ConsumeQueue 存储到了消息 Tag 的 HashCode,在 Broker 端进行消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag 的 HashCode 是否一致来决定是否消费消息。

RocketMQ_Broker_ConsumeQueue_Item.png

Index 索引文件

在已有的 CommitLog 和 ConsumeQueue 基础上,已经满足一个消息中间件的消息发送和消费功能,RMQ 提供 Index 目的是为了检索消息更快,方便排查问题。

这个目录下的文件,提供了跟数据库索引一样的作用,「给定 Topic 和消息 Key,通过索引文件能快速找到消息」,提供了消息检索的作用,监控端的【消息查询】界面使用了这个功能。

单个 Index 文件大小等于 420000040 B,包含索引头(IndexHeader)、哈希槽(HashSlot)和消息索引(MessageIndex)

Index 存储条目的结构有点像 HashMap,使用链式地址法解决哈希冲突:
「每个 Hash Slot 关联一个 Message Index 链表,多个 Message Index 通过 preIndexOffset 连接。」

RocketMQ_Broker_Index_Item.png

Broker 代码实现初探

Broker 启动阶段:

Broker Startup

org.apache.rocketmq.broker.BrokerStartup#main

#org.apache.rocketmq.broker.BrokerStartup#main
public class BrokerStartup {
public static void main(String[] args) {
start(createBrokerController(args));
#org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {
* SendMessageProcessor
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
* PullMessageProcessor
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

Broker 将消息处理器注册到核心控制器 BrokerControllerBroker 定义了很多种消息处理器,查看 AsyncNettyRequestProcessor 继承图:

RocketMQ_Broker_Processor_Implemention.png
  • AdminBrokerProcessor
  • ClientManageProcessor
  • ConsumerManageProcessor
  • EndTransactionProcessor
  • ForwardRequestProcessor
  • PullMessageProcessor
  • QueryMessageProcessor
  • ReplyMessageProcessor
  • SendMessageProcessor

其中 SendMessageProcessor 负责处理【Producer 发送消息】的请求,PullMessageProcessor 负责处理【Consumer 消费消息】的请求。

SendMessageProcessor 处理器实现了 NettyRequestProcessor 接口,处理请求的 processRequest 方法,然后在 BrokerController 启动时,往 RemotingServer 中按照 RequestCode 来注册处理器。


Broker 接收到发送消息的请求

如果请求中的标志是 RequestCode.SEND_MESSAGE,那么就会交给 SendMessageProcessor 进行处理,同时底层大量使用线程池技术。

例如 Debug Broker 端代码:

RocketMQ_Broker_SendMessageProcessor.png

可以看到,处理的线程名字前缀是 SendMessageThread_,异步回调处理请求。


Broker 接收到消费消息的请求

如果请求中的标志是 RequestCode.PULL_MESSAGE,那么就会交给 PullMessageProcessor 来进行处理,代码细节可以后续跟进看看。

测试发送消息的接口为:org.springframework.messaging.core.AbstractMessageSendingTemplate#convertAndSend(D, java.lang.Object)

RocketMQ_Broker_SendMessageProcessor_flow.png

核心方法:org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand, org.apache.rocketmq.remoting.netty.RemotingResponseCallback)

  • 反序列化请求头 requestHeader
  • 从消息上下文恢复 mqTrace 链路
  • 判断是否批量发送
  • 进入单条发送逻辑
  • 预发送 preSend 校验
  • 完善消息详情,用户参数等
  • 消息存储 messageStore

进入消息存储:org.apache.rocketmq.store.MessageStore#asyncPutMessage

  • 检查存储状态 checkStoreStatus,主要是确认服务状态,主从节点角色,是否可写入,还有页面缓存是否繁忙 pageCacheBusy
  • 消息常规性校验,topic 长度,参数合法性
  • 写入 commitLog 的方法 this.commitLog.asyncPutMessage(msg)

简单小结:

  • 客户端在调用 API 发送消息时,构造 RemotingCommand,头部信息设置 RequestCode.SEND_MESSAGE_V2,还有相关属性以及消息体。
  • 消息中转器 Broker,启动着 NettyRemotingServer,接收到请求
  • 识别请求头的 RequestCode.SEND_MESSAGE_V2 状态码,将请求交给对应的处理器 SendMessageProcessor
  • Request 中恢复消息,校验消息合法性
  • 消息存储,将消息详情写入到 commitLog 中
  • 返回处理结果 putMessageResult

Broker 的良好性能,有一半得归功于 Netty 这个优秀的通讯框架,扒了一下 Broker 上代码实现还有网上资料,记录一下使用到的 Netty 网络模型。

Netty 网络模型

RocketMQ-Broker-Netty-Model.png

各模块作用:

  • eventLoopGroupBoss

作为 acceptor 负责接收客户端的连接请求

  • eventLoopGroupSelector

负责 NIO 的读写操作

  • NettyServerHandler

读取 IO 数据,并对消息头进行解析

  • disatch

过程根据注册的消息 code 和 processsor 把不同的事件分发给不同的线程。
由 processTable 维护(类型为 HashMap)

线程池 & 请求码

Broker 中大量使用线程池技术,通过状态码对请求进行分类,将请求分发到不同的线程池,以此达到资源隔离的目的,每个线程池接收到请求,经过解码 decode 请求体和组装上下文 ctx,接着交给相应的处理器 xxxProcessor 差异化处理网络请求。

RequestCode 位置在:

org.apache.rocketmq.common.protocol.RequestCode

RocketMQ-RequestCode.png

broker 启动时,注册处理线程池的位置在:

org.apache.rocketmq.broker.BrokerController#registerProcessor

RocketMQ-RequestCode.png
作用 线程池 处理器 请求码 发送消息 sendMessageExecutor SendMessageProcessor RequestCode.SEND_MESSAGE RequestCode.SEND_MESSAGE_V2 RequestCode.SEND_BATCH_MESSAGE RequestCode.CONSUMER_SEND_MSG_BACK 拉取消息 pullMessageExecutor PullMessageProcessor RequestCode.PULL_MESSAGE 消息重试 replyMessageExecutor replyMessageProcessor RequestCode.SEND_REPLY_MESSAGE RequestCode.SEND_REPLY_MESSAGE_V2 查询消息 queryMessageExecutor NettyRequestProcessor RequestCode.QUERY_MESSAGE RequestCode.VIEW_MESSAGE_BY_ID 客户端注册、心跳检测 heartbeatExecutor、clientManageExecutor ClientManageProcessor RequestCode.HEART_BEAT RequestCode.UNREGISTER_CLIENT, RequestCode.CHECK_CLIENT_CONFIG 消费端处理(例如位点更新) consumerManageExecutor ConsumerManageProcessor RequestCode.GET_CONSUMER_LIST_BY_GROUPRequestCode.UPDATE_CONSUMER_OFFSET RequestCode.QUERY_CONSUMER_OFFSET** 事务消息处理 endTransactionExecutor EndTransactionProcessor RequestCode.END_TRANSACTION 处理集群信息 adminBrokerExecutor AdminBrokerProcessor Default,不属于前面的请求码都由它进行处理

本次学习主要从以下三个方面去了解:

  • Broker 启动参数说明
  • 消息文件的存储机制
  • 使用的 Netty 网络模型

从简单的使用到底层原理学习,逐渐剖开 MQ 框架的深层,慢慢将文件操作使用到的技术也文件存储机制了解,接着再去学习 Netty 网络模型,将网络通讯的基础也开始补起来,从一个点发散,将知识体系补全,还有很多需要去学习和了解的,后面继续补充吧~

RocketMQ高性能揭秘


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK