This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new bbb177a893 Support tracing for async producing, batch sync consuming, 
and batch async consuming in rocketMQ-client-java-5.x-plugin (#665)
bbb177a893 is described below

commit bbb177a89336267ab13ada252346e26454a6dfc2
Author: Chen Ziyan <[email protected]>
AuthorDate: Sun Jan 21 10:29:39 2024 +0800

    Support tracing for async producing, batch sync consuming, and batch async 
consuming in rocketMQ-client-java-5.x-plugin (#665)
---
 CHANGES.md                                         |   1 +
 .../client/java/v5/MessageListenerInterceptor.java |   4 +-
 ...eptor.java => MessageSendAsyncInterceptor.java} |  83 ++++---
 .../client/java/v5/MessageSendInterceptor.java     |  22 +-
 .../java/v5/RocketMqClientJavaPluginConfig.java    |  37 ++++
 .../v5/SimpleConsumerImplAsyncInterceptor.java     | 126 +++++++++++
 ...tor.java => SimpleConsumerImplInterceptor.java} |  71 +++---
 .../define/ProducerImplAsyncInstrumentation.java   |  68 ++++++
 .../SimpleConsumerImplAsyncInstrumentation.java    |  80 +++++++
 .../define/SimpleConsumerImplInstrumentation.java  |  80 +++++++
 .../src/main/resources/skywalking-plugin.def       |   3 +
 apm-sniffer/config/agent.config                    |   4 +
 .../service-agent/java-agent/configurations.md     |   2 +
 .../rocketmq-5-grpc-scenario/bin/startup.sh        |   2 +-
 .../config/expectedData.yaml                       | 152 +++++++++++--
 .../client/java/controller/CaseController.java     | 148 ++++---------
 .../client/java/controller/MessageService.java     | 240 +++++++++++++++++++++
 .../client/java/controller/ProducerSingleton.java  |  61 ++++++
 .../rocketmq-5-grpc-scenario/support-version.list  |   1 +
 19 files changed, 1001 insertions(+), 184 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5225574210..90ece6d118 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
 * Fix re-transform bug when plugin enhanced class proxy parent method.
 * Fix error HTTP status codes not recording as SLA failures in Vert.x plugins. 
 * Support for HttpExchange request tracing
+* Support tracing for async producing, batch sync consuming, and batch async 
consuming in rocketMQ-client-java-5.x-plugin.
 
 #### Documentation
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
index 513da2581c..17c4a749b1 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
 import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -37,6 +38,7 @@ import java.lang.reflect.Method;
 public class MessageListenerInterceptor implements 
