RocketMQ消息过滤分析

前言

简单介绍一下RocketMQ的消息过滤实现原理,参考丁威老师的《RocketMQ技术内幕》。

RocketMQ支持表达式过滤以及类过滤两种,表达式过滤又分为TAG和SQL92。

MessageFilter类主要有两个方法:

  • isMatchedByConsumeQueue(final Long tagsCode, final ConsumeQueueExt.CqExtUnit cqExtUnit);
  • isMatchedByCommitLog(final ByteBuffer msgBuffer, final Map<String, String> properties);

RocketMQ中消息过滤是在订阅的时候做的。下面抛出两个问题:

  1. 记得在看消息消费consumer端逻辑的时候,那里面有对tag的过滤,那个过滤和这个过滤有什么区别?

    1
    Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。
  2. 是否意味着消息消费broker端已经将过滤好的消息传给了consumer?

    是的,如果是基于tag过滤,那么broker会基于tag的hashCode进行过滤

Tag表达式过滤

Consumer端对过滤信息初步处理

创建Consumer,订阅topic以及消息表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("syn_consumer_1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("syn_message", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");

上面是创建DefaultMQPushConsumer的代码,与消息过滤有关的是subscrib()方法。

1
2
3
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}

1
2
3
4
5
6
7
8
9
10
11
12
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

以上代码所做的操作: 消费者订阅Topic以及消息过滤表达式,将订阅信息放入rebalance中。

SubscriptionData类结构
classFilterMode 是否为类过滤模式,默认为false
topic 消息主题的名字
subString 消息过滤表达式,如果有多个表达式,那么用双竖线分隔
tagsSet 消息过滤的tag集合,消费端过滤时使用的字段
codeSet 消息过滤的 tag的hashcode的集合
subVersion 版本号
expressionType 过滤类型: SQL92或者tag,默认为ExpressionType.TAG

在订阅的时候的注意点:

* 如果subString为null,或者为 * 那么默认该topic下面的所有消息都会被消费。
* 如果需要订阅多个Tag,那么多个Tag之间使用||分隔,多个tag将会存入tagsSet,tag.hashcode()存入codeSet。

DefaultMQPushConsumer.pullMessage(PullRequest pullRequest)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
......
......
......
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
//上边已经做过判空处理
if (sd != null) {
//判断是否每次拉取消息的时候都更新订阅信息。
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}

获取当前consumer所订阅的信息,如果订阅信息为空,直接返回。如果不为空,则判断postSubscriptionWhenPull是否为true, 该参数为是否每次拉取消息的时候都更新订阅信息。

1
2
3
4
5
6
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset 当前消息的消费进度
true, // suspend 表示消息拉取的时候支持挂起
subExpression != null, // subscription 是否更新订阅信息
classFilter // class filter
);

计算拉取消息的系统标识
之后经过一系列操作开始拉取消息。

Broker端进行消息过滤

入口: PullMessageProcessor.processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

Consumer对Topic过滤信息的创建

基于请求数据创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
}

如果配置了postSubscriptionWhenPull==true && 消息过滤不是类过滤模式

* 判断ExpressionType是否为tag ,如果是那么根据请求信息创建`SubscriptionData`。
* 否则的话根据请求信息创建 `ConsumerFilterData`
基于注册消息创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//根据consumerGroupName去查询消费者信息
ConsumerGroupInfo consumerGroupInfo =    this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
//如果消费者信息为空,返回false
if (null == consumerGroupInfo) {
log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
......
......
......
//从concumerGroup中拿到订阅信息
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
//如果该topic的订阅信息为空,那么返回false
if (null == subscriptionData) {
log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
//如果版本低于请求的版本,那么返回fasle
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
//如果 订阅的type不是Tag
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
//从consumerFilter中根据topic以及consumerGroupName查询消费过滤信息
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
//如果为空,返回异常。
if (consumerFilterData == null) {
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
}
//如果订阅信息中的客户端版本低于当前请求消息的版本,返回fasle
//TODO: 这是什么
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
}
}
1
2
3
4
5
6
7
//如果不是基于tag过滤,同时配置里面又配置了不能基于属性过滤,那么报错。
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}

从注册信息中查询该topic的订阅信息,并进行验证,详细过程见代码注释。

根据过滤信息创建过滤类

1
2
3
4
5
6
7
8
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}

根据配置项filterSupportRetry来决定是创建ExpressionForRetryMessageFilter还是ExpressionMessageFilter,前者支持对重试topic过滤,后者不支持。默认为false。

拉取消息

入口: DefaultMessageStore.getMessage()

根据consumeQueue进行消息过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//拿到消息的tagsCode
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
boolean extRet = false, isTagsCodeLegal = true;
//TODO: 暂时不明白什么意思
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}

TODO: 这里有点疑问,ConsumeQueueExt究竟是什么?
如果消息不匹配,直接略过该消息。

根据commitLog进行消息过滤
1
2
3
4
5
6
7
8
9
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}

根据consueQueue进行消息过滤完之后会根据commitlog进行消息过滤。consumeQueue过滤是基于tag的hashCode的。commitlog过滤的时候如果类型是Tag则直接返回true.

Consumer拿到过滤后的消息再次进行处理

经过上面的几部,broker已经根据consumer的订阅信息对消息进行拉取过滤,并返回给了consumer, 但是因为broker的tag过滤是基于hashCode的,所以并不是很准。在consumer拿到消息之后,会再次进行过滤。

1
2
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), 
pullResult, subscriptionData);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//解析消息
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}

这次过滤是从msg的properties中拿到tag的真实值。
到这里基于tag的消息过滤已经完成。

SQL92过滤

TODO:

类表达式过滤

TODO: