This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new cf85f03ad5 Polish up rabbitmq-5.x plugin to fix missing broker tag on
consumer side (#362)
cf85f03ad5 is described below
commit cf85f03ad5cb42205832ef35968b643a2e03f1ef
Author: pg.yang <[email protected]>
AuthorDate: Fri Oct 21 21:19:57 2022 +0800
Polish up rabbitmq-5.x plugin to fix missing broker tag on consumer side
(#362)
---
CHANGES.md | 1 +
...or.java => ChannelNConstructorInterceptor.java} | 2 +-
.../rabbitmq/RabbitMQConsumerInterceptor.java | 40 +-----
.../apm/plugin/rabbitmq/TracerConsumer.java | 102 +++++++++++++++
...mentation.java => ChannelNInstrumentation.java} | 28 +++-
.../define/RabbitMQConsumerInstrumentation.java | 82 ------------
.../src/main/resources/skywalking-plugin.def | 3 +-
...ava => ChannelNConstructorInterceptorTest.java} | 8 +-
.../rabbitmq/RabbitMQConsumerInterceptorTest.java | 142 +++++++++++++--------
9 files changed, 228 insertions(+), 180 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 18de5f4aee..a67ef86363 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -18,6 +18,7 @@ Release Notes.
* Polish up nats plugin to unify MQ related tags
* Correct the duration of the transaction span for Neo4J 4.x.
* Plugin-test configuration.yml dependencies support docker service command
field
+* Polish up rabbitmq-5.x plugin to fix missing broker tag on consumer side
#### Documentation
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
similarity index 92%
rename from
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
rename to
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
index 6a385ed23b..4eb70ac709 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptor.java
@@ -22,7 +22,7 @@ import com.rabbitmq.client.Connection;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
-public class RabbitMQProducerAndConsumerConstructorInterceptor implements
InstanceConstructorInterceptor {
+public class ChannelNConstructorInterceptor implements
InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Connection connection = (Connection) allArguments[0];
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
index 8c74a80573..50240c9729 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptor.java
@@ -18,61 +18,29 @@
package org.apache.skywalking.apm.plugin.rabbitmq;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Envelope;
-import org.apache.skywalking.apm.agent.core.context.CarrierItem;
-import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
-import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.tag.Tags;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
-import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import com.rabbitmq.client.Consumer;
+import java.lang.reflect.Method;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
-import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
-
-import java.lang.reflect.Method;
public class RabbitMQConsumerInterceptor implements
InstanceMethodsAroundInterceptor {
- public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
- public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
- ContextCarrier contextCarrier = new ContextCarrier();
- String url = (String) objInst.getSkyWalkingDynamicField();
- Envelope envelope = (Envelope) allArguments[1];
- AMQP.BasicProperties properties = (AMQP.BasicProperties)
allArguments[2];
- AbstractSpan activeSpan =
ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" +
envelope.getExchange() + "Queue/" + envelope
- .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX,
null).start(System.currentTimeMillis());
- Tags.MQ_BROKER.set(activeSpan, url);
- Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
- Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
- activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
- SpanLayer.asMQ(activeSpan);
- CarrierItem next = contextCarrier.items();
- while (next.hasNext()) {
- next = next.next();
- if (properties.getHeaders() != null &&
properties.getHeaders().get(next.getHeadKey()) != null) {
-
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
- }
- }
- ContextManager.extract(contextCarrier);
-
+ Consumer consumer = (Consumer) allArguments[6];
+ allArguments[6] = new TracerConsumer(consumer, (String)
objInst.getSkyWalkingDynamicField());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
- ContextManager.stopSpan();
return ret;
-
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
- ContextManager.activeSpan().log(t);
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
new file mode 100644
index 0000000000..b78f5384d0
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import java.io.IOException;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+
+public class TracerConsumer implements Consumer {
+
+ private Consumer delegate;
+ private String serverUrl;
+
+ public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
+ public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
+
+ public TracerConsumer(final Consumer delegate, final String serverUrl) {
+ this.delegate = delegate;
+ this.serverUrl = serverUrl;
+ }
+
+ @Override
+ public void handleConsumeOk(final String consumerTag) {
+ this.delegate.handleConsumeOk(consumerTag);
+ }
+
+ @Override
+ public void handleCancelOk(final String consumerTag) {
+ this.delegate.handleRecoverOk(consumerTag);
+ }
+
+ @Override
+ public void handleCancel(final String consumerTag) throws IOException {
+ this.delegate.handleCancel(consumerTag);
+ }
+
+ @Override
+ public void handleShutdownSignal(final String consumerTag, final
ShutdownSignalException sig) {
+ this.delegate.handleShutdownSignal(consumerTag, sig);
+ }
+
+ @Override
+ public void handleRecoverOk(final String consumerTag) {
+ this.delegate.handleRecoverOk(consumerTag);
+ }
+
+ @Override
+ public void handleDelivery(final String consumerTag,
+ final Envelope envelope,
+ final AMQP.BasicProperties properties,
+ final byte[] body) throws IOException {
+
+ ContextCarrier contextCarrier = new ContextCarrier();
+ AbstractSpan activeSpan = ContextManager.createEntrySpan(
+ OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/"
+ envelope
+ .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX,
null).start(System.currentTimeMillis());
+ Tags.MQ_BROKER.set(activeSpan, serverUrl);
+ Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
+ Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
+ activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
+ SpanLayer.asMQ(activeSpan);
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (properties.getHeaders() != null &&
properties.getHeaders().get(next.getHeadKey()) != null) {
+
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
+ }
+ }
+ ContextManager.extract(contextCarrier);
+ try {
+ this.delegate.handleDelivery(consumerTag, envelope, properties,
body);
+ } catch (Exception e) {
+ activeSpan.log(e).errorOccurred();
+ } finally {
+ ContextManager.stopSpan(activeSpan);
+ }
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
similarity index 71%
rename from
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
rename to
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
index 96f0ac6249..5beceafbd4 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQProducerInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/ChannelNInstrumentation.java
@@ -21,19 +21,23 @@ package org.apache.skywalking.apm.plugin.rabbitmq.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.DeclaredInstanceMethodsInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
-public class RabbitMQProducerInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class ChannelNInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerInterceptor";
public static final String ENHANCE_CLASS_PRODUCER =
"com.rabbitmq.client.impl.ChannelN";
- public static final String ENHANCE_METHOD_DISPATCH = "basicPublish";
- public static final String INTERCEPTOR_CONSTRUCTOR =
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
+ public static final String PUBLISH_ENHANCE_METHOD = "basicPublish";
+ public static final String INTERCEPTOR_CONSTRUCTOR =
"org.apache.skywalking.apm.plugin.rabbitmq.ChannelNConstructorInterceptor";
+ public static final String CONSUME_ENHANCE_METHOD = "basicConsume";
+ public static final String CONSUME_INTERCEPTOR_CONSTRUCTOR =
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -58,7 +62,7 @@ public class RabbitMQProducerInstrumentation extends
ClassInstanceMethodsEnhance
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return
named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(4,
"com.rabbitmq.client.AMQP$BasicProperties"));
+ return
named(PUBLISH_ENHANCE_METHOD).and(takesArgumentWithType(4,
"com.rabbitmq.client.AMQP$BasicProperties"));
}
@Override
@@ -66,6 +70,22 @@ public class RabbitMQProducerInstrumentation extends
ClassInstanceMethodsEnhance
return INTERCEPTOR_CLASS;
}
+ @Override
+ public boolean isOverrideArgs() {
+ return true;
+ }
+ },
+ new DeclaredInstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return
named(CONSUME_ENHANCE_METHOD).and(takesArguments(7));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return CONSUME_INTERCEPTOR_CONSTRUCTOR;
+ }
+
@Override
public boolean isOverrideArgs() {
return true;
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
deleted file mode 100644
index 0c9d0791c6..0000000000
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/define/RabbitMQConsumerInstrumentation.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.plugin.rabbitmq.define;
-
-import net.bytebuddy.description.method.MethodDescription;
-import net.bytebuddy.matcher.ElementMatcher;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.DeclaredInstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
-import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
-import org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch;
-
-import static net.bytebuddy.matcher.ElementMatchers.named;
-import static
org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
-
-public class RabbitMQConsumerInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
- public static final String INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor";
- public static final String ENHANCE_CLASS_PRODUCER =
"com.rabbitmq.client.Consumer";
- public static final String ENHANCE_METHOD_DISPATCH = "handleDelivery";
- public static final String INTERCEPTOR_CONSTRUCTOR =
"org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
-
- @Override
- public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
- return new ConstructorInterceptPoint[] {
- new ConstructorInterceptPoint() {
- @Override
- public ElementMatcher<MethodDescription>
getConstructorMatcher() {
- return takesArgumentWithType(0,
"com.rabbitmq.client.impl.AMQConnection");
- }
-
- @Override
- public String getConstructorInterceptor() {
- return INTERCEPTOR_CONSTRUCTOR;
- }
- }
- };
- }
-
- @Override
- public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
- return new InstanceMethodsInterceptPoint[] {
- new DeclaredInstanceMethodsInterceptPoint() {
- @Override
- public ElementMatcher<MethodDescription> getMethodsMatcher() {
- return
named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(2,
"com.rabbitmq.client.AMQP$BasicProperties"));
- }
-
- @Override
- public String getMethodsInterceptor() {
- return INTERCEPTOR_CLASS;
- }
-
- @Override
- public boolean isOverrideArgs() {
- return false;
- }
- }
- };
- }
-
- @Override
- protected ClassMatch enhanceClass() {
- return HierarchyMatch.byHierarchyMatch(new String[]
{ENHANCE_CLASS_PRODUCER});
- }
-}
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
index e9e83f6dee..c4987531b7 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/resources/skywalking-plugin.def
@@ -14,5 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQProducerInstrumentation
-rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQConsumerInstrumentation
\ No newline at end of file
+rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.ChannelNInstrumentation
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
similarity index 93%
rename from
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
rename to
apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
index 2056dd648a..ccb6955f64 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQProducerAndConsumerConstructorInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/ChannelNConstructorInterceptorTest.java
@@ -43,9 +43,9 @@ import static org.hamcrest.core.Is.is;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
-public class RabbitMQProducerAndConsumerConstructorInterceptorTest {
+public class ChannelNConstructorInterceptorTest {
- private RabbitMQProducerAndConsumerConstructorInterceptor
rabbitMQProducerAndConsumerConstructorInterceptor;
+ private ChannelNConstructorInterceptor channelNConstructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private String test;
@@ -230,8 +230,8 @@ public class
RabbitMQProducerAndConsumerConstructorInterceptorTest {
@Test
public void TestRabbitMQConsumerAndProducerConstructorInterceptor() {
- rabbitMQProducerAndConsumerConstructorInterceptor = new
RabbitMQProducerAndConsumerConstructorInterceptor();
-
rabbitMQProducerAndConsumerConstructorInterceptor.onConstruct(enhancedInstance,
new Object[] {testConnection});
+ channelNConstructorInterceptor = new ChannelNConstructorInterceptor();
+ channelNConstructorInterceptor.onConstruct(enhancedInstance, new
Object[] {testConnection});
assertThat((String) enhancedInstance.getSkyWalkingDynamicField(),
is("127.0.0.1:5672"));
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
index 57c2b40c5d..714e516b61 100644
---
a/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rabbitmq/RabbitMQConsumerInterceptorTest.java
@@ -19,7 +19,13 @@
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
@@ -35,10 +41,6 @@ import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import static org.hamcrest.CoreMatchers.is;
@RunWith(PowerMockRunner.class)
@@ -48,75 +50,113 @@ public class RabbitMQConsumerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
+ private RabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
+
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
- private EnhancedInstance enhancedInstance = new EnhancedInstance() {
- @Override
- public Object getSkyWalkingDynamicField() {
- return "127.0.0.1:5272";
- }
-
- @Override
- public void setSkyWalkingDynamicField(Object value) {
- }
- };
-
- private RabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
-
@Before
public void setUp() throws Exception {
rabbitMQConsumerInterceptor = new RabbitMQConsumerInterceptor();
}
@Test
- public void TestRabbitMQConsumerInterceptor() throws Throwable {
- Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(SW8CarrierItem.HEADER_NAME,
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=");
- AMQP.BasicProperties.Builder propsBuilder = new
AMQP.BasicProperties.Builder();
- Object[] arguments = new Object[] {
- 0,
- envelope,
- propsBuilder.headers(headers).build()
+ public void testRabbitMQConsumerInterceptorWithNilHeaders() throws
Throwable {
+ final Object[] args = {
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ getConsumer()
};
-
- rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null,
arguments, null, null);
- rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null,
arguments, null, null);
+ rabbitMQConsumerInterceptor.beforeMethod(getEnhancedInstance(), null,
args, new Class[0], null);
+ rabbitMQConsumerInterceptor.afterMethod(getEnhancedInstance(), null,
args, new Class[0], null);
+ ((Consumer) args[6]).handleDelivery("tag", new Envelope(1L, false,
"exchange", "routerKey"),
+ new AMQP.BasicProperties(), new
byte[0]
+ );
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
@Test
- public void testRabbitMQConsumerInterceptorWithNilHeaders() throws
Throwable {
- Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
- AMQP.BasicProperties.Builder propsBuilder = new
AMQP.BasicProperties.Builder();
- Object[] arguments = new Object[] {
- 0,
- envelope,
- propsBuilder.headers(null).build()
+ public void testRabbitMQConsumerInterceptor() throws Throwable {
+ final Object[] args = {
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ getConsumer()
};
- rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null,
arguments, null, null);
- rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null,
arguments, null, null);
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(
+ SW8CarrierItem.HEADER_NAME,
+
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
+ );
+ AMQP.BasicProperties.Builder propsBuilder = new
AMQP.BasicProperties.Builder();
+ propsBuilder.headers(headers);
+
+ rabbitMQConsumerInterceptor.beforeMethod(getEnhancedInstance(), null,
args, new Class[0], null);
+ rabbitMQConsumerInterceptor.afterMethod(getEnhancedInstance(), null,
args, new Class[0], null);
+ ((Consumer) args[6]).handleDelivery("tag", new Envelope(1L, false,
"exchange", "routerKey"),
+ propsBuilder.build(), new byte[0]
+ );
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
- @Test
- public void testRabbitMQConsumerInterceptorWithEmptyHeaders() throws
Throwable {
- Envelope envelope = new Envelope(1111, false, "", "rabbitmq-test");
- Map<String, Object> headers = new HashMap<String, Object>();
- AMQP.BasicProperties.Builder propsBuilder = new
AMQP.BasicProperties.Builder();
- Object[] arguments = new Object[] {
- 0,
- envelope,
- propsBuilder.headers(headers).build()
+ public EnhancedInstance getEnhancedInstance() {
+ return new EnhancedInstance() {
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return "serverAddr";
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(final Object value) {
+
+ }
};
+ }
- rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance, null,
arguments, null, null);
- rabbitMQConsumerInterceptor.afterMethod(enhancedInstance, null,
arguments, null, null);
- List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
- Assert.assertThat(traceSegments.size(), is(1));
+ public Consumer getConsumer() {
+ return new Consumer() {
+ @Override
+ public void handleConsumeOk(final String consumerTag) {
+
+ }
+
+ @Override
+ public void handleCancelOk(final String consumerTag) {
+
+ }
+
+ @Override
+ public void handleCancel(final String consumerTag) throws
IOException {
+
+ }
+
+ @Override
+ public void handleShutdownSignal(final String consumerTag, final
ShutdownSignalException sig) {
+
+ }
+
+ @Override
+ public void handleRecoverOk(final String consumerTag) {
+
+ }
+
+ @Override
+ public void handleDelivery(final String consumerTag,
+ final Envelope envelope,
+ final AMQP.BasicProperties properties,
+ final byte[] body) throws IOException {
+
+ }
+ };
}
}