This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f26710  fix pulsar plugin message listener error with multi 
partitions topic (#6918)
5f26710 is described below

commit 5f26710d34445cd721878a9473bdba01be7adb98
Author: wallezhang <[email protected]>
AuthorDate: Mon May 10 17:47:05 2021 +0800

    fix pulsar plugin message listener error with multi partitions topic (#6918)
---
 .../pulsar/ConsumerConstructorInterceptor.java     |   1 -
 .../plugin/pulsar/ConsumerEnhanceRequiredInfo.java |  13 ---
 .../plugin/pulsar/PulsarConsumerInterceptor.java   |  18 ++--
 .../pulsar/PulsarConsumerListenerInterceptor.java  |   2 +-
 .../pulsar/TopicMessageConstructorInterceptor.java |  45 +++++++++
 .../pulsar/define/MessageInstrumentation.java      |   6 +-
 ...ation.java => TopicMessageInstrumentation.java} |  14 +--
 .../src/main/resources/skywalking-plugin.def       |   3 +-
 .../pulsar/ConsumerConstructorInterceptorTest.java |   9 --
 .../pulsar/PulsarConsumerInterceptorTest.java      |   1 -
 .../PulsarConsumerListenerInterceptorTest.java     |   1 -
 .../pulsar-scenario/config/expectedData.yaml       | 104 ++++++++++++++++++++-
 .../scenarios/pulsar-scenario/configuration.yml    |   5 +-
 .../testcase/pulsar/controller/CaseController.java |  43 +++++++--
 .../src/main/resources/application.properties      |   2 +-
 15 files changed, 209 insertions(+), 58 deletions(-)

diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
index d7070f7..173753b 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
@@ -45,7 +45,6 @@ public class ConsumerConstructorInterceptor implements 
InstanceConstructorInterc
         requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
         requireInfo.setTopic(topic);
         
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
-        
requireInfo.setHasMessageListener(consumerConfigurationData.getMessageListener()
 != null);
         objInst.setSkyWalkingDynamicField(requireInfo);
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
index 27c3861..25eec64 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
@@ -38,19 +38,6 @@ public class ConsumerEnhanceRequiredInfo {
      */
     private String subscriptionName;
 
-    /**
-     * whether the consumer has a message listener
-     */
-    private boolean hasMessageListener;
-
-    public boolean isHasMessageListener() {
-        return hasMessageListener;
-    }
-
-    public void setHasMessageListener(boolean hasMessageListener) {
-        this.hasMessageListener = hasMessageListener;
-    }
-
     public String getServiceUrl() {
         return serviceUrl;
     }
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
index 4c4650d..a37629c 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
@@ -77,17 +77,15 @@ public class PulsarConsumerInterceptor implements 
InstanceMethodsAroundIntercept
         if (allArguments[0] != null) {
             final ConsumerEnhanceRequiredInfo requiredInfo = 
(ConsumerEnhanceRequiredInfo) objInst
                     .getSkyWalkingDynamicField();
-            if (requiredInfo.isHasMessageListener()) {
-                EnhancedInstance msg = (EnhancedInstance) allArguments[0];
-                MessageEnhanceRequiredInfo messageEnhanceRequiredInfo = 
(MessageEnhanceRequiredInfo) msg
-                        .getSkyWalkingDynamicField();
-                if (messageEnhanceRequiredInfo == null) {
-                    messageEnhanceRequiredInfo = new 
MessageEnhanceRequiredInfo();
-                    msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
-                }
-                messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
-                
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
+            EnhancedInstance msg = (EnhancedInstance) allArguments[0];
+            MessageEnhanceRequiredInfo messageEnhanceRequiredInfo = 
(MessageEnhanceRequiredInfo) msg
+                    .getSkyWalkingDynamicField();
+            if (messageEnhanceRequiredInfo == null) {
+                messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
+                msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
             }
+            messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
+            
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
             ContextManager.stopSpan();
         }
         return ret;
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
index 99da061..60d2f47 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
@@ -58,7 +58,7 @@ public class PulsarConsumerListenerInterceptor implements 
InstanceMethodsAroundI
         return ret == null ? null : (MessageListener) (consumer, message) -> {
             final MessageEnhanceRequiredInfo requiredInfo = 
(MessageEnhanceRequiredInfo) ((EnhancedInstance) message)
                     .getSkyWalkingDynamicField();
-            if (requiredInfo == null) {
+            if (requiredInfo == null || requiredInfo.getContextSnapshot() == 
null) {
                 ((MessageListener) ret).received(consumer, message);
             } else {
                 AbstractSpan activeSpan = ContextManager
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
new file mode 100644
index 0000000..d2faf97
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/TopicMessageConstructorInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.pulsar;
+
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+
+/**
+ * Interceptor of pulsar topic message constructor.
+ * <p>
+ * The interceptor create {@link MessageEnhanceRequiredInfo} which is required 
by passing message span across
+ * threads. Another purpose of this interceptor is to make {@link 
ClassEnhancePluginDefine} enable enhanced class
+ * to implement {@link EnhancedInstance} interface. Because if {@link 
ClassEnhancePluginDefine} class will not create
+ * SkyWalkingDynamicField without any constructor or method interceptor.
+ */
+public class TopicMessageConstructorInterceptor implements 
InstanceConstructorInterceptor {
+
+    @Override
+    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+        final Object msgArgument = allArguments[2];
+        if (msgArgument instanceof EnhancedInstance) {
+            objInst.setSkyWalkingDynamicField(((EnhancedInstance) 
msgArgument).getSkyWalkingDynamicField());
+        } else {
+            objInst.setSkyWalkingDynamicField(new 
MessageEnhanceRequiredInfo());
+        }
+
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
index 6b61ae7..9f3f89e 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
@@ -26,7 +26,7 @@ import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 
-import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
 
 /**
  * Pulsar message instrumentation.
@@ -39,7 +39,7 @@ import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.b
  */
 public class MessageInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
 
-    public static final String ENHANCE_CLASS = 
"org.apache.pulsar.client.api.Message";
+    public static final String ENHANCE_CLASS = 
"org.apache.pulsar.client.impl.MessageImpl";
     public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
 
     @Override
@@ -66,6 +66,6 @@ public class MessageInstrumentation extends 
ClassInstanceMethodsEnhancePluginDef
 
     @Override
     protected ClassMatch enhanceClass() {
-        return byHierarchyMatch(ENHANCE_CLASS);
+        return byName(ENHANCE_CLASS);
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
similarity index 86%
copy from 
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
copy to 
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
index 6b61ae7..86b6d8c 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/TopicMessageInstrumentation.java
@@ -26,21 +26,21 @@ import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 
-import static 
org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
 
 /**
- * Pulsar message instrumentation.
+ * Pulsar topic message instrumentation.
  * <p>
- * The message enhanced object is only for passing message reception span 
across threads.
+ * The topic message enhanced object is only for passing message reception 
span across threads.
  * <p>
  * Enhanced message object will be injected {@link 
org.apache.skywalking.apm.plugin.pulsar.MessageEnhanceRequiredInfo}
  * after message process method if consumer has a message listener.
  * </p>
  */
-public class MessageInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+public class TopicMessageInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
 
-    public static final String ENHANCE_CLASS = 
"org.apache.pulsar.client.api.Message";
-    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
+    public static final String ENHANCE_CLASS = 
"org.apache.pulsar.client.impl.TopicMessageImpl";
+    public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = 
"org.apache.skywalking.apm.plugin.pulsar.TopicMessageConstructorInterceptor";
 
     @Override
     public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -66,6 +66,6 @@ public class MessageInstrumentation extends 
ClassInstanceMethodsEnhancePluginDef
 
     @Override
     protected ClassMatch enhanceClass() {
-        return byHierarchyMatch(ENHANCE_CLASS);
+        return byName(ENHANCE_CLASS);
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
index e2384cb..2419fea 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
@@ -18,4 +18,5 @@ 
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentatio
 
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
 
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerListenerInstrumentation
 
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
-pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
\ No newline at end of file
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
+pulsar=org.apache.skywalking.apm.plugin.pulsar.define.TopicMessageInstrumentation
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
index cf7e9a8..38a1259 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.apm.plugin.pulsar;
 
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -71,13 +70,6 @@ public class ConsumerConstructorInterceptorTest {
         when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
         when(pulsarClient.getLookup()).thenReturn(lookupService);
         
when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
-        
when(consumerConfigurationData.getMessageListener()).thenReturn((consumer, 
message) -> {
-            try {
-                consumer.acknowledge(message);
-            } catch (PulsarClientException e) {
-                e.printStackTrace();
-            }
-        });
         constructorInterceptor = new ConsumerConstructorInterceptor();
     }
 
@@ -92,6 +84,5 @@ public class ConsumerConstructorInterceptorTest {
         assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
         assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
         assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
-        assertThat(requiredInfo.isHasMessageListener(), is(true));
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
index 882984f..31b65a6 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
@@ -122,7 +122,6 @@ public class PulsarConsumerInterceptorTest {
 
     @Test
     public void testConsumerWithMessageListener() throws Throwable {
-        consumerEnhanceRequiredInfo.setHasMessageListener(true);
         consumerInterceptor.beforeMethod(consumerInstance, null, new 
Object[]{msg}, new Class[0], null);
         consumerInterceptor.afterMethod(consumerInstance, null, new 
Object[]{msg}, new Class[0], null);
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
index 3d86892..235f985 100644
--- 
a/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
@@ -96,7 +96,6 @@ public class PulsarConsumerListenerInterceptorTest {
         
consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
         consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
         consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
-        consumerEnhanceRequiredInfo.setHasMessageListener(true);
         msg = new MockMessage();
         msg.getMessageBuilder()
                 .addProperties(PulsarApi.KeyValue.newBuilder()
diff --git a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml 
b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
index a40eb9f..18298fe 100644
--- a/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
@@ -15,7 +15,7 @@
 # limitations under the License.
 segmentItems:
 - serviceName: pulsar-scenario
-  segmentSize: ge 6
+  segmentSize: ge 9
   segments:
     - segmentId: not null
       spans:
@@ -34,6 +34,21 @@ segmentItems:
             - { key: mq.broker, value: not null }
             - { key: mq.topic, value: test }
           skipAnalysis: 'false'
+        - operationName: Pulsar/testMultiPartition/Producer
+          operationId: 0
+          parentSpanId: 0
+          spanId: 2
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 73
+          isError: false
+          spanType: Exit
+          peer: not null
+          skipAnalysis: false
+          tags:
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: testMultiPartition }
         - operationName: /case/pulsar-case
           operationId: 0
           parentSpanId: -1
@@ -46,7 +61,7 @@ segmentItems:
           spanType: Entry
           peer: ''
           tags:
-            - { key: url, value: 
'http://localhost:8080/pulsar-scenario/case/pulsar-case' }
+            - { key: url, value: 
'http://localhost:8088/pulsar-scenario/case/pulsar-case' }
             - { key: http.method, value: GET }
           skipAnalysis: 'false'
     - segmentId: not null
@@ -134,3 +149,88 @@ segmentItems:
                 parentSpanId: 0, parentTraceSegmentId: not null, 
parentServiceInstance: not null,
                 parentService: pulsar-scenario, traceId: not null }
           skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/Producer/Callback
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 73
+          isError: false
+          spanType: Local
+          peer: ''
+          tags:
+            - { key: mq.topic, value: testMultiPartition }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: '', 
refType: CrossThread,
+              parentSpanId: 2, parentTraceSegmentId: not null, 
parentServiceInstance: not
+                                                                 null, 
parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/testMultiPartition/Consumer/test
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Entry
+          peer: ''
+          tags:
+            - { key: transmission.latency, value: not null }
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: testMultiPartition }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: not null, 
refType: CrossProcess,
+              parentSpanId: 2, parentTraceSegmentId: not null, 
parentServiceInstance: not
+                                                                 null, 
parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/testMultiPartition/Consumer/testWithListener
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Entry
+          peer: ''
+          skipAnalysis: 'false'
+          tags:
+            - { key: transmission.latency, value: not null }
+            - { key: mq.broker, value: not null }
+            - { key: mq.topic, value: testMultiPartition }
+          refs:
+            - { parentEndpoint: /case/pulsar-case, networkAddress: not null,
+                refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: 
not null,
+                parentServiceInstance: not null, parentService: 
pulsar-scenario,
+                traceId: not null }
+    - segmentId: not null
+      spans:
+        - operationName: Pulsar/testMultiPartition/Consumer/MessageListener
+          operationId: 0
+          parentSpanId: -1
+          spanId: 0
+          spanLayer: MQ
+          startTime: nq 0
+          endTime: nq 0
+          componentId: 74
+          isError: false
+          spanType: Local
+          peer: ''
+          tags:
+            - { key: mq.topic, value: testMultiPartition }
+          refs:
+            - { parentEndpoint: 
Pulsar/testMultiPartition/Consumer/testWithListener, networkAddress: '', 
refType: CrossThread,
+                parentSpanId: 0, parentTraceSegmentId: not null, 
parentServiceInstance: not null,
+                parentService: pulsar-scenario, traceId: not null }
+          skipAnalysis: 'false'
diff --git a/test/plugin/scenarios/pulsar-scenario/configuration.yml 
b/test/plugin/scenarios/pulsar-scenario/configuration.yml
index 6a70cd3..af6848f 100644
--- a/test/plugin/scenarios/pulsar-scenario/configuration.yml
+++ b/test/plugin/scenarios/pulsar-scenario/configuration.yml
@@ -15,8 +15,8 @@
 # limitations under the License.
 
 type: jvm
-entryService: http://localhost:8080/pulsar-scenario/case/pulsar-case
-healthCheck: http://localhost:8080/pulsar-scenario/case/healthCheck
+entryService: http://localhost:8088/pulsar-scenario/case/pulsar-case
+healthCheck: http://localhost:8088/pulsar-scenario/case/healthCheck
 startScript: ./bin/startup.sh
 environment:
   - PULSAR_STANDALONE=pulsar-standalone:6650
@@ -29,3 +29,4 @@ dependencies:
     startScript: ["bin/pulsar","standalone"]
     expose:
       - 6650
+      - 8080
diff --git 
a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
 
b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
index f2e62ff..8eb1003 100644
--- 
a/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
+++ 
b/test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
@@ -31,6 +31,11 @@ import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -48,10 +53,24 @@ public class CaseController {
 
     @RequestMapping("/pulsar-case")
     @ResponseBody
-    public String pulsarCase() throws PulsarClientException, 
InterruptedException {
+    public String pulsarCase() {
 
-        String topic = "test";
+        String topicOnePartition = "test";
+        String topicMultiPartition = "testMultiPartition";
 
+        try {
+            doSendAndReceiveMessage(topicOnePartition);
+
+            createTopic(topicMultiPartition, 2);
+            doSendAndReceiveMessage(topicMultiPartition);
+        } catch (IOException e) {
+            LOGGER.error("test error", e);
+        }
+
+        return "Success";
+    }
+
+    private void doSendAndReceiveMessage(String topic) throws 
PulsarClientException {
         CountDownLatch latch = new CountDownLatch(2);
 
         PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(PULSAR_DOMAIN + serviceUrl).build();
@@ -60,7 +79,7 @@ public class CaseController {
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe();
 
-        pulsarClient.newConsumer().topic(topic)
+        final Consumer<byte[]> consumerWithListener = 
pulsarClient.newConsumer().topic(topic)
                 .subscriptionName("testWithListener")
                 .messageListener((c, msg) -> {
                     try {
@@ -70,7 +89,8 @@ public class CaseController {
                             msg.getProperties()
                                     .forEach((k, v) -> 
builder.append(String.format(propertiesFormat, k, v))
                                             .append(", "));
-                            LOGGER.info("Received message with messageId = {}, 
key = {}, value = {}, properties = {}",
+                            LOGGER.info(
+                                    "Received message with messageId = {}, key 
= {}, value = {}, properties = {}",
                                     msg.getMessageId(), msg
                                             .getKey(), new 
String(msg.getValue()), builder.toString());
 
@@ -113,13 +133,24 @@ public class CaseController {
         } catch (InterruptedException e) {
             LOGGER.error("Can get message from consumer", e);
             t.interrupt();
-            throw e;
         }
 
         producer.close();
         consumer.close();
+        consumerWithListener.close();
+    }
 
-        return "Success";
+    private void createTopic(String topic, int numOfPartitions) throws 
IOException {
+        final URL url = new 
URL("http://pulsar-standalone:8080/admin/v2/persistent/public/default/"; + topic 
+ "/partitions");
+        final HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
+        connection.setRequestMethod("PUT");
+        connection.setDoOutput(true);
+
+        final OutputStream outputStream = connection.getOutputStream();
+        
outputStream.write(String.valueOf(numOfPartitions).getBytes(StandardCharsets.UTF_8));
+        outputStream.flush();
+
+        LOGGER.info("Create topic result:{}", connection.getResponseCode());
     }
 
     @RequestMapping("/healthCheck")
diff --git 
a/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
 
b/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
index 7247212..3ca5cb3 100644
--- 
a/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
+++ 
b/test/plugin/scenarios/pulsar-scenario/src/main/resources/application.properties
@@ -15,5 +15,5 @@
 # limitations under the License.
 #
 #
-server.port=8080
+server.port=8088
 server.contextPath=/pulsar-scenario
\ No newline at end of file

Reply via email to