14

RocketMQ Filter机制探秘

 1 year ago
source link: https://vearne.cc/archives/40031
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

本文基于rocketmq-all-4.8.0

1. client使用Filter

RocketMQ消费者订阅消息时,可以指定以某种表达式进行过滤,目前支持2种类型–TagSQL92

1.1 Tag

按照Tag进行过滤.
首先需要给消息打上tag,注意:1条消息只能包含最多1个tag
生产者代码示例1

package main

import (
    "context"
    "fmt"
    "os"
    "strconv"

    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "github.com/apache/rocketmq-client-go/v2/producer"
)

// Package main implements a simple producer to send message.
func main() {
    p, _ := rocketmq.NewProducer(
        producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.12.100:9876"})),
        producer.WithRetry(2),
    )
    err := p.Start()
    if err != nil {
        fmt.Printf("start producer error: %s", err.Error())
        os.Exit(1)
    }
    topic := "test"

    msg := &primitive.Message{
        Topic: topic,
        Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
    }
    // 打tag
    msg.WithTag("tagA")
    // broker会针对key,建立索引存储在indexFile中
    msg.WithKeys([]string{"key1", "key2"})
    msg.WithProperty("age", "10")
    msg.WithProperty("color", "red")
    res, err := p.SendSync(context.Background(), msg)
    ...
}

Message中一些重要的属性

属性 说明 备注
KEYS 可以用来建立索引,用于后期查询
UNIQ_KEY 在客户端生成的唯一主键
SHARDING_KEY 用于选择MessageQueue
TAGS 用于过滤使用 注意:一个消息只能对应最多一个tag

消费者代码示例1

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    // 订阅包含tagA或者tagB的消息
    selector := consumer.MessageSelector{Type: consumer.TAG, Expression: "tagA || tagB"}
    // 订阅所有tag
    // selector = consumer.MessageSelector{Type: consumer.TAG, Expression: "*"} 
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        // 如果执行出错,想稍后再次执行
        // 可以返回consumer.ConsumeRetryLater
        return consumer.ConsumeSuccess, nil
    })
}

更多使用规则,可以参考ExpressionType.java

1.2 SQL92

生产者代码示例见 生产者代码示例1

消费者代码示例2

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-client-go/v2"
    "github.com/apache/rocketmq-client-go/v2/consumer"
    "github.com/apache/rocketmq-client-go/v2/primitive"
    "os"
)

func main() {
    sig := make(chan os.Signal)
    c, _ := rocketmq.NewPushConsumer(
        consumer.WithGroupName("testGroup2"),
        consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.1.100:9876"})),
    )
    selector = consumer.MessageSelector{Type: consumer.SQL92, Expression: "age < 20 AND color = 'red' "}
    err := c.Subscribe("test", selector, func(ctx context.Context,
        msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range msgs {
            fmt.Printf("subscribe callback: %v \n", msgs[i])
        }
        return consumer.ConsumeSuccess, nil
    })
}

2. client和broker对消息的过滤

让我们再回顾一下订阅消息和消费的过程

b43fcb52-f482-11ed-bfec-5626e1cdcfe2.png

1)client提交SubscriptionDataBroker,告知需要订阅什么样的数据

class SubscriptionData implements Comparable<SubscriptionData> {
    private boolean classFilterMode = false;
    private String topic;
    private String subString;
    private Set<String> tagsSet;
    private Set<Integer> codeSet;
    private long subVersion;
    private String expressionType;
}

2) 根据consumerOffset从ConsumeQueue中提取数据

提取的过程中,会做粗粒度的筛选
ExpressionMessageFilter.isMatchedByConsumeQueue()

3) 根据ConsumeQueue中存储的Offset从CommitLog中获取消息

提取的过程中,会做细粒度的过滤
ExpressionMessageFilter.isMatchedByCommitLog()

4)消息被发给Client,对于tag类型,client 还会再做一次过滤

PullAPIWrapper.processPullResult()

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
    final SubscriptionData subscriptionData) {
    PullResultExt pullResultExt = (PullResultExt) pullResult;
    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
    if (PullStatus.FOUND == pullResult.getPullStatus()) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
        List<MessageExt> msgListFilterAgain = msgList;
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) {
                if (msg.getTags() != null) {
                    // 消息的tag必须包含在订阅数据的tag集合中
                    // tagsSet的类型是Set<String>
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                        msgListFilterAgain.add(msg);
                    }
                }
            }
        }
    ...
    }
    pullResultExt.setMessageBinary(null);
    return pullResult;
}

显然应该尽量让第2步和第3步过滤的足够干净,这样可以避免无效数据在BrokerClient之间传输

2.1 针对Tag类型的订阅过滤

2.1.1 isMatchedByConsumeQueue

// 判断是否包含tag的hashCode
subscriptionData.getCodeSet().contains(tagsCode.intValue());

codeSet的计算可以参看
FilterAPI.buildSubscriptionData()

    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
        String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            subscriptionData.getTagsSet().add(trimString);
                            // 注意这里
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }

