This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 75db422  Implement the async dubbo invoking for alibaba-dubbo (#1857)
75db422 is described below

commit 75db422f35c2fc56f965c15a88b76513cc66342c
Author: Spance <[email protected]>
AuthorDate: Fri Aug 6 17:40:21 2021 +0800

    Implement the async dubbo invoking for alibaba-dubbo (#1857)
    
    * Implement the async dubbo invoking
    
    * Obey the code style
---
 .../plugin/alibaba/dubbo/AlibabaDubboPlugin.java   | 40 +++++++++++++++++-----
 .../dubbo/cache/ApplicationConfigCache.java        |  1 +
 .../dubbo/proxy/AlibabaDubboProxyService.java      | 16 ++++++---
 .../alibaba/dubbo/AlibabaDubboPluginTest.java      |  7 ++--
 .../dubbo/proxy/AlibabaDubboProxyServiceTest.java  | 30 +++++++++++-----
 5 files changed, 69 insertions(+), 25 deletions(-)

diff --git 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
index 65c3e55..14bf663 100644
--- 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPlugin.java
@@ -17,9 +17,11 @@
 
 package org.apache.shenyu.plugin.alibaba.dubbo;
 
+import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.Result;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
 import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.RuleData;
@@ -27,6 +29,7 @@ import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.enums.ResultEnum;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.api.context.ShenyuContext;
 import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
@@ -74,14 +77,33 @@ public class AlibabaDubboPlugin extends 
AbstractShenyuPlugin {
             Object error = 
ShenyuResultWrap.error(ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), 
ShenyuResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
             return WebFluxResultUtils.result(exchange, error);
         }
-        Object result = alibabaDubboProxyService.genericInvoker(param, 
metaData);
-        if (Objects.nonNull(result)) {
-            exchange.getAttributes().put(Constants.RPC_RESULT, result);
-        } else {
-            exchange.getAttributes().put(Constants.RPC_RESULT, 
Constants.DUBBO_RPC_RESULT_EMPTY);
-        }
-        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, 
ResultEnum.SUCCESS.getName());
-        return chain.execute(exchange);
+
+        return Mono.create(monoSink -> {
+            ResponseFuture future = 
alibabaDubboProxyService.genericInvoker(param, metaData);
+            future.setCallback(new ResponseCallback() {
+
+                @Override
+                public void done(final Object resultObj) {
+                    assert resultObj instanceof Result;
+                    Result result = (Result) resultObj;
+                    if (result.hasException()) {
+                        this.caught(result.getException());
+                        return;
+                    }
+                    monoSink.success(result.getValue());
+                }
+
+                @Override
+                public void caught(final Throwable ex) {
+                    log.error("dubbo failed using async genericInvoker() 
metaData={} param={}", metaData, param, ex);
+                    monoSink.error(ex);
+                }
+            });
+        }).flatMap(response -> {
+            exchange.getAttributes().put(Constants.RPC_RESULT, 
Objects.nonNull(response) ? response : Constants.DUBBO_RPC_RESULT_EMPTY);
+            
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, 
ResultEnum.SUCCESS.getName());
+            return chain.execute(exchange);
+        });
     }
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
index 78409d2..ad6ef9b 100644
--- 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
+++ 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/cache/ApplicationConfigCache.java
@@ -150,6 +150,7 @@ public final class ApplicationConfigCache {
         reference.setRegistry(registryConfig);
         reference.setInterface(metaData.getServiceName());
         reference.setProtocol("dubbo");
+        reference.setAsync(true);
         String rpcExt = metaData.getRpcExt();
         DubboParamExtInfo dubboParamExtInfo = 
GsonUtils.getInstance().fromJson(rpcExt, DubboParamExtInfo.class);
         if (Objects.nonNull(dubboParamExtInfo)) {
diff --git 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
index bd090f4..ee715bf 100644
--- 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
+++ 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/main/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyService.java
@@ -18,6 +18,9 @@
 package org.apache.shenyu.plugin.alibaba.dubbo.proxy;
 
 import com.alibaba.dubbo.config.ReferenceConfig;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
 import com.alibaba.dubbo.rpc.service.GenericException;
 import com.alibaba.dubbo.rpc.service.GenericService;
 import lombok.extern.slf4j.Slf4j;
@@ -37,9 +40,9 @@ import java.util.Objects;
  */
 @Slf4j
 public class AlibabaDubboProxyService {
-    
+
     private final BodyParamResolveService bodyParamResolveService;
-    
+
     /**
      * Instantiates a new Dubbo proxy service.
      *
@@ -48,7 +51,7 @@ public class AlibabaDubboProxyService {
     public AlibabaDubboProxyService(final BodyParamResolveService 
bodyParamResolveService) {
         this.bodyParamResolveService = bodyParamResolveService;
     }
-    
+
     /**
      * Generic invoker object.
      *
@@ -57,7 +60,7 @@ public class AlibabaDubboProxyService {
      * @return the object
      * @throws ShenyuException the shenyu exception
      */
-    public Object genericInvoker(final String body, final MetaData metaData) 
throws ShenyuException {
+    public ResponseFuture genericInvoker(final String body, final MetaData 
metaData) throws ShenyuException {
         ReferenceConfig<GenericService> reference = 
ApplicationConfigCache.getInstance().get(metaData.getPath());
         if (Objects.isNull(reference) || 
StringUtils.isEmpty(reference.getInterface())) {
             
ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
@@ -71,10 +74,13 @@ public class AlibabaDubboProxyService {
             } else {
                 pair = bodyParamResolveService.buildParameter(body, 
metaData.getParameterTypes());
             }
-            return genericService.$invoke(metaData.getMethodName(), 
pair.getLeft(), pair.getRight());
+            genericService.$invoke(metaData.getMethodName(), pair.getLeft(), 
pair.getRight());
         } catch (GenericException e) {
             log.error("dubbo invoker have exception", e);
             throw new ShenyuException(e.getExceptionMessage());
         }
+
+        FutureAdapter<?> adapter = (FutureAdapter<?>) 
RpcContext.getContext().getFuture();
+        return adapter.getFuture();
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
index e4cfe07..5aa7866 100644
--- 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/AlibabaDubboPluginTest.java
@@ -17,13 +17,15 @@
 
 package org.apache.shenyu.plugin.alibaba.dubbo;
 
-import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
+import com.alibaba.dubbo.remoting.exchange.support.SimpleFuture;
+import com.alibaba.dubbo.rpc.RpcResult;
 import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
+import org.apache.shenyu.plugin.alibaba.dubbo.proxy.AlibabaDubboProxyService;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.api.context.ShenyuContext;
 import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
@@ -67,7 +69,8 @@ public final class AlibabaDubboPluginTest {
         
metaData.setServiceName("org.apache.shenyu.test.dubbo.api.service.DubboTestService");
         metaData.setMethodName("findAll");
         metaData.setRpcType(RpcTypeEnum.DUBBO.getName());
-        when(mockAlibabaDubboProxyService.genericInvoker(null, 
metaData)).thenReturn(Mono.empty());
+        when(mockAlibabaDubboProxyService.genericInvoker(null, metaData))
+                .thenReturn(new SimpleFuture(new RpcResult(metaData.getId())));
         alibabaDubboPluginUnderTest = new 
AlibabaDubboPlugin(mockAlibabaDubboProxyService);
     }
 
diff --git 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
index a5ed1ee..8b9aa03 100644
--- 
a/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
+++ 
b/shenyu-plugin/shenyu-plugin-alibaba-dubbo/src/test/java/org/apache/shenyu/plugin/alibaba/dubbo/proxy/AlibabaDubboProxyServiceTest.java
@@ -18,16 +18,16 @@
 package org.apache.shenyu.plugin.alibaba.dubbo.proxy;
 
 import com.alibaba.dubbo.config.ReferenceConfig;
+import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
+import com.alibaba.dubbo.remoting.exchange.support.SimpleFuture;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.RpcResult;
+import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
 import com.alibaba.dubbo.rpc.service.GenericService;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.enums.RpcTypeEnum;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
-
 import org.apache.shenyu.plugin.alibaba.dubbo.cache.ApplicationConfigCache;
 import org.apache.shenyu.plugin.api.param.BodyParamResolveService;
 import org.junit.After;
@@ -38,6 +38,10 @@ import org.junit.runner.RunWith;
 import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
 /**
  * AlibabaDubboProxyServiceTest.
  */
@@ -70,18 +74,26 @@ public final class AlibabaDubboProxyServiceTest {
     }
 
     @Test
-    public void testGenericInvoker() {
+    public void testGenericInvoker() throws Exception {
         ReferenceConfig referenceConfig = mock(ReferenceConfig.class);
         GenericService genericService = mock(GenericService.class);
+        String sample = String.format("%x", System.nanoTime());
         when(referenceConfig.get()).thenReturn(genericService);
-        when(genericService.$invoke(METHOD_NAME, LEFT, 
RIGHT)).thenReturn(null);
+        when(genericService.$invoke(METHOD_NAME, LEFT, RIGHT))
+                .then(invocationOnMock -> {
+                    RpcContext.getContext().setFuture(new FutureAdapter<>(new 
SimpleFuture(new RpcResult(sample))));
+                    return sample;
+                });
         try (MockedStatic<ApplicationConfigCache> 
applicationConfigCacheMockedStatic = mockStatic(ApplicationConfigCache.class)) {
             ApplicationConfigCache applicationConfigCache = 
mock(ApplicationConfigCache.class);
-            applicationConfigCacheMockedStatic.when(() -> 
ApplicationConfigCache.getInstance()).thenReturn(applicationConfigCache);
+            
applicationConfigCacheMockedStatic.when(ApplicationConfigCache::getInstance).thenReturn(applicationConfigCache);
             
when(applicationConfigCache.initRef(metaData)).thenReturn(referenceConfig);
 
             AlibabaDubboProxyService alibabaDubboProxyService = new 
AlibabaDubboProxyService(new BodyParamResolveServiceImpl());
-            Assert.assertNull(alibabaDubboProxyService.genericInvoker("", 
metaData));
+
+            ResponseFuture responseFuture = 
alibabaDubboProxyService.genericInvoker("", metaData);
+            Assert.assertNotNull(responseFuture);
+            Assert.assertEquals(sample, 
RpcContext.getContext().getFuture().get());
         }
     }
 

Reply via email to