[hotfix] Add self rpc gateway registration to TestingSerialRpcService
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12af3b13 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12af3b13 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12af3b13 Branch: refs/heads/flip-6 Commit: 12af3b13c98d9adc1ebc5b9f98ebefb5b2a016e6 Parents: b779d19 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Sep 2 14:51:16 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Sep 21 11:39:17 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/TestingSerialRpcService.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/12af3b13/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 7bdbb99..955edcc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -43,7 +43,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread. + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous + * calls one by one in the calling thread. */ public class TestingSerialRpcService implements RpcService { @@ -52,7 +53,7 @@ public class TestingSerialRpcService implements RpcService { public TestingSerialRpcService() { executorService = new DirectExecutorService(); - this.registeredConnections = new ConcurrentHashMap<>(); + this.registeredConnections = new ConcurrentHashMap<>(16); } @Override @@ -78,14 +79,14 @@ public class TestingSerialRpcService implements RpcService { @Override public void stopServer(RpcGateway selfGateway) { - + registeredConnections.remove(selfGateway.getAddress()); } @Override public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) { final String address = UUID.randomUUID().toString(); - InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint); + InvocationHandler akkaInvocationHandler = new TestingSerialRpcService.TestingSerialInvocationHandler<>(address, rpcEndpoint); ClassLoader classLoader = getClass().getClassLoader(); @SuppressWarnings("unchecked") @@ -99,6 +100,9 @@ public class TestingSerialRpcService implements RpcService { }, akkaInvocationHandler); + // register self + registeredConnections.putIfAbsent(self.getAddress(), self); + return self; } @@ -133,7 +137,7 @@ public class TestingSerialRpcService implements RpcService { } } - private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { + private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable { private final T rpcEndpoint; @@ -197,7 +201,7 @@ public class TestingSerialRpcService implements RpcService { final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); Object result = rpcMethod.invoke(rpcEndpoint, args); - if (result != null && result instanceof Future) { + if (result instanceof Future) { Future<?> future = (Future<?>) result; return Await.result(future, futureTimeout.duration()); } else {