2.1.2 isMatchedByCommitLog

当前版本的处理是直接返回true,萌叔认为这里可以

subscriptionData.getTagsSet().contains(msg.getTags());

再过滤一次,杜绝无效数据在BrokerClient之间传输的可能

2.2 针对SQL92类型的订阅过滤

2.2.1 isMatchedByConsumeQueue

SQL92的粗粒度过滤使用的是BloomFilter,这个BloomFilter存储在ConsumeQueueExt文件中
需要修改Broker配置文件

enableConsumeQueueExt = true
  • Consumequeue文件的存储路径默认为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
  • ConsumeQueueExt文件的存储路径默认为$HOME/store/consumequeue_ext/{topic}/{queueId}/{fileName}
    其格式为

    66051d12-f490-11ed-96c2-5626e1cdcfe2.png

这里的BitMap就是BloomFilter,BitMap中存储的是

{consumerGroup} + "#" + {topic}
  • 消息满足的每一个subscriptionData对应的consumerGroup信息都会被写入BitMap
  • 如果消息满足SQL92,则BitMap对应的bit会被置为1。

这里相当于做了预处理,满足Consumer订阅要求的消息已经提前被打上了标记(BitMap), 等到Consumer实际取用数据的时候,需要检查的范围就大大缩小了。

CommitLogDispatcherCalcBitMap.dispatch()

public void dispatch(DispatchRequest request) {
    if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
        return;
    }

    try {
        // 获取当前Topic下,所有的订阅请求信息
        Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
        ...
        Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
        BitsArray filterBitMap = BitsArray.create(
            this.consumerFilterManager.getBloomFilter().getM()
        );

        long startTime = System.currentTimeMillis();
        while (iterator.hasNext()) {
            ConsumerFilterData filterData = iterator.next();

            ...

            Object ret = null;
            try {
                MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
                // 执行判断(SQL已经提前编译过了,类似于的AST树)
                ret = filterData.getCompiledExpression().evaluate(context);
            } catch (Throwable e) {
                log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
            }

            ...
            // eval true
            // filterBitMap对应的位置为1
            if (ret != null && ret instanceof Boolean && (Boolean) ret) {
                consumerFilterManager.getBloomFilter().hashTo(
                    filterData.getBloomFilterData(), 
                    filterBitMap
                );
            }
        }
        request.setBitMap(filterBitMap.bytes());

    } catch (Throwable e) {

    }
}

这里有一个问题,可能消息产生的时候,consumerGroup还没有发出订阅请求,所以BitMap中不包含任何有效信息,这种情况下只能认为这个消息是有效的。只能完全依赖细粒度阶段的过滤了

2.2.2 isMatchedByCommitLog

使用编译好的表达式,带入消息的属性值,判断是否满足条件。
编译代码都在 org/apache/rocketmq/filter/parser

7afc4f82-f496-11ed-868a-5626e1cdcfe2.png

编译的过程类似于编译原理中构建语法树的过程。

1.深入了解 RocketMQ 之过滤器


微信公众号

Recommend

  • 65
    • 掘金 juejin.im 7 years ago
    • Cache

    WebSocket探秘

    首先 长连接:一个连接上可以连续发送多个数据包,在连接期间,如果没有数据包发送,需要双方发链路检查包。 TCP/IP:TCP/IP属于传输层,主要解决数据在网络中的传输问题,只管传输数据。但是那样对传输的数据没有一个规范的封装、解析等处理,使得传输的数据就很...

  • 28

  • 40
    • www.cnblogs.com 5 years ago
    • Cache

    探秘 RocketMQ 消息持久化机制

    我们知道 RocketMQ 是一款高性能、高可靠的分布式消息中间件,高性能和高可靠是很难兼得的。因为要保证高可靠,那么数据就必须持久化到磁盘上,将数据持久化到磁盘,那么可能就不能保证高性能了。 RocketMQ 在兼容这两方面做的...

  • 8

    上一篇文章讲了如何设计和实现高并发高性能的应用,从根本上说明了一些道理。且以rocketmq的mappedFile的实现作为一个突破点,讲解了rocketmq是如何具体实现高性能的。从中我们也知道,mappedFile只是其利用的操作系统的一个特性小点。

  • 9

    RocketMQ高可用设计之消息重试机制 原创 周杰伦本人 2022-03-11 12:36:30...

  • 4

    【RocketMQ】消息的刷盘机制 CommitLog的...

  • 4

    三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。

  • 7

    亚马逊工作方法探秘:“用户至上”背后的机制 徐霄鹏 2023-03-21 0 评论...

  • 8

    RocketMQ 顺序消费机制 顺序消息是指对于一个指定...

  • 10
    • www.51cto.com 1 year ago
    • Cache

    探秘 Kafka 的内部机制原理

    探秘 Kafka 的内部机制原理 作者:码哥 2023-06-07 15:25:19 关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touc...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK