[hotfix] Add self rpc gateway registration to TestingSerialRpcService

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12af3b13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12af3b13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12af3b13

Branch: refs/heads/flip-6
Commit: 12af3b13c98d9adc1ebc5b9f98ebefb5b2a016e6
Parents: b779d19
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Sep 2 14:51:16 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:17 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/TestingSerialRpcService.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12af3b13/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 7bdbb99..955edcc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -43,7 +43,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 
 /**
- * An RPC Service implementation for testing. This RPC service directly 
executes all asynchronous calls one by one in the main thread.
+ * An RPC Service implementation for testing. This RPC service directly 
executes all asynchronous
+ * calls one by one in the calling thread.
  */
 public class TestingSerialRpcService implements RpcService {
 
@@ -52,7 +53,7 @@ public class TestingSerialRpcService implements RpcService {
 
        public TestingSerialRpcService() {
                executorService = new DirectExecutorService();
-               this.registeredConnections = new ConcurrentHashMap<>();
+               this.registeredConnections = new ConcurrentHashMap<>(16);
        }
 
        @Override
@@ -78,14 +79,14 @@ public class TestingSerialRpcService implements RpcService {
 
        @Override
        public void stopServer(RpcGateway selfGateway) {
-
+               registeredConnections.remove(selfGateway.getAddress());
        }
 
        @Override
        public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
                final String address = UUID.randomUUID().toString();
 
-               InvocationHandler akkaInvocationHandler = new 
TestingSerialInvocationHandler(address, rpcEndpoint);
+               InvocationHandler akkaInvocationHandler = new 
TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint);
                ClassLoader classLoader = getClass().getClassLoader();
 
                @SuppressWarnings("unchecked")
@@ -99,6 +100,9 @@ public class TestingSerialRpcService implements RpcService {
                        },
                        akkaInvocationHandler);
 
+               // register self
+               registeredConnections.putIfAbsent(self.getAddress(), self);
+
                return self;
        }
 
@@ -133,7 +137,7 @@ public class TestingSerialRpcService implements RpcService {
                }
        }
 
-       private static class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
+       private static final class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
 
                private final T rpcEndpoint;
 
@@ -197,7 +201,7 @@ public class TestingSerialRpcService implements RpcService {
                        final Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
                        Object result = rpcMethod.invoke(rpcEndpoint, args);
 
-                       if (result != null && result instanceof Future) {
+                       if (result instanceof Future) {
                                Future<?> future = (Future<?>) result;
                                return Await.result(future, 
futureTimeout.duration());
                        } else {

Reply via email to