InstanceMethodsAroundInterceptor {
 
     public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+    public static final StringTag MQ_MESSAGE_ID = new 
StringTag("mq.message.id");
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws 
Throwable {
@@ -47,7 +49,7 @@ public class MessageListenerInterceptor implements 
InstanceMethodsAroundIntercep
         AbstractSpan span = 
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + 
messageView.getTopic()
                 + "/Consumer", contextCarrier);
         Tags.MQ_TOPIC.set(span, messageView.getTopic());
-
+        span.tag(MQ_MESSAGE_ID, messageView.getMessageId().toString());
         Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
         if (skyWalkingDynamicField != null) {
             ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) 
skyWalkingDynamicField;
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
similarity index 59%
copy from 
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
copy to 
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
index 258fcfd62d..dcd4cc2874 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendAsyncInterceptor.java
@@ -18,15 +18,20 @@
 
 package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
 
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageBuilder;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
-import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
 import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -37,38 +42,42 @@ import 
org.apache.skywalking.apm.agent.core.util.CollectionUtil;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 import org.apache.skywalking.apm.util.StringUtil;
 
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
 /**
- * {@link MessageSendInterceptor} create exit span when the method {@link 
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message)}
- * and {@link 
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#send(Message, 
Transaction)} execute.
+ * {@link MessageSendAsyncInterceptor} create exit span when the method {@link 
org.apache.rocketmq.client.java.impl.producer.ProducerImpl#sendAsync(org.apache.rocketmq.client.apis.message.Message)}
+ * execute
  */
-public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor {
+public class MessageSendAsyncInterceptor implements 
InstanceMethodsAroundInterceptor {
 
     public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+    public static final StringTag MQ_MESSAGE_ID = new 
StringTag("mq.message.id");
+    public static final StringTag MQ_MESSAGE_KEYS = new 
StringTag("mq.message.keys");
+    public static final StringTag MQ_MESSAGE_TAGS = new 
StringTag("mq.message.tags");
 
     @Override
-    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws 
Throwable {
+    public void beforeMethod(EnhancedInstance objInst,
+                             Method method,
+                             Object[] allArguments,
+                             Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) throws Throwable {
         Message message = (Message) allArguments[0];
         ClientImpl producerImpl = (ClientImpl) objInst;
 
         ContextCarrier contextCarrier = new ContextCarrier();
         String namingServiceAddress = 
producerImpl.getClientConfiguration().getEndpoints();
-        AbstractSpan span = 
ContextManager.createExitSpan(buildOperationName(message.getTopic()), 
contextCarrier, namingServiceAddress);
+        AbstractSpan span = ContextManager.createExitSpan(
+            buildOperationName(message.getTopic()), contextCarrier, 
namingServiceAddress);
         span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
         Tags.MQ_BROKER.set(span, namingServiceAddress);
         Tags.MQ_TOPIC.set(span, message.getTopic());
-        Collection<String> keys = message.getKeys();
-        if (!CollectionUtil.isEmpty(keys)) {
-            span.tag(Tags.ofKey("mq.message.keys"), 
keys.stream().collect(Collectors.joining(",")));
+        if 
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
+            Collection<String> keys = message.getKeys();
+            if (!CollectionUtil.isEmpty(keys)) {
+                span.tag(MQ_MESSAGE_KEYS, String.join(",", keys));
+            }
         }
-        Optional<String> tag = message.getTag();
-        if (tag.isPresent()) {
-            span.tag(Tags.ofKey("mq.message.tags"), tag.get());
+        if 
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
+            Optional<String> tag = message.getTag();
+            tag.ifPresent(s -> span.tag(MQ_MESSAGE_TAGS, s));
         }
 
         contextCarrier.extensionInjector().injectSendingTimestamp();
@@ -99,23 +108,43 @@ public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor
         if (message.getDeliveryTimestamp().isPresent()) {
             
messageBuilder.setDeliveryTimestamp(message.getDeliveryTimestamp().get());
         }
-        properties.entrySet().forEach(item -> 
messageBuilder.addProperty(item.getKey(), item.getValue()));
+
+        properties.forEach(messageBuilder::addProperty);
         allArguments[0] = messageBuilder.build();
     }
 
     @Override
-    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
-        SendReceipt sendReceipt = (SendReceipt) ret;
-        if (sendReceipt != null && sendReceipt.getMessageId() != null) {
-            AbstractSpan activeSpan = ContextManager.activeSpan();
-            activeSpan.tag(Tags.ofKey("mq.message.id"), 
sendReceipt.getMessageId().toString());
-        }
+    public Object afterMethod(EnhancedInstance objInst,
+                              Method method,
+                              Object[] allArguments,
+                              Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        CompletableFuture<SendReceipt> future = 
(CompletableFuture<SendReceipt>) ret;
+        AbstractSpan span = ContextManager.activeSpan();
+        span.prepareForAsync();
         ContextManager.stopSpan();
-        return ret;
+        return future.whenCompleteAsync((sendReceipt, throwable) -> {
+            if (null != throwable) {
+                span.log(throwable);
+                span.errorOccurred();
+                span.asyncFinish();
+                return;
+            }
+            if (sendReceipt == null || sendReceipt.getMessageId() == null) {
+                span.asyncFinish();
+                return;
+            }
+            span.tag(MQ_MESSAGE_ID, sendReceipt.getMessageId().toString());
+            span.asyncFinish();
+        });
     }
 
     @Override
-    public void handleMethodException(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+    public void handleMethodException(EnhancedInstance objInst,
+                                      Method method,
+                                      Object[] allArguments,
+                                      Class<?>[] argumentsTypes,
+                                      Throwable t) {
         ContextManager.activeSpan().log(t);
     }
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
index 258fcfd62d..e2e32bedea 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageSendInterceptor.java
@@ -27,6 +27,7 @@ import 
org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
 import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -50,6 +51,9 @@ import java.util.stream.Collectors;
 public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor {
 
     public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+    public static final StringTag MQ_MESSAGE_ID = new 
StringTag("mq.message.id");
+    public static final StringTag MQ_MESSAGE_KEYS = new 
StringTag("mq.message.keys");
+    public static final StringTag MQ_MESSAGE_TAGS = new 
StringTag("mq.message.tags");
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws 
Throwable {
@@ -62,13 +66,17 @@ public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor
         span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
         Tags.MQ_BROKER.set(span, namingServiceAddress);
         Tags.MQ_TOPIC.set(span, message.getTopic());
-        Collection<String> keys = message.getKeys();
-        if (!CollectionUtil.isEmpty(keys)) {
-            span.tag(Tags.ofKey("mq.message.keys"), 
keys.stream().collect(Collectors.joining(",")));
+        if 
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_KEYS) {
+            Collection<String> keys = message.getKeys();
+            if (!CollectionUtil.isEmpty(keys)) {
+                span.tag(MQ_MESSAGE_KEYS, 
keys.stream().collect(Collectors.joining(",")));
+            }
         }
-        Optional<String> tag = message.getTag();
-        if (tag.isPresent()) {
-            span.tag(Tags.ofKey("mq.message.tags"), tag.get());
+        if 
(RocketMqClientJavaPluginConfig.Plugin.Rocketmqclient.COLLECT_MESSAGE_TAGS) {
+            Optional<String> tag = message.getTag();
+            if (tag.isPresent()) {
+                span.tag(MQ_MESSAGE_TAGS, tag.get());
+            }
         }
 
         contextCarrier.extensionInjector().injectSendingTimestamp();
@@ -108,7 +116,7 @@ public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor
         SendReceipt sendReceipt = (SendReceipt) ret;
         if (sendReceipt != null && sendReceipt.getMessageId() != null) {
             AbstractSpan activeSpan = ContextManager.activeSpan();
-            activeSpan.tag(Tags.ofKey("mq.message.id"), 
sendReceipt.getMessageId().toString());
+            activeSpan.tag(MQ_MESSAGE_ID, 
sendReceipt.getMessageId().toString());
         }
         ContextManager.stopSpan();
         return ret;
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
new file mode 100644
index 0000000000..a37a0b533f
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/RocketMqClientJavaPluginConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
+
+public class RocketMqClientJavaPluginConfig {
+    public static class Plugin {
+        @PluginConfig(root = RocketMqClientJavaPluginConfig.class)
+        public static class Rocketmqclient {
+            /**
+             * This config item controls that whether the RocketMqClientJava 
plugin should collect the keys of the message.
+             */
+            public static boolean COLLECT_MESSAGE_KEYS = false;
+            /**
+             * This config item controls that whether the RocketMqClientJava 
plugin should collect the tags of the message.
+             */
+            public static boolean COLLECT_MESSAGE_TAGS = false;
+        }
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
new file mode 100644
index 0000000000..4851780c38
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplAsyncInterceptor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
+
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import 
org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
+
+/**
+ * {@link SimpleConsumerImplAsyncInterceptor} create local span when the 
method {@link
+ * 
org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl#receiveAsync(int,
 Duration)} execute.
+ */
+public class SimpleConsumerImplAsyncInterceptor implements 
InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor {
+    public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+    @Override
+    public final void beforeMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments,
+                                   Class<?>[] argumentsTypes, 
MethodInterceptResult result) throws Throwable {
+
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        CompletableFuture<List<MessageView>> futureList = 
(CompletableFuture<List<MessageView>>) ret;
+        ContextSnapshot capture = null;
+        if (ContextManager.isActive()) {
+            capture = ContextManager.capture();
+        }
+        final ContextSnapshot finalCapture = capture;
+        return futureList.whenCompleteAsync((messages, throwable) -> {
+            String topics = 
messages.stream().map(MessageView::getTopic).distinct().collect(Collectors.joining(","));
+            AbstractSpan span = ContextManager.createEntrySpan(
+                CONSUMER_OPERATION_NAME_PREFIX + topics + "/Consumer", null);
+            if (finalCapture != null) {
+                ContextManager.continued(finalCapture);
+            }
+            if (null != throwable) {
+                span.log(throwable);
+                span.errorOccurred();
+                ContextManager.stopSpan();
+                return;
+            }
+            if (messages.isEmpty()) {
+                ContextManager.stopSpan();
+                return;
+            }
+            String namesrvAddr = "";
+            Object skyWalkingDynamicField = 
objInst.getSkyWalkingDynamicField();
+            if (skyWalkingDynamicField != null) {
+                ConsumerEnhanceInfos consumerEnhanceInfos = 
(ConsumerEnhanceInfos) objInst.getSkyWalkingDynamicField();
+                namesrvAddr = consumerEnhanceInfos.getNamesrvAddr();
+            }
+            SpanLayer.asMQ(span);
+            Tags.MQ_BROKER.set(span, namesrvAddr);
+            Tags.MQ_TOPIC.set(span, topics);
+            span.setPeer(namesrvAddr);
+            span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
+
+            for (MessageView messageView : messages) {
+                ContextCarrier contextCarrier = 
getContextCarrierFromMessage(messageView);
+                ContextManager.extract(contextCarrier);
+            }
+            ContextManager.stopSpan();
+        });
+    }
+
+    @Override
+    public final void handleMethodException(EnhancedInstance objInst, Method 
method, Object[] allArguments,
+                                            Class<?>[] argumentsTypes, 
Throwable t) {
+        ContextManager.activeSpan().log(t);
+    }
+
+    @Override
+    public void onConstruct(final EnhancedInstance objInst, final Object[] 
allArguments) throws Throwable {
+        ClientConfiguration clientConfiguration = (ClientConfiguration) 
allArguments[0];
+        String namesrvAddr = clientConfiguration.getEndpoints();
+        ConsumerEnhanceInfos consumerEnhanceInfos = new 
ConsumerEnhanceInfos(namesrvAddr);
+        objInst.setSkyWalkingDynamicField(consumerEnhanceInfos);
+    }
+
+    private ContextCarrier getContextCarrierFromMessage(MessageView message) {
+        ContextCarrier contextCarrier = new ContextCarrier();
+
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            next.setHeadValue(message.getProperties().get(next.getHeadKey()));
+        }
+
+        return contextCarrier;
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
similarity index 55%
copy from 
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
copy to 
apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
index 513da2581c..59689ebe96 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/MessageListenerInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/SimpleConsumerImplInterceptor.java
@@ -18,7 +18,10 @@
 
 package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5;
 
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
@@ -27,55 +30,71 @@ import 
org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 import 
org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ConsumerEnhanceInfos;
 
-import java.lang.reflect.Method;
-
-public class MessageListenerInterceptor implements 
InstanceMethodsAroundInterceptor {
-
+/**
+ * {@link SimpleConsumerImplInterceptor} create local span when the method 
{@link 
org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl#receive(int,
+ * java.time.Duration)} execute.
+ */
+public class SimpleConsumerImplInterceptor implements 
InstanceMethodsAroundInterceptor, InstanceConstructorInterceptor {
     public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
 
     @Override
-    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws 
Throwable {
-        MessageView messageView = (MessageView) allArguments[0];
+    public final void beforeMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments,
+                                   Class<?>[] argumentsTypes, 
MethodInterceptResult result) throws Throwable {
 
-        ContextCarrier contextCarrier = 
getContextCarrierFromMessage(messageView);
-
-        AbstractSpan span = 
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + 
messageView.getTopic()
-                + "/Consumer", contextCarrier);
-        Tags.MQ_TOPIC.set(span, messageView.getTopic());
+    }
 
+    @Override
+    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
+                              Object ret) throws Throwable {
+        List<MessageView> list = (List<MessageView>) ret;
+        if (list.isEmpty()) {
+            return ret;
+        }
+        String topics = 
list.stream().map(MessageView::getTopic).distinct().collect(Collectors.joining(","));
+        AbstractSpan span = 
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + topics
+                                                               + "/Consumer", 
null);
+        SpanLayer.asMQ(span);
+        String namesrvAddr = "";
         Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
         if (skyWalkingDynamicField != null) {
-            ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) 
skyWalkingDynamicField;
-            Tags.MQ_BROKER.set(span, consumerEnhanceInfos.getNamesrvAddr());
-            span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+            ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) 
objInst.getSkyWalkingDynamicField();
+            namesrvAddr = consumerEnhanceInfos.getNamesrvAddr();
         }
-
+        Tags.MQ_TOPIC.set(span, topics);
+        Tags.MQ_BROKER.set(span, namesrvAddr);
+        span.setPeer(namesrvAddr);
         span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
-        SpanLayer.asMQ(span);
-    }
 
-    @Override
-    public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
-        ConsumeResult status = (ConsumeResult) ret;
-        if (ConsumeResult.FAILURE.equals(status)) {
-            AbstractSpan activeSpan = ContextManager.activeSpan();
-            activeSpan.errorOccurred();
-            Tags.MQ_STATUS.set(activeSpan, status.name());
+        for (MessageView messageView : list) {
+            ContextCarrier contextCarrier = 
getContextCarrierFromMessage(messageView);
+            ContextManager.extract(contextCarrier);
         }
+
         ContextManager.stopSpan();
+
         return ret;
     }
 
     @Override
-    public void handleMethodException(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+    public final void handleMethodException(EnhancedInstance objInst, Method 
method, Object[] allArguments,
+                                            Class<?>[] argumentsTypes, 
Throwable t) {
         ContextManager.activeSpan().log(t);
     }
 
+    @Override
+    public void onConstruct(final EnhancedInstance objInst, final Object[] 
allArguments) throws Throwable {
+        ClientConfiguration clientConfiguration = (ClientConfiguration) 
allArguments[0];
+        String namesrvAddr = clientConfiguration.getEndpoints();
+        ConsumerEnhanceInfos consumerEnhanceInfos = new 
ConsumerEnhanceInfos(namesrvAddr);
+        objInst.setSkyWalkingDynamicField(consumerEnhanceInfos);
+    }
+
     private ContextCarrier getContextCarrierFromMessage(MessageView message) {
         ContextCarrier contextCarrier = new ContextCarrier();
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
new file mode 100644
index 0000000000..7492d3a3ec
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/ProducerImplAsyncInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class ProducerImplAsyncInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    private static final String ENHANCE_CLASS = 
"org.apache.rocketmq.client.java.impl.producer.ProducerImpl";
+    private static final String ENHANCE_METHOD = "sendAsync";
+    private static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.MessageSendAsyncInterceptor";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[0];
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
+        return new InstanceMethodsInterceptPoint[]{
+                new InstanceMethodsInterceptPoint() {
+                    @Override
+                    public ElementMatcher<MethodDescription> 
getMethodsMatcher() {
+                        return 
named(ENHANCE_METHOD).and(takesArgumentWithType(0, 
"org.apache.rocketmq.client.apis.message.Message"));
+                    }
+
+                    @Override
+                    public String getMethodsInterceptor() {
+                        return INTERCEPTOR_CLASS;
+                    }
+
+                    @Override
+                    public boolean isOverrideArgs() {
+                        return true;
+                    }
+                }
+        };
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
new file mode 100644
index 0000000000..9c65f388e6
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplAsyncInstrumentation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class SimpleConsumerImplAsyncInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    private static final String ENHANCE_CLASS = 
"org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl";
+    private static final String ENHANCE_METHOD = "receiveAsync";
+    private static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.SimpleConsumerImplAsyncInterceptor";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
+            new ConstructorInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> 
getConstructorMatcher() {
+                    return takesArguments(4);
+                }
+
+                @Override
+                public String getConstructorInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
new file mode 100644
index 0000000000..b461459607
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/client/java/v5/define/SimpleConsumerImplInstrumentation.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class SimpleConsumerImplInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+    private static final String ENHANCE_CLASS = 
"org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerImpl";
+    private static final String ENHANCE_METHOD = "receive";
+    private static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.SimpleConsumerImplInterceptor";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return new ConstructorInterceptPoint[]{
+            new ConstructorInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> 
getConstructorMatcher() {
+                    return takesArguments(4);
+                }
+
+                @Override
+                public String getConstructorInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+            }
+        };
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(ENHANCE_METHOD);
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INTERCEPTOR_CLASS;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return false;
+                }
+            }
+        };
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
index ed2f03fde6..9c1da6962d 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-client-java-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -17,3 +17,6 @@
 
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.MessageListenerInstrumentation
 
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ProducerImplInstrumentation
 
rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.PushConsumerImplInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.SimpleConsumerImplInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.ProducerImplAsyncInstrumentation
+rocketMQ-client-java-5.x=org.apache.skywalking.apm.plugin.rocketMQ.client.java.v5.define.SimpleConsumerImplAsyncInstrumentation
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index 6827c0a912..0a18b8873b 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -316,3 +316,7 @@ 
plugin.nettyhttp.collect_request_body=${SW_PLUGIN_NETTYHTTP_COLLECT_REQUEST_BODY
 
plugin.nettyhttp.filter_length_limit=${SW_PLUGIN_NETTYHTTP_FILTER_LENGTH_LIMIT:1024}
 #  When `HTTP_COLLECT_REQUEST_BODY` is enabled and content-type start with 
HTTP_SUPPORTED_CONTENT_TYPES_PREFIX, collect the body of the request , multiple 
paths should be separated by `,`
 
plugin.nettyhttp.supported_content_types_prefix=${SW_PLUGIN_NETTYHTTP_SUPPORTED_CONTENT_TYPES_PREFIX:application/json,text/}
+# If set to true, the keys of messages would be collected by the plugin for 
RocketMQ Java client.
+plugin.rocketmqclient.collect_message_keys=${SW_PLUGIN_ROCKETMQCLIENT_COLLECT_MESSAGE_KEYS:false}
+# If set to true, the tags of messages would be collected by the plugin for 
RocketMQ Java client.
+plugin.rocketmqclient.collect_message_tags=${SW_PLUGIN_ROCKETMQCLIENT_COLLECT_MESSAGE_TAGS:false}
diff --git a/docs/en/setup/service-agent/java-agent/configurations.md 
b/docs/en/setup/service-agent/java-agent/configurations.md
index 718f08dc76..121c0c3e25 100644
--- a/docs/en/setup/service-agent/java-agent/configurations.md
+++ b/docs/en/setup/service-agent/java-agent/configurations.md
@@ -131,6 +131,8 @@ This is the properties list supported in 
`agent/config/agent.config`.
 | `plugin.nettyhttp.collect_request_body`                             | This 
config item controls that whether the Netty-http plugin should collect the http 
body of the request.                                                            
                                                                                
                                                                                
                                                                                
                [...]
 | `plugin.nettyhttp.filter_length_limit`                              | When 
`COLLECT_REQUEST_BODY` is enabled, how many characters to keep and send to the 
OAP backend, use negative values to keep and send the complete body.            
                                                                                
                                                                                
                                                                                
                 [...]
 | `plugin.nettyhttp.supported_content_types_prefix`                   | When 
`COLLECT_REQUEST_BODY` is enabled and content-type start with 
`HTTP_SUPPORTED_CONTENT_TYPES_PREFIX`, collect the body of the request , 
multiple paths should be separated by `,`                                       
                                                                                
                                                                                
                                         [...]
+| `plugin.rocketmqclient.collect_message_keys`                    | If set to 
true, the keys of messages would be collected by the plugin for RocketMQ Java 
client.
+| `plugin.rocketmqclient.collect_message_tags`                    | If set to 
true, the tags of messages would be collected by the plugin for RocketMQ Java 
client.
 |
 
 # Reset Collection/Map type configurations as empty collection.
diff --git a/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
index cfaffd692b..2ce7a89e81 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/bin/startup.sh
@@ -18,4 +18,4 @@
 
 home="$(cd "$(dirname $0)"; pwd)"
 
-java -Dendpoints=${ENDPOINTS} -DnameServer=${NAME_SERVER} -jar ${agent_opts} 
${home}/../libs/rocketmq-5-grpc-scenario.jar &
+java -Dskywalking.plugin.rocketmqclient.collect_message_keys=true 
-Dskywalking.plugin.rocketmqclient.collect_message_tags=true 
-Dendpoints=${ENDPOINTS} -DnameServer=${NAME_SERVER} -jar ${agent_opts} 
${home}/../libs/rocketmq-5-grpc-scenario.jar &
diff --git 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
index 3f228e2725..3c258b18c7 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/config/expectedData.yaml
@@ -15,11 +15,65 @@
 # limitations under the License.
 segmentItems:
   - serviceName: rocketmq-5-grpc-scenario
-    segmentSize: ge 2
+    segmentSize: ge 4
     segments:
       - segmentId: not null
         spans:
-          - operationName: RocketMQ/TopicTest/Producer
+          - operationName: RocketMQ/ProducerAsyncTopicTest/Consumer
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 39
+            isError: false
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.topic, value: ProducerAsyncTopicTest }
+              - { key: mq.broker, value: not null }
+              - { key: transmission.latency, value: not null }
+              - { key: transmission.latency, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario', 
networkAddress: not null,
+                  refType: CrossProcess, parentSpanId: 3, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: 
rocketmq-5-grpc-scenario,
+                  traceId: not null }
+              - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario', 
networkAddress: not null,
+                  refType: CrossProcess, parentSpanId: 2, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: 
rocketmq-5-grpc-scenario,
+                  traceId: not null }
+      - segmentId: not null
+        spans:
+          - operationName: RocketMQ/ConsumerAsyncTopicTest/Consumer
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 39
+            isError: false
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: ConsumerAsyncTopicTest }
+              - { key: transmission.latency, value: not null }
+              - { key: transmission.latency, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario', 
networkAddress: not null,
+                  refType: CrossProcess, parentSpanId: 4, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: 
rocketmq-5-grpc-scenario,
+                  traceId: not null }
+              - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario', 
networkAddress: not null,
+                  refType: CrossProcess, parentSpanId: 5, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: 
rocketmq-5-grpc-scenario,
+                  traceId: not null }
+      - segmentId: not null
+        spans:
+          - operationName: RocketMQ/NormalTopicTest/Producer
             parentSpanId: 0
             spanId: 1
             spanLayer: MQ
@@ -29,13 +83,81 @@ segmentItems:
             isError: false
             spanType: Exit
             peer: not null
+            skipAnalysis: false
             tags:
               - { key: mq.broker, value: not null }
-              - { key: mq.topic, value: TopicTest }
-              - { key: mq.message.keys, value: KeyA }
-              - { key: mq.message.tags, value: TagA }
+              - { key: mq.topic, value: NormalTopicTest }
+              - { key: mq.message.keys, value: not null }
+              - { key: mq.message.tags, value: not null }
+              - { key: mq.message.id, value: not null }
+          - operationName: RocketMQ/ProducerAsyncTopicTest/Producer
+            parentSpanId: 0
+            spanId: 2
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 38
+            isError: false
+            spanType: Exit
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: ProducerAsyncTopicTest }
+              - { key: mq.message.keys, value: not null }
+              - { key: mq.message.tags, value: not null }
+              - { key: mq.message.id, value: not null }
+          - operationName: RocketMQ/ProducerAsyncTopicTest/Producer
+            parentSpanId: 0
+            spanId: 3
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 38
+            isError: false
+            spanType: Exit
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: ProducerAsyncTopicTest }
+              - { key: mq.message.keys, value: not null }
+              - { key: mq.message.tags, value: not null }
+              - { key: mq.message.id, value: not null }
+          - operationName: RocketMQ/ConsumerAsyncTopicTest/Producer
+            parentSpanId: 0
+            spanId: 4
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 38
+            isError: false
+            spanType: Exit
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: ConsumerAsyncTopicTest }
+              - { key: mq.message.keys, value: not null }
+              - { key: mq.message.tags, value: not null }
+              - { key: mq.message.id, value: not null }
+          - operationName: RocketMQ/ConsumerAsyncTopicTest/Producer
+            parentSpanId: 0
+            spanId: 5
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 38
+            isError: false
+            spanType: Exit
+            peer: not null
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: ConsumerAsyncTopicTest }
+              - { key: mq.message.keys, value: not null }
+              - { key: mq.message.tags, value: not null }
               - { key: mq.message.id, value: not null }
