

RocketMQ Filter机制探秘
source link: https://vearne.cc/archives/40031
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
本文基于rocketmq-all-4.8.0
1. client使用Filter
RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–Tag
和SQL92
。
1.1 Tag
按照Tag进行过滤.
首先需要给消息打上tag,注意:1条消息只能包含最多1个tag生产者代码示例1
package main
import (
"context"
"fmt"
"os"
"strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
// Package main implements a simple producer to send message.
func main() {
p, _ := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})),
producer.WithRetry(2),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
topic := "test"
msg := &primitive.Message{
Topic: topic,
Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
}
// 打tag
msg.WithTag("tagA")
// broker会针对key,建立索引存储在indexFile中
msg.WithKeys([]string{"key1", "key2"})
msg.WithProperty("age", "10")
msg.WithProperty("color", "red")
res, err := p.SendSync(context.Background(), msg)
...
}
Message中一些重要的属性
属性 | 说明 | 备注 |
---|---|---|
KEYS | 可以用来建立索引,用于后期查询 | |
UNIQ_KEY | 在客户端生成的唯一主键 | |
SHARDING_KEY | 用于选择MessageQueue | |
TAGS | 用于过滤使用 | 注意:一个消息只能对应最多一个tag |
消费者代码示例1
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup2"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
)
// 订阅包含tagA或者tagB的消息
selector := consumer.MessageSelector{Type: consumer.TAG, Expression: "tagA || tagB"}
// 订阅所有tag
// selector = consumer.MessageSelector{Type: consumer.TAG, Expression: "*"}
err := c.Subscribe("test", selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
// 如果执行出错,想稍后再次执行
// 可以返回consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
}
更多使用规则,可以参考ExpressionType.java
1.2 SQL92
生产者代码示例见 生产者代码示例1
消费者代码示例2
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup2"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
)
selector = consumer.MessageSelector{Type: consumer.SQL92, Expression: "age < 20 AND color = 'red' "}
err := c.Subscribe("test", selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
}
2. client和broker对消息的过滤
让我们再回顾一下订阅消息和消费的过程

1)client
提交SubscriptionData
给Broker
,告知需要订阅什么样的数据
class SubscriptionData implements Comparable<SubscriptionData> {
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<String> tagsSet;
private Set<Integer> codeSet;
private long subVersion;
private String expressionType;
}
2) 根据consumerOffset从ConsumeQueue
中提取数据
提取的过程中,会做粗粒度的筛选
ExpressionMessageFilter.isMatchedByConsumeQueue()
3) 根据ConsumeQueue
中存储的Offset从CommitLog
中获取消息
提取的过程中,会做细粒度的过滤
ExpressionMessageFilter.isMatchedByCommitLog()
4)消息被发给Client
,对于tag类型,client
还会再做一次过滤
PullAPIWrapper.processPullResult()
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 消息的tag必须包含在订阅数据的tag集合中
// tagsSet的类型是Set<String>
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
...
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
显然应该尽量让第2步和第3步过滤的足够干净,这样可以避免无效数据在Broker
和Client
之间传输
2.1 针对Tag类型的订阅过滤
2.1.1 isMatchedByConsumeQueue
// 判断是否包含tag的hashCode
subscriptionData.getCodeSet().contains(tagsCode.intValue());
codeSet的计算可以参看
FilterAPI.buildSubscriptionData()
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
// 注意这里
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
2.1.2 isMatchedByCommitLog
当前版本的处理是直接返回true,萌叔认为这里可以
subscriptionData.getTagsSet().contains(msg.getTags());
再过滤一次,杜绝无效数据在Broker
和Client
之间传输的可能
2.2 针对SQL92类型的订阅过滤
2.2.1 isMatchedByConsumeQueue
SQL92的粗粒度过滤使用的是BloomFilter
,这个BloomFilter
存储在ConsumeQueueExt
文件中
需要修改Broker
配置文件
enableConsumeQueueExt = true
Consumequeue
文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}-
ConsumeQueueExt
文件的存储路径默认为$HOME/store/consumequeue_ext/{topic}/{queueId}/{fileName}
其格式为
这里的BitMap就是BloomFilter
,BitMap中存储的是
{consumerGroup} + "#" + {topic}
- 消息满足的每一个
subscriptionData
对应的consumerGroup
信息都会被写入BitMap - 如果消息满足SQL92,则BitMap对应的bit会被置为1。
这里相当于做了预处理,满足Consumer订阅要求的消息已经提前被打上了标记(BitMap), 等到Consumer实际取用数据的时候,需要检查的范围就大大缩小了。
CommitLogDispatcherCalcBitMap.dispatch()
public void dispatch(DispatchRequest request) {
if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
return;
}
try {
// 获取当前Topic下,所有的订阅请求信息
Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
...
Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
BitsArray filterBitMap = BitsArray.create(
this.consumerFilterManager.getBloomFilter().getM()
);
long startTime = System.currentTimeMillis();
while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();
...
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
// 执行判断(SQL已经提前编译过了,类似于的AST树)
ret = filterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
}
...
// eval true
// filterBitMap对应的位置为1
if (ret != null && ret instanceof Boolean && (Boolean) ret) {
consumerFilterManager.getBloomFilter().hashTo(
filterData.getBloomFilterData(),
filterBitMap
);
}
}
request.setBitMap(filterBitMap.bytes());
} catch (Throwable e) {
}
}
这里有一个问题,可能消息产生的时候,consumerGroup还没有发出订阅请求,所以BitMap中不包含任何有效信息,这种情况下只能认为这个消息是有效的。只能完全依赖细粒度阶段的过滤了
2.2.2 isMatchedByCommitLog
使用编译好的表达式,带入消息的属性值,判断是否满足条件。
编译代码都在 org/apache/rocketmq/filter/parser中

编译的过程类似于编译原理中构建语法树的过程。

Recommend
-
65
首先 长连接:一个连接上可以连续发送多个数据包,在连接期间,如果没有数据包发送,需要双方发链路检查包。 TCP/IP:TCP/IP属于传输层,主要解决数据在网络中的传输问题,只管传输数据。但是那样对传输的数据没有一个规范的封装、解析等处理,使得传输的数据就很...
-
28
-
40
我们知道 RocketMQ 是一款高性能、高可靠的分布式消息中间件,高性能和高可靠是很难兼得的。因为要保证高可靠,那么数据就必须持久化到磁盘上,将数据持久化到磁盘,那么可能就不能保证高性能了。 RocketMQ 在兼容这两方面做的...
-
8
上一篇文章讲了如何设计和实现高并发高性能的应用,从根本上说明了一些道理。且以rocketmq的mappedFile的实现作为一个突破点,讲解了rocketmq是如何具体实现高性能的。从中我们也知道,mappedFile只是其利用的操作系统的一个特性小点。
-
9
RocketMQ高可用设计之消息重试机制 原创 周杰伦本人 2022-03-11 12:36:30...
-
4
【RocketMQ】消息的刷盘机制 CommitLog的...
-
4
三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
-
7
亚马逊工作方法探秘:“用户至上”背后的机制 徐霄鹏 2023-03-21 0 评论...
-
8
RocketMQ 顺序消费机制 顺序消息是指对于一个指定...
-
10
探秘 Kafka 的内部机制原理 作者:码哥 2023-06-07 15:25:19 关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touc...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK