This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 92665b8ad Add plugin to support NATS Java client (#298)
92665b8ad is described below
commit 92665b8add7020f26f67519a6e6c7fef49e42643
Author: pg.yang <[email protected]>
AuthorDate: Mon Aug 29 19:44:20 2022 +0800
Add plugin to support NATS Java client (#298)
---
.github/workflows/plugins-test.3.yaml | 1 +
CHANGES.md | 1 +
.../network/trace/component/ComponentsDefine.java | 2 +
.../nats-2.14.x-2.15.x-plugin/pom.xml | 47 ++
.../nats/client/CreateDispatcherInterceptor.java | 45 ++
.../plugin/nats/client/CreateSubInterceptor.java | 46 ++
.../nats/client/DeliverReplyInterceptor.java | 50 ++
.../apm/plugin/nats/client/NatsCommons.java | 116 +++
.../plugin/nats/client/NatsMessageInterceptor.java | 41 ++
.../client/SocketDataPortConnectInterceptor.java | 43 ++
.../client/SubscriptionNextMsgInterceptor.java | 54 ++
.../plugin/nats/client/WriterQueueInterceptor.java | 68 ++
.../client/WriterSendMessageBatchInterceptor.java | 115 +++
.../define/NatsConnectionInstrumentation.java | 85 +++
.../NatsConnectionWriterInstrumentation.java | 85 +++
.../define/NatsJetStreamInstrumentation.java | 68 ++
.../client/define/NatsMessageInstrumentation.java | 74 ++
.../define/NatsSubscriptionInstrumentation.java | 67 ++
.../define/SocketDataPortInstrumentation.java | 67 ++
.../src/main/resources/skywalking-plugin.def | 22 +
apm-sniffer/apm-sdk-plugin/pom.xml | 1 +
.../setup/service-agent/java-agent/Plugin-list.md | 1 +
.../service-agent/java-agent/Supported-list.md | 1 +
pom.xml | 2 -
.../nats-2.14.x-2.15.x-scenario/bin/startup.sh | 24 +
.../config/expectedData.yaml | 804 +++++++++++++++++++++
.../nats-2.14.x-2.15.x-scenario/configuration.yml | 35 +
.../scenarios/nats-2.14.x-2.15.x-scenario/pom.xml | 115 +++
.../src/main/assembly/assembly.xml | 41 ++
.../apm/testcase/nats/client/Application.java | 30 +
.../nats/client/controller/StartController.java | 96 +++
.../nats/client/publisher/JetStreamPublisher.java | 46 ++
.../publisher/JetStreamPublisherFetcher.java | 46 ++
.../nats/client/publisher/NormalPublisher.java | 38 +
.../testcase/nats/client/publisher/Publisher.java | 35 +
.../nats/client/publisher/ReqReplyPublisher.java | 38 +
.../testcase/nats/client/subscriber/Consumer.java | 26 +
.../subscriber/JetStreamFetcherConsumer.java | 66 ++
.../subscriber/JetStreamHandlerConsumer.java | 56 ++
.../nats/client/subscriber/NextMsgConsumer.java | 55 ++
.../nats/client/subscriber/ReqReplyConsumer.java | 46 ++
.../apm/testcase/nats/client/work/StopSignal.java | 31 +
.../apm/testcase/nats/client/work/StreamUtil.java | 57 ++
.../nats/client/work/TrackedConnection.java | 61 ++
.../apm/testcase/nats/client/work/WorkBuilder.java | 72 ++
.../support-version.list | 18 +
46 files changed, 2936 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/plugins-test.3.yaml
b/.github/workflows/plugins-test.3.yaml
index 0629de603..96204abd1 100644
--- a/.github/workflows/plugins-test.3.yaml
+++ b/.github/workflows/plugins-test.3.yaml
@@ -69,6 +69,7 @@ jobs:
- vertx-web-3.6plus-scenario
- mariadb-scenario
- micronaut-http-scenario
+ - nats-2.14.x-2.15.x-scenario
- quasar-scenario
- baidu-brpc-scenario
- retransform-class-scenario
diff --git a/CHANGES.md b/CHANGES.md
index 3cf412f66..07b4b131d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -18,6 +18,7 @@ Release Notes.
* Force the injected high-priority classes in order to avoid
NoClassDefFoundError.
* Plugin to support xxl-job 2.3.x.
* Add plugin to support Micronaut(HTTP Client/Server) 3.2.x-3.6.x
+* Add plugin to support NATS Java client 2.14.x-2.15.x
#### Documentation
diff --git
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
index 23d7f9f1d..52930e987 100755
---
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
+++
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -227,4 +227,6 @@ public class ComponentsDefine {
public static final OfficialComponent MICRONAUT = new
OfficialComponent(131, "Micronaut");
+ public static final OfficialComponent NATS = new OfficialComponent(132,
"Nats");
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/pom.xml
new file mode 100644
index 000000000..ece81e35b
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>apm-sdk-plugin</artifactId>
+ <groupId>org.apache.skywalking</groupId>
+ <version>8.12.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nats-2.14.x-2.15.x-plugin</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>jnats</artifactId>
+ <version>2.15.6</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java
new file mode 100644
index 000000000..e4686d256
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateDispatcherInterceptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.MessageHandler;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.buildTraceMsgHandler;
+
+public class CreateDispatcherInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ allArguments[0] = buildTraceMsgHandler((MessageHandler)
allArguments[0]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java
new file mode 100644
index 000000000..268b20422
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/CreateSubInterceptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.MessageHandler;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.buildTraceMsgHandler;
+
+public class CreateSubInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ allArguments[3] = buildTraceMsgHandler((MessageHandler)
allArguments[3]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[]
+ argumentsTypes, Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[]
+ argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java
new file mode 100644
index 000000000..5ff7f4b6d
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.Message;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.createEntrySpan;
+
+public class DeliverReplyInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ createEntrySpan((Message) allArguments[0]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ AbstractSpan span = ContextManager.activeSpan();
+ span.log(t).errorOccurred();
+ ContextManager.stopSpan(span);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java
new file mode 100644
index 000000000..9d29df7d1
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.Message;
+import io.nats.client.MessageHandler;
+import org.apache.skywalking.apm.agent.core.context.CarrierItem;
+import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.IntegerTag;
+import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.util.StringUtil;
+
+import java.util.Optional;
+
+public class NatsCommons {
+
+ private static final String SID = "sid";
+ private static final String REPLY_TO = "reply_to";
+ private static final String MSG_STATE = "state";
+ private static final String MSG = "message";
+
+ static boolean skipTrace(Object msg) {
+ // include null
+ if (!(msg instanceof Message)) {
+ return true;
+ }
+ Message natsMsg = (Message) msg;
+ return StringUtil.isBlank(natsMsg.getSubject()) ||
natsMsg.isStatusMessage();
+ }
+
+ static AbstractSpan createEntrySpan(Message message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (StringUtil.isNotEmpty(next.getHeadKey())) {
+
next.setHeadValue(message.getHeaders().getFirst(next.getHeadKey()));
+ }
+ }
+ AbstractSpan span = ContextManager.createEntrySpan("Nats/Sub/" +
message.getSubject(), contextCarrier);
+ addCommonTag(span, message);
+ return span;
+ }
+
+ static void injectCarrier(Message message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+ ContextManager.inject(contextCarrier);
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (StringUtil.isNotEmpty(next.getHeadKey())
+ && StringUtil.isNotEmpty(next.getHeadValue())) {
+ message.getHeaders().add(next.getHeadKey(),
next.getHeadValue());
+ }
+ }
+ }
+
+ static void addCommonTag(AbstractSpan span, Message message) {
+ Optional.ofNullable(message.getReplyTo()).ifPresent(v -> span.tag(new
StringTag(REPLY_TO), v));
+ Optional.ofNullable(message.getSID()).ifPresent(v -> span.tag(new
StringTag(SID), v));
+ span.setComponent(ComponentsDefine.NATS);
+ SpanLayer.asMQ(span);
+ if (message.getStatus() != null) {
+ int code = message.getStatus().getCode();
+ String statusMsg = message.getStatus().getMessage();
+ span.tag(new IntegerTag(MSG_STATE), String.valueOf(code));
+ if (StringUtil.isNotBlank(statusMsg)) {
+ span.tag(new StringTag(MSG), statusMsg);
+ }
+ if (code != 0) {
+ span.errorOccurred();
+ }
+ }
+ }
+
+ static MessageHandler buildTraceMsgHandler(MessageHandler msgHandler) {
+ if (msgHandler == null) {
+ return null;
+ }
+ return msg -> {
+ if (skipTrace(msg) || msg.getHeaders() == null) {
+ msgHandler.onMessage(msg);
+ return;
+ }
+ AbstractSpan span = NatsCommons.createEntrySpan(msg);
+ try {
+ msgHandler.onMessage(msg);
+ } catch (Exception e) {
+ span.log(e).errorOccurred();
+ throw e;
+ } finally {
+ ContextManager.stopSpan(span);
+ }
+ };
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java
new file mode 100644
index 000000000..128ba969b
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsMessageInterceptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+public class NatsMessageInterceptor implements
InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SocketDataPortConnectInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SocketDataPortConnectInterceptor.java
new file mode 100644
index 000000000..4d3be6a5f
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SocketDataPortConnectInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+public class SocketDataPortConnectInterceptor implements
InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ //Hold current serverURI
+ objInst.setSkyWalkingDynamicField(allArguments[0]);
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java
new file mode 100644
index 000000000..e2a207307
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.Message;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.createEntrySpan;
+
+public class SubscriptionNextMsgInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ if (ret == null) {
+ return null;
+ }
+
+ Message msg = (Message) ret;
+ AbstractSpan span = createEntrySpan(msg);
+ // Close the span immediately , as no chance to trace what user want
to do
+ ContextManager.stopSpan(span);
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ //do nothing since we can't capture any message
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java
new file mode 100644
index 000000000..e931d20c9
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterQueueInterceptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.Message;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.addCommonTag;
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.skipTrace;
+
+public class WriterQueueInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ if (skipTrace(allArguments[0])) {
+ return;
+ }
+ Message message = (Message) allArguments[0];
+ EnhancedInstance enhancedMsg = (EnhancedInstance) allArguments[0];
+ AbstractSpan span = ContextManager.createLocalSpan("Nats/Pub/Enqueue/"
+ message.getSubject());
+ addCommonTag(span, message);
+ enhancedMsg.setSkyWalkingDynamicField(ContextManager.capture());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ if (skipTrace(allArguments[0])) {
+ return ret;
+ }
+ AbstractSpan span = ContextManager.activeSpan();
+ if (!(Boolean) ret) {
+ Map<String, String> eventMap = new HashMap<String, String>();
+ eventMap.put("enqueue", "failed");
+ span.errorOccurred().log(System.currentTimeMillis(), eventMap);
+ }
+ ContextManager.stopSpan(span);
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ AbstractSpan span = ContextManager.activeSpan().errorOccurred().log(t);
+ ContextManager.stopSpan(span);
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java
new file mode 100644
index 000000000..802bfd637
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/WriterSendMessageBatchInterceptor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client;
+
+import io.nats.client.Message;
+import io.nats.client.impl.NatsMessage;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.PluginException;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Optional;
+
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.addCommonTag;
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.injectCarrier;
+import static
org.apache.skywalking.apm.plugin.nats.client.NatsCommons.skipTrace;
+
+public class WriterSendMessageBatchInterceptor implements
InstanceMethodsAroundInterceptor {
+
+ private static final Field NEXT_FIELD;
+
+ static {
+ Field field;
+ try {
+ field = NatsMessage.class.getDeclaredField("next");
+ field.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ field = null;
+ }
+ NEXT_FIELD = field;
+ }
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[]
allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws
Throwable {
+ Object next = allArguments[0];
+ String serverURI = (String) ((EnhancedInstance)
allArguments[1]).getSkyWalkingDynamicField();
+ while (next != null) {
+ if (!skipTrace(next)) {
+ Message message = (Message) next;
+ EnhancedInstance enhanced = (EnhancedInstance) next;
+ AbstractSpan span = ContextManager.createExitSpan("Nats/Pub/"
+ message.getSubject(), serverURI);
+ addCommonTag(span, message);
+ Optional.ofNullable(enhanced.getSkyWalkingDynamicField())
+ .ifPresent(snapshot ->
ContextManager.continued((ContextSnapshot) snapshot));
+ injectCarrier(message);
+ //escape from message's lifecycle ahead of time always correct
+ enhanced.setSkyWalkingDynamicField(null);
+ }
+ next = next(next);
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ while (ContextManager.isActive()) {
+ AbstractSpan abstractSpan = ContextManager.activeSpan();
+ ContextManager.stopSpan(abstractSpan);
+ }
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+ while (ContextManager.isActive()) {
+ AbstractSpan span = ContextManager.activeSpan();
+ span.log(t);
+ span.errorOccurred();
+ ContextManager.stopSpan(span);
+ }
+ Object next = allArguments[0];
+ try {
+ while (next != null) {
+ if (!skipTrace(next)) {
+ EnhancedInstance enhanced = (EnhancedInstance) next;
+ //escape from message's lifecycle ahead of time always
correct
+ enhanced.setSkyWalkingDynamicField(null);
+ }
+ next = next(next);
+ }
+ } catch (IllegalAccessException e) {
+ throw new PluginException("nats plugin error", e);
+ }
+ }
+
+ private NatsMessage next(Object message) throws IllegalAccessException {
+ if (NEXT_FIELD == null) {
+ return null;
+ }
+ if (!(message instanceof NatsMessage)) {
+ return null;
+ }
+ return (NatsMessage) NEXT_FIELD.get(message);
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java
new file mode 100644
index 000000000..1822732b1
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionInstrumentation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class NatsConnectionInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.NatsConnection";
+
+ private static final String CREATE_DISPATCHER_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.CreateDispatcherInterceptor";
+ private static final String REPLY_MSG_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.DeliverReplyInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named("createDispatcher").and(takesArguments(1));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return CREATE_DISPATCHER_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return true;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named("deliverReply").and(takesArguments(1));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return REPLY_MSG_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java
new file mode 100644
index 000000000..ec2bd9c0b
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsConnectionWriterInstrumentation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class NatsConnectionWriterInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.NatsConnectionWriter";
+ private static final String PUBLISH_INTERCEPTOR_CLASS_NAME =
"org.apache.skywalking.apm.plugin.nats.client.WriterSendMessageBatchInterceptor";
+ private static final String QUEUE_INTERCEPTOR_CLASS_NAME =
"org.apache.skywalking.apm.plugin.nats.client.WriterQueueInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named("queue").and(takesArguments(1));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return QUEUE_INTERCEPTOR_CLASS_NAME;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named("sendMessageBatch").and(takesArguments(3));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return PUBLISH_INTERCEPTOR_CLASS_NAME;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java
new file mode 100644
index 000000000..88dc357e1
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsJetStreamInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class NatsJetStreamInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.NatsJetStream";
+ private static final String CREATE_SUB_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.CreateSubInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named("createSubscription").and(takesArguments(7));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return CREATE_SUB_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return true;
+ }
+ }
+ };
+ }
+
+}
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java
new file mode 100644
index 000000000..79d061a39
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsMessageInstrumentation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/*
+ * Enhance NatsMessage for adding Skywalking dynamic field , that used to hold
ContextSnapshot
+ * See WriterQueueInterceptor
+ * See WriterSendMessageBatchInterceptor
+ *
+ * BTW , ACK is done by publishing a message , So we needn't enhance ACK method
+ */
+public class NatsMessageInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.NatsMessage";
+ private static final String PUBLISH_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.NatsMessageInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named("toString").and(takesArguments(0));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return PUBLISH_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java
new file mode 100644
index 000000000..abbd8bb73
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/NatsSubscriptionInstrumentation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class NatsSubscriptionInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.NatsSubscription";
+ private static final String NEXT_MSG_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.SubscriptionNextMsgInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return
named("nextMessageInternal").and(takesArguments(1));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return NEXT_MSG_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/SocketDataPortInstrumentation.java
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/SocketDataPortInstrumentation.java
new file mode 100644
index 000000000..2dcdfe1fd
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/define/SocketDataPortInstrumentation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.nats.client.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+public class SocketDataPortInstrumentation extends
ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS =
"io.nats.client.impl.SocketDataPort";
+ private static final String CONSTRUCTOR_INTERCEPTOR =
"org.apache.skywalking.apm.plugin.nats.client.SocketDataPortConnectInterceptor";
+
+ @Override
+ protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+
+ @Override
+ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override
+ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
+ return new InstanceMethodsInterceptPoint[]{
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription>
getMethodsMatcher() {
+ return named("connect").and(takesArguments(3));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return CONSTRUCTOR_INTERCEPTOR;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git
a/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/resources/skywalking-plugin.def
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 000000000..9f8a26701
--- /dev/null
+++
b/apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.NatsMessageInstrumentation
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.NatsConnectionInstrumentation
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.NatsConnectionWriterInstrumentation
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.NatsSubscriptionInstrumentation
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.NatsJetStreamInstrumentation
+nats-client-2.14.x-2.15.x=org.apache.skywalking.apm.plugin.nats.client.define.SocketDataPortInstrumentation
\ No newline at end of file
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml
b/apm-sniffer/apm-sdk-plugin/pom.xml
index 538162693..6362b47fa 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -123,6 +123,7 @@
<module>guava-eventbus-plugin</module>
<module>hutool-plugins</module>
<module>micronaut-plugins</module>
+ <module>nats-2.14.x-2.15.x-plugin</module>
</modules>
<packaging>pom</packaging>
diff --git a/docs/en/setup/service-agent/java-agent/Plugin-list.md
b/docs/en/setup/service-agent/java-agent/Plugin-list.md
index 60b9c2d10..b559e65df 100644
--- a/docs/en/setup/service-agent/java-agent/Plugin-list.md
+++ b/docs/en/setup/service-agent/java-agent/Plugin-list.md
@@ -143,3 +143,4 @@
- hutool-http-5.x
- micronaut-http-client-3.2.x-3.6.x
- micronaut-http-server-3.2.x-3.6.x
+- nats-client-2.14.x-2.15.x
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md
b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 7ac7c1826..a84d53772 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -70,6 +70,7 @@ metrics based on the tracing data.
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 5.x
* [Pulsar](http://pulsar.apache.org) 2.2.x -> 2.9.x
+ * [NATS](https://github.com/nats-io/nats.java) 2.14.x -> 2.15.x
* Aliyun ONS 1.x (Optional¹)
* NoSQL
* Redis
diff --git a/pom.xml b/pom.xml
index 3273392e3..30b1803ba 100755
--- a/pom.xml
+++ b/pom.xml
@@ -391,8 +391,6 @@
<sourceDirectories>
<sourceDirectory>${project.build.sourceDirectory}</sourceDirectory>
<sourceDirectory>${project.build.testSourceDirectory}</sourceDirectory>
-<sourceDirectory>scenarios/okhttp-scenario</sourceDirectory>
-<sourceDirectory>scenarios/spring-4.3.x-scenario</sourceDirectory>
</sourceDirectories>
<resourceIncludes>
**/*.properties,
diff --git a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/bin/startup.sh
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/bin/startup.sh
new file mode 100644
index 000000000..d2237e1e8
--- /dev/null
+++ b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/bin/startup.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+
+java -jar ${agent_opts} -Dserver.port=8080 \
+ -Dskywalking.agent.service_name=scenario-8080- \
+ -Dnats.server=nats-server \
+ ${home}/../libs/nats-2.14.x-2.15.x-scenario.jar &
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/config/expectedData.yaml
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/config/expectedData.yaml
new file mode 100644
index 000000000..2ce382794
--- /dev/null
+++ b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/config/expectedData.yaml
@@ -0,0 +1,804 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+segmentItems:
+ - serviceName: scenario-8080-
+ segmentSize: ge 32
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.CONSUMER.CREATE.scenario-8080-test-stream-3,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.STREAM.CREATE.scenario-8080-test-stream-4,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/$JS.API.STREAM.NAMES
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: Nats/Pub/Enqueue/$JS.API.STREAM.NAMES,
networkAddress: '',
+ refType: CrossThread, parentSpanId: 0, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/Enqueue/$JS.API.STREAM.NAMES
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.CONSUMER.INFO.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.CONSUMER.DURABLE.CREATE.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint:
Nats/Pub/Enqueue/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable,
+ networkAddress: '', refType: CrossThread, parentSpanId: 0,
parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/Enqueue/$JS.API.CONSUMER.MSG.NEXT.scenario-8080-test-stream-4.scenario-8080-test-stream-4-durable
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - segmentId: not null
+ spans:
+ - operationName: HEAD:/nats/check
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 1
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: url, value: 'http://localhost:8080/nats/check'}
+ - {key: http.method, value: HEAD}
+ - {key: http.status_code, value: '200'}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/scenario-8080-subject-1
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 1, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Sub/scenario-8080-subject-1
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: sid, value: '1'}
+ refs:
+ - {parentEndpoint: Nats/Pub/scenario-8080-subject-1,
networkAddress: 'nats://nats-server:4222',
+ refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/scenario-8080-subject-2
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 2, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: not null
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+
+ refs:
+ - {parentEndpoint: Nats/Sub/scenario-8080-subject-2,
networkAddress: '', refType: CrossThread,
+ parentSpanId: 1, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: not null
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+
+ - operationName: Nats/Sub/scenario-8080-subject-2
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+ - {key: sid, value: '1'}
+ refs:
+ - {parentEndpoint: Nats/Pub/scenario-8080-subject-2,
networkAddress: 'nats://nats-server:4222',
+ refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: not null
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: sid, value: '1'}
+ refs:
+ - {parentEndpoint: not null,
+ networkAddress: 'nats://nats-server:4222', refType:
CrossProcess, parentSpanId: 0,
+ parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 3, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: not null
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+
+ - operationName: Nats/Sub/scenario-8080-subject-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+ - {key: sid, value: '2'}
+ refs:
+ - {parentEndpoint: Nats/Pub/scenario-8080-subject-3,
networkAddress: 'nats://nats-server:4222',
+ refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: not null
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+
+ refs:
+ - {parentEndpoint: Nats/Sub/scenario-8080-subject-3,
networkAddress: '', refType: CrossThread,
+ parentSpanId: 1, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/scenario-8080-subject-3
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 4, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/scenario-8080-subject-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 6, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Sub/scenario-8080-subject-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+ - {key: sid, value: '2'}
+ refs:
+ - {parentEndpoint: Nats/Pub/scenario-8080-subject-4,
networkAddress: 'nats://nats-server:4222',
+ refType: CrossProcess, parentSpanId: 0, parentTraceSegmentId:
not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
+ - segmentId: not null
+ spans:
+ - operationName: Nats/Pub/Enqueue/scenario-8080-subject-1
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+
+ - operationName: Nats/Pub/Enqueue/scenario-8080-subject-2
+ operationId: 0
+ parentSpanId: 0
+ spanId: 2
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-3
+ operationId: 0
+ parentSpanId: 0
+ spanId: 3
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - operationName: Nats/Pub/Enqueue/scenario-8080-subject-3
+ operationId: 0
+ parentSpanId: 0
+ spanId: 4
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - operationName:
Nats/Pub/Enqueue/$JS.API.STREAM.INFO.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: 0
+ spanId: 5
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - operationName: Nats/Pub/Enqueue/scenario-8080-subject-4
+ operationId: 0
+ parentSpanId: 0
+ spanId: 6
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Local
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ - operationName: GET:/nats/start
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 1
+ isError: false
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
+ tags:
+ - {key: url, value: 'http://localhost:8080/nats/start'}
+ - {key: http.method, value: GET}
+ - {key: http.status_code, value: '200'}
+ - segmentId: not null
+ spans:
+ - operationName:
Nats/Pub/$JS.API.STREAM.INFO.scenario-8080-test-stream-4
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 132
+ isError: false
+ spanType: Exit
+ peer: nats://nats-server:4222
+ skipAnalysis: false
+ tags:
+ - {key: reply_to, value: not null}
+
+ refs:
+ - {parentEndpoint: 'GET:/nats/start', networkAddress: '',
refType: CrossThread,
+ parentSpanId: 5, parentTraceSegmentId: not null,
+ parentServiceInstance: not null, parentService:
scenario-8080-,
+ traceId: not null}
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/configuration.yml
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/configuration.yml
new file mode 100644
index 000000000..48b10fdb9
--- /dev/null
+++ b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/configuration.yml
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+type: jvm
+entryService: http://localhost:8080/nats/start
+healthCheck: http://localhost:8080/nats/check
+startScript: ./bin/startup.sh
+depends_on:
+ - nats-server
+dependencies:
+ nats-server:
+ image: nats:2.8.4-alpine3.15
+ hostname: nats-server
+ environment:
+ - nats.server=nats-server
+ expose:
+ - "4222"
+ entrypoint:
+ - nats-server
+ - "--auth"
+ - "abcdefgh"
+ - "--jetstream"
\ No newline at end of file
diff --git a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/pom.xml
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/pom.xml
new file mode 100644
index 000000000..5234f29e9
--- /dev/null
+++ b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/pom.xml
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <groupId>org.apache.skywalking.apm.testcase</groupId>
+ <artifactId>nats-2.14.x-2.15.x-scenario</artifactId>
+ <version>1.0.0</version>
+ <packaging>jar</packaging>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <spring.boot.version>2.7.3</spring.boot.version>
+ </properties>
+
+ <name>skywalking-nats-scenario</name>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring.boot.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.nats</groupId>
+ <artifactId>jnats</artifactId>
+ <version>${test.framework.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <finalName>nats-2.14.x-2.15.x-scenario</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>2.7.3</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>assemble</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+
<descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ <outputDirectory>./target/</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/assembly/assembly.xml
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/assembly/assembly.xml
new file mode 100644
index 000000000..098c888d8
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/assembly/assembly.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+<assembly
+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>./bin</directory>
+ <fileMode>0775</fileMode>
+ </fileSet>
+ </fileSets>
+
+ <files>
+ <file>
+
<source>${project.build.directory}/nats-2.14.x-2.15.x-scenario.jar</source>
+ <outputDirectory>./libs</outputDirectory>
+ <fileMode>0775</fileMode>
+ </file>
+ </files>
+</assembly>
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java
new file mode 100644
index 000000000..54de1f2df
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/Application.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java
new file mode 100644
index 000000000..2564daed2
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/controller/StartController.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.controller;
+
+import
org.apache.skywalking.apm.testcase.nats.client.publisher.JetStreamPublisher;
+import
org.apache.skywalking.apm.testcase.nats.client.publisher.JetStreamPublisherFetcher;
+import
org.apache.skywalking.apm.testcase.nats.client.publisher.NormalPublisher;
+import
org.apache.skywalking.apm.testcase.nats.client.publisher.ReqReplyPublisher;
+import
org.apache.skywalking.apm.testcase.nats.client.subscriber.JetStreamFetcherConsumer;
+import
org.apache.skywalking.apm.testcase.nats.client.subscriber.JetStreamHandlerConsumer;
+import
org.apache.skywalking.apm.testcase.nats.client.subscriber.NextMsgConsumer;
+import
org.apache.skywalking.apm.testcase.nats.client.subscriber.ReqReplyConsumer;
+import org.apache.skywalking.apm.testcase.nats.client.work.StopSignal;
+import org.apache.skywalking.apm.testcase.nats.client.work.WorkBuilder;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.List;
+
+@RestController
+@RequestMapping("nats")
+public class StartController {
+
+ private final List<WorkBuilder.Work> works = new ArrayList<>();
+
+ private final StopSignal stopSignal = new StopSignal();
+
+ @GetMapping(value = "check")
+ public String check() throws Exception {
+ return "success";
+ }
+
+ @GetMapping(value = "check/remote")
+ public String check2() throws Exception {
+ return "success";
+ }
+
+ @GetMapping(value = "start", produces = MediaType.TEXT_HTML_VALUE)
+ public String start() throws Exception {
+ works.forEach(WorkBuilder.Work::publish);
+ return "success";
+ }
+
+ @PostConstruct
+ public void init() {
+ String server = System.getProperty("nats.server");
+ String subjectPrefix =
System.getProperty("skywalking.agent.service_name", "");
+ if (server == null) {
+ throw new RuntimeException("missing property : nats.server");
+ }
+ // test normal message
+ works.add(new WorkBuilder("message-subject-1", subjectPrefix +
"subject-1", server)
+ .build(new NormalPublisher(), new
NextMsgConsumer(stopSignal)));
+
+ // test request-reply message
+ works.add(new WorkBuilder("request-reply-subject-2", subjectPrefix +
"subject-2", server)
+ .build(new ReqReplyPublisher(), new ReqReplyConsumer()));
+
+ // test handle message and ack
+ works.add(new WorkBuilder("stream-subject-3", subjectPrefix +
"subject-3", server)
+ .build(new JetStreamPublisher(subjectPrefix + "test-stream-3"),
+ new JetStreamHandlerConsumer(subjectPrefix +
"test-stream-3")));
+
+ // test stream message and pull message
+ String stream = subjectPrefix + "test-stream-4";
+ works.add(new WorkBuilder("request-stream-subject-4", subjectPrefix +
"subject-4", server)
+ .build(new JetStreamPublisherFetcher(stream), new
JetStreamFetcherConsumer(stream)));
+
+ works.forEach(WorkBuilder.Work::subscribe);
+ }
+
+ @PreDestroy
+ public void stop() {
+ stopSignal.stop();
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java
new file mode 100644
index 000000000..5cd7033a8
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.publisher;
+
+import io.nats.client.Connection;
+import io.nats.client.JetStream;
+import io.nats.client.api.PublishAck;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil;
+
+@Slf4j
+public class JetStreamPublisher implements Publisher {
+ private String stream;
+
+ public JetStreamPublisher(String stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void publish(Connection connection, String msg, String subject) {
+ try {
+ JetStream js = connection.jetStream();
+ StreamUtil.initStream(connection, subject, this.stream);
+ log.info("send message : {} to {}:{}", msg, this.stream, subject);
+ PublishAck pa = js.publish(buildMsg(subject, msg));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java
new file mode 100644
index 000000000..76f90990e
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/JetStreamPublisherFetcher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.publisher;
+
+import io.nats.client.Connection;
+import io.nats.client.JetStream;
+import io.nats.client.api.PublishAck;
+import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil;
+
+public class JetStreamPublisherFetcher implements Publisher {
+
+ private final String stream;
+
+ public JetStreamPublisherFetcher(String stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void publish(Connection connection, String msg, String subject) {
+
+ try {
+ JetStream js = connection.jetStream();
+ StreamUtil.initStream(connection, subject, stream);
+ PublishAck pa = js.publish(buildMsg(subject, msg));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java
new file mode 100644
index 000000000..c1d6c325c
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/NormalPublisher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.publisher;
+
+import io.nats.client.Connection;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+@Slf4j
+public class NormalPublisher implements Publisher {
+
+ public void publish(Connection connection, String msg, String subject) {
+ try {
+ connection.publish(buildMsg(subject, msg));
+ log.info("send message : {} to {}", msg, subject);
+ connection.flush(Duration.ofSeconds(5));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java
new file mode 100644
index 000000000..8eed818fa
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/Publisher.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.publisher;
+
+import io.nats.client.Connection;
+import io.nats.client.impl.NatsMessage;
+
+import java.nio.charset.StandardCharsets;
+
+public interface Publisher {
+
+ void publish(Connection connection, String msg, String subject);
+
+ default NatsMessage buildMsg(String subject, String msg) {
+ return NatsMessage.builder()
+ .data(msg, StandardCharsets.UTF_8)
+ .subject(subject).build();
+
+ }
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java
new file mode 100644
index 000000000..200a851e4
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/publisher/ReqReplyPublisher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.publisher;
+
+import io.nats.client.Connection;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+@Slf4j
+public class ReqReplyPublisher implements Publisher {
+
+ @Override
+ public void publish(Connection connection, String msg, String subject) {
+ try {
+ log.info("send message : {} to {}", msg, subject);
+ connection.request(buildMsg(subject, msg), Duration.ofMinutes(5));
+ log.info("receive reply message : {} from {}'reply", msg, subject);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java
new file mode 100644
index 000000000..496c533bc
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/Consumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.subscriber;
+
+import io.nats.client.Connection;
+
+public interface Consumer {
+
+ void subscribe(Connection connection, String subject);
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java
new file mode 100644
index 000000000..4b1ef70c4
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamFetcherConsumer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.subscriber;
+
+import io.nats.client.Connection;
+import io.nats.client.JetStream;
+import io.nats.client.JetStreamSubscription;
+import io.nats.client.Message;
+import io.nats.client.PullSubscribeOptions;
+import io.nats.client.api.ConsumerConfiguration;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil;
+
+import java.time.Duration;
+import java.util.List;
+
+@Slf4j
+public class JetStreamFetcherConsumer implements Consumer {
+
+ private final String stream;
+
+ public JetStreamFetcherConsumer(String stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void subscribe(Connection connection, String subject) {
+ new Thread(() -> {
+ try {
+ ConsumerConfiguration cc = ConsumerConfiguration.builder()
+ .ackWait(Duration.ofMillis(100))
+ .build();
+ PullSubscribeOptions pullOptions =
PullSubscribeOptions.builder()
+ .durable(stream + "-durable")
+ .configuration(cc)
+ .build();
+ StreamUtil.initStream(connection, subject, stream);
+ JetStream js = connection.jetStream();
+ JetStreamSubscription subscribe = js.subscribe(subject,
pullOptions);
+ List<Message> messages = subscribe.fetch(1,
Duration.ofHours(1));
+ if (messages != null) {
+ messages.forEach(msg -> log.info("received message : {} ",
msg));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }).start();
+
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java
new file mode 100644
index 000000000..a8481dba4
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/JetStreamHandlerConsumer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.subscriber;
+
+import io.nats.client.Connection;
+import io.nats.client.JetStream;
+import io.nats.client.MessageHandler;
+import io.nats.client.PushSubscribeOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.testcase.nats.client.work.StreamUtil;
+
+@Slf4j
+public class JetStreamHandlerConsumer implements Consumer {
+
+ private String stream;
+
+ private PushSubscribeOptions so;
+
+ public JetStreamHandlerConsumer(String stream) {
+ this.stream = stream;
+ this.so = PushSubscribeOptions.builder()
+ .stream(this.stream)
+ .build();
+ }
+
+ @Override
+ public void subscribe(Connection connection, String subject) {
+ try {
+ JetStream js = connection.jetStream();
+ StreamUtil.initStream(connection, subject, this.stream);
+ MessageHandler handler = msg -> {
+ log.info("receive : {}, from :{} ,and will ack", subject, msg);
+ msg.ack();
+ };
+ js.subscribe(subject, connection.createDispatcher(), handler,
true, so);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java
new file mode 100644
index 000000000..831278f48
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/NextMsgConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.subscriber;
+
+import io.nats.client.Connection;
+import io.nats.client.Message;
+import io.nats.client.Subscription;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.testcase.nats.client.work.StopSignal;
+
+import java.time.Duration;
+
+@Slf4j
+public class NextMsgConsumer implements Consumer {
+
+ private final StopSignal stopSignal;
+
+ public NextMsgConsumer(StopSignal stopSignal) {
+ this.stopSignal = stopSignal;
+ }
+
+ @Override
+ public void subscribe(Connection connection, String subject) {
+ Subscription sub = connection.subscribe(subject);
+ new Thread(() -> {
+ while (!stopSignal.stopped()) {
+ try {
+ Message msg = sub.nextMessage(Duration.ofMinutes(50));
+ if (msg != null) {
+ msg.ack();
+ log.info("receive : {}, from :{} ", subject, msg);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+ }
+
+}
\ No newline at end of file
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java
new file mode 100644
index 000000000..6a10e81f2
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/subscriber/ReqReplyConsumer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.subscriber;
+
+import io.nats.client.Connection;
+import io.nats.client.Dispatcher;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+
+@Slf4j
+public class ReqReplyConsumer implements Consumer {
+
+ @Override
+ public void subscribe(Connection connection, String subject) {
+
+ Dispatcher d = connection.createDispatcher(msg -> {
+ log.info("receive : {}, from :{} and will reply", subject, msg);
+ connection.publish(msg.getReplyTo(), "Have received
msg".getBytes(StandardCharsets.UTF_8));
+ try {
+ connection.flush(Duration.ofSeconds(5));
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ d.subscribe(subject);
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java
new file mode 100644
index 000000000..fc73dfe66
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StopSignal.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.work;
+
+public class StopSignal {
+ private volatile int stop = 1;
+
+ public void stop() {
+ this.stop = 0;
+ }
+
+ public boolean stopped() {
+ return this.stop == 0;
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java
new file mode 100644
index 000000000..9a10c9c3d
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/StreamUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.work;
+
+import io.nats.client.Connection;
+import io.nats.client.JetStreamApiException;
+import io.nats.client.JetStreamManagement;
+import io.nats.client.api.StorageType;
+import io.nats.client.api.StreamConfiguration;
+import io.nats.client.api.StreamInfo;
+
+import java.util.List;
+
+public class StreamUtil {
+
+ public static void initStream(Connection connection, String subject,
String stream) throws Exception {
+
+ JetStreamManagement jetStreamManagement =
connection.jetStreamManagement();
+ StreamInfo streamInfo;
+ try {
+ streamInfo = jetStreamManagement.getStreamInfo(stream);
+ } catch (JetStreamApiException e) {
+ streamInfo = null;
+ }
+
+ if (streamInfo == null) {
+ StreamConfiguration sc = StreamConfiguration.builder()
+ .name(stream)
+ .storageType(StorageType.Memory)
+ .subjects(subject)
+ .build();
+ jetStreamManagement.addStream(sc);
+ } else {
+ List<String> subjects =
streamInfo.getConfiguration().getSubjects();
+ if (!subjects.contains(subject)) {
+ subjects.add(subject);
+
jetStreamManagement.updateStream(streamInfo.getConfiguration());
+ }
+ }
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java
new file mode 100644
index 000000000..31b4ee0c1
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/TrackedConnection.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.work;
+
+import io.nats.client.Connection;
+import io.nats.client.ErrorListener;
+import io.nats.client.Nats;
+import io.nats.client.Options;
+import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.time.Duration;
+
+@Slf4j
+public class TrackedConnection {
+
+ public static Connection newConnection(String url) throws IOException,
InterruptedException {
+ return Nats.connect(createOptions(url));
+ }
+
+ public static Options createOptions(String server) {
+ Options.Builder builder = new Options.Builder().
+ server(server).
+ connectionTimeout(Duration.ofSeconds(5)).
+ pingInterval(Duration.ofSeconds(10)).
+ reconnectWait(Duration.ofSeconds(1)).
+ maxReconnects(-1).
+ traceConnection()
+ .token("abcdefgh".toCharArray());
+
+ builder = builder.connectionListener((conn, type) -> log.info("Status
change " + type));
+ builder = builder.errorListener(new ErrorListener() {
+ @Override
+ public void exceptionOccurred(Connection conn, Exception exp) {
+ log.info("ATS connection exception occurred");
+ exp.printStackTrace();
+ }
+
+ @Override
+ public void errorOccurred(Connection conn, String error) {
+ log.info("NATS connection error occurred " + error);
+ }
+ });
+
+ return builder.build();
+ }
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java
new file mode 100644
index 000000000..dd85f9852
--- /dev/null
+++
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/nats/client/work/WorkBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.testcase.nats.client.work;
+
+import io.nats.client.Connection;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.testcase.nats.client.publisher.Publisher;
+import org.apache.skywalking.apm.testcase.nats.client.subscriber.Consumer;
+
+@Slf4j
+public class WorkBuilder {
+ private final String message;
+ private final String subject;
+ private final String url;
+
+ public WorkBuilder(String message, String subject, String url) {
+ this.message = message;
+ this.subject = subject;
+ this.url = url;
+ }
+
+ public Work build(Publisher publisher, Consumer consumer) {
+
+ return new Work() {
+ @Override
+ public void subscribe() {
+ Connection consumerCon = WorkBuilder.this.createConnection();
+ consumer.subscribe(consumerCon, WorkBuilder.this.subject);
+ }
+
+ @Override
+ public void publish() {
+ Connection connection = WorkBuilder.this.createConnection();
+ publisher.publish(connection, message,
WorkBuilder.this.subject);
+ }
+ };
+ }
+
+ private Connection createConnection() {
+ Connection connection;
+ try {
+ connection = TrackedConnection.newConnection(WorkBuilder.this.url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return connection;
+ }
+
+ public interface Work {
+ // first subscribe
+ void subscribe();
+
+ // then publish message
+ void publish();
+ }
+
+}
diff --git
a/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/support-version.list
b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/support-version.list
new file mode 100644
index 000000000..9798fc44d
--- /dev/null
+++ b/test/plugin/scenarios/nats-2.14.x-2.15.x-scenario/support-version.list
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+2.15.6
+2.14.2
\ No newline at end of file