-            skipAnalysis: 'false'
           - operationName: GET:/case/rocketmq-5-grpc-scenario
             parentSpanId: -1
             spanId: 0
@@ -46,14 +168,14 @@ segmentItems:
             isError: false
             spanType: Entry
             peer: ''
+            skipAnalysis: false
             tags:
               - { key: url, value: 
'http://localhost:8080/rocketmq-5-grpc-scenario/case/rocketmq-5-grpc-scenario' }
               - { key: http.method, value: GET }
               - { key: http.status_code, value: '200' }
-            skipAnalysis: 'false'
       - segmentId: not null
         spans:
-          - operationName: RocketMQ/TopicTest/Consumer
+          - operationName: RocketMQ/NormalTopicTest/Consumer
             parentSpanId: -1
             spanId: 0
             spanLayer: MQ
@@ -62,13 +184,15 @@ segmentItems:
             componentId: 39
             isError: false
             spanType: Entry
-            peer: not blank
+            peer: not null
+            skipAnalysis: false
             tags:
               - { key: transmission.latency, value: not null }
-              - { key: mq.topic, value: TopicTest }
-              - { key: mq.broker, value: not blank }
+              - { key: mq.topic, value: NormalTopicTest }
+              - { key: mq.message.id, value: not null }
+              - { key: mq.broker, value: not null }
             refs:
