This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 6621f82739 add RocketMQ 5.x plugin (#536)
6621f82739 is described below
commit 6621f82739340bbaaac2338cb06ccbe3d38513a5
Author: weihubeats <[email protected]>
AuthorDate: Wed Jun 7 22:05:08 2023 +0800
add RocketMQ 5.x plugin (#536)
---
CHANGES.md | 1 +
apm-sniffer/apm-sdk-plugin/pom.xml | 1 +
.../v4/define/AbstractRocketMQInstrumentation.java | 29 ++++++
.../apm/plugin/rocketMQ/v4/define/Constants.java | 24 +++++
.../ConsumeMessageConcurrentlyInstrumentation.java | 3 +-
.../ConsumeMessageOrderlyInstrumentation.java | 3 +-
.../DefaultMQPushConsumerInstrumentation.java | 3 +-
.../v4/define/MQClientAPIImplInstrumentation.java | 3 +-
.../v4/define/SendCallbackInstrumentation.java | 3 +-
.../apm-sdk-plugin/rocketMQ-5.x-plugin/pom.xml | 53 ++++++++++
.../v5/AbstractMessageConsumeInterceptor.java | 91 ++++++++++++++++
.../v5/MessageConcurrentlyConsumeInterceptor.java | 49 +++++++++
.../v5/MessageOrderlyConsumeInterceptor.java | 50 +++++++++
.../plugin/rocketMQ/v5/MessageSendInterceptor.java | 114 +++++++++++++++++++++
.../plugin/rocketMQ/v5/OnExceptionInterceptor.java | 70 +++++++++++++
.../plugin/rocketMQ/v5/OnSuccessInterceptor.java | 68 ++++++++++++
.../v5/RegisterMessageListenerInterceptor.java | 51 +++++++++
.../rocketMQ/v5/UpdateNameServerInterceptor.java | 45 ++++++++
.../v5/define/AbstractRocketMQInstrumentation.java | 29 ++++++
.../apm/plugin/rocketMQ/v5/define/Constants.java | 24 +++++
.../ConsumeMessageConcurrentlyInstrumentation.java | 7 +-
.../ConsumeMessageOrderlyInstrumentation.java | 7 +-
.../rocketMQ/v5/define/ConsumerEnhanceInfos.java | 32 ++++++
.../DefaultMQPushConsumerInstrumentation.java | 7 +-
.../v5}/define/MQClientAPIImplInstrumentation.java | 9 +-
.../v5/define/SendCallBackEnhanceInfo.java | 42 ++++++++
.../v5}/define/SendCallbackInstrumentation.java | 9 +-
.../src/main/resources/skywalking-plugin.def | 14 +--
.../setup/service-agent/java-agent/Plugin-list.md | 1 +
.../service-agent/java-agent/Supported-list.md | 2 +-
.../rocketmq-scenario/support-version.list | 1 +
31 files changed, 803 insertions(+), 42 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 984fc42d83..ccff2d786c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
* Support Jdk17 ZGC metric collect
* Support Jetty 11.x plugin
* Fix the scenario of using the HBase plugin with spring-data-hadoop.
+* Add RocketMQ 5.x plugin
#### Documentation
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/pom.xml
index 96a80986b2..0fc2fa7ae3 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -131,6 +131,7 @@
<module>jersey-3.x-plugin</module>
<module>grizzly-2.3.x-4.x-plugin</module>
<module>grizzly-2.3.x-4.x-work-threadpool-plugin</module>
+ <module>rocketMQ-5.x-plugin</module>
</modules>
<packaging>pom</packaging>
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/AbstractRocketMQInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/AbstractRocketMQInstrumentation.java
new file mode 100644
index 0000000000..6d3211fb8e
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/AbstractRocketMQInstrumentation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+
+public abstract class AbstractRocketMQInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[] {Constants.WITNESS_ROCKETMQ_4X_CLASS};
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/Constants.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/Constants.java
new file mode 100644
index 0000000000..9112fae7ff
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/Constants.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+
+public class Constants {
+
+ public static final String WITNESS_ROCKETMQ_4X_CLASS =
"org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader";
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
index ecde176fcd..b5d80c0ce5 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
@@ -22,13 +22,12 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
-public class ConsumeMessageConcurrentlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class ConsumeMessageConcurrentlyInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently";
private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor";
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
index 5d0da38989..51785b331d 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
@@ -22,13 +22,12 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
-public class ConsumeMessageOrderlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class ConsumeMessageOrderlyInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly";
private static final String ENHANCE_METHOD = "consumeMessage";
private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageOrderlyConsumeInterceptor";
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
index d7b8ccdb82..46a8fc96b6 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
@@ -22,13 +22,12 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class DefaultMQPushConsumerInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class DefaultMQPushConsumerInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
private static final String REGISTER_MESSAGE_LISTENER_METHOD_NAME =
"registerMessageListener";
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
index 62d5cb62f7..8e44a73844 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
@@ -22,14 +22,13 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class MQClientAPIImplInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class MQClientAPIImplInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.impl.MQClientAPIImpl";
private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
index 68abd0fb9f..a4a54573d0 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
@@ -22,14 +22,13 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
-public class SendCallbackInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class SendCallbackInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.producer.SendCallback";
private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/pom.xml
new file mode 100644
index 0000000000..c4efff0a95
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>apm-sdk-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>8.17.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>apm-rocketMQ-5.x-plugin</artifactId>
+ <name>rocketMQ-5.x-plugin</name>
+
+ <properties>
+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <rocketmq-client.version>5.1.1</rocketmq-client.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq-client.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/AbstractMessageConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/AbstractMessageConsumeInterceptor.java
new file mode 100644
index 0000000000..f9c990c34e
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/AbstractMessageConsumeInterceptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v5.define.ConsumerEnhanceInfos;
+
+/**
+ * {@link AbstractMessageConsumeInterceptor} create entry span when the
<code>consumeMessage</code> in the {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently}
and {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
+ */
+public abstract class AbstractMessageConsumeInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public final void beforeMethod(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ List<MessageExt> msgs = (List<MessageExt>) allArguments[0];
+
+ ContextCarrier contextCarrier =
getContextCarrierFromMessage(msgs.get(0));
+ AbstractSpan span =
ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
+
.getTopic() + "/Consumer", contextCarrier);
+ Tags.MQ_TOPIC.set(span, msgs.get(0).getTopic());
+ if (msgs.get(0).getStoreHost() != null) {
+ String brokerAddress = msgs.get(0).getStoreHost().toString();
+ brokerAddress = StringUtils.removeStart(brokerAddress, "/");
+ Tags.MQ_BROKER.set(span, brokerAddress);
+ }
+ span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
+ SpanLayer.asMQ(span);
+ for (int i = 1; i < msgs.size(); i++) {
+ ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
+ }
+
+ Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
+ if (skyWalkingDynamicField != null) {
+ ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos)
skyWalkingDynamicField;
+ span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
+ }
+ }
+
+ @Override
+ public final void handleMethodException(EnhancedInstance objInst, Method
method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(message.getUserProperty(next.getHeadKey()));
+ }
+
+ return contextCarrier;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageConcurrentlyConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageConcurrentlyConsumeInterceptor.java
new file mode 100644
index 0000000000..79f968777e
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageConcurrentlyConsumeInterceptor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageConcurrentlyConsumeInterceptor} set the process status after
the {@link
+ *
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)}
method execute.
+ */
+public class MessageConcurrentlyConsumeInterceptor extends
AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus) ret;
+ if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.MQ_STATUS.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+}
+
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageOrderlyConsumeInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageOrderlyConsumeInterceptor.java
new file mode 100644
index 0000000000..23bce4b674
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageOrderlyConsumeInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageOrderlyConsumeInterceptor} set the process status after the
{@link
+ *
org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method
execute.
+ */
+public class MessageOrderlyConsumeInterceptor extends
AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+
+ ConsumeOrderlyStatus status = (ConsumeOrderlyStatus) ret;
+ if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.MQ_STATUS.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageSendInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageSendInterceptor.java
new file mode 100644
index 0000000000..453a2ba8d8
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/MessageSendInterceptor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v5.define.SendCallBackEnhanceInfo;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import static
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
+/**
+ * {@link MessageSendInterceptor} create exit span when the method {@link
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, Message,
org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader, long,
+ * org.apache.rocketmq.client.impl.CommunicationMode,
org.apache.rocketmq.client.producer.SendCallback,
+ * org.apache.rocketmq.client.impl.producer.TopicPublishInfo,
org.apache.rocketmq.client.impl.factory.MQClientInstance,
+ * int, org.apache.rocketmq.client.hook.SendMessageContext,
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * execute.
+ */
+public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ Message message = (Message) allArguments[2];
+ ContextCarrier contextCarrier = new ContextCarrier();
+ String namingServiceAddress =
String.valueOf(objInst.getSkyWalkingDynamicField());
+ AbstractSpan span =
ContextManager.createExitSpan(buildOperationName(message.getTopic()),
contextCarrier, namingServiceAddress);
+ span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
+ Tags.MQ_BROKER.set(span, (String) allArguments[0]);
+ Tags.MQ_TOPIC.set(span, message.getTopic());
+ String keys = message.getKeys();
+ if (StringUtil.isNotBlank(keys)) {
+ span.tag(Tags.ofKey("mq.message.keys"), keys);
+ }
+ String tags = message.getTags();
+ if (StringUtil.isNotBlank(tags)) {
+ span.tag(Tags.ofKey("mq.message.tags"), tags);
+ }
+
+ contextCarrier.extensionInjector().injectSendingTimestamp();
+ SpanLayer.asMQ(span);
+
+ SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)
allArguments[3];
+ StringBuilder properties = new
StringBuilder(requestHeader.getProperties());
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (!StringUtil.isEmpty(next.getHeadValue())) {
+ if (properties.length() > 0 &&
properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) {
+ // adapt for RocketMQ 4.9.x or later
+ properties.append(PROPERTY_SEPARATOR);
+ }
+ properties.append(next.getHeadKey());
+ properties.append(NAME_VALUE_SEPARATOR);
+ properties.append(next.getHeadValue());
+ }
+ }
+ requestHeader.setProperties(properties.toString());
+
+ if (allArguments[6] != null) {
+ ((EnhancedInstance) allArguments[6]).setSkyWalkingDynamicField(new
SendCallBackEnhanceInfo(message.getTopic(), ContextManager
+ .capture()));
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+
+ private String buildOperationName(String topicName) {
+ return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnExceptionInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnExceptionInterceptor.java
new file mode 100644
index 0000000000..d674147824
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnExceptionInterceptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v5.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnExceptionInterceptor} create local span when the method {@link
org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)}
+ * execute.
+ */
+public class OnExceptionInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+ private static final String DEFAULT_TOPIC = "no_topic";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)
objInst.getSkyWalkingDynamicField();
+ String topicId = DEFAULT_TOPIC;
+ // The SendCallBackEnhanceInfo could be null when there is an internal
exception in the client API,
+ // such as MQClientException("no route info of this topic")
+ if (enhanceInfo != null) {
+ topicId = enhanceInfo.getTopicId();
+ }
+ AbstractSpan activeSpan =
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + topicId +
"/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
+ activeSpan.log((Throwable) allArguments[0]);
+ if (enhanceInfo != null && enhanceInfo.getContextSnapshot() != null) {
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnSuccessInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnSuccessInterceptor.java
new file mode 100644
index 0000000000..f18f0f8c0e
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/OnSuccessInterceptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v5.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnSuccessInterceptor} create local span when the method {@link
org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
+ * execute.
+ */
+public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)
objInst.getSkyWalkingDynamicField();
+ AbstractSpan activeSpan =
ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX +
enhanceInfo.getTopicId() + "/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
+ SendStatus sendStatus = ((SendResult) allArguments[0]).getSendStatus();
+ if (sendStatus != SendStatus.SEND_OK) {
+ activeSpan.errorOccurred();
+ Tags.MQ_STATUS.set(activeSpan, sendStatus.name());
+ }
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/RegisterMessageListenerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/RegisterMessageListenerInterceptor.java
new file mode 100644
index 0000000000..377a701117
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/RegisterMessageListenerInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import
org.apache.skywalking.apm.plugin.rocketMQ.v5.define.ConsumerEnhanceInfos;
+
+public class RegisterMessageListenerInterceptor implements
InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer)
objInst;
+ String namesrvAddr = defaultMQPushConsumer.getNamesrvAddr();
+ ConsumerEnhanceInfos consumerEnhanceInfos = new
ConsumerEnhanceInfos(namesrvAddr);
+
+ if (allArguments[0] instanceof EnhancedInstance) {
+ EnhancedInstance enhancedMessageListener = (EnhancedInstance)
allArguments[0];
+
enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/UpdateNameServerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/UpdateNameServerInterceptor.java
new file mode 100644
index 0000000000..6827d765a7
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/UpdateNameServerInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5;
+
+import java.lang.reflect.Method;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class UpdateNameServerInterceptor implements
InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ objInst.setSkyWalkingDynamicField(allArguments[0]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/AbstractRocketMQInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/AbstractRocketMQInstrumentation.java
new file mode 100644
index 0000000000..a65dec9ad9
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/AbstractRocketMQInstrumentation.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+
+public abstract class AbstractRocketMQInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[] {Constants.WITNESS_ROCKETMQ_5X_CLASS};
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/Constants.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/Constants.java
new file mode 100644
index 0000000000..0362f2c460
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/Constants.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
+
+public class Constants {
+
+ public static final String WITNESS_ROCKETMQ_5X_CLASS =
"org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader";
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageConcurrentlyInstrumentation.java
similarity index 88%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageConcurrentlyInstrumentation.java
index ecde176fcd..c4e0620c3c 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageConcurrentlyInstrumentation.java
@@ -16,22 +16,21 @@
*
*/
-package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
-public class ConsumeMessageConcurrentlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class ConsumeMessageConcurrentlyInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently";
private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
- private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.MessageConcurrentlyConsumeInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageOrderlyInstrumentation.java
similarity index 88%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageOrderlyInstrumentation.java
index 5d0da38989..aa158e200a 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumeMessageOrderlyInstrumentation.java
@@ -16,22 +16,21 @@
*
*/
-package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
-public class ConsumeMessageOrderlyInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class ConsumeMessageOrderlyInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly";
private static final String ENHANCE_METHOD = "consumeMessage";
- private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageOrderlyConsumeInterceptor";
+ private static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.MessageOrderlyConsumeInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumerEnhanceInfos.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumerEnhanceInfos.java
new file mode 100644
index 0000000000..c1ff9e3866
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/ConsumerEnhanceInfos.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
+
+public class ConsumerEnhanceInfos {
+
+ private String namesrvAddr;
+
+ public ConsumerEnhanceInfos(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/DefaultMQPushConsumerInstrumentation.java
similarity index 88%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/DefaultMQPushConsumerInstrumentation.java
index d7b8ccdb82..04913aa30a 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/DefaultMQPushConsumerInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/DefaultMQPushConsumerInstrumentation.java
@@ -16,23 +16,22 @@
*
*/
-package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class DefaultMQPushConsumerInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class DefaultMQPushConsumerInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
private static final String REGISTER_MESSAGE_LISTENER_METHOD_NAME =
"registerMessageListener";
- public static final String REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.RegisterMessageListenerInterceptor";
+ public static final String REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.RegisterMessageListenerInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/MQClientAPIImplInstrumentation.java
similarity index 89%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/MQClientAPIImplInstrumentation.java
index 62d5cb62f7..f6f0c55c92 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/MQClientAPIImplInstrumentation.java
@@ -16,25 +16,24 @@
*
*/
-package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class MQClientAPIImplInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class MQClientAPIImplInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.impl.MQClientAPIImpl";
private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
- private static final String ASYNC_METHOD_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor";
- public static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.UpdateNameServerInterceptor";
+ private static final String ASYNC_METHOD_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.MessageSendInterceptor";
+ public static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.UpdateNameServerInterceptor";
public static final String UPDATE_NAME_SERVER_METHOD_NAME =
"updateNameServerAddressList";
@Override
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallBackEnhanceInfo.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallBackEnhanceInfo.java
new file mode 100644
index 0000000000..78e690555c
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallBackEnhanceInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
+
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link
ContextSnapshot} instance for trace.
+ */
+public class SendCallBackEnhanceInfo {
+ private String topicId;
+ private ContextSnapshot contextSnapshot;
+
+ public SendCallBackEnhanceInfo(String topicId, ContextSnapshot
contextSnapshot) {
+ this.topicId = topicId;
+ this.contextSnapshot = contextSnapshot;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public ContextSnapshot getContextSnapshot() {
+ return contextSnapshot;
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallbackInstrumentation.java
similarity index 89%
copy from
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallbackInstrumentation.java
index 68abd0fb9f..e7573982c5 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v5/define/SendCallbackInstrumentation.java
@@ -16,26 +16,25 @@
*
*/
-package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
+package org.apache.skywalking.apm.plugin.rocketMQ.v5.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
-public class SendCallbackInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class SendCallbackInstrumentation extends
AbstractRocketMQInstrumentation {
private static final String ENHANCE_CLASS =
"org.apache.rocketmq.client.producer.SendCallback";
private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
- private static final String ON_SUCCESS_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor";
+ private static final String ON_SUCCESS_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.OnSuccessInterceptor";
private static final String ON_EXCEPTION_METHOD = "onException";
- private static final String ON_EXCEPTION_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor";
+ private static final String ON_EXCEPTION_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.rocketMQ.v5.OnExceptionInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
diff --git a/test/plugin/scenarios/rocketmq-scenario/support-version.list
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/resources/skywalking-plugin.def
similarity index 61%
copy from test/plugin/scenarios/rocketmq-scenario/support-version.list
copy to
apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/resources/skywalking-plugin.def
index 44c67a363f..d38e2db24e 100644
--- a/test/plugin/scenarios/rocketmq-scenario/support-version.list
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -14,12 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# lists your version here (Contains only the last version number of each minor
version.)
-
-4.9.4
-4.8.0
-4.7.1
-4.6.0
-4.5.2
-4.4.0
-4.3.1
+rocketMQ-5.x=org.apache.skywalking.apm.plugin.rocketMQ.v5.define.ConsumeMessageConcurrentlyInstrumentation
+rocketMQ-5.x=org.apache.skywalking.apm.plugin.rocketMQ.v5.define.ConsumeMessageOrderlyInstrumentation
+rocketMQ-5.x=org.apache.skywalking.apm.plugin.rocketMQ.v5.define.MQClientAPIImplInstrumentation
+rocketMQ-5.x=org.apache.skywalking.apm.plugin.rocketMQ.v5.define.SendCallbackInstrumentation
+rocketMQ-5.x=org.apache.skywalking.apm.plugin.rocketMQ.v5.define.DefaultMQPushConsumerInstrumentation
diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md
b/docs/en/setup/service-agent/java-agent/Plugin-list.md
index 9043febbf7..3aed87c67e 100644
--- a/docs/en/setup/service-agent/java-agent/Plugin-list.md
+++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md
@@ -83,6 +83,7 @@
- resteasy-server-4.x
- rocketMQ-3.x
- rocketMQ-4.x
+- rocketMQ-5.x
- sentinel-1.x
- servicecomb-2.x
- sharding-sphere-3.x
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md
b/docs/en/setup/service-agent/java-agent/Supported-list.md
index a6ed661e48..e9f7b36dbc 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -69,7 +69,7 @@ metrics based on the tracing data.
* [Apache CXF](https://github.com/apache/cxf) 3.x
* [JSONRPC4J](https://github.com/briandilley/jsonrpc4j) 1.2.0 -> 1.6
* MQ
- * [RocketMQ](https://github.com/apache/rocketmq) 4.x
+ * [RocketMQ](https://github.com/apache/rocketmq) 3.x-> 5.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring
Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended
by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
diff --git a/test/plugin/scenarios/rocketmq-scenario/support-version.list
b/test/plugin/scenarios/rocketmq-scenario/support-version.list
index 44c67a363f..b391ea9d2a 100644
--- a/test/plugin/scenarios/rocketmq-scenario/support-version.list
+++ b/test/plugin/scenarios/rocketmq-scenario/support-version.list
@@ -16,6 +16,7 @@
# lists your version here (Contains only the last version number of each minor
version.)
+5.1.0
4.9.4
4.8.0
4.7.1