This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new bbb177a893 Support tracing for async producing, batch sync consuming,
and batch async consuming in rocketMQ-client-java-5.x-plugin (#665)
bbb177a893 is described below
commit bbb177a89336267ab13ada252346e26454a6dfc2
Author: Chen Ziyan <[email protected]>
AuthorDate: Sun Jan 21 10:29:39 2024 +0800
Support tracing for async producing, batch sync consuming, and batch async
consuming in rocketMQ-client-java-5.x-plugin (#665)
---
CHANGES.md | 1 +
.../client/java/v5/MessageListenerInterceptor.java | 4 +-
...eptor.java => MessageSendAsyncInterceptor.java} | 83 ++++---
.../client/java/v5/MessageSendInterceptor.java | 22 +-
.../java/v5/RocketMqClientJavaPluginConfig.java | 37 ++++
.../v5/SimpleConsumerImplAsyncInterceptor.java | 126 +++++++++++
...tor.java => SimpleConsumerImplInterceptor.java} | 71 +++---
.../define/ProducerImplAsyncInstrumentation.java | 68 ++++++
.../SimpleConsumerImplAsyncInstrumentation.java | 80 +++++++
.../define/SimpleConsumerImplInstrumentation.java | 80 +++++++
.../src/main/resources/skywalking-plugin.def | 3 +
apm-sniffer/config/agent.config | 4 +
.../service-agent/java-agent/configurations.md | 2 +
.../rocketmq-5-grpc-scenario/bin/startup.sh | 2 +-
.../config/expectedData.yaml | 152 +++++++++++--
.../client/java/controller/CaseController.java | 148 ++++---------
.../client/java/controller/MessageService.java | 240 +++++++++++++++++++++
.../client/java/controller/ProducerSingleton.java | 61 ++++++
.../rocketmq-5-grpc-scenario/support-version.list | 1 +
19 files changed, 1001 insertions(+), 184 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5225574210..90ece6d118 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
* Fix re-transform bug when plugin enhanced class proxy parent method.
* Fix error HTTP status codes not recording as SLA failures in Vert.x plugins.
* Support for HttpExchange request tracing
+* Support tracing for async producing, batch sync consuming, and batch async
consuming in rocketMQ-client-java-5.x-plugin.
#### Documentation
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
index 513da2581c..17c4a749b1 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -37,6 +38,7 @@ import java.lang.reflect.Method;
public class MessageListenerInterceptor implements
InstanceMethodsAroundInterceptor {
public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+ public static final StringTag MQ_MESSAGE_ID = new
StringTag("mq.message.id");
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
@@ -47,7 +49,7 @@ public class MessageListenerInterceptor implements
InstanceMethodsAroundIntercep
AbstractSpan span =
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX +
messageView.getTopic()
+ "/Consumer", contextCarrier);
Tags.MQ_TOPIC.set(span, messageView.getTopic());
-
+ span.tag(MQ_MESSAGE_ID, messageView.getMessageId().toString());
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
if (skyWalkingDynamicField != null) {
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos)
skyWalkingDynamicField;
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
similarity index 59%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
index 258fcfd62d..dcd4cc2874 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
@@ -18,15 +18,20 @@
package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
-import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -37,38 +42,42 @@ import
org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.util.StringUtil;
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
/**
- * {@link MessageSendInterceptor} create exit span when the method {@link
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message)}
- * and {@link
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message,
Transaction)} execute.
+ * {@link MessageSendAsyncInterceptor} create exit span when the method {@link
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#sendAsync(org.apache.rocketmq.client.apis.message.Message)}
+ * execute
*/
-public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor {
+public class MessageSendAsyncInterceptor implements
InstanceMethodsAroundInterceptor {
public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+ public static final StringTag MQ_MESSAGE_ID = new
StringTag("mq.message.id");
+ public static final StringTag MQ_MESSAGE_KEYS = new
StringTag("mq.message.keys");
+ public static final StringTag MQ_MESSAGE_TAGS = new
StringTag("mq.message.tags");
@Override
- public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ public void beforeMethod(EnhancedInstance objInst,
+ Method method,
+ Object[] allArguments,
+ Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
Message message = (Message) allArguments[0];
ClientImpl producerImpl = (ClientImpl) objInst;
ContextCarrier contextCarrier = new ContextCarrier();
String namingServiceAddress =
producerImpl.getClientConfiguration().getEndpoints();
- AbstractSpan span =
ContextManager.createExitSpan(buildOperationName(message.getTopic()),
contextCarrier, namingServiceAddress);
+ AbstractSpan span = ContextManager.createExitSpan(
+ buildOperationName(message.getTopic()), contextCarrier,
namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
- Collection<String> keys = message.getKeys();
- if (!CollectionUtil.isEmpty(keys)) {
- span.tag(Tags.ofKey("mq.message.keys"),
keys.stream().collect(Collectors.joining(",")));
+ if
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
+ Collection<String> keys = message.getKeys();
+ if (!CollectionUtil.isEmpty(keys)) {
+ span.tag(MQ_MESSAGE_KEYS, String.join(",", keys));
+ }
}
- Optional<String> tag = message.getTag();
- if (tag.isPresent()) {
- span.tag(Tags.ofKey("mq.message.tags"), tag.get());
+ if
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
+ Optional<String> tag = message.getTag();
+ tag.ifPresent(s -> span.tag(MQ_MESSAGE_TAGS, s));
}
contextCarrier.extensionInjector().injectSendingTimestamp();
@@ -99,23 +108,43 @@ public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor
if (message.getDeliveryTimestamp().isPresent()) {
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
}
- properties.entrySet().forEach(item ->
messageBuilder.addProperty(item.getKey(), item.getValue()));
+
+ properties.forEach(messageBuilder::addProperty);
allArguments[0] = messageBuilder.build();
}
@Override
- public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
- SendReceipt sendReceipt = (SendReceipt) ret;
- if (sendReceipt != null && sendReceipt.getMessageId() != null) {
- AbstractSpan activeSpan = ContextManager.activeSpan();
- activeSpan.tag(Tags.ofKey("mq.message.id"),
sendReceipt.getMessageId().toString());
- }
+ public Object afterMethod(EnhancedInstance objInst,
+ Method method,
+ Object[] allArguments,
+ Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ CompletableFuture<SendReceipt> future =
(CompletableFuture<SendReceipt>) ret;
+ AbstractSpan span = ContextManager.activeSpan();
+ span.prepareForAsync();
ContextManager.stopSpan();
- return ret;
+ return future.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
+ span.log(throwable);
+ span.errorOccurred();
+ span.asyncFinish();
+ return;
+ }
+ if (sendReceipt == null || sendReceipt.getMessageId() == null) {
+ span.asyncFinish();
+ return;
+ }
+ span.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
+ span.asyncFinish();
+ });
}
@Override
- public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ public void handleMethodException(EnhancedInstance objInst,
+ Method method,
+ Object[] allArguments,
+ Class<?>[] argumentsTypes,
+ Throwable t) {
ContextManager.activeSpan().log(t);
}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
index 258fcfd62d..e2e32bedea 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
@@ -27,6 +27,7 @@ import
org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -50,6 +51,9 @@ import java.util.stream.Collectors;
public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor {
public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+ public static final StringTag MQ_MESSAGE_ID = new
StringTag("mq.message.id");
+ public static final StringTag MQ_MESSAGE_KEYS = new
StringTag("mq.message.keys");
+ public static final StringTag MQ_MESSAGE_TAGS = new
StringTag("mq.message.tags");
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
@@ -62,13 +66,17 @@ public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
Tags.MQ_BROKER.set(span, namingServiceAddress);
Tags.MQ_TOPIC.set(span, message.getTopic());
- Collection<String> keys = message.getKeys();
- if (!CollectionUtil.isEmpty(keys)) {
- span.tag(Tags.ofKey("mq.message.keys"),
keys.stream().collect(Collectors.joining(",")));
+ if
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
+ Collection<String> keys = message.getKeys();
+ if (!CollectionUtil.isEmpty(keys)) {
+ span.tag(MQ_MESSAGE_KEYS,
keys.stream().collect(Collectors.joining(",")));
+ }
}
- Optional<String> tag = message.getTag();
- if (tag.isPresent()) {
- span.tag(Tags.ofKey("mq.message.tags"), tag.get());
+ if
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
+ Optional<String> tag = message.getTag();
+ if (tag.isPresent()) {
+ span.tag(MQ_MESSAGE_TAGS, tag.get());
+ }
}
contextCarrier.extensionInjector().injectSendingTimestamp();
@@ -108,7 +116,7 @@ public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor
SendReceipt sendReceipt = (SendReceipt) ret;
if (sendReceipt != null && sendReceipt.getMessageId() != null) {
AbstractSpan activeSpan = ContextManager.activeSpan();
- activeSpan.tag(Tags.ofKey("mq.message.id"),
sendReceipt.getMessageId().toString());
+ activeSpan.tag(MQ_MESSAGE_ID,
sendReceipt.getMessageId().toString());
}
ContextManager.stopSpan();
return ret;
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
new file mode 100644
index 0000000000..a37a0b533f
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
+
+public class RocketMqClientJavaPluginConfig {
+ public static class Plugin {
+ @PluginConfig(root = RocketMqClientJavaPluginConfig.class)
+ public static class Rocketmqclient {
+ /**
+ * This config item controls that whether the RocketMqClientJava
plugin should collect the keys of the message.
+ */
+ public static boolean COLLECT_MESSAGE_KEYS = false;
+ /**
+ * This config item controls that whether the RocketMqClientJava
plugin should collect the tags of the message.
+ */
+ public static boolean COLLECT_MESSAGE_TAGS = false;
+ }
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
new file mode 100644
index 0000000000..4851780c38
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
+
+/**
+ * {@link SimpleConsumerImplAsyncInterceptor} create local span when the
method {@link
+ *
org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl#receiveAsync(int,
Duration)} execute.
+ */
+public class SimpleConsumerImplAsyncInterceptor implements
InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor {
+ public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public final void beforeMethod(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
+
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ CompletableFuture<List<MessageView>> futureList =
(CompletableFuture<List<MessageView>>) ret;
+ ContextSnapshot capture = null;
+ if (ContextManager.isActive()) {
+ capture = ContextManager.capture();
+ }
+ final ContextSnapshot finalCapture = capture;
+ return futureList.whenCompleteAsync((messages, throwable) -> {
+ String topics =
messages.stream().map(MessageView::getTopic).distinct().collect(Collectors.joining(","));
+ AbstractSpan span = ContextManager.createEntrySpan(
+ CONSUMER_OPERATION_NAME_PREFIX + topics + "/Consumer", null);
+ if (finalCapture != null) {
+ ContextManager.continued(finalCapture);
+ }
+ if (null != throwable) {
+ span.log(throwable);
+ span.errorOccurred();
+ ContextManager.stopSpan();
+ return;
+ }
+ if (messages.isEmpty()) {
+ ContextManager.stopSpan();
+ return;
+ }
+ String namesrvAddr = "";
+ Object skyWalkingDynamicField =
objInst.getSkyWalkingDynamicField();
+ if (skyWalkingDynamicField != null) {
+ ConsumerEnhanceInfos consumerEnhanceInfos =
(ConsumerEnhanceInfos) objInst.getSkyWalkingDynamicField();
+ namesrvAddr = consumerEnhanceInfos.getNamesrvAddr();
+ }
+ SpanLayer.asMQ(span);
+ Tags.MQ_BROKER.set(span, namesrvAddr);
+ Tags.MQ_TOPIC.set(span, topics);
+ span.setPeer(namesrvAddr);
+ span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
+
+ for (MessageView messageView : messages) {
+ ContextCarrier contextCarrier =
getContextCarrierFromMessage(messageView);
+ ContextManager.extract(contextCarrier);
+ }
+ ContextManager.stopSpan();
+ });
+ }
+
+ @Override
+ public final void handleMethodException(EnhancedInstance objInst, Method
method, Object[] allArguments,
+ Class<?>[] argumentsTypes,
Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ @Override
+ public void onConstruct(final EnhancedInstance objInst, final Object[]
allArguments) throws Throwable {
+ ClientConfiguration clientConfiguration = (ClientConfiguration)
allArguments[0];
+ String namesrvAddr = clientConfiguration.getEndpoints();
+ ConsumerEnhanceInfos consumerEnhanceInfos = new
ConsumerEnhanceInfos(namesrvAddr);
+ objInst.setSkyWalkingDynamicField(consumerEnhanceInfos);
+ }
+
+ private ContextCarrier getContextCarrierFromMessage(MessageView message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(message.getProperties().get(next.getHeadKey()));
+ }
+
+ return contextCarrier;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
similarity index 55%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
index 513da2581c..59689ebe96 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
@@ -18,7 +18,10 @@
package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
@@ -27,55 +30,71 @@ import
org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import
org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
-import java.lang.reflect.Method;
-
-public class MessageListenerInterceptor implements
InstanceMethodsAroundInterceptor {
-
+/**
+ * {@link SimpleConsumerImplInterceptor} create local span when the method
{@link
org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl#receive(int,
+ * java.time.Duration)} execute.
+ */
+public class SimpleConsumerImplInterceptor implements
InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor {
public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
@Override
- public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
- MessageView messageView = (MessageView) allArguments[0];
+ public final void beforeMethod(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
- ContextCarrier contextCarrier =
getContextCarrierFromMessage(messageView);
-
- AbstractSpan span =
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX +
messageView.getTopic()
- + "/Consumer", contextCarrier);
- Tags.MQ_TOPIC.set(span, messageView.getTopic());
+ }
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ List<MessageView> list = (List<MessageView>) ret;
+ if (list.isEmpty()) {
+ return ret;
+ }
+ String topics =
list.stream().map(MessageView::getTopic).distinct().collect(Collectors.joining(","));
+ AbstractSpan span =
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + topics
+ + "/Consumer",
null);
+ SpanLayer.asMQ(span);
+ String namesrvAddr = "";
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
if (skyWalkingDynamicField != null) {
- ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos)
skyWalkingDynamicField;
- Tags.MQ_BROKER.set(span, consumerEnhanceInfos.getNamesrvAddr());
- span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+ ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos)
objInst.getSkyWalkingDynamicField();
+ namesrvAddr = consumerEnhanceInfos.getNamesrvAddr();
}
-
+ Tags.MQ_TOPIC.set(span, topics);
+ Tags.MQ_BROKER.set(span, namesrvAddr);
+ span.setPeer(namesrvAddr);
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
- SpanLayer.asMQ(span);
- }
- @Override
- public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
- ConsumeResult status = (ConsumeResult) ret;
- if (ConsumeResult.FAILURE.equals(status)) {
- AbstractSpan activeSpan = ContextManager.activeSpan();
- activeSpan.errorOccurred();
- Tags.MQ_STATUS.set(activeSpan, status.name());
+ for (MessageView messageView : list) {
+ ContextCarrier contextCarrier =
getContextCarrierFromMessage(messageView);
+ ContextManager.extract(contextCarrier);
}
+
ContextManager.stopSpan();
+
return ret;
}
@Override
- public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ public final void handleMethodException(EnhancedInstance objInst, Method
method, Object[] allArguments,
+ Class<?>[] argumentsTypes,
Throwable t) {
ContextManager.activeSpan().log(t);
}
+ @Override
+ public void onConstruct(final EnhancedInstance objInst, final Object[]
allArguments) throws Throwable {
+ ClientConfiguration clientConfiguration = (ClientConfiguration)
allArguments[0];
+ String namesrvAddr = clientConfiguration.getEndpoints();
+ ConsumerEnhanceInfos consumerEnhanceInfos = new
ConsumerEnhanceInfos(namesrvAddr);
+ objInst.setSkyWalkingDynamicField(consumerEnhanceInfos);
+ }
+
private ContextCarrier getContextCarrierFromMessage(MessageView message) {
ContextCarrier contextCarrier = new ContextCarrier();
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
new file mode 100644
index 0000000000..7492d3a3ec
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class ProducerImplAsyncInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.java.impl.producer.ProducerImpl";
+ private static final String ENHANCE_METHOD = "sendAsync";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.MessageSendAsyncInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named(ENHANCE_METHOD).and(takesArgumentWithType(0,
"org.apache.rocketmq.client.apis.message.Message"));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return true;
+ }
+ }
+ };
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
new file mode 100644
index 0000000000..9c65f388e6
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class SimpleConsumerImplAsyncInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl";
+ private static final String ENHANCE_METHOD = "receiveAsync";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.SimpleConsumerImplAsyncInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getConstructorMatcher() {
+ return takesArguments(4);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
new file mode 100644
index 0000000000..b461459607
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class SimpleConsumerImplInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl";
+ private static final String ENHANCE_METHOD = "receive";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.SimpleConsumerImplInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[]{
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getConstructorMatcher() {
+ return takesArguments(4);
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+ }
+ };
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
index ed2f03fde6..9c1da6962d 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -17,3 +17,6 @@
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.MessageListenerInstrumentation
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ProducerImplInstrumentation
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.PushConsumerImplInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.SimpleConsumerImplInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ProducerImplAsyncInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.SimpleConsumerImplAsyncInstrumentation
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index 6827c0a912..0a18b8873b 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -316,3 +316,7 @@
plugin.nettyhttp.collect_request_body=${SW_PLUGIN_NETTYHTTP_COLLECT_REQUEST_BODY
plugin.nettyhttp.filter_length_limit=${SW_PLUGIN_NETTYHTTP_FILTER_LENGTH_LIMIT:1024}
# When `HTTP_COLLECT_REQUEST_BODY` is enabled and content-type start with
HTTP_SUPPORTED_CONTENT_TYPES_PREFIX, collect the body of the request , multiple
paths should be separated by `,`
plugin.nettyhttp.supported_content_types_prefix=${SW_PLUGIN_NETTYHTTP_SUPPORTED_CONTENT_TYPES_PREFIX:application/json,text/}
+# If set to true, the keys of messages would be collected by the plugin for
RocketMQ Java client.
+plugin.rocketmqclient.collect_message_keys=${SW_PLUGIN_ROCKETMQCLIENT_COLLECT_MESSAGE_KEYS:false}
+# If set to true, the tags of messages would be collected by the plugin for
RocketMQ Java client.
+plugin.rocketmqclient.collect_message_tags=${SW_PLUGIN_ROCKETMQCLIENT_COLLECT_MESSAGE_TAGS:false}
diff --git a/docs/en/setup/service-agent/java-agent/configurations.md
b/docs/en/setup/service-agent/java-agent/configurations.md
index 718f08dc76..121c0c3e25 100644
--- a/docs/en/setup/service-agent/java-agent/configurations.md
+++ b/docs/en/setup/service-agent/java-agent/configurations.md
@@ -131,6 +131,8 @@ This is the properties list supported in
`agent/config/agent.config`.
| `plugin.nettyhttp.collect_request_body` | This
config item controls that whether the Netty-http plugin should collect the http
body of the request.
[...]
| `plugin.nettyhttp.filter_length_limit` | When
`COLLECT_REQUEST_BODY` is enabled, how many characters to keep and send to the
OAP backend, use negative values to keep and send the complete body.
[...]
| `plugin.nettyhttp.supported_content_types_prefix` | When
`COLLECT_REQUEST_BODY` is enabled and content-type start with
`HTTP_SUPPORTED_CONTENT_TYPES_PREFIX`, collect the body of the request ,
multiple paths should be separated by `,`
[...]
+| `plugin.rocketmqclient.collect_message_keys` | If set to
true, the keys of messages would be collected by the plugin for RocketMQ Java
client.
+| `plugin.rocketmqclient.collect_message_tags` | If set to
true, the tags of messages would be collected by the plugin for RocketMQ Java
client.
|
# Reset Collection/Map type configurations as empty collection.
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
index cfaffd692b..2ce7a89e81 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
@@ -18,4 +18,4 @@
home="$(cd "$(dirname $0)"; pwd)"
-java -Dendpoints=${ENDPOINTS} -DnameServer=${NAME_SERVER} -jar ${agent_opts}
${home}/../libs/rocketmq-5-grpc-scenario.jar &
+java -Dskywalking.plugin.rocketmqclient.collect_message_keys=true
-Dskywalking.plugin.rocketmqclient.collect_message_tags=true
-Dendpoints=${ENDPOINTS} -DnameServer=${NAME_SERVER} -jar ${agent_opts}
${home}/../libs/rocketmq-5-grpc-scenario.jar &
diff --git
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
index 3f228e2725..3c258b18c7 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
@@ -15,11 +15,65 @@
# limitations under the License.
segmentItems:
- serviceName: rocketmq-5-grpc-scenario
- segmentSize: ge 2
+ segmentSize: ge 4
segments:
- segmentId: not null
spans:
- - operationName: RocketMQ/TopicTest/Producer
+ - operationName: RocketMQ/ProducerAsyncTopicTest/Consumer
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 39
+ isError: false
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.topic, value: ProducerAsyncTopicTest }
+ - { key: mq.broker, value: not null }
+ - { key: transmission.latency, value: not null }
+ - { key: transmission.latency, value: not null }
+ refs:
+ - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario',
networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 3,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
rocketmq-5-grpc-scenario,
+ traceId: not null }
+ - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario',
networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 2,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
rocketmq-5-grpc-scenario,
+ traceId: not null }
+ - segmentId: not null
+ spans:
+ - operationName: RocketMQ/ConsumerAsyncTopicTest/Consumer
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 39
+ isError: false
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: ConsumerAsyncTopicTest }
+ - { key: transmission.latency, value: not null }
+ - { key: transmission.latency, value: not null }
+ refs:
+ - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario',
networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 4,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
rocketmq-5-grpc-scenario,
+ traceId: not null }
+ - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario',
networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 5,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
rocketmq-5-grpc-scenario,
+ traceId: not null }
+ - segmentId: not null
+ spans:
+ - operationName: RocketMQ/NormalTopicTest/Producer
parentSpanId: 0
spanId: 1
spanLayer: MQ
@@ -29,13 +83,81 @@ segmentItems:
isError: false
spanType: Exit
peer: not null
+ skipAnalysis: false
tags:
- { key: mq.broker, value: not null }
- - { key: mq.topic, value: TopicTest }
- - { key: mq.message.keys, value: KeyA }
- - { key: mq.message.tags, value: TagA }
+ - { key: mq.topic, value: NormalTopicTest }
+ - { key: mq.message.keys, value: not null }
+ - { key: mq.message.tags, value: not null }
+ - { key: mq.message.id, value: not null }
+ - operationName: RocketMQ/ProducerAsyncTopicTest/Producer
+ parentSpanId: 0
+ spanId: 2
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 38
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: ProducerAsyncTopicTest }
+ - { key: mq.message.keys, value: not null }
+ - { key: mq.message.tags, value: not null }
+ - { key: mq.message.id, value: not null }
+ - operationName: RocketMQ/ProducerAsyncTopicTest/Producer
+ parentSpanId: 0
+ spanId: 3
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 38
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: ProducerAsyncTopicTest }
+ - { key: mq.message.keys, value: not null }
+ - { key: mq.message.tags, value: not null }
+ - { key: mq.message.id, value: not null }
+ - operationName: RocketMQ/ConsumerAsyncTopicTest/Producer
+ parentSpanId: 0
+ spanId: 4
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 38
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: ConsumerAsyncTopicTest }
+ - { key: mq.message.keys, value: not null }
+ - { key: mq.message.tags, value: not null }
+ - { key: mq.message.id, value: not null }
+ - operationName: RocketMQ/ConsumerAsyncTopicTest/Producer
+ parentSpanId: 0
+ spanId: 5
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 38
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: ConsumerAsyncTopicTest }
+ - { key: mq.message.keys, value: not null }
+ - { key: mq.message.tags, value: not null }
- { key: mq.message.id, value: not null }
- skipAnalysis: 'false'
- operationName: GET:/case/rocketmq-5-grpc-scenario
parentSpanId: -1
spanId: 0
@@ -46,14 +168,14 @@ segmentItems:
isError: false
spanType: Entry
peer: ''
+ skipAnalysis: false
tags:
- { key: url, value:
'http://localhost:8080/rocketmq-5-grpc-scenario/case/rocketmq-5-grpc-scenario' }
- { key: http.method, value: GET }
- { key: http.status_code, value: '200' }
- skipAnalysis: 'false'
- segmentId: not null
spans:
- - operationName: RocketMQ/TopicTest/Consumer
+ - operationName: RocketMQ/NormalTopicTest/Consumer
parentSpanId: -1
spanId: 0
spanLayer: MQ
@@ -62,13 +184,15 @@ segmentItems:
componentId: 39
isError: false
spanType: Entry
- peer: not blank
+ peer: not null
+ skipAnalysis: false
tags:
- { key: transmission.latency, value: not null }
- - { key: mq.topic, value: TopicTest }
- - { key: mq.broker, value: not blank }
+ - { key: mq.topic, value: NormalTopicTest }
+ - { key: mq.message.id, value: not null }
+ - { key: mq.broker, value: not null }
refs:
- - { parentEndpoint: GET:/case/rocketmq-5-grpc-scenario,
networkAddress: not null,
+ - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario',
networkAddress: not null,
refType: CrossProcess, parentSpanId: 1,
parentTraceSegmentId: not null,
- parentServiceInstance: not null, parentService: not null,
traceId: not null }
- skipAnalysis: 'false'
+ parentServiceInstance: not null, parentService:
rocketmq-5-grpc-scenario,
+ traceId: not null }
\ No newline at end of file
diff --git
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
index ba0e43815d..5219bc8db9 100644
---
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
+++
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
@@ -18,97 +18,65 @@
package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.consumer.FilterExpression;
-import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-
@RestController
@RequestMapping("/case")
@Slf4j
public class CaseController {
private static final String SUCCESS = "Success";
+ private static final String NORMAL_TOPIC = "NormalTopicTest";
+ private static final String ASYNC_PRODUCER_TOPIC =
"ProducerAsyncTopicTest";
+ private static final String ASYNC_CONSUMER_TOPIC =
"ConsumerAsyncTopicTest";
+ private static final String TAG_NOMARL = "Tag:normal";
+ private static final String TAG_ASYNC_PRODUCER = "Tag:async:producer";
+ private static final String TAG_ASYNC_CONSUMER = "Tag:async:consumer";
+ private static final String GROUP = "group1";
@Value("${endpoints}")
private String endpoints;
- @Value("${nameServer}")
- private String nameServer;
-
- static final String TOPIC = "TopicTest";
- static final String TAG = "TagA";
- static final String GROUP = "group1";
-
- Producer producer;
-
- PushConsumer consumer;
+ @Autowired
+ private MessageService messageService;
@RequestMapping("/rocketmq-5-grpc-scenario")
@ResponseBody
public String testcase() {
try {
- ClientServiceProvider provider =
ClientServiceProvider.loadService();
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .enableSsl(false)
- .build();
- // start producer
- if (producer == null) {
- producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- .build();
- }
-
- // send msg
- Message message = provider.newMessageBuilder()
- // Set topic for the current message.
- .setTopic(TOPIC)
- // Message secondary classifier of message besides topic.
- .setTag(TAG)
- // Key(s) of the message, another way to mark message
besides message id.
- .setKeys("KeyA")
- .setBody("This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8))
- .build();
- SendReceipt sendReceipt = producer.send(message);
-
- // start consumer
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- FilterExpression filterExpression = new
FilterExpression(TAG, FilterExpressionType.TAG);
- if (consumer == null) {
- consumer = provider.newPushConsumerBuilder()
-
.setClientConfiguration(clientConfiguration)
- .setConsumerGroup(GROUP)
-
.setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression))
- .setMessageListener(new MyConsumer())
- .build();
- }
- } catch (Exception e) {
- log.error("consumer start error", e);
- }
- }
- });
- thread.start();
+ messageService.sendNormalMessage(NORMAL_TOPIC, TAG_NOMARL, GROUP);
+ Thread t1 = new Thread(() -> messageService.pushConsumes(
+ Collections.singletonList(NORMAL_TOPIC),
+ Collections.singletonList(TAG_NOMARL),
+ GROUP
+ ));
+ t1.start();
+ t1.join();
+
+ messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC,
TAG_ASYNC_PRODUCER, GROUP);
+ messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC,
TAG_ASYNC_PRODUCER, GROUP);
+ Thread t2 = new Thread(() ->
messageService.simpleConsumes(Collections.singletonList(ASYNC_PRODUCER_TOPIC),
+
Collections.singletonList(TAG_ASYNC_PRODUCER), GROUP,
+ 10, 10
+ ));
+ t2.start();
+ t2.join();
+
+ messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC,
TAG_ASYNC_CONSUMER, GROUP);
+ messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC,
TAG_ASYNC_CONSUMER, GROUP);
+ Thread t3 = new Thread(() ->
messageService.simpleConsumeAsync(ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER,
GROUP, 10,
+ 10
+ ));
+ t3.start();
+ t3.join();
} catch (Exception e) {
log.error("testcase error", e);
}
@@ -119,46 +87,10 @@ public class CaseController {
@ResponseBody
public String healthCheck() throws Exception {
System.setProperty(MixAll.ROCKETMQ_HOME_ENV,
this.getClass().getResource("/").getPath());
- String[] subArgs = new String[]{
- "updateTopic",
- "-n",
- nameServer,
- "-c",
- "DefaultCluster",
- "-t",
- "TopicTest"};
- MQAdminStartup.main(subArgs);
-
- subArgs = new String[]{
- "updateSubGroup",
- "-n",
- nameServer,
- "-c",
- "DefaultCluster",
- "-g",
- "group1"};
- MQAdminStartup.main(subArgs);
-
- ClientServiceProvider provider = ClientServiceProvider.loadService();
- ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
- .setEndpoints(endpoints)
- .enableSsl(false)
- .build();
- // start producer
- Producer producer = provider.newProducerBuilder()
- .setClientConfiguration(clientConfiguration)
- .build();
+ messageService.updateNormalTopic(NORMAL_TOPIC);
+ messageService.updateNormalTopic(ASYNC_PRODUCER_TOPIC);
+ messageService.updateNormalTopic(ASYNC_CONSUMER_TOPIC);
+ final Producer producer = ProducerSingleton.getInstance(endpoints,
NORMAL_TOPIC);
return SUCCESS;
}
-
- public static class MyConsumer implements MessageListener {
-
- @Override
- public ConsumeResult consume(MessageView messageView) {
- log.info("Consume message successfully,
messageId={},messageBody={}", messageView.getMessageId(),
- messageView.getBody().toString());
- return ConsumeResult.SUCCESS;
- }
- }
-
}
diff --git
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
new file mode 100644
index 0000000000..0c87f66981
--- /dev/null
+++
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class MessageService {
+ @Value("${endpoints}")
+ private String endpoints;
+
+ @Value("${nameServer}")
+ private String nameServer;
+
+ public void sendNormalMessage(String topic, String tag, String group)
throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ final Producer producer = ProducerSingleton.getInstance(endpoints,
topic);
+ byte[] body = "This is a normal message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ final Message message = provider.newMessageBuilder()
+ .setTopic(topic)
+ .setTag(tag)
+ .setKeys(UUID.randomUUID().toString())
+ .setBody(body)
+ .build();
+ try {
+ final SendReceipt sendReceipt = producer.send(message);
+ log.info("Send normal message successfully, messageId={}",
sendReceipt.getMessageId());
+ } catch (Throwable t) {
+ log.error("Failed to send message", t);
+ }
+ }
+
+ public void sendNormalMessageAsync(String topic, String tag, String group)
throws ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ final Producer producer = ProducerSingleton.getInstance(endpoints,
topic);
+ byte[] body = "This is a async message for Apache
RocketMQ".getBytes(StandardCharsets.UTF_8);
+ final Message message = provider.newMessageBuilder()
+ .setTopic(topic)
+ .setTag(tag)
+ .setKeys(UUID.randomUUID().toString())
+ .setBody(body)
+ .build();
+ try {
+ CompletableFuture<SendReceipt> future =
producer.sendAsync(message);
+ future.join();
+ log.info("Send async message successfully");
+ } catch (Throwable t) {
+ log.error("Failed to send message", t);
+ }
+ }
+
+ public void pushConsumes(List<String> topics, List<String> tags, String
group) {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+
.setEndpoints(endpoints)
+ .build();
+ try {
+ Map<String, FilterExpression> filterExpressionMap = new
HashMap<>();
+ for (int i = 0; i < topics.size(); i++) {
+ filterExpressionMap.put(
+ topics.get(i), new FilterExpression(tags.get(i),
FilterExpressionType.TAG));
+ }
+
+ PushConsumer consumer = provider.newPushConsumerBuilder()
+
.setClientConfiguration(clientConfiguration)
+
.setSubscriptionExpressions(filterExpressionMap)
+ .setConsumerGroup(group)
+ .setMessageListener(new
MyConsumer())
+ .build();
+ } catch (Exception e) {
+ log.error("consumer start error", e);
+ }
+ }
+
+ public void simpleConsumes(List<String> topics,
+ List<String> tags,
+ String group,
+ Integer maxMessageNum,
+ Integer duration) {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+
.setEndpoints(endpoints)
+ .build();
+
+ try {
+ Map<String, FilterExpression> filterExpressionMap = new
HashMap<>();
+ for (int i = 0; i < topics.size(); i++) {
+ FilterExpression filterExpression = new
FilterExpression(tags.get(i), FilterExpressionType.TAG);
+ filterExpressionMap.put(topics.get(i), filterExpression);
+ }
+
+ SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+
.setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(group)
+
.setAwaitDuration(Duration.ofSeconds(10))
+
.setSubscriptionExpressions(filterExpressionMap)
+ .build();
+
+ Duration invisibleDuration = Duration.ofSeconds(duration);
+ final List<MessageView> messages = consumer.receive(maxMessageNum,
invisibleDuration);
+ messages.forEach(messageView -> {
+ log.info("Received message: {}", messageView);
+ });
+ for (MessageView msg : messages) {
+ final MessageId messageId = msg.getMessageId();
+ try {
+ consumer.ack(msg);
+ log.info("Message is acknowledged successfully,
messageId={}", messageId);
+ } catch (Throwable t) {
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ }
+ }
+ } catch (Exception e) {
+ log.error("consumer start error", e);
+ }
+ }
+
+ public void simpleConsumeAsync(String topic,
+ String tag,
+ String group,
+ Integer maxMessageNum,
+ Integer duration) {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+
.setEndpoints(endpoints)
+ .build();
+ try {
+ Duration awaitDuration = Duration.ofSeconds(10);
+ FilterExpression filterExpression = new FilterExpression(tag,
FilterExpressionType.TAG);
+ SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+
.setClientConfiguration(clientConfiguration)
+ .setConsumerGroup(group)
+ .setAwaitDuration(awaitDuration)
+ .setSubscriptionExpressions(
+
Collections.singletonMap(topic, filterExpression))
+ .build();
+ Duration invisibleDuration = Duration.ofSeconds(duration);
+ ExecutorService receiveCallbackExecutor =
Executors.newCachedThreadPool();
+ ExecutorService ackCallbackExecutor =
Executors.newCachedThreadPool();
+ final CompletableFuture<List<MessageView>> future0 =
consumer.receiveAsync(
+ maxMessageNum,
+ invisibleDuration
+ );
+ future0.whenCompleteAsync((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote",
throwable);
+ return;
+ }
+ log.info("Received {} message(s)", messages.size());
+ final Map<MessageView, CompletableFuture<Void>> map =
+ messages.stream().collect(Collectors.toMap(message ->
message, consumer::ackAsync));
+ for (Map.Entry<MessageView, CompletableFuture<Void>> entry :
map.entrySet()) {
+ final MessageId messageId = entry.getKey().getMessageId();
+ final CompletableFuture<Void> future = entry.getValue();
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged,
messageId={}", messageId, t);
+ return;
+ }
+ log.info("Message is acknowledged successfully,
messageId={}", messageId);
+ }, ackCallbackExecutor);
+ }
+ }, receiveCallbackExecutor);
+ } catch (Exception e) {
+ log.error("consumer start error", e);
+ }
+ }
+
+ public void updateNormalTopic(String topic) {
+ String[] subArgs = new String[] {
+ "updateTopic",
+ "-n",
+ nameServer,
+ "-c",
+ "DefaultCluster",
+ "-t",
+ topic,
+ "-a",
+ "+message.type=NORMAL"
+ };
+ MQAdminStartup.main(subArgs);
+ }
+
+ public static class MyConsumer implements MessageListener {
+
+ @Override
+ public ConsumeResult consume(MessageView messageView) {
+ log.info("Consume message successfully,
messageId={},messageBody={}", messageView.getMessageId(),
+ messageView.getBody().toString()
+ );
+ return ConsumeResult.SUCCESS;
+ }
+ }
+
+}
diff --git
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
new file mode 100644
index 0000000000..276c08275a
--- /dev/null
+++
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+
+public class ProducerSingleton {
+ private static volatile Producer PRODUCER;
+
+ private ProducerSingleton() {
+ }
+
+ private static Producer buildProducer(TransactionChecker checker,
+ String endpoints,
+ String... topics) throws
ClientException {
+ final ClientServiceProvider provider =
ClientServiceProvider.loadService();
+ ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+
.setEndpoints(endpoints)
+
.enableSsl(false)
+ .build();
+ final ProducerBuilder builder = provider.newProducerBuilder()
+
.setClientConfiguration(clientConfiguration)
+ .setTopics(topics);
+ if (checker != null) {
+ builder.setTransactionChecker(checker);
+ }
+ return builder.build();
+ }
+
+ public static Producer getInstance(String endpoints, String... topics)
throws ClientException {
+ if (null == PRODUCER) {
+ synchronized (ProducerSingleton.class) {
+ if (null == PRODUCER) {
+ PRODUCER = buildProducer(null, endpoints, topics);
+ }
+ }
+ }
+ return PRODUCER;
+ }
+}
diff --git
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
index a9219d9010..4f8dd365dc 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
@@ -17,3 +17,4 @@
# lists your version here (Contains only the last version number of each minor
version.)
5.1.1
+5.1.4