-              - { parentEndpoint: GET:/case/rocketmq-5-grpc-scenario, 
networkAddress: not null,
+              - { parentEndpoint: 'GET:/case/rocketmq-5-grpc-scenario', 
networkAddress: not null,
                   refType: CrossProcess, parentSpanId: 1, 
parentTraceSegmentId: not null,
-                  parentServiceInstance: not null, parentService: not null, 
traceId: not null }
-            skipAnalysis: 'false'
+                  parentServiceInstance: not null, parentService: 
rocketmq-5-grpc-scenario,
+                  traceId: not null }
\ No newline at end of file
diff --git 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
index ba0e43815d..5219bc8db9 100644
--- 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
+++ 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java
@@ -18,97 +18,65 @@
 
 package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
 
+import java.util.Collections;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.client.apis.ClientConfiguration;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.consumer.FilterExpression;
-import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.PushConsumer;
-import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.apis.producer.Producer;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-
 @RestController
 @RequestMapping("/case")
 @Slf4j
 public class CaseController {
 
     private static final String SUCCESS = "Success";
+    private static final String NORMAL_TOPIC = "NormalTopicTest";
+    private static final String ASYNC_PRODUCER_TOPIC = 
"ProducerAsyncTopicTest";
+    private static final String ASYNC_CONSUMER_TOPIC = 
"ConsumerAsyncTopicTest";
+    private static final String TAG_NOMARL = "Tag:normal";
+    private static final String TAG_ASYNC_PRODUCER = "Tag:async:producer";
+    private static final String TAG_ASYNC_CONSUMER = "Tag:async:consumer";
+    private static final String GROUP = "group1";
 
     @Value("${endpoints}")
     private String endpoints;
 
-    @Value("${nameServer}")
-    private String nameServer;
-
-    static final String TOPIC = "TopicTest";
-    static final String TAG = "TagA";
-    static final String GROUP = "group1";
-
-    Producer producer;
-
-    PushConsumer consumer;
+    @Autowired
+    private MessageService messageService;
 
     @RequestMapping("/rocketmq-5-grpc-scenario")
     @ResponseBody
     public String testcase() {
         try {
-            ClientServiceProvider provider = 
ClientServiceProvider.loadService();
-            ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-                    .setEndpoints(endpoints)
-                    .enableSsl(false)
-                    .build();
-            // start producer
-            if (producer == null) {
-                producer = provider.newProducerBuilder()
-                        .setClientConfiguration(clientConfiguration)
-                        .build();
-            }
-
-            // send msg
-            Message message = provider.newMessageBuilder()
-                    // Set topic for the current message.
-                    .setTopic(TOPIC)
-                    // Message secondary classifier of message besides topic.
-                    .setTag(TAG)
-                    // Key(s) of the message, another way to mark message 
besides message id.
-                    .setKeys("KeyA")
-                    .setBody("This is a normal message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8))
-                    .build();
-            SendReceipt sendReceipt = producer.send(message);
-
-            // start consumer
-            Thread thread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        FilterExpression filterExpression = new 
FilterExpression(TAG, FilterExpressionType.TAG);
-                        if (consumer == null) {
-                            consumer = provider.newPushConsumerBuilder()
-                                    
.setClientConfiguration(clientConfiguration)
-                                    .setConsumerGroup(GROUP)
-                                    
.setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression))
-                                    .setMessageListener(new MyConsumer())
-                                    .build();
-                        }
-                    } catch (Exception e) {
-                        log.error("consumer start error", e);
-                    }
-                }
-            });
-            thread.start();
+            messageService.sendNormalMessage(NORMAL_TOPIC, TAG_NOMARL, GROUP);
+            Thread t1 = new Thread(() -> messageService.pushConsumes(
+                Collections.singletonList(NORMAL_TOPIC),
+                Collections.singletonList(TAG_NOMARL),
+                GROUP
+            ));
+            t1.start();
+            t1.join();
+
+            messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC, 
TAG_ASYNC_PRODUCER, GROUP);
+            messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC, 
TAG_ASYNC_PRODUCER, GROUP);
+            Thread t2 = new Thread(() -> 
messageService.simpleConsumes(Collections.singletonList(ASYNC_PRODUCER_TOPIC),
+                                                           
Collections.singletonList(TAG_ASYNC_PRODUCER), GROUP,
+                                                           10, 10
+            ));
+            t2.start();
+            t2.join();
+
+            messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC, 
TAG_ASYNC_CONSUMER, GROUP);
+            messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC, 
TAG_ASYNC_CONSUMER, GROUP);
+            Thread t3 = new Thread(() -> 
messageService.simpleConsumeAsync(ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER, 
GROUP, 10,
+                                                               10
+            ));
+            t3.start();
+            t3.join();
         } catch (Exception e) {
             log.error("testcase error", e);
         }
