[FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService

The termination futures can be used to wait for the termination of the 
respective component.

This closes #2558.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9dfaf457
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9dfaf457
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9dfaf457

Branch: refs/heads/flip-6
Commit: 9dfaf457fcc282fb01a1ee11950416e6a0b51171
Parents: 5915613
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Sep 27 18:17:42 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:43 2016 +0200

----------------------------------------------------------------------
 .../concurrent/impl/FlinkCompletableFuture.java | 11 +++---
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  9 +++++
 .../apache/flink/runtime/rpc/RpcService.java    |  7 ++++
 .../apache/flink/runtime/rpc/SelfGateway.java   | 34 ++++++++++++++++++
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 22 ++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 17 ++++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 32 +++++++++++++++--
 .../runtime/rpc/TestingSerialRpcService.java    | 10 +++++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 36 ++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 29 ++++++++++++++++
 10 files changed, 193 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
index e648a71..14686d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.concurrent.impl;
 
 import akka.dispatch.Futures;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.util.Preconditions;
 import scala.concurrent.Promise;
 import scala.concurrent.Promise$;
 
@@ -52,8 +51,6 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> 
implements Complet
 
        @Override
        public boolean complete(T value) {
-               Preconditions.checkNotNull(value);
-
                try {
                        promise.success(value);
 
@@ -65,10 +62,12 @@ public class FlinkCompletableFuture<T> extends 
FlinkFuture<T> implements Complet
 
        @Override
        public boolean completeExceptionally(Throwable t) {
-               Preconditions.checkNotNull(t);
-
                try {
-                       promise.failure(t);
+                       if (t == null) {
+                               promise.failure(new 
NullPointerException("Throwable was null."));
+                       } else {
+                               promise.failure(t);
+                       }
 
                        return true;
                } catch (IllegalStateException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 79961f7..f93a2e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -173,6 +173,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                return rpcService;
        }
 
+       /**
+        * Return a future which is completed when the rpc endpoint has been 
terminated.
+        *
+        * @return Future which is completed when the rpc endpoint has been 
terminated.
+        */
+       public Future<Void> getTerminationFuture() {
+               return ((SelfGateway)self).getTerminationFuture();
+       }
+
        // 
------------------------------------------------------------------------
        //  Asynchronous executions
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 96844ed..2052f98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -77,6 +77,13 @@ public interface RpcService {
        void stopService();
 
        /**
+        * Returns a future indicating when the RPC service has been shut down.
+        *
+        * @return Termination future
+        */
+       Future<Void> getTerminationFuture();
+
+       /**
         * Gets the executor, provided by this RPC service. This executor can 
be used for example for
         * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods 
of futures.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
new file mode 100644
index 0000000..ed8ef9d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for self gateways
+ */
+public interface SelfGateway {
+
+       /**
+        * Return a future which is completed when the rpc endpoint has been 
terminated.
+        *
+        * @return Future indicating when the rpc endpoint has been terminated
+        */
+       Future<Void> getTerminationFuture();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 8f4deff..709ff92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
@@ -52,7 +53,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutable, StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutable, StartStoppable, SelfGateway {
        private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
 
        private final String address;
@@ -67,12 +68,22 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
        private final long maximumFramesize;
 
-       AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time 
timeout, long maximumFramesize) {
+       // null if gateway; otherwise non-null
+       private final Future<Void> terminationFuture;
+
+       AkkaInvocationHandler(
+                       String address,
+                       ActorRef rpcEndpoint,
+                       Time timeout,
+                       long maximumFramesize,
+                       Future<Void> terminationFuture) {
+
                this.address = Preconditions.checkNotNull(address);
                this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
                this.isLocal = 
this.rpcEndpoint.path().address().hasLocalScope();
                this.timeout = Preconditions.checkNotNull(timeout);
                this.maximumFramesize = maximumFramesize;
+               this.terminationFuture = terminationFuture;
        }
 
        @Override
@@ -83,7 +94,7 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
                if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutable.class) ||
                        declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
-                       declaringClass.equals(RpcGateway.class)) {
+                       declaringClass.equals(RpcGateway.class) || 
declaringClass.equals(SelfGateway.class)) {
                        result = method.invoke(this, args);
                } else {
                        String methodName = method.getName();
@@ -300,4 +311,9 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        public String getAddress() {
                return address;
        }
+
+       @Override
+       public Future<Void> getTerminationFuture() {
+               return terminationFuture;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 1b456a7..c21383a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -24,6 +24,7 @@ import akka.actor.UntypedActorWithStash;
 import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
@@ -76,9 +77,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
        /** the helper that tracks whether calls come from the main thread */
        private final MainThreadValidatorUtil mainThreadValidator;
 
-       AkkaRpcActor(final T rpcEndpoint) {
+       private final CompletableFuture<Void> terminationFuture;
+
+       AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> 
terminationFuture) {
                this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
                this.mainThreadValidator = new 
MainThreadValidatorUtil(rpcEndpoint);
+               this.terminationFuture = checkNotNull(terminationFuture);
+       }
+
+       @Override
+       public void postStop() {
+               super.postStop();
+
+               // IMPORTANT: This only works if we don't use a restarting 
supervisor strategy. Otherwise
+               // we would complete the future and let the actor system 
restart the actor with a completed
+               // future.
+               // Complete the termination future so that others know that 
we've stopped.
+               terminationFuture.complete(null);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index fb7896a..44719c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -32,9 +32,12 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -131,7 +134,12 @@ public class AkkaRpcService implements RpcService {
 
                                        final String address = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
 
-                                       InvocationHandler akkaInvocationHandler 
= new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+                                       InvocationHandler akkaInvocationHandler 
= new AkkaInvocationHandler(
+                                               address,
+                                               actorRef,
+                                               timeout,
+                                               maximumFramesize,
+                                               null);
 
                                        // Rather than using the System 
ClassLoader directly, we derive the ClassLoader
                                        // from this class . That works better 
in cases where Flink runs embedded and all Flink
@@ -156,7 +164,8 @@ public class AkkaRpcService implements RpcService {
        public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
                checkNotNull(rpcEndpoint, "rpc endpoint");
 
-               Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, 
rpcEndpoint);
+               CompletableFuture<Void> terminationFuture = new 
FlinkCompletableFuture<>();
+               Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, 
rpcEndpoint, terminationFuture);
                ActorRef actorRef;
 
                synchronized (lock) {
@@ -169,7 +178,12 @@ public class AkkaRpcService implements RpcService {
 
                final String address = AkkaUtils.getAkkaURL(actorSystem, 
actorRef);
 
-               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(
+                       address,
+                       actorRef,
+                       timeout,
+                       maximumFramesize,
+                       terminationFuture);
 
                // Rather than using the System ClassLoader directly, we derive 
the ClassLoader
                // from this class . That works better in cases where Flink 
runs embedded and all Flink
@@ -181,6 +195,7 @@ public class AkkaRpcService implements RpcService {
                        classLoader,
                        new Class<?>[]{
                                rpcEndpoint.getSelfGatewayType(),
+                               SelfGateway.class,
                                MainThreadExecutable.class,
                                StartStoppable.class,
                                AkkaGateway.class},
@@ -231,6 +246,17 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
+       public Future<Void> getTerminationFuture() {
+               return FlinkFuture.supplyAsync(new Callable<Void>(){
+                       @Override
+                       public Void call() throws Exception {
+                               actorSystem.awaitTermination();
+                               return null;
+                       }
+               }, getExecutor());
+       }
+
+       @Override
        public Executor getExecutor() {
                return actorSystem.dispatcher();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 2a004c5..88906a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-
 /**
  * An RPC Service implementation for testing. This RPC service directly 
executes all asynchronous
  * calls one by one in the calling thread.
@@ -48,10 +48,12 @@ public class TestingSerialRpcService implements RpcService {
 
        private final DirectExecutorService executorService;
        private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
+       private final CompletableFuture<Void> terminationFuture;
 
        public TestingSerialRpcService() {
                executorService = new DirectExecutorService();
                this.registeredConnections = new ConcurrentHashMap<>(16);
+               this.terminationFuture = new FlinkCompletableFuture<>();
        }
 
        @Override
@@ -89,6 +91,12 @@ public class TestingSerialRpcService implements RpcService {
        public void stopService() {
                executorService.shutdown();
                registeredConnections.clear();
+               terminationFuture.complete(null);
+       }
+
+       @Override
+       public Future<Void> getTerminationFuture() {
+               return terminationFuture;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5d76024..ba8eb11 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,8 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,9 +34,15 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -148,6 +156,34 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that we can wait for a RpcEndpoint to terminate.
+        *
+        * @throws ExecutionException
+        * @throws InterruptedException
+        */
+       @Test(timeout=1000)
+       public void testRpcEndpointTerminationFuture() throws 
ExecutionException, InterruptedException {
+               final DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
+               rpcEndpoint.start();
+
+               Future<Void> terminationFuture = 
rpcEndpoint.getTerminationFuture();
+
+               assertFalse(terminationFuture.isDone());
+
+               FlinkFuture.supplyAsync(new Callable<Void>() {
+                       @Override
+                       public Void call() throws Exception {
+                               rpcEndpoint.shutDown();
+
+                               return null;
+                       }
+               }, actorSystem.dispatcher());
+
+               // wait until the rpc endpoint has terminated
+               terminationFuture.get();
+       }
+
        private interface DummyRpcGateway extends RpcGateway {
                Future<Integer> foobar();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9dfaf457/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 3388011..7c8defa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
@@ -120,4 +122,31 @@ public class AkkaRpcServiceTest extends TestLogger {
        public void testGetAddress() {
                assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), 
akkaRpcService.getAddress());
        }
+
+       /**
+        * Tests that we can wait for the termination of the rpc service
+        *
+        * @throws ExecutionException
+        * @throws InterruptedException
+        */
+       @Test(timeout = 1000)
+       public void testTerminationFuture() throws ExecutionException, 
InterruptedException {
+               final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+               final AkkaRpcService rpcService = new 
AkkaRpcService(actorSystem, Time.milliseconds(1000));
+
+               Future<Void> terminationFuture = 
rpcService.getTerminationFuture();
+
+               assertFalse(terminationFuture.isDone());
+
+               FlinkFuture.supplyAsync(new Callable<Void>() {
+                       @Override
+                       public Void call() throws Exception {
+                               rpcService.stopService();
+
+                               return null;
+                       }
+               }, actorSystem.dispatcher());
+
+               terminationFuture.get();
+       }
 }

Reply via email to