This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 7af3f06713 Fix the thread safety bug of finishing operation for the
span named "SpringCloudGateway/sendRequest" (#555)
7af3f06713 is described below
commit 7af3f0671399bd56ea03ecf2fa6327ff9e8ca459
Author: gzlicanyi <[email protected]>
AuthorDate: Fri Jun 16 22:25:50 2023 +0800
Fix the thread safety bug of finishing operation for the span named
"SpringCloudGateway/sendRequest" (#555)
---
CHANGES.md | 1 +
.../gateway/v20x/HttpClientRequestInterceptor.java | 66 ++++++++++------
...> AbstractGateway200EnhancePluginDefineV2.java} | 22 ++----
.../gateway/v20x/define/EnhanceCacheObject.java | 9 +++
.../v20x/define/HttpClientInstrumentation.java | 13 ++--
.../v20x/HttpClientRequestInterceptorTest.java | 91 ++++++++++++++++++++++
6 files changed, 158 insertions(+), 44 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ccff2d786c..1e32c43a88 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
* Support Jetty 11.x plugin
* Fix the scenario of using the HBase plugin with spring-data-hadoop.
* Add RocketMQ 5.x plugin
+* Fix the thread safety bug of finishing operation for the span named
"SpringCloudGateway/sendRequest"
#### Documentation
diff --git
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
index 00613b6024..0bed218b24 100644
---
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
+++
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.util.function.BiConsumer;
import java.util.function.Function;
+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
@@ -28,8 +29,8 @@ import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
import
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
@@ -38,13 +39,14 @@ import reactor.ipc.netty.http.client.HttpClientResponse;
import static
org.apache.skywalking.apm.network.trace.component.ComponentsDefine.SPRING_CLOUD_GATEWAY;
-public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterceptor {
+public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterceptorV2 {
+
@Override
public void beforeMethod(final EnhancedInstance objInst,
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
- final MethodInterceptResult result) throws
Throwable {
+ final MethodInvocationContext context) throws
Throwable {
/*
In this plug-in, the HttpClientFinalizerSendInterceptor depends on
the NettyRoutingFilterInterceptor
@@ -54,13 +56,13 @@ public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterc
if (!ContextManager.isActive()) {
return;
}
-
+
AbstractSpan span = ContextManager.activeSpan();
URL url = new URL((String) allArguments[1]);
ContextCarrier contextCarrier = new ContextCarrier();
AbstractSpan abstractSpan = ContextManager.createExitSpan(
- "SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
+ "SpringCloudGateway/sendRequest", contextCarrier,
getPeer(url));
abstractSpan.prepareForAsync();
Tags.URL.set(abstractSpan, String.valueOf(allArguments[1]));
abstractSpan.setLayer(SpanLayer.HTTP);
@@ -80,7 +82,7 @@ public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterc
}
};
- objInst.setSkyWalkingDynamicField(new EnhanceCacheObject(span,
abstractSpan));
+ context.setContext(new EnhanceCacheObject(span, abstractSpan));
}
@Override
@@ -88,28 +90,45 @@ public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterc
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
- final Object ret) {
- EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject)
objInst.getSkyWalkingDynamicField();
+ final Object ret,
+ MethodInvocationContext context) {
+ EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject)
context.getContext();
Mono<HttpClientResponse> responseMono = (Mono<HttpClientResponse>) ret;
return responseMono.doAfterSuccessOrError(new
BiConsumer<HttpClientResponse, Throwable>() {
@Override
public void accept(final HttpClientResponse httpClientResponse,
final Throwable throwable) {
+ doAfterSuccessOrError(httpClientResponse, throwable,
enhanceCacheObject);
+ }
+ });
+ }
- AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
- if (abstractSpan != null) {
- if (throwable != null) {
- abstractSpan.log(throwable);
- } else if (httpClientResponse.status().code() > 400) {
- abstractSpan.errorOccurred();
- }
- Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan,
httpClientResponse.status().code());
- abstractSpan.asyncFinish();
- }
+ void doAfterSuccessOrError(HttpClientResponse httpClientResponse,
Throwable throwable, EnhanceCacheObject enhanceCacheObject) {
+ try {
+ //When executing the beforeMethod method, if the ContextManager is
inactive, the enhanceCacheObject will be null.
+ if (enhanceCacheObject == null) {
+ return;
+ }
- objInst.setSkyWalkingDynamicField(null);
- enhanceCacheObject.getFilterSpan().asyncFinish();
+ //The doAfterSuccessOrError method may be executed multiple times.
+ if (enhanceCacheObject.isSpanFinish()) {
+ return;
}
- });
+
+ AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
+ if (throwable != null) {
+ abstractSpan.log(throwable);
+ } else if (httpClientResponse.status().code() > 400) {
+ abstractSpan.errorOccurred();
+ }
+ Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan,
httpClientResponse.status().code());
+
+ abstractSpan.asyncFinish();
+ enhanceCacheObject.getFilterSpan().asyncFinish();
+
+ enhanceCacheObject.setSpanFinish(true);
+ } catch (Throwable e) {
+ //Catch unknown exceptions to avoid interrupting business
processes.
+ }
}
private String getPeer(URL url) {
@@ -121,7 +140,8 @@ public class HttpClientRequestInterceptor implements
InstanceMethodsAroundInterc
final Method method,
final Object[] allArguments,
final Class<?>[] argumentsTypes,
- final Throwable t) {
+ final Throwable t,
+ MethodInvocationContext context) {
}
}
diff --git
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
similarity index 64%
copy from
apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
copy to
apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
index 14964a3053..1f07c0ba89 100644
---
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
+++
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
@@ -17,22 +17,14 @@
package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
-public class EnhanceCacheObject {
- private final AbstractSpan filterSpan;
- private final AbstractSpan sendSpan;
+public abstract class AbstractGateway200EnhancePluginDefineV2 extends
ClassInstanceMethodsEnhancePluginDefineV2 {
- public EnhanceCacheObject(final AbstractSpan filterSpan, final
AbstractSpan sendSpan) {
- this.filterSpan = filterSpan;
- this.sendSpan = sendSpan;
- }
-
- public AbstractSpan getFilterSpan() {
- return filterSpan;
- }
-
- public AbstractSpan getSendSpan() {
- return sendSpan;
+ @Override
+ protected String[] witnessClasses() {
+ return new String[] {
+
"org.springframework.cloud.gateway.config.GatewayAutoConfiguration$1"
+ };
}
}
diff --git
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
index 14964a3053..e5070ab7a3 100644
---
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
+++
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
@@ -22,6 +22,7 @@ import
org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
public class EnhanceCacheObject {
private final AbstractSpan filterSpan;
private final AbstractSpan sendSpan;
+ private volatile boolean spanFinish = false;
public EnhanceCacheObject(final AbstractSpan filterSpan, final
AbstractSpan sendSpan) {
this.filterSpan = filterSpan;
@@ -35,4 +36,12 @@ public class EnhanceCacheObject {
public AbstractSpan getSendSpan() {
return sendSpan;
}
+
+ public boolean isSpanFinish() {
+ return spanFinish;
+ }
+
+ public void setSpanFinish(boolean spanFinish) {
+ this.spanFinish = spanFinish;
+ }
}
diff --git
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
index 04031cfd96..b73eaca3f2 100644
---
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
+++
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
@@ -20,13 +20,13 @@ package
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
-import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import
org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class HttpClientInstrumentation extends
AbstractGateway200EnhancePluginDefine {
+public class HttpClientInstrumentation extends
AbstractGateway200EnhancePluginDefineV2 {
@Override
protected ClassMatch enhanceClass() {
@@ -39,16 +39,16 @@ public class HttpClientInstrumentation extends
AbstractGateway200EnhancePluginDe
}
@Override
- public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints()
{
- return new InstanceMethodsInterceptPoint[] {
- new InstanceMethodsInterceptPoint() {
+ public InstanceMethodsInterceptV2Point[]
getInstanceMethodsInterceptV2Points() {
+ return new InstanceMethodsInterceptV2Point[] {
+ new InstanceMethodsInterceptV2Point() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("request");
}
@Override
- public String getMethodsInterceptor() {
+ public String getMethodsInterceptorV2() {
return Constants.REQUEST_INTERCEPTOR;
}
@@ -59,4 +59,5 @@ public class HttpClientInstrumentation extends
AbstractGateway200EnhancePluginDe
}
};
}
+
}
diff --git
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
new file mode 100644
index 0000000000..5ec786e6af
--- /dev/null
+++
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import reactor.ipc.netty.http.client.HttpClientResponse;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+@RunWith(TracingSegmentRunner.class)
+public class HttpClientRequestInterceptorTest {
+
+ private HttpClientRequestInterceptor httpClientRequestInterceptor = new
HttpClientRequestInterceptor();
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+ @Rule
+ public MockitoRule rule = MockitoJUnit.rule();
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ private HttpClientResponse httpClientResponse;
+
+ @Before
+ public void setUp() throws Exception {
+
+ httpClientResponse = Mockito.mock(HttpClientResponse.class);
+ HttpResponseStatus httpResponseStatus =
Mockito.mock(HttpResponseStatus.class);
+
+ Mockito.when(httpResponseStatus.code()).thenReturn(200);
+
Mockito.when(httpClientResponse.status()).thenReturn(httpResponseStatus);
+ }
+
+ @Test
+ public void testDoAfterSuccessOrError() {
+ AbstractSpan filterSpan =
ContextManager.createLocalSpan("mockFilterSpan");
+ filterSpan.prepareForAsync();
+ ContextManager.stopSpan(filterSpan);
+
+ AbstractSpan sendSpan =
ContextManager.createExitSpan("SpringCloudGateway/sendRequest",
"http://127.0.0.1:80");
+ sendSpan.prepareForAsync();
+ ContextManager.stopSpan(sendSpan);
+
+ EnhanceCacheObject enhanceCacheObject = new
EnhanceCacheObject(filterSpan, sendSpan);
+ enhanceCacheObject = spy(enhanceCacheObject);
+
+ //Test the ContextManager is inactive.
+ httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse,
null, null);
+ verify(enhanceCacheObject, Mockito.times(0)).setSpanFinish(true);
+
+ //Test normal scenario.
+ httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse,
null, enhanceCacheObject);
+ verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
+
+ //Test the doAfterSuccessOrError method is executed multiple times.
+ httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse,
null, enhanceCacheObject);
+ verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
+ }
+
+}
\ No newline at end of file