@@ -119,46 +87,10 @@ public class CaseController {
     @ResponseBody
     public String healthCheck() throws Exception {
         System.setProperty(MixAll.ROCKETMQ_HOME_ENV, 
this.getClass().getResource("/").getPath());
-        String[] subArgs = new String[]{
-                "updateTopic",
-                "-n",
-                nameServer,
-                "-c",
-                "DefaultCluster",
-                "-t",
-                "TopicTest"};
-        MQAdminStartup.main(subArgs);
-
-        subArgs = new String[]{
-                "updateSubGroup",
-                "-n",
-                nameServer,
-                "-c",
-                "DefaultCluster",
-                "-g",
-                "group1"};
-        MQAdminStartup.main(subArgs);
-
-        ClientServiceProvider provider = ClientServiceProvider.loadService();
-        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
-                .setEndpoints(endpoints)
-                .enableSsl(false)
-                .build();
-        // start producer
-        Producer producer = provider.newProducerBuilder()
-                .setClientConfiguration(clientConfiguration)
-                .build();
+        messageService.updateNormalTopic(NORMAL_TOPIC);
+        messageService.updateNormalTopic(ASYNC_PRODUCER_TOPIC);
+        messageService.updateNormalTopic(ASYNC_CONSUMER_TOPIC);
+        final Producer producer = ProducerSingleton.getInstance(endpoints, 
NORMAL_TOPIC);
         return SUCCESS;
     }
-
-    public static class MyConsumer implements MessageListener {
-
-        @Override
-        public ConsumeResult consume(MessageView messageView) {
-            log.info("Consume message successfully, 
messageId={},messageBody={}", messageView.getMessageId(),
-                    messageView.getBody().toString());
-            return ConsumeResult.SUCCESS;
-        }
-    }
-
 }
