10

RocketMQ Filter机制探秘

 8 months ago
source link: https://vearne.cc/archives/40031
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

本文基于rocketmq-all-4.8.0

1. client使用Filter

RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–TagSQL92

1.1 Tag

按照Tag进行过滤.
首先需要给消息打上tag,注意:1条消息只能包含最多1个tag
生产者代码示例1

package main

import (
    "context"
    "fmt"
    "os"
    "strconv"

    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
    p, _ := rocketmq.NewProducer(
        producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})),
        producer.WithRetry(2),
    )
    err := p.Start()
    if err != nil {
        fmt.Printf("start producer error: %s", err.Error())
        os.Exit(1)
    }
    topic := "test"

    msg := &primitive.Message{
        Topic: topic,
        Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
    }
    // 打tag
    msg.WithTag("tagA")
    // broker会针对key,建立索引存储在indexFile中
    msg.WithKeys([]string{"key1", "key2"})
    msg.WithProperty("age", "10")
    msg.WithProperty("color", "red")
    res, err := p.SendSync(context.Background(), msg)
    ...
}

Message中一些重要的属性

属性 说明 备注
KEYS 可以用来建立索引,用于后期查询
UNIQ_KEY 在客户端生成的唯一主键
SHARDING_KEY 用于选择MessageQueue
TAGS 用于过滤使用 注意:一个消息只能对应最多一个tag

消费者代码示例1

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    // 订阅包含tagA或者tagB的消息
    selector := consumer.MessageSelector{Type: consumer.TAG, Expression: "tagA || tagB"}
    // 订阅所有tag
    // selector = consumer.MessageSelector{Type: consumer.TAG, Expression: "*"} 
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        // 如果执行出错,想稍后再次执行
        // 可以返回consumer.ConsumeRetryLater
        return consumer.ConsumeSuccess, nil
    })
}

更多使用规则,可以参考ExpressionType.java

1.2 SQL92

生产者代码示例见 生产者代码示例1

消费者代码示例2

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    selector = consumer.MessageSelector{Type: consumer.SQL92, Expression: "age < 20 AND color = 'red' "}
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        return consumer.ConsumeSuccess, nil
    })
}

2. client和broker对消息的过滤

让我们再回顾一下订阅消息和消费的过程

b43fcb52-f482-11ed-bfec-5626e1cdcfe2.png

1)client提交SubscriptionDataBroker,告知需要订阅什么样的数据

class SubscriptionData implements Comparable<SubscriptionData> {
    private boolean classFilterMode = false;
    private String topic;
    private String subString;
    private Set<String> tagsSet;
    private Set<Integer> codeSet;
    private long subVersion;
    private String expressionType;
}

2) 根据consumerOffset从ConsumeQueue中提取数据

提取的过程中,会做粗粒度的筛选
ExpressionMessageFilter.isMatchedByConsumeQueue()

3) 根据ConsumeQueue中存储的Offset从CommitLog中获取消息

提取的过程中,会做细粒度的过滤
ExpressionMessageFilter.isMatchedByCommitLog()

4)消息被发给Client,对于tag类型,client 还会再做一次过滤

PullAPIWrapper.processPullResult()

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;
    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
        List<MessageExt> msgListFilterAgain = msgList;
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 消息的tag必须包含在订阅数据的tag集合中
                    // tagsSet的类型是Set<String>
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
    ...
    }
    pullResultExt.setMessageBinary(null);
    return pullResult;
}

显然应该尽量让第2步和第3步过滤的足够干净,这样可以避免无效数据在BrokerClient之间传输

2.1 针对Tag类型的订阅过滤

2.1.1 isMatchedByConsumeQueue

// 判断是否包含tag的hashCode
subscriptionData.getCodeSet().contains(tagsCode.intValue());

codeSet的计算可以参看
FilterAPI.buildSubscriptionData()

    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
        String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            subscriptionData.getTagsSet().add(trimString);
                            // 注意这里
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }

2.1.2 isMatchedByCommitLog

当前版本的处理是直接返回true,萌叔认为这里可以

subscriptionData.getTagsSet().contains(msg.getTags());

再过滤一次,杜绝无效数据在BrokerClient之间传输的可能

2.2 针对SQL92类型的订阅过滤

2.2.1 isMatchedByConsumeQueue

SQL92的粗粒度过滤使用的是BloomFilter,这个BloomFilter存储在ConsumeQueueExt文件中
需要修改Broker配置文件

enableConsumeQueueExt = true
  • Consumequeue文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
  • ConsumeQueueExt文件的存储路径默认为$HOME/store/consumequeue_ext/{topic}/{queueId}/{fileName}
    其格式为

    66051d12-f490-11ed-96c2-5626e1cdcfe2.png

这里的BitMap就是BloomFilter,BitMap中存储的是

{consumerGroup} + "#" + {topic}
  • 消息满足的每一个subscriptionData对应的consumerGroup信息都会被写入BitMap
  • 如果消息满足SQL92,则BitMap对应的bit会被置为1。

这里相当于做了预处理,满足Consumer订阅要求的消息已经提前被打上了标记(BitMap), 等到Consumer实际取用数据的时候,需要检查的范围就大大缩小了。

CommitLogDispatcherCalcBitMap.dispatch()

public void dispatch(DispatchRequest request) {
    if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
        return;
    }

    try {
        // 获取当前Topic下,所有的订阅请求信息
        Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
        ...
        Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
        BitsArray filterBitMap = BitsArray.create(
            this.consumerFilterManager.getBloomFilter().getM()
        );

        long startTime = System.currentTimeMillis();
        while (iterator.hasNext()) {
            ConsumerFilterData filterData = iterator.next();

            ...

            Object ret = null;
            try {
                MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
                // 执行判断(SQL已经提前编译过了,类似于的AST树)
                ret = filterData.getCompiledExpression().evaluate(context);
            } catch (Throwable e) {
                log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
            }

            ...
            // eval true
            // filterBitMap对应的位置为1
            if (ret != null && ret instanceof Boolean && (Boolean) ret) {
                consumerFilterManager.getBloomFilter().hashTo(
                    filterData.getBloomFilterData(), 
                    filterBitMap
                );
            }
        }
        request.setBitMap(filterBitMap.bytes());

    } catch (Throwable e) {

    }
}

这里有一个问题,可能消息产生的时候,consumerGroup还没有发出订阅请求,所以BitMap中不包含任何有效信息,这种情况下只能认为这个消息是有效的。只能完全依赖细粒度阶段的过滤了

2.2.2 isMatchedByCommitLog

使用编译好的表达式,带入消息的属性值,判断是否满足条件。
编译代码都在 org/apache/rocketmq/filter/parser

7afc4f82-f496-11ed-868a-5626e1cdcfe2.png

编译的过程类似于编译原理中构建语法树的过程。

1.深入了解 RocketMQ 之过滤器


微信公众号

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK