This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5f26710 fix pulsar plugin message listener error with multi
partitions topic (#6918)
5f26710 is described below
commit 5f26710d34445cd721878a9473bdba01be7adb98
Author: wallezhang <[email protected]>
AuthorDate: Mon May 10 17:47:05 2021 +0800
fix pulsar plugin message listener error with multi partitions topic (#6918)
---
.../pulsar/ConsumerConstructorInterceptor.java | 1 -
.../plugin/pulsar/ConsumerEnhanceRequiredInfo.java | 13 ---
.../plugin/pulsar/PulsarConsumerInterceptor.java | 18 ++--
.../pulsar/PulsarConsumerListenerInterceptor.java | 2 +-
.../pulsar/TopicMessageConstructorInterceptor.java | 45 +++++++++
.../pulsar/define/MessageInstrumentation.java | 6 +-
...ation.java => TopicMessageInstrumentation.java} | 14 +--
.../src/main/resources/skywalking-plugin.def | 3 +-
.../pulsar/ConsumerConstructorInterceptorTest.java | 9 --
.../pulsar/PulsarConsumerInterceptorTest.java | 1 -
.../PulsarConsumerListenerInterceptorTest.java | 1 -
.../pulsar-scenario/config/expectedData.yaml | 104 ++++++++++++++++++++-
.../scenarios/pulsar-scenario/configuration.yml | 5 +-
.../testcase/pulsar/controller/CaseController.java | 43 +++++++--
.../src/main/resources/application.properties | 2 +-
15 files changed, 209 insertions(+), 58 deletions(-)
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
index d7070f7..173753b 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
@@ -45,7 +45,6 @@ public class ConsumerConstructorInterceptor implements
InstanceConstructorInterc
requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
requireInfo.setTopic(topic);
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
-
requireInfo.setHasMessageListener(consumerConfigurationData.getMessageListener()
!= null);
objInst.setSkyWalkingDynamicField(requireInfo);
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
index 27c3861..25eec64 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
@@ -38,19 +38,6 @@ public class ConsumerEnhanceRequiredInfo {
*/
private String subscriptionName;
- /**
- * whether the consumer has a message listener
- */
- private boolean hasMessageListener;
-
- public boolean isHasMessageListener() {
- return hasMessageListener;
- }
-
- public void setHasMessageListener(boolean hasMessageListener) {
- this.hasMessageListener = hasMessageListener;
- }
-
public String getServiceUrl() {
return serviceUrl;
}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
index 4c4650d..a37629c 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
@@ -77,17 +77,15 @@ public class PulsarConsumerInterceptor implements
InstanceMethodsAroundIntercept
if (allArguments[0] != null) {
final ConsumerEnhanceRequiredInfo requiredInfo =
(ConsumerEnhanceRequiredInfo) objInst
.getSkyWalkingDynamicField();
- if (requiredInfo.isHasMessageListener()) {
- EnhancedInstance msg = (EnhancedInstance) allArguments[0];
- MessageEnhanceRequiredInfo messageEnhanceRequiredInfo =
(MessageEnhanceRequiredInfo) msg
- .getSkyWalkingDynamicField();
- if (messageEnhanceRequiredInfo == null) {
- messageEnhanceRequiredInfo = new
MessageEnhanceRequiredInfo();
- msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
- }
- messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
-
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
+ EnhancedInstance msg = (EnhancedInstance) allArguments[0];
+ MessageEnhanceRequiredInfo messageEnhanceRequiredInfo =
(MessageEnhanceRequiredInfo) msg
+ .getSkyWalkingDynamicField();
+ if (messageEnhanceRequiredInfo == null) {
+ messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
+ msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
}
+ messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
+
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
ContextManager.stopSpan();
}
return ret;
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
index 99da061..60d2f47 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
@@ -58,7 +58,7 @@ public class PulsarConsumerListenerInterceptor implements
InstanceMethodsAroundI
return ret == null ? null : (MessageListener) (consumer, message) -> {
final MessageEnhanceRequiredInfo requiredInfo =
(MessageEnhanceRequiredInfo) ((EnhancedInstance) message)
.getSkyWalkingDynamicField();
- if (requiredInfo == null) {
+ if (requiredInfo == null || requiredInfo.getContextSnapshot() ==
null) {
((MessageListener) ret).received(consumer, message);
} else {
AbstractSpan activeSpan = ContextManager
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
new file mode 100644
index 0000000..d2faf97
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar topic message constructor.
+ * <p>
+ * The interceptor create {@link MessageEnhanceRequiredInfo} which is required
by passing message span across
+ * threads. Another purpose of this interceptor is to make {@link
ClassEnhancePluginDefine} enable enhanced class
+ * to implement {@link EnhancedInstance} interface. Because if {@link
ClassEnhancePluginDefine} class will not create
+ * SkyWalkingDynamicField without any constructor or method interceptor.
+ */
+public class TopicMessageConstructorInterceptor implements
InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ final Object msgArgument = allArguments[2];
+ if (msgArgument instanceof EnhancedInstance) {
+ objInst.setSkyWalkingDynamicField(((EnhancedInstance)
msgArgument).getSkyWalkingDynamicField());
+ } else {
+ objInst.setSkyWalkingDynamicField(new
MessageEnhanceRequiredInfo());
+ }
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
index 6b61ae7..9f3f89e 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
@@ -26,7 +26,7 @@ import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
-import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Pulsar message instrumentation.
@@ -39,7 +39,7 @@ import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.b
*/
public class MessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
- public static final String ENHANCE_CLASS =
"org.apache.pulsar.client.api.Message";
+ public static final String ENHANCE_CLASS =
"org.apache.pulsar.client.impl.MessageImpl";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
@Override
@@ -66,6 +66,6 @@ public class MessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDef
@Override
protected ClassMatch enhanceClass() {
- return byHierarchyMatch(ENHANCE_CLASS);
+ return byName(ENHANCE_CLASS);
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
similarity index 86%
copy from
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
copy to
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
index 6b61ae7..86b6d8c 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
@@ -26,21 +26,21 @@ import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
-import static
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
- * Pulsar message instrumentation.
+ * Pulsar topic message instrumentation.
* <p>
- * The message enhanced object is only for passing message reception span
across threads.
+ * The topic message enhanced object is only for passing message reception
span across threads.
* <p>
* Enhanced message object will be injected {@link
org.apache.skywalking.apm.plugin.pulsar.MessageEnhanceRequiredInfo}
* after message process method if consumer has a message listener.
* </p>
*/
-public class MessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+public class TopicMessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
- public static final String ENHANCE_CLASS =
"org.apache.pulsar.client.api.Message";
- public static final String CONSTRUCTOR_INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
+ public static final String ENHANCE_CLASS =
"org.apache.pulsar.client.impl.TopicMessageImpl";
+ public static final String CONSTRUCTOR_INTERCEPTOR_CLASS =
"org.apache.skywalking.apm.plugin.pulsar.TopicMessageConstructorInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -66,6 +66,6 @@ public class MessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDef
@Override
protected ClassMatch enhanceClass() {
- return byHierarchyMatch(ENHANCE_CLASS);
+ return byName(ENHANCE_CLASS);
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
index e2384cb..2419fea 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
@@ -18,4 +18,5 @@
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentatio
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerListenerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
-pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
\ No newline at end of file
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.TopicMessageInstrumentation
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
index cf7e9a8..38a1259 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.plugin.pulsar;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -71,13 +70,6 @@ public class ConsumerConstructorInterceptorTest {
when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
when(pulsarClient.getLookup()).thenReturn(lookupService);
when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
-
when(consumerConfigurationData.getMessageListener()).thenReturn((consumer,
message) -> {
- try {
- consumer.acknowledge(message);
- } catch (PulsarClientException e) {
- e.printStackTrace();
- }
- });
constructorInterceptor = new ConsumerConstructorInterceptor();
}
@@ -92,6 +84,5 @@ public class ConsumerConstructorInterceptorTest {
assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
- assertThat(requiredInfo.isHasMessageListener(), is(true));
}
}
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
index 882984f..31b65a6 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
@@ -122,7 +122,6 @@ public class PulsarConsumerInterceptorTest {
@Test
public void testConsumerWithMessageListener() throws Throwable {
- consumerEnhanceRequiredInfo.setHasMessageListener(true);
consumerInterceptor.beforeMethod(consumerInstance, null, new
Object[]{msg}, new Class[0], null);
consumerInterceptor.afterMethod(consumerInstance, null, new
Object[]{msg}, new Class[0], null);
diff --git
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
index 3d86892..235f985 100644
---
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
@@ -96,7 +96,6 @@ public class PulsarConsumerListenerInterceptorTest {
consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
- consumerEnhanceRequiredInfo.setHasMessageListener(true);
msg = new MockMessage();
msg.getMessageBuilder()
.addProperties(PulsarApi.KeyValue.newBuilder()
diff --git a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
index a40eb9f..18298fe 100644
--- a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
@@ -15,7 +15,7 @@
# limitations under the License.
segmentItems:
- serviceName: pulsar-scenario
- segmentSize: ge 6
+ segmentSize: ge 9
segments:
- segmentId: not null
spans:
@@ -34,6 +34,21 @@ segmentItems:
- { key: mq.broker, value: not null }
- { key: mq.topic, value: test }
skipAnalysis: 'false'
+ - operationName: Pulsar/testMultiPartition/Producer
+ operationId: 0
+ parentSpanId: 0
+ spanId: 2
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 73
+ isError: false
+ spanType: Exit
+ peer: not null
+ skipAnalysis: false
+ tags:
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: testMultiPartition }
- operationName: /case/pulsar-case
operationId: 0
parentSpanId: -1
@@ -46,7 +61,7 @@ segmentItems:
spanType: Entry
peer: ''
tags:
- - { key: url, value:
'http://localhost:8080/pulsar-scenario/case/pulsar-case' }
+ - { key: url, value:
'http://localhost:8088/pulsar-scenario/case/pulsar-case' }
- { key: http.method, value: GET }
skipAnalysis: 'false'
- segmentId: not null
@@ -134,3 +149,88 @@ segmentItems:
parentSpanId: 0, parentTraceSegmentId: not null,
parentServiceInstance: not null,
parentService: pulsar-scenario, traceId: not null }
skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/Producer/Callback
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 73
+ isError: false
+ spanType: Local
+ peer: ''
+ tags:
+ - { key: mq.topic, value: testMultiPartition }
+ refs:
+ - { parentEndpoint: /case/pulsar-case, networkAddress: '',
refType: CrossThread,
+ parentSpanId: 2, parentTraceSegmentId: not null,
parentServiceInstance: not
+ null,
parentService: pulsar-scenario, traceId: not null }
+ skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/testMultiPartition/Consumer/test
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 74
+ isError: false
+ spanType: Entry
+ peer: ''
+ tags:
+ - { key: transmission.latency, value: not null }
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: testMultiPartition }
+ refs:
+ - { parentEndpoint: /case/pulsar-case, networkAddress: not null,
refType: CrossProcess,
+ parentSpanId: 2, parentTraceSegmentId: not null,
parentServiceInstance: not
+ null,
parentService: pulsar-scenario, traceId: not null }
+ skipAnalysis: 'false'
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/testMultiPartition/Consumer/testWithListener
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 74
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: 'false'
+ tags:
+ - { key: transmission.latency, value: not null }
+ - { key: mq.broker, value: not null }
+ - { key: mq.topic, value: testMultiPartition }
+ refs:
+ - { parentEndpoint: /case/pulsar-case, networkAddress: not null,
+ refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
pulsar-scenario,
+ traceId: not null }
+ - segmentId: not null
+ spans:
+ - operationName: Pulsar/testMultiPartition/Consumer/MessageListener
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 74
+ isError: false
+ spanType: Local
+ peer: ''
+ tags:
+ - { key: mq.topic, value: testMultiPartition }
+ refs:
+ - { parentEndpoint:
Pulsar/testMultiPartition/Consumer/testWithListener, networkAddress: '',
refType: CrossThread,
+ parentSpanId: 0, parentTraceSegmentId: not null,
parentServiceInstance: not null,
+ parentService: pulsar-scenario, traceId: not null }
+ skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/pulsar-scenario/configuration.yml
b/test/plugin/scenarios/pulsar-scenario/configuration.yml
index 6a70cd3..af6848f 100644
--- a/test/plugin/scenarios/pulsar-scenario/configuration.yml
+++ b/test/plugin/scenarios/pulsar-scenario/configuration.yml
@@ -15,8 +15,8 @@
# limitations under the License.
type: jvm
-entryService: http://localhost:8080/pulsar-scenario/case/pulsar-case
-healthCheck: http://localhost:8080/pulsar-scenario/case/healthCheck
+entryService: http://localhost:8088/pulsar-scenario/case/pulsar-case
+healthCheck: http://localhost:8088/pulsar-scenario/case/healthCheck
startScript: ./bin/startup.sh
environment:
- PULSAR_STANDALONE=pulsar-standalone:6650
@@ -29,3 +29,4 @@ dependencies:
startScript: ["bin/pulsar","standalone"]
expose:
- 6650
+ - 8080
diff --git
a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
index f2e62ff..8eb1003 100644
---
a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
+++
b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
@@ -31,6 +31,11 @@ import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -48,10 +53,24 @@ public class CaseController {
@RequestMapping("/pulsar-case")
@ResponseBody
- public String pulsarCase() throws PulsarClientException,
InterruptedException {
+ public String pulsarCase() {
- String topic = "test";
+ String topicOnePartition = "test";
+ String topicMultiPartition = "testMultiPartition";
+ try {
+ doSendAndReceiveMessage(topicOnePartition);
+
+ createTopic(topicMultiPartition, 2);
+ doSendAndReceiveMessage(topicMultiPartition);
+ } catch (IOException e) {
+ LOGGER.error("test error", e);
+ }
+
+ return "Success";
+ }
+
+ private void doSendAndReceiveMessage(String topic) throws
PulsarClientException {
CountDownLatch latch = new CountDownLatch(2);
PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(PULSAR_DOMAIN + serviceUrl).build();
@@ -60,7 +79,7 @@ public class CaseController {
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe();
- pulsarClient.newConsumer().topic(topic)
+ final Consumer<byte[]> consumerWithListener =
pulsarClient.newConsumer().topic(topic)
.subscriptionName("testWithListener")
.messageListener((c, msg) -> {
try {
@@ -70,7 +89,8 @@ public class CaseController {
msg.getProperties()
.forEach((k, v) ->
builder.append(String.format(propertiesFormat, k, v))
.append(", "));
- LOGGER.info("Received message with messageId = {},
key = {}, value = {}, properties = {}",
+ LOGGER.info(
+ "Received message with messageId = {}, key
= {}, value = {}, properties = {}",
msg.getMessageId(), msg
.getKey(), new
String(msg.getValue()), builder.toString());
@@ -113,13 +133,24 @@ public class CaseController {
} catch (InterruptedException e) {
LOGGER.error("Can get message from consumer", e);
t.interrupt();
- throw e;
}
producer.close();
consumer.close();
+ consumerWithListener.close();
+ }
- return "Success";
+ private void createTopic(String topic, int numOfPartitions) throws
IOException {
+ final URL url = new
URL("http://pulsar-standalone:8080/admin/v2/persistent/public/default/" + topic
+ "/partitions");
+ final HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
+ connection.setRequestMethod("PUT");
+ connection.setDoOutput(true);
+
+ final OutputStream outputStream = connection.getOutputStream();
+
outputStream.write(String.valueOf(numOfPartitions).getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+
+ LOGGER.info("Create topic result:{}", connection.getResponseCode());
}
@RequestMapping("/healthCheck")
diff --git
a/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
b/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
index 7247212..3ca5cb3 100644
---
a/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
+++
b/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
@@ -15,5 +15,5 @@
# limitations under the License.
#
#
-server.port=8080
+server.port=8088
server.contextPath=/pulsar-scenario
\ No newline at end of file