diff --git 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
new file mode 100644
index 0000000000..0c87f66981
--- /dev/null
+++ 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.MessageListener;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.tools.command.MQAdminStartup;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class MessageService {
+    @Value("${endpoints}")
+    private String endpoints;
+
+    @Value("${nameServer}")
+    private String nameServer;
+
+    public void sendNormalMessage(String topic, String tag, String group) 
throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        final Producer producer = ProducerSingleton.getInstance(endpoints, 
topic);
+        byte[] body = "This is a normal message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
+        final Message message = provider.newMessageBuilder()
+                                        .setTopic(topic)
+                                        .setTag(tag)
+                                        .setKeys(UUID.randomUUID().toString())
+                                        .setBody(body)
+                                        .build();
+        try {
+            final SendReceipt sendReceipt = producer.send(message);
+            log.info("Send normal message successfully, messageId={}", 
sendReceipt.getMessageId());
+        } catch (Throwable t) {
+            log.error("Failed to send message", t);
+        }
+    }
+
+    public void sendNormalMessageAsync(String topic, String tag, String group) 
throws ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        final Producer producer = ProducerSingleton.getInstance(endpoints, 
topic);
+        byte[] body = "This is a async message for Apache 
RocketMQ".getBytes(StandardCharsets.UTF_8);
+        final Message message = provider.newMessageBuilder()
+                                        .setTopic(topic)
+                                        .setTag(tag)
+                                        .setKeys(UUID.randomUUID().toString())
+                                        .setBody(body)
+                                        .build();
+        try {
+            CompletableFuture<SendReceipt> future = 
producer.sendAsync(message);
+            future.join();
+            log.info("Send async message successfully");
+        } catch (Throwable t) {
+            log.error("Failed to send message", t);
+        }
+    }
+
+    public void pushConsumes(List<String> topics, List<String> tags, String 
group) {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+                                                                     
.setEndpoints(endpoints)
+                                                                     .build();
+        try {
+            Map<String, FilterExpression> filterExpressionMap = new 
HashMap<>();
+            for (int i = 0; i < topics.size(); i++) {
+                filterExpressionMap.put(
+                    topics.get(i), new FilterExpression(tags.get(i), 
FilterExpressionType.TAG));
+            }
+
+            PushConsumer consumer = provider.newPushConsumerBuilder()
+                                            
.setClientConfiguration(clientConfiguration)
+                                            
.setSubscriptionExpressions(filterExpressionMap)
+                                            .setConsumerGroup(group)
+                                            .setMessageListener(new 
MyConsumer())
+                                            .build();
+        } catch (Exception e) {
+            log.error("consumer start error", e);
+        }
+    }
+
+    public void simpleConsumes(List<String> topics,
+                               List<String> tags,
+                               String group,
+                               Integer maxMessageNum,
+                               Integer duration) {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+                                                                     
.setEndpoints(endpoints)
+                                                                     .build();
+
+        try {
+            Map<String, FilterExpression> filterExpressionMap = new 
HashMap<>();
+            for (int i = 0; i < topics.size(); i++) {
+                FilterExpression filterExpression = new 
FilterExpression(tags.get(i), FilterExpressionType.TAG);
+                filterExpressionMap.put(topics.get(i), filterExpression);
+            }
+
+            SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+                                              
.setClientConfiguration(clientConfiguration)
+                                              .setConsumerGroup(group)
+                                              
.setAwaitDuration(Duration.ofSeconds(10))
+                                              
.setSubscriptionExpressions(filterExpressionMap)
+                                              .build();
+
+            Duration invisibleDuration = Duration.ofSeconds(duration);
+            final List<MessageView> messages = consumer.receive(maxMessageNum, 
invisibleDuration);
+            messages.forEach(messageView -> {
+                log.info("Received message: {}", messageView);
+            });
+            for (MessageView msg : messages) {
+                final MessageId messageId = msg.getMessageId();
+                try {
+                    consumer.ack(msg);
+                    log.info("Message is acknowledged successfully, 
messageId={}", messageId);
+                } catch (Throwable t) {
+                    log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
+                }
+            }
+        } catch (Exception e) {
+            log.error("consumer start error", e);
+        }
+    }
+
+    public void simpleConsumeAsync(String topic,
+                                   String tag,
+                                   String group,
+                                   Integer maxMessageNum,
+                                   Integer duration) {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+                                                                     
.setEndpoints(endpoints)
+                                                                     .build();
+        try {
+            Duration awaitDuration = Duration.ofSeconds(10);
+            FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
+            SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
+                                              
.setClientConfiguration(clientConfiguration)
+                                              .setConsumerGroup(group)
+                                              .setAwaitDuration(awaitDuration)
+                                              .setSubscriptionExpressions(
+                                                  
Collections.singletonMap(topic, filterExpression))
+                                              .build();
+            Duration invisibleDuration = Duration.ofSeconds(duration);
+            ExecutorService receiveCallbackExecutor = 
Executors.newCachedThreadPool();
+            ExecutorService ackCallbackExecutor = 
Executors.newCachedThreadPool();
+            final CompletableFuture<List<MessageView>> future0 = 
consumer.receiveAsync(
+                maxMessageNum,
+                invisibleDuration
+            );
+            future0.whenCompleteAsync((messages, throwable) -> {
+                if (null != throwable) {
+                    log.error("Failed to receive message from remote", 
throwable);
+                    return;
+                }
+                log.info("Received {} message(s)", messages.size());
+                final Map<MessageView, CompletableFuture<Void>> map =
+                    messages.stream().collect(Collectors.toMap(message -> 
message, consumer::ackAsync));
+                for (Map.Entry<MessageView, CompletableFuture<Void>> entry : 
map.entrySet()) {
+                    final MessageId messageId = entry.getKey().getMessageId();
+                    final CompletableFuture<Void> future = entry.getValue();
+                    future.whenCompleteAsync((v, t) -> {
+                        if (null != t) {
+                            log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
+                            return;
+                        }
+                        log.info("Message is acknowledged successfully, 
messageId={}", messageId);
+                    }, ackCallbackExecutor);
+                }
+            }, receiveCallbackExecutor);
+        } catch (Exception e) {
+            log.error("consumer start error", e);
+        }
+    }
+
+    public void updateNormalTopic(String topic) {
+        String[] subArgs = new String[] {
+            "updateTopic",
+            "-n",
+            nameServer,
+            "-c",
+            "DefaultCluster",
+            "-t",
+            topic,
+            "-a",
+            "+message.type=NORMAL"
+        };
+        MQAdminStartup.main(subArgs);
+    }
+
+    public static class MyConsumer implements MessageListener {
+
+        @Override
+        public ConsumeResult consume(MessageView messageView) {
+            log.info("Consume message successfully, 
messageId={},messageBody={}", messageView.getMessageId(),
+                     messageView.getBody().toString()
+            );
+            return ConsumeResult.SUCCESS;
+        }
+    }
+
+}
diff --git 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
new file mode 100644
index 0000000000..276c08275a
--- /dev/null
+++ 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/ProducerSingleton.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package test.apache.skywalking.apm.testcase.rocketmq.client.java.controller;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
+import org.apache.rocketmq.client.apis.producer.TransactionChecker;
+
+public class ProducerSingleton {
+    private static volatile Producer PRODUCER;
+
+    private ProducerSingleton() {
+    }
+
+    private static Producer buildProducer(TransactionChecker checker,
+                                          String endpoints,
+                                          String... topics) throws 
ClientException {
+        final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
+        ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
+                                                                     
.setEndpoints(endpoints)
+                                                                     
.enableSsl(false)
+                                                                     .build();
+        final ProducerBuilder builder = provider.newProducerBuilder()
+                                                
.setClientConfiguration(clientConfiguration)
+                                                .setTopics(topics);
+        if (checker != null) {
+            builder.setTransactionChecker(checker);
+        }
+        return builder.build();
+    }
+
+    public static Producer getInstance(String endpoints, String... topics) 
throws ClientException {
+        if (null == PRODUCER) {
+            synchronized (ProducerSingleton.class) {
+                if (null == PRODUCER) {
+                    PRODUCER = buildProducer(null, endpoints, topics);
+                }
+            }
+        }
+        return PRODUCER;
+    }
+}
diff --git 
a/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list 
b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
index a9219d9010..4f8dd365dc 100644
--- a/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
+++ b/test/plugin/scenarios/rocketmq-5-grpc-scenario/support-version.list
@@ -17,3 +17,4 @@
 # lists your version here (Contains only the last version number of each minor 
version.)
 
 5.1.1
+5.1.4

Reply via email to