3

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web...

 1 year ago
source link: https://blog.51cto.com/alex4dream/5800932
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

任何先进的技术均与魔法无异

【​ ​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】

什么是消息过滤

Message Filter,可不是web容器的过滤器哦,可以发生在服务端也可以发生在客户端,消息过滤是指消息生产者向Topic中发送消息时,设置消息属性对消息进行分类,消费者订阅Topic时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。

RocketMQ消息过滤

  RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的。

  • Consumer端订阅消息是需要通过ConsumeQueue这个消息的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。
  • ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag哈希值,基于Tag消息过滤正式基于这个字段值的。
【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_sql

主要支持如下2种的过滤方式:简单消息过滤(通过Tag方式实现)和高级过滤消息(通过Filter去实现)

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_自定义属性_02

消费者订阅Topic时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic中的所有消息都将被投递到消费端进行消费。

RocketMQ支持的消息过滤方式有两种,Tag过滤和SQL92过滤。

【Tag过滤方式】

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用'||'分隔。其中Consumer端会将这个订阅请求构建成一个SubscriptionData,发送一个Pull消息的请求给Broker端。

Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对如果不同,则丢弃该消息,不进行消息消费。

过滤消息样例

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息,例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TOPIC_TEST");
consumer.subscribe("TOPIC", "A || B || C");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。

【Filter(SQL92)过滤方式】

大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的SQL expression的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子。

消息可以未被过滤,符合条件
------------
| messageA |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
消息可以会被过滤,不符合条件
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
SQL92基本语法

RocketMQ只定义了一些基本语法来支持这个特性。

  • 数值比较的运算符:>,>=,<,<=,BETWEEN,=;
  • 字符比较的运算符:\=,<>,IN;
  • IS NULL或者IS NOT NULL;
  • 逻辑符号AND,OR,NOT;
常量支持类型为
  • 数值类型:123,3.1415;
  • 字符串类型:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)
过滤消息样例
生产者样例

发送消息时,你能通过​​putUserProperty​​来设置消息的属性

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消费者样例

用MessageSelector.bySql来使用sql筛选消息。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();

消息过滤实际案例(参考官方案例)

以图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:

  • 订单消息A
  • 支付消息B
  • 物流消息C

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。
  • 交易成功率分析系统:需订阅订单和支付消息。

过滤示意图如下所示

参考官方原图:​ ​https://rocketmq.apache.org/zh/assets/images/Tag%E8%BF%87%E6%BB%A4-844cfe6dd033746c7134bde843021ad6.png​

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_sql_03
Tag的案例代码

对于物流系统和支付系统来说,它们都只订阅单个Tag,此时只需要在调用subcribe接口时明确标明Tag即可。

consumer.subscribe("TagFilterTest", "TagA");

对于实时计算系统来说,它订阅交易Topic下所有的消息,Tag用星号(*)表示即可。

consumer.subscribe("TagFilterTest", "*");

对于交易成功率分析系统来说,它订阅了订单和支付两个Tag的消息,在多个Tag之间用两个竖线(||)分隔即可。

consumer.subscribe("TagFilterTest", "TagA || TagB");

这里需要注意的是,如果同一个消费者多次订阅某个Topic下的Tag,以最后一次订阅为准。

//如下错误代码中,Consumer只能订阅到TagFilterTest下TagB的消息,而不能订阅TagA的消息。
consumer.subscribe("TagFilterTest", "TagA");
consumer.subscribe("TagFilterTest", "TagB");
SQL92的案例代码

SQL92过滤是在消息发送时设置消息的Tag或自定义属性,消费者订阅时使用SQL语法设置过滤表达式,根据自定义属性或Tag过滤消息。

Tag属于一种特殊的消息属性,在SQL语法中,Tag的属性值为TAGS。 开启属性过滤首先要在Broker端设置配置enablePropertyFilter=true,该值默认为false。

以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:

  • 物流消息且地域为杭州
  • 物流消息且地域为上海

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 物流系统1:只需订阅物流消息且消息地域为杭州。
  • 物流系统2:只需订阅物流消息且消息地域为杭州或上海。
  • 订单跟踪系统:只需订阅订单消息。

参考官方案例:​ ​https://rocketmq.apache.org/zh/assets/images/SQL92%E8%BF%87%E6%BB%A4-716732acb1aad27fc8e7a9e218ebaa65.png​

【RocketMQ入门到精通】— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦_字符串_04

地域将作为自定义属性设置在消息中。

消息发送端

设置消息的自定义属性。

Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
// 设置自定义属性A,属性值为1。
msg.putUserProperties("a", "1");
消息消费端

使用SQL语法设置过滤表达式,并根据自定义属性过滤消息。

consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK