This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new cf85f03ad5 Polish up rabbitmq-5.x plugin to fix missing broker tag on 
consumer side (#362)
cf85f03ad5 is described below

commit cf85f03ad5cb42205832ef35968b643a2e03f1ef
Author: pg.yang <[email protected]>
AuthorDate: Fri Oct 21 21:19:57 2022 +0800

    Polish up rabbitmq-5.x plugin to fix missing broker tag on consumer side 
(#362)
---
 CHANGES.md                                         |   1 +
 ...or.java => ChannelNConstructorInterceptor.java} |   2 +-
 .../rabbitmq/RabbitMQConsumerInterceptor.java      |  40 +-----
 .../apm/plugin/rabbitmq/TracerConsumer.java        | 102 +++++++++++++++
 ...mentation.java => ChannelNInstrumentation.java} |  28 +++-
 .../define/RabbitMQConsumerInstrumentation.java    |  82 ------------
 .../src/main/resources/skywalking-plugin.def       |   3 +-
 ...ava => ChannelNConstructorInterceptorTest.java} |   8 +-
 .../rabbitmq/RabbitMQConsumerInterceptorTest.java  | 142 +++++++++++++--------
 9 files changed, 228 insertions(+), 180 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 18de5f4aee..a67ef86363 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -18,6 +18,7 @@ Release Notes.
 * Polish up nats plugin to unify MQ related tags  
 * Correct the duration of the transaction span for Neo4J 4.x.
 * Plugin-test configuration.yml dependencies support docker service command 
field
+* Polish up rabbitmq-5.x plugin to fix missing broker tag on consumer side
 
 #### Documentation
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
similarity index 92%
rename from 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
rename to 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
index 6a385ed23b..4eb70ac709 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
@@ -22,7 +22,7 @@ import com.rabbitmq.client.Connection;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
 
-public class RabbitMQProducerAndConsumerConstructorInterceptor implements 
InstanceConstructorInterceptor {
+public class ChannelNConstructorInterceptor implements 
InstanceConstructorInterceptor {
     @Override
     public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
         Connection connection = (Connection) allArguments[0];
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
index 8c74a80573..50240c9729 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
@@ -18,61 +18,29 @@
 
 package org.apache.skywalking.apm.plugin.rabbitmq;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Envelope;
-import org.apache.skywalking.apm.agent.core.context.CarrierItem;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
-import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.tag.Tags;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import com.rabbitmq.client.Consumer;
+import java.lang.reflect.Method;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
-import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
-
-import java.lang.reflect.Method;
 
 public class RabbitMQConsumerInterceptor implements 
InstanceMethodsAroundInterceptor {
-    public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
-    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
 
     @Override
     public void beforeMethod(EnhancedInstance objInst, Method method, Object[] 
allArguments, Class<?>[] argumentsTypes,
         MethodInterceptResult result) throws Throwable {
-        ContextCarrier contextCarrier = new ContextCarrier();
-        String url = (String) objInst.getSkyWalkingDynamicField();
-        Envelope envelope = (Envelope) allArguments[1];
-        AMQP.BasicProperties properties = (AMQP.BasicProperties) 
allArguments[2];
-        AbstractSpan activeSpan = 
ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + 
envelope.getExchange() + "Queue/" + envelope
-            .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, 
null).start(System.currentTimeMillis());
-        Tags.MQ_BROKER.set(activeSpan, url);
-        Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
-        Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
-        activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
-        SpanLayer.asMQ(activeSpan);
-        CarrierItem next = contextCarrier.items();
-        while (next.hasNext()) {
-            next = next.next();
-            if (properties.getHeaders() != null && 
properties.getHeaders().get(next.getHeadKey()) != null) {
-                
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
-            }
-        }
-        ContextManager.extract(contextCarrier);
-
+        Consumer consumer = (Consumer) allArguments[6];
+        allArguments[6] = new TracerConsumer(consumer, (String) 
objInst.getSkyWalkingDynamicField());
     }
 
     @Override
     public Object afterMethod(EnhancedInstance objInst, Method method, 
Object[] allArguments, Class<?>[] argumentsTypes,
         Object ret) throws Throwable {
-        ContextManager.stopSpan();
         return ret;
-
     }
 
     @Override
     public void handleMethodException(EnhancedInstance objInst, Method method, 
Object[] allArguments,
         Class<?>[] argumentsTypes, Throwable t) {
-        ContextManager.activeSpan().log(t);
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
new file mode 100644
index 0000000000..b78f5384d0
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
@@ -0,0 +1,102 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one or more
+ *   contributor license agreements.  See the NOTICE file distributed with
+ *   this work for additional information regarding copyright ownership.
+ *   The ASF licenses this file to You under the Apache License, Version 2.0
+ *   (the "License"); you may not use this file except in compliance with
+ *   the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import java.io.IOException;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+public class TracerConsumer implements Consumer {
+
+    private Consumer delegate;
+    private String serverUrl;
+
+    public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
+    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+
+    public TracerConsumer(final Consumer delegate, final String serverUrl) {
+        this.delegate = delegate;
+        this.serverUrl = serverUrl;
+    }
+
+    @Override
+    public void handleConsumeOk(final String consumerTag) {
+        this.delegate.handleConsumeOk(consumerTag);
+    }
+
+    @Override
+    public void handleCancelOk(final String consumerTag) {
+        this.delegate.handleRecoverOk(consumerTag);
+    }
+
+    @Override
+    public void handleCancel(final String consumerTag) throws IOException {
+        this.delegate.handleCancel(consumerTag);
+    }
+
+    @Override
+    public void handleShutdownSignal(final String consumerTag, final 
ShutdownSignalException sig) {
+        this.delegate.handleShutdownSignal(consumerTag, sig);
+    }
+
+    @Override
+    public void handleRecoverOk(final String consumerTag) {
+        this.delegate.handleRecoverOk(consumerTag);
+    }
+
+    @Override
+    public void handleDelivery(final String consumerTag,
+                               final Envelope envelope,
+                               final AMQP.BasicProperties properties,
+                               final byte[] body) throws IOException {
+
+        ContextCarrier contextCarrier = new ContextCarrier();
+        AbstractSpan activeSpan = ContextManager.createEntrySpan(
+            OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/" 
+ envelope
+                .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, 
null).start(System.currentTimeMillis());
+        Tags.MQ_BROKER.set(activeSpan, serverUrl);
+        Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
+        Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
+        activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
+        SpanLayer.asMQ(activeSpan);
+        CarrierItem next = contextCarrier.items();
+        while (next.hasNext()) {
+            next = next.next();
+            if (properties.getHeaders() != null && 
properties.getHeaders().get(next.getHeadKey()) != null) {
+                
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
+            }
+        }
+        ContextManager.extract(contextCarrier);
+        try {
+            this.delegate.handleDelivery(consumerTag, envelope, properties, 
body);
+        } catch (Exception e) {
+            activeSpan.log(e).errorOccurred();
+        } finally {
+            ContextManager.stopSpan(activeSpan);
+        }
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
similarity index 71%
rename from 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
rename to 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
index 96f0ac6249..5beceafbd4 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
@@ -21,19 +21,23 @@ package org.apache.skywalking.apm.plugin.rabbitmq.define;
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.DeclaredInstanceMethodsInterceptPoint;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static 
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
 
-public class RabbitMQProducerInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+public class ChannelNInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
     public static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerInterceptor";
     public static final String ENHANCE_CLASS_PRODUCER = 
"com.rabbitmq.client.impl.ChannelN";
-    public static final String ENHANCE_METHOD_DISPATCH = "basicPublish";
-    public static final String INTERCEPTOR_CONSTRUCTOR = 
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
+    public static final String PUBLISH_ENHANCE_METHOD = "basicPublish";
+    public static final String INTERCEPTOR_CONSTRUCTOR = 
"org.apache.skywalking.apm.plugin.rabbitmq.ChannelNConstructorInterceptor";
+    public static final String CONSUME_ENHANCE_METHOD = "basicConsume";
+    public static final String CONSUME_INTERCEPTOR_CONSTRUCTOR = 
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor";
 
     @Override
     public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -58,7 +62,7 @@ public class RabbitMQProducerInstrumentation extends 
ClassInstanceMethodsEnhance
             new InstanceMethodsInterceptPoint() {
                 @Override
                 public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return 
named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(4, 
"com.rabbitmq.client.AMQP$BasicProperties"));
+                    return 
named(PUBLISH_ENHANCE_METHOD).and(takesArgumentWithType(4, 
"com.rabbitmq.client.AMQP$BasicProperties"));
                 }
 
                 @Override
@@ -66,6 +70,22 @@ public class RabbitMQProducerInstrumentation extends 
ClassInstanceMethodsEnhance
                     return INTERCEPTOR_CLASS;
                 }
 
+                @Override
+                public boolean isOverrideArgs() {
+                    return true;
+                }
+            },
+            new DeclaredInstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return 
named(CONSUME_ENHANCE_METHOD).and(takesArguments(7));
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return CONSUME_INTERCEPTOR_CONSTRUCTOR;
+                }
+
                 @Override
                 public boolean isOverrideArgs() {
                     return true;
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
deleted file mode 100644
index 0c9d0791c6..0000000000
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.plugin.rabbitmq.define;
-
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.matcher.ElementMatcher;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.DeclaredInstanceMethodsInterceptPoint;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
-import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
-import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
-
-import static net.bytebuddy.matcher.ElementMatchers.named;
-import static 
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
-
-public class RabbitMQConsumerInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
-    public static final String INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor";
-    public static final String ENHANCE_CLASS_PRODUCER = 
"com.rabbitmq.client.Consumer";
-    public static final String ENHANCE_METHOD_DISPATCH = "handleDelivery";
-    public static final String INTERCEPTOR_CONSTRUCTOR = 
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
-
-    @Override
-    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
-        return new ConstructorInterceptPoint[] {
-            new ConstructorInterceptPoint() {
-                @Override
-                public ElementMatcher<MethodDescription> 
getConstructorMatcher() {
-                    return takesArgumentWithType(0, 
"com.rabbitmq.client.impl.AMQConnection");
-                }
-
-                @Override
-                public String getConstructorInterceptor() {
-                    return INTERCEPTOR_CONSTRUCTOR;
-                }
-            }
-        };
-    }
-
-    @Override
-    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
-        return new InstanceMethodsInterceptPoint[] {
-            new DeclaredInstanceMethodsInterceptPoint() {
-                @Override
-                public ElementMatcher<MethodDescription> getMethodsMatcher() {
-                    return 
named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(2, 
"com.rabbitmq.client.AMQP$BasicProperties"));
-                }
-
-                @Override
-                public String getMethodsInterceptor() {
-                    return INTERCEPTOR_CLASS;
-                }
-
-                @Override
-                public boolean isOverrideArgs() {
-                    return false;
-                }
-            }
-        };
-    }
-
-    @Override
-    protected ClassMatch enhanceClass() {
-        return HierarchyMatch.byHierarchyMatch(new String[] 
{ENHANCE_CLASS_PRODUCER});
-    }
-}
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
index e9e83f6dee..c4987531b7 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -14,5 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQProducerInstrumentation
-rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQConsumerInstrumentation
\ No newline at end of file
+rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.ChannelNInstrumentation
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
similarity index 93%
rename from 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
rename to 
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
index 2056dd648a..ccb6955f64 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
@@ -43,9 +43,9 @@ import static org.hamcrest.core.Is.is;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(TracingSegmentRunner.class)
-public class RabbitMQProducerAndConsumerConstructorInterceptorTest {
+public class ChannelNConstructorInterceptorTest {
 
-    private RabbitMQProducerAndConsumerConstructorInterceptor 
rabbitMQProducerAndConsumerConstructorInterceptor;
+    private ChannelNConstructorInterceptor channelNConstructorInterceptor;
 
     private EnhancedInstance enhancedInstance = new EnhancedInstance() {
         private String test;
@@ -230,8 +230,8 @@ public class 
RabbitMQProducerAndConsumerConstructorInterceptorTest {
 
     @Test
     public void TestRabbitMQConsumerAndProducerConstructorInterceptor() {
-        rabbitMQProducerAndConsumerConstructorInterceptor = new 
RabbitMQProducerAndConsumerConstructorInterceptor();
-        
rabbitMQProducerAndConsumerConstructorInterceptor.onConstruct(enhancedInstance, 
new Object[] {testConnection});
+        channelNConstructorInterceptor = new ChannelNConstructorInterceptor();
+        channelNConstructorInterceptor.onConstruct(enhancedInstance, new 
Object[] {testConnection});
         assertThat((String) enhancedInstance.getSkyWalkingDynamicField(), 
is("127.0.0.1:5672"));
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
index 57c2b40c5d..714e516b61 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
@@ -19,7 +19,13 @@
 package org.apache.skywalking.apm.plugin.rabbitmq;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
 import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
@@ -35,10 +41,6 @@ import org.junit.runner.RunWith;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import static org.hamcrest.CoreMatchers.is;
 
 @RunWith(PowerMockRunner.class)
@@ -48,75 +50,113 @@ public class RabbitMQConsumerInterceptorTest {
     @SegmentStoragePoint
     private SegmentStorage segmentStorage;
 
+    private RabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
+
     @Rule
     public AgentServiceRule serviceRule = new AgentServiceRule();
 
-    private EnhancedInstance enhancedInstance = new EnhancedInstance() {
-        @Override
-        public Object getSkyWalkingDynamicField() {
-            return "127.0.0.1:5272";
-        }
-
-        @Override
-        public void setSkyWalkingDynamicField(Object value) {
-        }
-    };
-
-    private RabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
-
     @Before
     public void setUp() throws Exception {
         rabbitMQConsumerInterceptor = new RabbitMQConsumerInterceptor();
     }
 
     @Test
-    public void TestRabbitMQConsumerInterceptor() throws Throwable {
-        Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
-        Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put(SW8CarrierItem.HEADER_NAME, 
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
-        AMQP.BasicProperties.Builder propsBuilder = new 
AMQP.BasicProperties.Builder();
-        Object[] arguments = new Object[] {
-            0,
-            envelope,
-            propsBuilder.headers(headers).build()
+    public void testRabbitMQConsumerInterceptorWithNilHeaders() throws 
Throwable {
+        final Object[] args = {
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            getConsumer()
         };
-
-        rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, 
arguments, null, null);
-        rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, 
arguments, null, null);
+        rabbitMQConsumerInterceptor.beforeMethod(getEnhancedInstance(), null, 
args, new Class[0], null);
+        rabbitMQConsumerInterceptor.afterMethod(getEnhancedInstance(), null, 
args, new Class[0], null);
+        ((Consumer) args[6]).handleDelivery("tag", new Envelope(1L, false, 
"exchange", "routerKey"),
+                                            new AMQP.BasicProperties(), new 
byte[0]
+        );
         List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
         Assert.assertThat(traceSegments.size(), is(1));
     }
 
     @Test
-    public void testRabbitMQConsumerInterceptorWithNilHeaders() throws 
Throwable {
-        Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
-        AMQP.BasicProperties.Builder propsBuilder = new 
AMQP.BasicProperties.Builder();
-        Object[] arguments = new Object[] {
-            0,
-            envelope,
-            propsBuilder.headers(null).build()
+    public void testRabbitMQConsumerInterceptor() throws Throwable {
+        final Object[] args = {
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            getConsumer()
         };
 
-        rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, 
arguments, null, null);
-        rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, 
arguments, null, null);
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(
+            SW8CarrierItem.HEADER_NAME,
+            
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
+        );
+        AMQP.BasicProperties.Builder propsBuilder = new 
AMQP.BasicProperties.Builder();
+        propsBuilder.headers(headers);
+
+        rabbitMQConsumerInterceptor.beforeMethod(getEnhancedInstance(), null, 
args, new Class[0], null);
+        rabbitMQConsumerInterceptor.afterMethod(getEnhancedInstance(), null, 
args, new Class[0], null);
+        ((Consumer) args[6]).handleDelivery("tag", new Envelope(1L, false, 
"exchange", "routerKey"),
+                                            propsBuilder.build(), new byte[0]
+        );
         List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
         Assert.assertThat(traceSegments.size(), is(1));
     }
 
-    @Test
-    public void testRabbitMQConsumerInterceptorWithEmptyHeaders() throws 
Throwable {
-        Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
-        Map<String, Object> headers = new HashMap<String, Object>();
-        AMQP.BasicProperties.Builder propsBuilder = new 
AMQP.BasicProperties.Builder();
-        Object[] arguments = new Object[] {
-            0,
-            envelope,
-            propsBuilder.headers(headers).build()
+    public EnhancedInstance getEnhancedInstance() {
+        return new EnhancedInstance() {
+            @Override
+            public Object getSkyWalkingDynamicField() {
+                return "serverAddr";
+            }
+
+            @Override
+            public void setSkyWalkingDynamicField(final Object value) {
+
+            }
         };
+    }
 
-        rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null, 
arguments, null, null);
-        rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null, 
arguments, null, null);
-        List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
-        Assert.assertThat(traceSegments.size(), is(1));
+    public Consumer getConsumer() {
+        return new Consumer() {
+            @Override
+            public void handleConsumeOk(final String consumerTag) {
+
+            }
+
+            @Override
+            public void handleCancelOk(final String consumerTag) {
+
+            }
+
+            @Override
+            public void handleCancel(final String consumerTag) throws 
IOException {
+
+            }
+
+            @Override
+            public void handleShutdownSignal(final String consumerTag, final 
ShutdownSignalException sig) {
+
+            }
+
+            @Override
+            public void handleRecoverOk(final String consumerTag) {
+
+            }
+
+            @Override
+            public void handleDelivery(final String consumerTag,
+                                       final Envelope envelope,
+                                       final AMQP.BasicProperties properties,
+                                       final byte[] body) throws IOException {
+
+            }
+        };
     }
 }

Reply via email to