This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c85af4aa1afb91e7aeac1d29d3c69b3f575e4eff
Author: Jeffrey Chung <chun...@users.noreply.github.com>
AuthorDate: Sat Jul 28 14:54:01 2018 -0400

    [FLINK-9240] Avoid deprecated Akka methods
    
    Use static imports
    
    Use the org.apache.flink.runtime.concurrent.Executors import
    
    This closes #6446.
---
 .../apache/flink/client/program/ClusterClient.java |  5 +++--
 .../apache/flink/client/program/ClientTest.java    |  7 ++++--
 .../MesosApplicationMasterRunner.java              | 25 ++++++++++++++++------
 .../runtime/akka/DefaultQuarantineHandler.java     |  9 ++++++--
 .../runtime/minicluster/StandaloneMiniCluster.java | 10 +++++++--
 .../flink/runtime/taskmanager/MemoryLogger.java    |  2 +-
 .../flink/runtime/util/ProcessShutDownThread.java  |  3 ++-
 .../flink/runtime/jobmanager/JobManager.scala      | 17 +++++++++------
 .../runtime/minicluster/FlinkMiniCluster.scala     | 18 ++++++++++------
 .../flink/runtime/taskmanager/TaskManager.scala    | 22 +++++++++----------
 .../flink/runtime/akka/QuarantineMonitorTest.java  | 15 +++++++------
 .../JobManagerHAJobGraphRecoveryITCase.java        |  4 ++--
 .../jobmanager/JobManagerProcessReapingTest.java   |  2 +-
 .../flink/runtime/jobmanager/JobManagerTest.java   | 19 ++++++++--------
 .../flink/runtime/jobmanager/JobSubmitTest.java    |  2 +-
 .../runtime/metrics/TaskManagerMetricsTest.java    |  8 ++++---
 .../metrics/dump/MetricQueryServiceTest.java       |  2 +-
 .../StackTraceSampleCoordinatorTest.java           |  2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   |  6 ++++--
 .../TaskManagerComponentsStartupShutdownTest.java  | 10 +++++----
 .../TaskManagerProcessReapingTestBase.java         |  2 +-
 .../taskmanager/TaskManagerRegistrationTest.java   |  2 +-
 .../impl/AkkaJobManagerRetrieverTest.java          |  9 +++++---
 .../jobmanager/JobManagerConnectionTest.scala      |  4 ++--
 .../runtime/testingUtils/TestingCluster.scala      |  9 ++++----
 .../JobManagerHACheckpointRecoveryITCase.java      |  6 ++++--
 .../minicluster/LocalFlinkMiniClusterITCase.java   |  7 ++++--
 .../flink/yarn/YarnApplicationMasterRunner.java    | 25 ++++++++++++++++------
 28 files changed, 160 insertions(+), 92 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index d85033a..51541be 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit;
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -249,8 +250,8 @@ public abstract class ClusterClient<T> {
                @Override
                public void close() throws Exception {
                        if (isLoaded()) {
-                               actorSystem.shutdown();
-                               actorSystem.awaitTermination();
+                               actorSystem.terminate();
+                               Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                                actorSystem = null;
                        }
                }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ca8b6fb..ec9cfc5 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -67,6 +67,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
+
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
@@ -114,8 +117,8 @@ public class ClientTest extends TestLogger {
        public void shutDownActorSystem() {
                if (jobManagerSystem != null) {
                        try {
-                               jobManagerSystem.shutdown();
-                               jobManagerSystem.awaitTermination();
+                               jobManagerSystem.terminate();
+                               Await.ready(jobManagerSystem.whenTerminated(), 
Duration.Inf());
                        } catch (Exception e) {
                                e.printStackTrace();
                                fail(e.getMessage());
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 630fa83..bbecf6a 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -57,6 +57,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
@@ -73,10 +75,14 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -374,11 +380,14 @@ public class MesosApplicationMasterRunner {
                        }
 
                        if (actorSystem != null) {
-                               try {
-                                       actorSystem.shutdown();
-                               } catch (Throwable tt) {
-                                       LOG.error("Error shutting down actor 
system", tt);
-                               }
+                               actorSystem.terminate().onComplete(
+                                       new OnComplete<Terminated>() {
+                                               public void 
onComplete(Throwable failure, Terminated success) {
+                                                       if (failure != null) {
+                                                               
LOG.error("Error shutting down actor system", failure);
+                                                       }
+                                               }
+                                       }, directExecutionContext());
                        }
 
                        if (futureExecutor != null) {
@@ -412,7 +421,11 @@ public class MesosApplicationMasterRunner {
                LOG.info("Mesos JobManager started");
 
                // wait until everything is done
-               actorSystem.awaitTermination();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       LOG.error("Error shutting down actor system", e);
+               }
 
                // if we get here, everything work out jolly all right, and we 
even exited smoothly
                if (webMonitor != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 708437f..38c29bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -25,6 +25,9 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import org.slf4j.Logger;
 
+import java.util.concurrent.TimeoutException;
+
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -65,11 +68,13 @@ public class DefaultQuarantineHandler implements 
QuarantineHandler {
 
        private void shutdownActorSystem(ActorSystem actorSystem) {
                // shut the actor system down
-               actorSystem.shutdown();
+               actorSystem.terminate();
 
                try {
                        // give it some time to complete the shutdown
-                       actorSystem.awaitTermination(timeout);
+                       Await.ready(actorSystem.whenTerminated(), timeout);
+               } catch (InterruptedException | TimeoutException e) {
+                       log.error("Exception thrown when terminating the actor 
system", e);
                } finally {
                        // now let's crash the JVM
                        System.exit(exitCode);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 0b0cbf5..808de22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -43,11 +43,13 @@ import akka.pattern.Patterns;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -154,8 +156,12 @@ public class StandaloneMiniCluster implements 
AutoCloseableAsync {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
-               actorSystem.shutdown();
-               actorSystem.awaitTermination();
+               actorSystem.terminate();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       exception = e;
+               }
 
                try {
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
index 366e5fa..91849d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java
@@ -125,7 +125,7 @@ public class MemoryLogger extends Thread {
        @Override
        public void run() {
                try {
-                       while (running && (monitored == null || 
!monitored.isTerminated())) {
+                       while (running && (monitored == null || 
!monitored.whenTerminated().isCompleted())) {
                                
logger.info(getMemoryUsageStatsAsString(memoryBean));
                                
logger.info(getDirectMemoryStatsAsString(directBufferBean));
                                
logger.info(getMemoryPoolStatsAsString(poolBeans));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
index e19cfd7..db0a1dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.util;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeoutException;
@@ -67,7 +68,7 @@ public class ProcessShutDownThread extends Thread {
        @Override
        public void run() {
                try {
-                       actorSystem.awaitTermination(terminationTimeout);
+                       Await.ready(actorSystem.whenTerminated(), 
terminationTimeout);
                } catch (Exception e) {
                        if (e instanceof TimeoutException) {
                                log.error("Actor system shut down timed out.", 
e);
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0855991..afecae2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.clusterframework.{BootstrapTools, 
FlinkResourceManager}
+import org.apache.flink.runtime.concurrent.Executors.directExecutionContext
 import org.apache.flink.runtime.concurrent.{FutureUtils, 
ScheduledExecutorServiceAdapter}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
@@ -1867,7 +1868,10 @@ class JobManager(
       FiniteDuration(10, SECONDS)).start()
 
     // Shutdown and discard all queued messages
-    context.system.shutdown()
+    context.system.terminate().onComplete {
+      case scala.util.Success(_) =>
+      case scala.util.Failure(t) => log.warn("Could not cleanly shut down 
actor system", t)
+    }(directExecutionContext())
   }
 
   private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
@@ -2046,7 +2050,7 @@ object JobManager {
     }
 
     // block until everything is shut down
-    jobManagerSystem.awaitTermination()
+    Await.ready(jobManagerSystem.whenTerminated, Duration.Inf)
 
     webMonitorOption.foreach{
       webMonitor =>
@@ -2288,11 +2292,10 @@ object JobManager {
     catch {
       case t: Throwable =>
         LOG.error("Error while starting up JobManager", t)
-        try {
-          jobManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor 
system", tt)
-        }
+        jobManagerSystem.terminate().onComplete {
+          case scala.util.Success(_) =>
+          case scala.util.Failure(tt) => LOG.warn("Could not cleanly shut down 
actor system", tt)
+        }(directExecutionContext())
         throw t
     }
   }
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 8d9e1ee..4f70a22 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
+import scala.util.{Failure, Success}
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -479,30 +480,30 @@ abstract class FlinkMiniCluster(
 
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
 
       resourceManagerActorSystems foreach {
-        _ foreach(_.shutdown())
+        _ foreach(_.terminate())
       }
     }
 
     jobManagerActorSystems foreach {
-      _ foreach(_.shutdown())
+      _ foreach(_.terminate())
     }
   }
 
   def awaitTermination(): Unit = {
     jobManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     resourceManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
 
     taskManagerActorSystems foreach {
-      _ foreach(_.awaitTermination())
+      _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf))
     }
   }
 
@@ -625,7 +626,10 @@ abstract class FlinkMiniCluster(
 
   def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = {
     if(!useSingleActorSystem) {
-      actorSystem.shutdown()
+      actorSystem.terminate().onComplete {
+        case Success(_) =>
+        case Failure(t) => LOG.warn("Could not cleanly shut down the job 
client actor system.", t)
+      }
     }
   }
 
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1de4848..c04084c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -77,6 +77,7 @@ import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a 
Flink job. It is
@@ -423,7 +424,7 @@ class TaskManager(
               futureResponse.mapTo[Boolean].onComplete {
                 // IMPORTANT: In the future callback, we cannot directly 
modify state
                 //            but only send messages to the TaskManager to do 
those changes
-                case scala.util.Success(result) =>
+                case Success(result) =>
                   if (!result) {
                   self ! decorateMessage(
                     FailTask(
@@ -432,7 +433,7 @@ class TaskManager(
                     )
                   }
 
-                case scala.util.Failure(t) =>
+                case Failure(t) =>
                 self ! decorateMessage(
                   FailTask(
                     executionID,
@@ -839,10 +840,10 @@ class TaskManager(
             blobCache.get.getTransientBlobService.putTransient(fis)
           }(context.dispatcher)
             .onComplete {
-              case scala.util.Success(value) =>
+              case Success(value) =>
                 sender ! value
                 fis.close()
-              case scala.util.Failure(e) =>
+              case Failure(e) =>
                 sender ! akka.actor.Status.Failure(e)
                 fis.close()
             }(context.dispatcher)
@@ -1534,7 +1535,7 @@ class TaskManager(
   }
 
   protected def shutdown(): Unit = {
-    context.system.shutdown()
+    context.system.terminate()
 
     // Await actor system termination and shut down JVM
     new ProcessShutDownThread(
@@ -1885,15 +1886,14 @@ object TaskManager {
       MemoryLogger.startIfConfigured(LOG.logger, configuration, 
taskManagerSystem)
 
       // block until everything is done
-      taskManagerSystem.awaitTermination()
+      Await.ready(taskManagerSystem.whenTerminated, Duration.Inf)
     } catch {
       case t: Throwable =>
         LOG.error("Error while starting up taskManager", t)
-        try {
-          taskManagerSystem.shutdown()
-        } catch {
-          case tt: Throwable => LOG.warn("Could not cleanly shut down actor 
system", tt)
-        }
+        taskManagerSystem.terminate().onComplete {
+          case Success(_) =>
+          case Failure(tt) => LOG.warn("Could not cleanly shut down actor 
system", tt)
+        }(Executors.directExecutionContext())
         throw t
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 37a4547..a998fb0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -43,7 +43,10 @@ import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -72,10 +75,10 @@ public class QuarantineMonitorTest extends TestLogger {
        }
 
        @AfterClass
-       public static void tearDown() {
+       public static void tearDown() throws InterruptedException, 
TimeoutException {
                if (actorSystem1 != null) {
-                       actorSystem1.shutdown();
-                       actorSystem1.awaitTermination();
+                       actorSystem1.terminate();
+                       Await.ready(actorSystem1.whenTerminated(), 
Duration.Inf());
                }
        }
 
@@ -85,10 +88,10 @@ public class QuarantineMonitorTest extends TestLogger {
        }
 
        @After
-       public void tearDownTest() {
+       public void tearDownTest() throws InterruptedException, 
TimeoutException {
                if (actorSystem2 != null) {
-                       actorSystem2.shutdown();
-                       actorSystem2.awaitTermination();
+                       actorSystem2.terminate();
+                       Await.ready(actorSystem2.whenTerminated(), 
Duration.Inf());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index 0b7547d..ae8542b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -321,11 +321,11 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
                        }
 
                        if (taskManagerSystem != null) {
-                               taskManagerSystem.shutdown();
+                               taskManagerSystem.terminate();
                        }
 
                        if (testSystem != null) {
-                               testSystem.shutdown();
+                               testSystem.terminate();
                        }
 
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 38b8431..fc16483 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -182,7 +182,7 @@ public class JobManagerProcessReapingTest extends 
TestLogger {
                                jmProcess.destroy();
                        }
                        if (localSystem != null) {
-                               localSystem.shutdown();
+                               localSystem.terminate();
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 873a4f1..052349c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -132,6 +132,7 @@ import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
@@ -940,7 +941,7 @@ public class JobManagerTest extends TestLogger {
                        assertTrue(savepointFile.exists());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -956,7 +957,7 @@ public class JobManagerTest extends TestLogger {
                        }
 
                        if (actorSystem != null) {
-                               actorSystem.awaitTermination(TESTING_TIMEOUT());
+                               Await.result(actorSystem.whenTerminated(), 
TESTING_TIMEOUT());
                        }
                }
        }
@@ -1130,7 +1131,7 @@ public class JobManagerTest extends TestLogger {
                        }
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1243,7 +1244,7 @@ public class JobManagerTest extends TestLogger {
                        assertEquals(1, targetDirectory.listFiles().length);
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1259,7 +1260,7 @@ public class JobManagerTest extends TestLogger {
                        }
 
                        if (actorSystem != null) {
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.result(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
                }
        }
@@ -1416,7 +1417,7 @@ public class JobManagerTest extends TestLogger {
                        assertTrue("Unexpected response: " + response, response 
instanceof JobSubmitSuccess);
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (archiver != null) {
@@ -1432,7 +1433,7 @@ public class JobManagerTest extends TestLogger {
                        }
 
                        if (actorSystem != null) {
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.ready(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
                }
        }
@@ -1516,8 +1517,8 @@ public class JobManagerTest extends TestLogger {
 
                } finally {
                        // cleanup the actor system and with it all of the 
started actors if not already terminated
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index c7d837b..ef493b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -117,7 +117,7 @@ public class JobSubmitTest {
        @AfterClass
        public static void teardownJobmanager() throws Exception {
                if (jobManagerSystem != null) {
-                       jobManagerSystem.shutdown();
+                       jobManagerSystem.terminate();
                }
 
                if (highAvailabilityServices != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 4d18060..aa86100 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -49,6 +49,8 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -155,11 +157,11 @@ public class TaskManagerMetricsTest extends TestLogger {
                        Assert.assertFalse(metricRegistry.isShutdown());
 
                        // shut down the actors and the actor system
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.result(actorSystem.whenTerminated(), 
Duration.Inf());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
                        }
 
                        if (highAvailabilityServices != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 1acaf61..3767421 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -116,7 +116,7 @@ public class MetricQueryServiceTest extends TestLogger {
                testActor.message = null;
                assertEquals(0, emptyDump.serializedMetrics.length);
 
-               s.shutdown();
+               s.terminate();
        }
 
        private static class TestActor extends UntypedActor {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
index 786b0ae..ed98be5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -66,7 +66,7 @@ public class StackTraceSampleCoordinatorTest extends 
TestLogger {
        @AfterClass
        public static void tearDown() throws Exception {
                if (system != null) {
-                       system.shutdown();
+                       system.terminate();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 07d0244..a32c1f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -49,6 +49,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import scala.concurrent.Await;
+
 public class AkkaRpcActorTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
@@ -259,8 +261,8 @@ public class AkkaRpcActorTest extends TestLogger {
 
                        terminationFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                } finally {
-                       rpcActorSystem.shutdown();
-                       
rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+                       rpcActorSystem.terminate();
+                       Await.ready(rpcActorSystem.whenTerminated(), 
FutureUtils.toFiniteDuration(timeout));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 8289930..9669513 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -62,6 +62,8 @@ import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertTrue;
@@ -205,8 +207,8 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        jobManager.tell(Kill.getInstance(), 
ActorRef.noSender());
 
                        // shut down the actors and the actor system
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
                        actorSystem = null;
 
                        // now that the TaskManager is shut down, the 
components should be shut down as well
@@ -215,9 +217,9 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        assertTrue(memManager.isShutdown());
                } finally {
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystem.terminate();
 
-                               
actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
+                               Await.ready(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
                        }
 
                        highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index ae66a08..0b9b951 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -218,7 +218,7 @@ public abstract class TaskManagerProcessReapingTestBase 
extends TestLogger {
                                taskManagerProcess.destroy();
                        }
                        if (jmActorSystem != null) {
-                               jmActorSystem.shutdown();
+                               jmActorSystem.terminate();
                        }
                        if (highAvailabilityServices != null) {
                                
highAvailabilityServices.closeAndCleanupAllData();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index ad32a4f..7ee0921 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -102,7 +102,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
        @AfterClass
        public static void shutdownActorSystem() {
                if (actorSystem != null) {
-                       actorSystem.shutdown();
+                       actorSystem.terminate();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
index 94473b9..5314550 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java
@@ -37,10 +37,13 @@ import org.junit.Test;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
+import scala.concurrent.Await;
+
 /**
  * Test for the {@link AkkaJobManagerRetriever}.
  */
@@ -55,10 +58,10 @@ public class AkkaJobManagerRetrieverTest extends TestLogger 
{
        }
 
        @AfterClass
-       public static void teardown() {
+       public static void teardown() throws InterruptedException, 
TimeoutException {
                if (actorSystem != null) {
-                       actorSystem.shutdown();
-                       
actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+                       actorSystem.terminate();
+                       Await.ready(actorSystem.whenTerminated(), 
FutureUtils.toFiniteDuration(timeout));
 
                        actorSystem = null;
                }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 6d7d87c..947d029 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -77,7 +77,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
@@ -116,7 +116,7 @@ class JobManagerConnectionTest {
         fail(e.getMessage)
     }
     finally {
-      actorSystem.shutdown()
+      actorSystem.terminate()
     }
   }
 
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c2d47f9..f722935 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
 
+import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 
@@ -229,8 +230,8 @@ class TestingCluster(
           Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
           if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
+            jmActorSystems(index).terminate()
+            Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf)
           }
 
           val newJobManagerActorSystem = if(!singleActorSystem) {
@@ -274,8 +275,8 @@ class TestingCluster(
         Await.result(stopped, TestingCluster.MAX_RESTART_DURATION)
 
         if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
+          tmActorSystems(index).terminate()
+          Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf)
         }
 
         val taskManagerActorSystem  = if(!singleActorSystem) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index ce750d3..a22a8a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -94,7 +94,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
 import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
@@ -425,8 +427,8 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                miniCluster.awaitTermination();
                        }
 
-                       system.shutdown();
-                       system.awaitTermination();
+                       system.terminate();
+                       Await.ready(system.whenTerminated(), Duration.Inf());
 
                        testingServer.stop();
                        testingServer.close();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index aa2f38d..00c6865 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -38,8 +38,11 @@ import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.forkjoin.ForkJoinPool;
 import scala.concurrent.impl.ExecutionContextImpl;
 
@@ -62,7 +65,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger {
        };
 
        @Test
-       public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+       public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws 
InterruptedException, TimeoutException {
 
                final ActorSystem system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
                LocalFlinkMiniCluster miniCluster = null;
@@ -117,7 +120,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger 
{
                        }
 
                        JavaTestKit.shutdownActorSystem(system);
-                       system.awaitTermination();
+                       Await.ready(system.whenTerminated(), Duration.Inf());
                }
 
                // shut down the global execution context, to make sure it does 
not affect this testing
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 497ac87..e98e174 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -57,6 +57,8 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.dispatch.OnComplete;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -71,11 +73,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutionContext;
 import static org.apache.flink.yarn.Utils.require;
 
 /**
@@ -401,11 +407,14 @@ public class YarnApplicationMasterRunner {
                        }
 
                        if (actorSystem != null) {
-                               try {
-                                       actorSystem.shutdown();
-                               } catch (Throwable tt) {
-                                       LOG.error("Error shutting down actor 
system", tt);
-                               }
+                               actorSystem.terminate().onComplete(
+                                       new OnComplete<Terminated>() {
+                                               public void 
onComplete(Throwable failure, Terminated result) {
+                                                       if (failure != null) {
+                                                               
LOG.error("Error shutting down actor system", failure);
+                                                       }
+                                               }
+                                       }, directExecutionContext());
                        }
 
                        futureExecutor.shutdownNow();
@@ -418,7 +427,11 @@ public class YarnApplicationMasterRunner {
                LOG.info("YARN Application Master started");
 
                // wait until everything is done
-               actorSystem.awaitTermination();
+               try {
+                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
+               } catch (InterruptedException | TimeoutException e) {
+                       LOG.error("Error shutting down actor system", e);
+               }
 
                // if we get here, everything work out jolly all right, and we 
even exited smoothly
                if (webMonitor != null) {

Reply via email to