3

MQ系列7:消息通信,追求极致性能 - Hello-Brand

 1 year ago
source link: https://www.cnblogs.com/wzh2010/p/16631103.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

MQ系列7:消息通信,追求极致性能

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费

前面的章节我学习了 NameServer的原理,消息的生产发送,以及消息的消费的全过程。
我们来回顾一下:
RocketMQ 消息队列架构主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4个核心部件,基本执行流程如下:

image
  1. NameServer 优先启动。NameServer 是整个 RocketMQ 的“中央大脑” ,作为 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
  2. Broker 启动后,需要将自己注册至NameServer中,并 保持长连接,每 30s 发送一次发送心跳包,来确保Broker是否存活。并将 Broker 信息 ( IP+、端口等信息)以及Broker中存储的Topic信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  3. NameServer 如果检测到Broker 宕机(因为使用心跳机制, 如果检测超120s(两分钟)无响应),则从路由注册表中将其移除。
  4. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(Broker可能是Cluster模式),然后根据负载均衡算法从列表中选择1台Broker ,建立连接通道,进行消息发送。
  5. 消费者在订阅某个topic的消息之前从 NamerServer 获取 Broker 服务器地址列表(Broker可能是Cluster模式),包括关联的全部Topic队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。
  6. 生产者和消费者默认每30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有Topic队列信息,更新到Client本地。
    2 ~ 4 步骤实际上是 Producer、Broker 以及NameServer 之间整个进行数据通信的过程,面对复杂的消息队列系统,一个性能优良,稳定性高的网络通信模块是非常重要的,它体现了RocketMQ集群消息的整体吞吐和负载能力。也是RocketMQ保证高性能、高稳定性的基石。

2 网络通信过程分析

2.1 通信类(rocketmq-remoting )的结构解析

image

通过上图可以看到,在整个RocketMQ队列系统中,rocketmq-remoting 这个module是专门用来负责网络通信职能的。
并且从模块依赖关系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服务) 等模块均依赖了它。

image

通信层是基于 Netty 进行扩展的,并自定义了通信协议,用于将消息传递给 Broker 进行存储。实现Client与Server之间高效的数据请求与接收。

2.2 协议结构设计

因为是基于Netty进行扩展的,所以自定义了RocketMQ的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。
在RocketMQ中,负责这个工作的就是RemotingCommand类,我们来看看这个类的几个重要属性:

字段 类型 Request维度 Response维度
code int 请求操作码,依据不同的请求码做不同的业务处理 应答响应码:0成功,非0标识对应的错误
language LanguageCode 枚举(JAVA、CPP、PYThON、GO等):请求方实现的编码语言 应答方实现的编码语言
version int 请求方程序的版本 应答方版本
opaque int 类似请求ID:reqeustId,唯一识别码,区分每一个独立的请求 response的时候直接返回
flag int 区分是普通还是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 区分是普通还是oneway RPC
remark String 自定义备注信息 自定义备注信息
extFields HashMap<String, String> Request自定义扩展的字段属性 Response自定义扩展的字段属性

2.3 消息内容的组成结构

传输的消息内容主要由一下几个部分组成:

组成部分 说明
消息长度 消息的总长度,int类型,四个字节存储
序列化类型+消息头length int类型,字节1表示序列化类型,字节2~4表示消息头长度
消息头的数据 序列化后的消息头数据
消息主体数据 消息主体数据内容,二进制字节
image

2.4 RocketMQ 消息通信流程

在RocketMQ消息队列中支持通信的模式主要有

  • sync 同步发送模式
  • async 异步发送模式
  • oneway 单向模式,无需关注Response

2.4.1 通信流程说明

下图从 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息发送,以及Handler 处理过程来说明。

image
  • Broker 和 NameServer 启动时同步调用 NettyRemotingServer.start() 方法, 初始化 Netty 服务器
    • 配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组
    • 配置 Channel
    • 添加 NettyServerHandler
    • 调用 serverBootstrap.bind() 监听端口,等待client的connection
  • Producer 和 Consumer 同样需要启动 Netty 的客户端,通过调用NettyRemotingClient.start() 初始化 Netty 客户端
    • 配置客户端 NioEventLoopGroup 线程组
    • 配置 Channel
    • 添加 NettyClientHandler
  • 发送同步消息时,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建用于通信的 Channel 通道。
  • 创建完 Channel 后,生产者 Producer 调用 Channel.writeAndFlush() 发送数据
  • NettyRemotingServer 服务端线程组 处理可读事件,调用 NettyServerHandler 处理数据。
  • 下一步,NettyServerHandler 调用 processMessageReceived方法,接收并处理传送过来的数据。
  • 根据请求码 RequestCode 区别不同的请求,来执行不同的 Processor。
    • 说明:Processor 在服务端初始化的时候,将 RequestCode 添加到 Processor 缓存中。消息的存、查、拉取都是不同的请求码。
  • processMessageReceived 从ResponseTables(key 为 opaque) 缓存中取出 ResponseFuture,并将将返回结果设置到 ResponseFuture。同步模式下执行 responseFuture.putResponse()方法,异步调用执行回调方法。
  • NettyRemotingClient 收到可读事件,调用 NettyClientHandler 读取并处理返回事件。

2.4.2 Reactor多线程设计

上面我们说过了,RocketMQ的通信是采用Netty组件作为底层通信库。同样的,它也遵循Reactor多线程模型,并在此基础上做了一些优化。

image

上面图中四个图形可以大致说明NettyRemotingServer的Reactor 多线程模型,在RocketMQ中的存在形式。

  • M:1个 Reactor 主线程:eventLoopGroupBoss,它的职能是负责监听 TCP网络连接请求,有连接请求过来时候,创建SocketChannel,并注册到selector上。
  • S:RocketMQ的源码中会选择NIO或Epoll,来监听网络数据,当监听到网络数据过来时,读取数据并丢给Worker线程池:eventLoopGroupSelector,Rocket源码中默认设置线程数为3。
  • M1:执行业务之前的各种杂事(SSL认证、空闲检查、网络连接检查、编解码、序列化反序列化 等等),交付给 这些工作交给defaultEventExecutorGroup 去处理,RocketMQ源码中默认线程数设置为8。
  • M2:剩下处理业务的操作,就直接放在业务线程池中执行了。按照之前说的,依据RequestCode去processorTable 本地缓存中找到对应的 processor,并封装成task任务,在丢给对应的业务processor线程池来处理。
线程数标识 线程名 说明
1 NettyBoss Reactor 主线程,默认1
N NettyServerEPOLLSelector Reactor 线程池,默认3
M1 NettyServerCodecThread Worker 线程池,默认8
M2 RemotingExecutorThread Processor线程池,处理业务逻辑

完整的可以参照官网的这张图:

image

上面介绍了 RocketMQ 消息通信的主要内容,我们用几句话总结下:

  • 整个RocketMQ队列系统中,rocketmq-remoting Module是专门用来负责网络通信职能的。
  • 网络通信模块基于Netty进行扩展的,所以自定义了RocketMQ的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。
  • 理解 NettyRemotingServer/NettyRemotingClient 的初始化过程,以及调用 NettyServerHandler/NettyClienthandler 进行处理的执行流程。
  • 同步异步:同步和异步消核心区别是 同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,异步的请求则 SendCallback 相应的方法进行回调处理。
  • 多线程模式下会通过1个Reactor 主线程(监听连接),以及Reactor 线程池(监听数据)、Worker 线程池(处理前置工作)、Processor线程池(处理业务逻辑) 来处理通信过程。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK