This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new a751e324f6 Support across thread tracing for SOFA-RPC (#675)
a751e324f6 is described below

commit a751e324f6669f5a1b0507d61b4fef020fe2b264
Author: 张呈熹 <orezsile...@163.com>
AuthorDate: Mon Mar 25 10:53:40 2024 +0800

    Support across thread tracing for SOFA-RPC (#675)
---
 CHANGES.md                                         |   1 +
 .../apm/plugin/sofarpc/InvokeCallbackWrapper.java  |  88 +++++++++++
 .../sofarpc/SofaBoltCallbackInstrumentation.java   |  70 +++++++++
 .../sofarpc/SofaBoltCallbackInvokeInterceptor.java |  55 +++++++
 .../src/main/resources/skywalking-plugin.def       |   1 +
 .../plugin/sofarpc/InvokeCallbackWrapperTest.java  | 169 +++++++++++++++++++++
 .../SofaBoltCallbackInvokeInterceptorTest.java     |  79 ++++++++++
 .../sofarpc/SofaRpcConsumerInterceptorTest.java    |  40 +++--
 .../sofarpc/SofaRpcProviderInterceptorTest.java    |  19 +--
 .../sofarpc-scenario/config/expectedData.yaml      |  48 ++++++
 .../apm/testcase/sofarpc/SofaRpcApplication.java   |  14 +-
 .../TestCallback.java}                             |  27 +++-
 .../sofarpc/controller/CaseController.java         |  13 ++
 .../sofarpc/interfaces/SofaRpcDemoService.java     |   2 +
 .../sofarpc/service/SofaRpcDemoServiceImpl.java    |   5 +
 15 files changed, 600 insertions(+), 31 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 11dcc86166..5627ea6ad0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -18,6 +18,7 @@ Release Notes.
 * Archive the expired plugins `impala-jdbc-2.6.x-plugin`.
 * Fix a bug in Spring Cloud Gateway if HttpClientFinalizer#send does not 
invoke, the span created at NettyRoutingFilterInterceptor can not stop.
 * Fix not tracing in HttpClient v5 when HttpHost(arg[0]) is null but 
`RoutingSupport#determineHost` works.
+* Support across thread tracing for SOFA-RPC.
 
 #### Documentation
 * Update docs to describe `expired-plugins`.
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java
new file mode 100644
index 0000000000..cf806cb592
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.sofarpc;
+
+import com.alipay.remoting.InvokeCallback;
+import java.util.concurrent.Executor;
+import lombok.AccessLevel;
+import lombok.Getter;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+
+public class InvokeCallbackWrapper implements InvokeCallback {
+
+    @Getter(AccessLevel.PACKAGE)
+    private ContextSnapshot contextSnapshot;
+    @Getter(AccessLevel.PACKAGE)
+    private final InvokeCallback invokeCallback;
+
+    public InvokeCallbackWrapper(InvokeCallback invokeCallback) {
+        if (ContextManager.isActive()) {
+            this.contextSnapshot = ContextManager.capture();
+        }
+        this.invokeCallback = invokeCallback;
+    }
+
+    @Override
+    public void onResponse(final Object o) {
+        ContextManager.createLocalSpan("Thread/" + 
invokeCallback.getClass().getName() + "/onResponse");
+        if (contextSnapshot != null) {
+            ContextManager.continued(contextSnapshot);
+        }
+        try {
+            invokeCallback.onResponse(o);
+        } catch (Throwable t) {
+            ContextManager.activeSpan().log(t);
+            throw t;
+        } finally {
+            contextSnapshot = null;
+            ContextManager.stopSpan();
+        }
+
+    }
+
+    @Override
+    public void onException(final Throwable throwable) {
+        ContextManager.createLocalSpan("Thread/" + 
invokeCallback.getClass().getName() + "/onException");
+        if (contextSnapshot != null) {
+            ContextManager.continued(contextSnapshot);
+        }
+        if (throwable != null) {
+            AbstractSpan abstractSpan = ContextManager.activeSpan();
+            if (abstractSpan != null) {
+                abstractSpan.log(throwable);
+            }
+        }
+        try {
+            invokeCallback.onException(throwable);
+        } catch (Throwable t) {
+            ContextManager.activeSpan().log(t);
+            throw t;
+        } finally {
+            contextSnapshot = null;
+            ContextManager.stopSpan();
+        }
+    }
+
+    @Override
+    public Executor getExecutor() {
+        return invokeCallback.getExecutor();
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java
new file mode 100644
index 0000000000..6547042bc6
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInstrumentation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.sofarpc;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.agent.core.plugin.match.NameMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class SofaBoltCallbackInstrumentation extends 
ClassInstanceMethodsEnhancePluginDefine {
+
+    private static final String ENHANCE_CLASS = 
"com.alipay.remoting.BaseRemoting";
+    private static final String INVOKE_METHOD_INTERCEPTOR = 
"org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInvokeInterceptor";
+    private static final String INVOKE_METHOD = "invokeWithCallback";
+
+    @Override
+    protected ClassMatch enhanceClass() {
+        return NameMatch.byName(ENHANCE_CLASS);
+    }
+
+    @Override
+    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+        return null;
+    }
+
+    @Override
+    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
+        return new InstanceMethodsInterceptPoint[] {
+            new InstanceMethodsInterceptPoint() {
+                @Override
+                public ElementMatcher<MethodDescription> getMethodsMatcher() {
+                    return named(INVOKE_METHOD).and(
+                        takesArguments(4));
+                }
+
+                @Override
+                public String getMethodsInterceptor() {
+                    return INVOKE_METHOD_INTERCEPTOR;
+                }
+
+                @Override
+                public boolean isOverrideArgs() {
+                    return true;
+                }
+            }
+        };
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java
new file mode 100644
index 0000000000..c890b4a90d
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.sofarpc;
+
+import com.alipay.remoting.InvokeCallback;
+import java.lang.reflect.Method;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class SofaBoltCallbackInvokeInterceptor implements 
InstanceMethodsAroundInterceptor {
+    @Override
+    public void beforeMethod(EnhancedInstance objInst,
+                             Method method,
+                             Object[] allArguments,
+                             Class<?>[] argumentsTypes,
+                             MethodInterceptResult result) {
+        if (allArguments[2] instanceof InvokeCallback) {
+            allArguments[2] = new InvokeCallbackWrapper((InvokeCallback) 
allArguments[2]);
+        }
+    }
+
+    @Override
+    public Object afterMethod(EnhancedInstance objInst,
+                              Method method,
+                              Object[] allArguments,
+                              Class<?>[] argumentsTypes,
+                              Object ret) {
+        return ret;
+    }
+
+    @Override
+    public void handleMethodException(EnhancedInstance objInst,
+                                      Method method,
+                                      Object[] allArguments,
+                                      Class<?>[] argumentsTypes,
+                                      Throwable t) {
+    }
+}
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def
index 9850487d56..72682ac2bc 100644
--- 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/main/resources/skywalking-plugin.def
@@ -16,3 +16,4 @@
 
 sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcConsumerInstrumentation
 sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaRpcProviderInstrumentation
+sofarpc=org.apache.skywalking.apm.plugin.sofarpc.SofaBoltCallbackInstrumentation
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java
new file mode 100644
index 0000000000..ef9c9f3cb0
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/InvokeCallbackWrapperTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.sofarpc;
+
+import com.alipay.remoting.InvokeCallback;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@RunWith(TracingSegmentRunner.class)
+public class InvokeCallbackWrapperTest {
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    private Executor executor = Executors.newFixedThreadPool(1);
+
+    @Rule
+    public AgentServiceRule agentServiceRule = new AgentServiceRule();
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    private InvokeCallback callback;
+
+    @Before
+    public void before() {
+        callback = new InvokeCallback() {
+            @Override
+            public void onResponse(final Object o) {
+            }
+
+            @Override
+            public void onException(final Throwable throwable) {
+            }
+
+            @Override
+            public Executor getExecutor() {
+                return null;
+            }
+        };
+    }
+
+    static class WrapperWrapper implements InvokeCallback {
+
+        private InvokeCallback callback;
+
+        private CountDownLatch countDownLatch;
+
+        public CountDownLatch getCountDownLatch() {
+            return countDownLatch;
+        }
+
+        public WrapperWrapper(InvokeCallback callback) {
+            this.countDownLatch = new CountDownLatch(1);
+            this.callback = callback;
+        }
+
+        @Override
+        public void onResponse(final Object o) {
+            callback.onResponse(o);
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void onException(final Throwable throwable) {
+            callback.onException(throwable);
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public Executor getExecutor() {
+            return null;
+        }
+    }
+
+    @Test
+    public void testConstruct() {
+        InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback);
+        Assert.assertSame(callback, wrapper.getInvokeCallback());
+        Assert.assertNull(wrapper.getContextSnapshot());
+
+        ContextManager.createEntrySpan("sofarpc", null);
+        wrapper = new InvokeCallbackWrapper(callback);
+        Assert.assertSame(callback, wrapper.getInvokeCallback());
+        Assert.assertEquals(ContextManager.getGlobalTraceId(), 
wrapper.getContextSnapshot().getTraceId().getId());
+        Assert.assertEquals("sofarpc", 
wrapper.getContextSnapshot().getParentEndpoint());
+        ContextManager.stopSpan();
+    }
+
+    @Test
+    public void testOnResponse() throws InterruptedException {
+        ContextManager.createEntrySpan("sofarpc", null);
+        InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback);
+        final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper);
+        executor.execute(() -> wrapperWrapper.onResponse(null));
+        ContextManager.stopSpan();
+        wrapperWrapper.getCountDownLatch().await();
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(2));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1);
+        List<AbstractTracingSpan> spans2 = 
SegmentHelper.getSpans(traceSegment2);
+        assertThat(spans2.size(), is(1));
+        assertEquals("sofarpc", traceSegment2.getRef().getParentEndpoint());
+    }
+
+    @Test
+    public void testOnException() throws InterruptedException {
+        ContextManager.createEntrySpan("sofarpc", null);
+        InvokeCallbackWrapper wrapper = new InvokeCallbackWrapper(callback);
+        final WrapperWrapper wrapperWrapper = new WrapperWrapper(wrapper);
+        final Throwable throwable = new Throwable();
+        executor.execute(() -> wrapperWrapper.onException(throwable));
+        ContextManager.stopSpan();
+        wrapperWrapper.getCountDownLatch().await();
+
+        assertThat(segmentStorage.getTraceSegments().size(), is(2));
+        TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+        List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
+        assertThat(spans.size(), is(1));
+
+        TraceSegment traceSegment2 = segmentStorage.getTraceSegments().get(1);
+        List<AbstractTracingSpan> spans2 = 
SegmentHelper.getSpans(traceSegment2);
+        assertThat(spans2.size(), is(1));
+        assertThat(SpanHelper.getLogs(spans2.get(0)).size(), is(1));
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java
new file mode 100644
index 0000000000..bc082d7d2b
--- /dev/null
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaBoltCallbackInvokeInterceptorTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.sofarpc;
+
+import com.alipay.remoting.InvokeCallback;
+import java.util.concurrent.Executor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SofaBoltCallbackInvokeInterceptorTest {
+    InvokeCallback callback;
+    Object obj;
+    Object[] matchArgs;
+    Object[] mismatchArgs;
+
+    @Before
+    public void before() {
+        callback = new InvokeCallback() {
+            @Override
+            public void onResponse(final Object o) {
+
+            }
+
+            @Override
+            public void onException(final Throwable throwable) {
+
+            }
+
+            @Override
+            public Executor getExecutor() {
+                return null;
+            }
+        };
+
+        obj = new Object();
+
+        matchArgs = new Object[] {
+            null,
+            null,
+            callback,
+            null
+        };
+        mismatchArgs = new Object[] {
+            null,
+            null,
+            obj,
+            null
+        };
+    }
+
+    @Test
+    public void testOverrideArguments() {
+        final SofaBoltCallbackInvokeInterceptor interceptor = new 
SofaBoltCallbackInvokeInterceptor();
+        interceptor.beforeMethod(null, null, matchArgs, null, null);
+        Assert.assertTrue(matchArgs[2] instanceof InvokeCallbackWrapper);
+        Assert.assertSame(callback, ((InvokeCallbackWrapper) 
matchArgs[2]).getInvokeCallback());
+
+        interceptor.beforeMethod(null, null, mismatchArgs, null, null);
+        Assert.assertSame(obj, mismatchArgs[2]);
+    }
+
+}
\ No newline at end of file
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java
index 3dc0903ec0..c0c3420966 100644
--- 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcConsumerInterceptorTest.java
@@ -18,11 +18,11 @@
 
 package org.apache.skywalking.apm.plugin.sofarpc;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
+import com.alipay.sofa.rpc.client.ProviderInfo;
+import com.alipay.sofa.rpc.context.RpcInternalContext;
+import com.alipay.sofa.rpc.core.request.SofaRequest;
+import com.alipay.sofa.rpc.core.response.SofaResponse;
+import com.alipay.sofa.rpc.filter.ConsumerInvoker;
 import java.util.List;
 import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
@@ -50,11 +50,12 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
-import com.alipay.sofa.rpc.client.ProviderInfo;
-import com.alipay.sofa.rpc.context.RpcInternalContext;
-import com.alipay.sofa.rpc.core.request.SofaRequest;
-import com.alipay.sofa.rpc.core.response.SofaResponse;
-import com.alipay.sofa.rpc.filter.ConsumerInvoker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
 
 @RunWith(TracingSegmentRunner.class)
 public class SofaRpcConsumerInterceptorTest {
@@ -121,7 +122,8 @@ public class SofaRpcConsumerInterceptorTest {
 
     @Test
     public void testConsumerWithAttachment() throws Throwable {
-        sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, 
allArguments, argumentTypes, methodInterceptResult);
+        sofaRpcConsumerInterceptor.beforeMethod(
+            enhancedInstance, null, allArguments, argumentTypes, 
methodInterceptResult);
         sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, 
allArguments, argumentTypes, sofaResponse);
 
         assertThat(segmentStorage.getTraceSegments().size(), is(1));
@@ -133,8 +135,10 @@ public class SofaRpcConsumerInterceptorTest {
 
     @Test
     public void testConsumerWithException() throws Throwable {
-        sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, 
allArguments, argumentTypes, methodInterceptResult);
-        sofaRpcConsumerInterceptor.handleMethodException(enhancedInstance, 
null, allArguments, argumentTypes, new RuntimeException());
+        sofaRpcConsumerInterceptor.beforeMethod(
+            enhancedInstance, null, allArguments, argumentTypes, 
methodInterceptResult);
+        sofaRpcConsumerInterceptor.handleMethodException(
+            enhancedInstance, null, allArguments, argumentTypes, new 
RuntimeException());
         sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, 
allArguments, argumentTypes, sofaResponse);
         assertThat(segmentStorage.getTraceSegments().size(), is(1));
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
@@ -146,7 +150,8 @@ public class SofaRpcConsumerInterceptorTest {
         when(sofaResponse.isError()).thenReturn(true);
         when(sofaResponse.getAppResponse()).thenReturn(new RuntimeException());
 
-        sofaRpcConsumerInterceptor.beforeMethod(enhancedInstance, null, 
allArguments, argumentTypes, methodInterceptResult);
+        sofaRpcConsumerInterceptor.beforeMethod(
+            enhancedInstance, null, allArguments, argumentTypes, 
methodInterceptResult);
         sofaRpcConsumerInterceptor.afterMethod(enhancedInstance, null, 
allArguments, argumentTypes, sofaResponse);
 
         assertThat(segmentStorage.getTraceSegments().size(), is(1));
@@ -180,8 +185,11 @@ public class SofaRpcConsumerInterceptorTest {
         assertThat(tags.size(), is(1));
         assertThat(SpanHelper.getLayer(span), 
CoreMatchers.is(SpanLayer.RPC_FRAMEWORK));
         assertThat(SpanHelper.getComponentId(span), is(43));
-        assertThat(tags.get(0)
-                       .getValue(), 
is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)"));
+        assertThat(
+            tags.get(0)
+                .getValue(),
+            
is("bolt://127.0.0.1:12200/org.apache.skywalking.apm.test.TestSofaRpcService.test(String)")
+        );
         assertThat(span.getOperationName(), 
is("org.apache.skywalking.apm.test.TestSofaRpcService.test(String)"));
     }
 }
diff --git 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java
index 43c2938e5e..80edc8016a 100644
--- 
a/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/sofarpc-plugin/src/test/java/org/apache/skywalking/apm/plugin/sofarpc/SofaRpcProviderInterceptorTest.java
@@ -18,10 +18,11 @@
 
 package org.apache.skywalking.apm.plugin.sofarpc;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
+import com.alipay.sofa.rpc.client.ProviderInfo;
+import com.alipay.sofa.rpc.context.RpcInternalContext;
+import com.alipay.sofa.rpc.core.request.SofaRequest;
+import com.alipay.sofa.rpc.core.response.SofaResponse;
+import com.alipay.sofa.rpc.filter.ProviderInvoker;
 import java.util.List;
 import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.context.SW8CarrierItem;
@@ -51,11 +52,11 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
-import com.alipay.sofa.rpc.client.ProviderInfo;
-import com.alipay.sofa.rpc.context.RpcInternalContext;
-import com.alipay.sofa.rpc.core.request.SofaRequest;
-import com.alipay.sofa.rpc.core.response.SofaResponse;
-import com.alipay.sofa.rpc.filter.ProviderInvoker;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
 
 @RunWith(TracingSegmentRunner.class)
 public class SofaRpcProviderInterceptorTest {
diff --git a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml 
b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml
index f8d0f4f807..c6655d281b 100644
--- a/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/sofarpc-scenario/config/expectedData.yaml
@@ -34,6 +34,24 @@ segmentItems:
         refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not 
null, parentServiceInstance: not
           null, parentService: sofarpc-scenario, traceId: not null}
       skipAnalysis: 'false'
+  - segmentId: not null
+    spans:
+    - operationName: 
Thread/com.alipay.sofa.rpc.message.bolt.BoltInvokerCallback/onResponse
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: Unknown
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 0
+      isError: false
+      spanType: Local
+      peer: ''
+      skipAnalysis: false
+      refs:
+      - {parentEndpoint: 'GET:/sofarpc-scenario/case/sofarpc', networkAddress: 
'',
+        refType: CrossThread, parentSpanId: 2, parentTraceSegmentId: not null,
+        parentServiceInstance: not null, parentService: sofarpc-scenario,
+        traceId: not null}
   - segmentId: not null
     spans:
     - operationName: 
org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String)
@@ -49,6 +67,19 @@ segmentItems:
       tags:
       - {key: url, value: 
'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.hello(java.lang.String)'}
       skipAnalysis: 'false'
+    - operationName: 
org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String)
+      parentSpanId: 0
+      spanId: 2
+      spanLayer: RPCFramework
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 43
+      isError: false
+      spanType: Exit
+      peer: 127.0.0.1:12200
+      tags:
+        - { key: url, value: 
'bolt://127.0.0.1:12200/org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String)'
 }
+      skipAnalysis: 'false'
     - operationName: GET:/sofarpc-scenario/case/sofarpc
       parentSpanId: -1
       spanId: 0
@@ -64,3 +95,20 @@ segmentItems:
       - {key: http.method, value: GET}
       - {key: http.status_code, value: '200'}
       skipAnalysis: 'false'
+  - segmentId: not null
+    spans:
+    - operationName: 
org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService.callback(java.lang.String)
+      parentSpanId: -1
+      spanId: 0
+      spanLayer: RPCFramework
+      startTime: nq 0
+      endTime: nq 0
+      componentId: 43
+      isError: false
+      spanType: Entry
+      peer: ''
+      refs:
+        - { parentEndpoint: GET:/sofarpc-scenario/case/sofarpc, 
networkAddress: '127.0.0.1:12200',
+          refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not 
null, parentServiceInstance: not
+              null, parentService: sofarpc-scenario, traceId: not null }
+      skipAnalysis: 'false'
diff --git 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java
 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java
index faa8bb8639..609a0e5266 100644
--- 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java
+++ 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/SofaRpcApplication.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.testcase.sofarpc;
 
+import com.alipay.sofa.rpc.common.RpcConstants;
 import com.alipay.sofa.rpc.config.ConsumerConfig;
 import com.alipay.sofa.rpc.config.ProviderConfig;
 import com.alipay.sofa.rpc.config.ServerConfig;
@@ -42,8 +43,9 @@ public class SofaRpcApplication {
         public ProviderConfig provider() {
             ServerConfig config = new 
ServerConfig().setProtocol("bolt").setPort(12200).setDaemon(true);
 
-            ProviderConfig<SofaRpcDemoService> providerConfig = new 
ProviderConfig<SofaRpcDemoService>().setInterfaceId(SofaRpcDemoService.class
-                .getName()).setRef(new 
SofaRpcDemoServiceImpl()).setServer(config);
+            ProviderConfig<SofaRpcDemoService> providerConfig = new 
ProviderConfig<SofaRpcDemoService>().setInterfaceId(
+                SofaRpcDemoService.class
+                    .getName()).setRef(new 
SofaRpcDemoServiceImpl()).setServer(config);
 
             providerConfig.export();
             return providerConfig;
@@ -55,5 +57,13 @@ public class SofaRpcApplication {
                                                            .setProtocol("bolt")
                                                            
.setDirectUrl("bolt://127.0.0.1:12200");
         }
+
+        @Bean
+        public ConsumerConfig callbackConsumer() {
+            return new 
ConsumerConfig<SofaRpcDemoService>().setInterfaceId(SofaRpcDemoService.class.getName())
+                                                           .setProtocol("bolt")
+                                                           
.setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
+                                                           
.setDirectUrl("bolt://127.0.0.1:12200");
+        }
     }
 }
diff --git 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java
similarity index 53%
copy from 
test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
copy to 
test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java
index 9fed6ddc3d..81da5ba588 100644
--- 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
+++ 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/callback/TestCallback.java
@@ -16,13 +16,32 @@
  *
  */
 
-package org.apache.skywalking.apm.testcase.sofarpc.service;
+package org.apache.skywalking.apm.testcase.sofarpc.callback;
 
+import com.alipay.sofa.rpc.core.exception.SofaRpcException;
+import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
+import com.alipay.sofa.rpc.core.request.RequestBase;
 import 
org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService;
 
-public class SofaRpcDemoServiceImpl implements SofaRpcDemoService {
+public class TestCallback implements SofaResponseCallback {
+
+    private SofaRpcDemoService service;
+
+    public TestCallback(final SofaRpcDemoService service) {
+        this.service = service;
+    }
+
+    @Override
+    public void onAppResponse(final Object o, final String s, final 
RequestBase requestBase) {
+    }
+
+    @Override
+    public void onAppException(final Throwable throwable, final String s, 
final RequestBase requestBase) {
+
+    }
+
     @Override
-    public String hello(String name) {
-        return "hello, " + name;
+    public void onSofaException(final SofaRpcException e, final String s, 
final RequestBase requestBase) {
+
     }
 }
diff --git 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java
 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java
index ca9320001e..fff7b5ac34 100644
--- 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java
+++ 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/controller/CaseController.java
@@ -19,8 +19,11 @@
 package org.apache.skywalking.apm.testcase.sofarpc.controller;
 
 import com.alipay.sofa.rpc.config.ConsumerConfig;
+import com.alipay.sofa.rpc.context.RpcInvokeContext;
+import org.apache.skywalking.apm.testcase.sofarpc.callback.TestCallback;
 import 
org.apache.skywalking.apm.testcase.sofarpc.interfaces.SofaRpcDemoService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
@@ -32,8 +35,13 @@ public class CaseController {
     private static final String SUCCESS = "Success";
 
     @Autowired
+    @Qualifier("consumer")
     private ConsumerConfig<SofaRpcDemoService> consumerConfig;
 
+    @Autowired
+    @Qualifier("callbackConsumer")
+    private ConsumerConfig<SofaRpcDemoService> callbackConsumer;
+
     @RequestMapping("/healthCheck")
     @ResponseBody
     public String healthCheck() {
@@ -45,6 +53,11 @@ public class CaseController {
     public String sofarpc() {
         SofaRpcDemoService service = consumerConfig.refer();
         service.hello("sofarpc");
+
+        SofaRpcDemoService callbackService = callbackConsumer.refer();
+        RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
+        invokeCtx.setResponseCallback(new TestCallback(service));
+        callbackService.callback("sofarpc");
         return SUCCESS;
     }
 }
diff --git 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java
 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java
index cd541109b7..f2d0f93e9e 100644
--- 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java
+++ 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/interfaces/SofaRpcDemoService.java
@@ -21,4 +21,6 @@ package org.apache.skywalking.apm.testcase.sofarpc.interfaces;
 public interface SofaRpcDemoService {
 
     String hello(String name);
+
+    String callback(String name);
 }
diff --git 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
index 9fed6ddc3d..b946932117 100644
--- 
a/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
+++ 
b/test/plugin/scenarios/sofarpc-scenario/src/main/java/org/apache/skywalking/apm/testcase/sofarpc/service/SofaRpcDemoServiceImpl.java
@@ -25,4 +25,9 @@ public class SofaRpcDemoServiceImpl implements 
SofaRpcDemoService {
     public String hello(String name) {
         return "hello, " + name;
     }
+
+    @Override
+    public String callback(String name) {
+        return "hello, " + name;
+    }
 }


Reply via email to