This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 75db422 Implement the async dubbo invoking for alibaba-dubbo (#1857)
75db422 is described below
commit 75db422f35c2fc56f965c15a88b76513cc66342c
Author: Spance <[email protected]>
AuthorDate: Fri Aug 6 17:40:21 2021 +0800
Implement the async dubbo invoking for alibaba-dubbo (#1857)
* Implement the async dubbo invoking
* Obey the code style
---
.../plugin/alibaba/dubbo/AlibabaDubboPlugin.java | 40 +++++++++++++++++-----
.../dubbo/cache/ApplicationConfigCache.java | 1 +
.../dubbo/proxy/AlibabaDubboProxyService.java | 16 ++++++---
.../alibaba/dubbo/AlibabaDubboPluginTest.java | 7 ++--
.../dubbo/proxy/AlibabaDubboProxyServiceTest.java | 30 +++++++++++-----
5 files changed, 69 insertions(+), 25 deletions(-)
diff --git
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
index 65c3e55..14bf663 100644
---
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
+++
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
@@ -17,9 +17,11 @@
package org.apache.shenyu.plugin.alibaba.dubbo;
+import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.Result;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.RuleData;
@@ -27,6 +29,7 @@ import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.enums.ResultEnum;
import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
@@ -74,14 +77,33 @@ public class AlibabaDubboPlugin extends
AbstractShenyuPlugin {
Object error =
ShenyuResultWrap.error(ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(),
ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
- Object result = alibabaDubboProxyService.genericInvoker(param,
metaData);
- if (Objects.nonNull(result)) {
- exchange.getAttributes().put(Constants.RPC_RESULT, result);
- } else {
- exchange.getAttributes().put(Constants.RPC_RESULT,
Constants.DUBBO_RPC_RESULT_EMPTY);
- }
- exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE,
ResultEnum.SUCCESS.getName());
- return chain.execute(exchange);
+
+ return Mono.create(monoSink -> {
+ ResponseFuture future =
alibabaDubboProxyService.genericInvoker(param, metaData);
+ future.setCallback(new ResponseCallback() {
+
+ @Override
+ public void done(final Object resultObj) {
+ assert resultObj instanceof Result;
+ Result result = (Result) resultObj;
+ if (result.hasException()) {
+ this.caught(result.getException());
+ return;
+ }
+ monoSink.success(result.getValue());
+ }
+
+ @Override
+ public void caught(final Throwable ex) {
+ log.error("dubbo failed using async genericInvoker()
metaData={} param={}", metaData, param, ex);
+ monoSink.error(ex);
+ }
+ });
+ }).flatMap(response -> {
+ exchange.getAttributes().put(Constants.RPC_RESULT,
Objects.nonNull(response) ? response : Constants.DUBBO_RPC_RESULT_EMPTY);
+
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE,
ResultEnum.SUCCESS.getName());
+ return chain.execute(exchange);
+ });
}
/**
diff --git
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
index 78409d2..ad6ef9b 100644
---
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
+++
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
@@ -150,6 +150,7 @@ public final class ApplicationConfigCache {
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
+ reference.setAsync(true);
String rpcExt = metaData.getRpcExt();
DubboParamExtInfo dubboParamExtInfo =
GsonUtils.getInstance().fromJson(rpcExt, DubboParamExtInfo.class);
if (Objects.nonNull(dubboParamExtInfo)) {
diff --git
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
index bd090f4..ee715bf 100644
---
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
+++
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
@@ -18,6 +18,9 @@
package org.apache.shenyu.plugin.alibaba.dubbo.proxy;
import com.alibaba.dubbo.config.ReferenceConfig;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.service.GenericException;
import com.alibaba.dubbo.rpc.service.GenericService;
import lombok.extern.slf4j.Slf4j;
@@ -37,9 +40,9 @@ import java.util.Objects;
*/
@Slf4j
public class AlibabaDubboProxyService {
-
+
private final BodyParamResolveService bodyParamResolveService;
-
+
/**
* Instantiates a new Dubbo proxy service.
*
@@ -48,7 +51,7 @@ public class AlibabaDubboProxyService {
public AlibabaDubboProxyService(final BodyParamResolveService
bodyParamResolveService) {
this.bodyParamResolveService = bodyParamResolveService;
}
-
+
/**
* Generic invoker object.
*
@@ -57,7 +60,7 @@ public class AlibabaDubboProxyService {
* @return the object
* @throws ShenyuException the shenyu exception
*/
- public Object genericInvoker(final String body, final MetaData metaData)
throws ShenyuException {
+ public ResponseFuture genericInvoker(final String body, final MetaData
metaData) throws ShenyuException {
ReferenceConfig<GenericService> reference =
ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) ||
StringUtils.isEmpty(reference.getInterface())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
@@ -71,10 +74,13 @@ public class AlibabaDubboProxyService {
} else {
pair = bodyParamResolveService.buildParameter(body,
metaData.getParameterTypes());
}
- return genericService.$invoke(metaData.getMethodName(),
pair.getLeft(), pair.getRight());
+ genericService.$invoke(metaData.getMethodName(), pair.getLeft(),
pair.getRight());
} catch (GenericException e) {
log.error("dubbo invoker have exception", e);
throw new ShenyuException(e.getExceptionMessage());
}
+
+ FutureAdapter<?> adapter = (FutureAdapter<?>)
RpcContext.getContext().getFuture();
+ return adapter.getFuture();
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
index e4cfe07..5aa7866 100644
---
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
+++
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
@@ -17,13 +17,15 @@
package org.apache.shenyu.plugin.alibaba.dubbo;
-import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
+import com.alibaba.dubbo.remoting.exchange.support.SimpleFuture;
+import com.alibaba.dubbo.rpc.RpcResult;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.context.ShenyuContext;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
@@ -67,7 +69,8 @@ public final class AlibabaDubboPluginTest {
metaData.setServiceName("org.apache.shenyu.test.dubbo.api.service.DubboTestService");
metaData.setMethodName("findAll");
metaData.setRpcType(RpcTypeEnum.DUBBO.getName());
- when(mockAlibabaDubboProxyService.genericInvoker(null,
metaData)).thenReturn(Mono.empty());
+ when(mockAlibabaDubboProxyService.genericInvoker(null, metaData))
+ .thenReturn(new SimpleFuture(new RpcResult(metaData.getId())));
alibabaDubboPluginUnderTest = new
AlibabaDubboPlugin(mockAlibabaDubboProxyService);
}
diff --git
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
index a5ed1ee..8b9aa03 100644
---
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
+++
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
@@ -18,16 +18,16 @@
package org.apache.shenyu.plugin.alibaba.dubbo.proxy;
import com.alibaba.dubbo.config.ReferenceConfig;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.remoting.exchange.support.SimpleFuture;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.RpcResult;
+import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.service.GenericService;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.enums.RpcTypeEnum;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
import org.apache.shenyu.plugin.alibaba.dubbo.cache.ApplicationConfigCache;
import org.apache.shenyu.plugin.api.param.BodyParamResolveService;
import org.junit.After;
@@ -38,6 +38,10 @@ import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
/**
* AlibabaDubboProxyServiceTest.
*/
@@ -70,18 +74,26 @@ public final class AlibabaDubboProxyServiceTest {
}
@Test
- public void testGenericInvoker() {
+ public void testGenericInvoker() throws Exception {
ReferenceConfig referenceConfig = mock(ReferenceConfig.class);
GenericService genericService = mock(GenericService.class);
+ String sample = String.format("%x", System.nanoTime());
when(referenceConfig.get()).thenReturn(genericService);
- when(genericService.$invoke(METHOD_NAME, LEFT,
RIGHT)).thenReturn(null);
+ when(genericService.$invoke(METHOD_NAME, LEFT, RIGHT))
+ .then(invocationOnMock -> {
+ RpcContext.getContext().setFuture(new FutureAdapter<>(new
SimpleFuture(new RpcResult(sample))));
+ return sample;
+ });
try (MockedStatic<ApplicationConfigCache>
applicationConfigCacheMockedStatic = mockStatic(ApplicationConfigCache.class)) {
ApplicationConfigCache applicationConfigCache =
mock(ApplicationConfigCache.class);
- applicationConfigCacheMockedStatic.when(() ->
ApplicationConfigCache.getInstance()).thenReturn(applicationConfigCache);
+
applicationConfigCacheMockedStatic.when(ApplicationConfigCache::getInstance).thenReturn(applicationConfigCache);
when(applicationConfigCache.initRef(metaData)).thenReturn(referenceConfig);
AlibabaDubboProxyService alibabaDubboProxyService = new
AlibabaDubboProxyService(new BodyParamResolveServiceImpl());
- Assert.assertNull(alibabaDubboProxyService.genericInvoker("",
metaData));
+
+ ResponseFuture responseFuture =
alibabaDubboProxyService.genericInvoker("", metaData);
+ Assert.assertNotNull(responseFuture);
+ Assert.assertEquals(sample,
RpcContext.getContext().getFuture().get());
}
}