This is an automated email from the ASF dual-hosted git repository.

victory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 92cafe9  register callback once for the full filter chain. (#4127)
92cafe9 is described below

commit 92cafe962d8fd3df034c16629954425f4640f343
Author: ken.lj <ken.lj...@gmail.com>
AuthorDate: Fri May 24 14:49:25 2019 +0800

    register callback once for the full filter chain. (#4127)
---
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 18 ++----
 .../dubbo/rpc/protocol/ProtocolFilterWrapper.java  | 73 +++++++++++++++++-----
 .../dubbo/internal/org.apache.dubbo.rpc.Filter     |  3 +-
 3 files changed, 65 insertions(+), 29 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 9546e2a..fc26247 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -137,20 +137,10 @@ public class AsyncRpcResult extends AbstractResult {
     }
 
     public Result thenApplyWithContext(Function<Result, Result> fn) {
-        CompletableFuture<Result> future = 
this.thenApply(fn.compose(beforeContext).andThen(afterContext));
-        AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
-        nextAsyncRpcResult.subscribeTo(future);
-        return nextAsyncRpcResult;
-    }
-
-    public void subscribeTo(CompletableFuture<?> future) {
-        future.whenComplete((obj, t) -> {
-            if (t != null) {
-                this.completeExceptionally(t);
-            } else {
-                this.complete((Result) obj);
-            }
-        });
+        this.thenApply(fn.compose(beforeContext).andThen(afterContext));
+        // You may need to return a new Result instance representing the next 
async stage,
+        // like thenApply will return a new CompletableFuture.
+        return this;
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
index 9ee7d3e..f54d076 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
@@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcException;
 import java.util.List;
 
 import static 
org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
-
 import static org.apache.dubbo.rpc.Constants.REFERENCE_FILTER_KEY;
 import static org.apache.dubbo.rpc.Constants.SERVICE_FILTER_KEY;
 
@@ -49,9 +48,12 @@ public class ProtocolFilterWrapper implements Protocol {
         this.protocol = protocol;
     }
 
+
+
     private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, 
String key, String group) {
         Invoker<T> last = invoker;
         List<Filter> filters = 
ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),
 key, group);
+
         if (!filters.isEmpty()) {
             for (int i = filters.size() - 1; i >= 0; i--) {
                 final Filter filter = filters.get(i);
@@ -88,18 +90,7 @@ public class ProtocolFilterWrapper implements Protocol {
                             }
                             throw e;
                         }
-                        return asyncResult.thenApplyWithContext(r -> {
-                            // onResponse callback
-                            if (filter instanceof ListenableFilter) {
-                                Filter.Listener listener = ((ListenableFilter) 
filter).listener();
-                                if (listener != null) {
-                                    listener.onResponse(r, invoker, 
invocation);
-                                }
-                            } else {
-                                filter.onResponse(r, invoker, invocation);
-                            }
-                            return r;
-                        });
+                        return asyncResult;
                     }
 
                     @Override
@@ -114,7 +105,8 @@ public class ProtocolFilterWrapper implements Protocol {
                 };
             }
         }
-        return last;
+
+        return new CallbackRegistrationInvoker<>(last, filters);
     }
 
     @Override
@@ -143,4 +135,57 @@ public class ProtocolFilterWrapper implements Protocol {
         protocol.destroy();
     }
 
+    static class CallbackRegistrationInvoker<T> implements Invoker<T> {
+
+        private final Invoker<T> filterInvoker;
+        private final List<Filter> filters;
+
+        public CallbackRegistrationInvoker(Invoker<T> filterInvoker, 
List<Filter> filters) {
+            this.filterInvoker = filterInvoker;
+            this.filters = filters;
+        }
+
+        @Override
+        public Result invoke(Invocation invocation) throws RpcException {
+            Result asyncResult = filterInvoker.invoke(invocation);
+
+            asyncResult.thenApplyWithContext(r -> {
+                for (int i = filters.size() - 1; i >= 0; i--) {
+                    Filter filter = filters.get(i);
+                    // onResponse callback
+                    if (filter instanceof ListenableFilter) {
+                        Filter.Listener listener = ((ListenableFilter) 
filter).listener();
+                        if (listener != null) {
+                            listener.onResponse(r, filterInvoker, invocation);
+                        }
+                    } else {
+                        filter.onResponse(r, filterInvoker, invocation);
+                    }
+                }
+                return r;
+            });
+
+            return asyncResult;
+        }
+
+        @Override
+        public Class<T> getInterface() {
+            return filterInvoker.getInterface();
+        }
+
+        @Override
+        public URL getUrl() {
+            return filterInvoker.getUrl();
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return filterInvoker.isAvailable();
+        }
+
+        @Override
+        public void destroy() {
+            filterInvoker.destroy();
+        }
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
 
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 376f966..2406521 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -11,4 +11,5 @@ exception=org.apache.dubbo.rpc.filter.ExceptionFilter
 executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
 deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
 compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
-timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
\ No newline at end of file
+timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
+callback-registration=org.apache.dubbo.rpc.filter.CallbackRegistrationFilter
\ No newline at end of file

Reply via email to