This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c85af4aa1afb91e7aeac1d29d3c69b3f575e4eff Author: Jeffrey Chung <chun...@users.noreply.github.com> AuthorDate: Sat Jul 28 14:54:01 2018 -0400 [FLINK-9240] Avoid deprecated Akka methods Use static imports Use the org.apache.flink.runtime.concurrent.Executors import This closes #6446. --- .../apache/flink/client/program/ClusterClient.java | 5 +++-- .../apache/flink/client/program/ClientTest.java | 7 ++++-- .../MesosApplicationMasterRunner.java | 25 ++++++++++++++++------ .../runtime/akka/DefaultQuarantineHandler.java | 9 ++++++-- .../runtime/minicluster/StandaloneMiniCluster.java | 10 +++++++-- .../flink/runtime/taskmanager/MemoryLogger.java | 2 +- .../flink/runtime/util/ProcessShutDownThread.java | 3 ++- .../flink/runtime/jobmanager/JobManager.scala | 17 +++++++++------ .../runtime/minicluster/FlinkMiniCluster.scala | 18 ++++++++++------ .../flink/runtime/taskmanager/TaskManager.scala | 22 +++++++++---------- .../flink/runtime/akka/QuarantineMonitorTest.java | 15 +++++++------ .../JobManagerHAJobGraphRecoveryITCase.java | 4 ++-- .../jobmanager/JobManagerProcessReapingTest.java | 2 +- .../flink/runtime/jobmanager/JobManagerTest.java | 19 ++++++++-------- .../flink/runtime/jobmanager/JobSubmitTest.java | 2 +- .../runtime/metrics/TaskManagerMetricsTest.java | 8 ++++--- .../metrics/dump/MetricQueryServiceTest.java | 2 +- .../StackTraceSampleCoordinatorTest.java | 2 +- .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 6 ++++-- .../TaskManagerComponentsStartupShutdownTest.java | 10 +++++---- .../TaskManagerProcessReapingTestBase.java | 2 +- .../taskmanager/TaskManagerRegistrationTest.java | 2 +- .../impl/AkkaJobManagerRetrieverTest.java | 9 +++++--- .../jobmanager/JobManagerConnectionTest.scala | 4 ++-- .../runtime/testingUtils/TestingCluster.scala | 9 ++++---- .../JobManagerHACheckpointRecoveryITCase.java | 6 ++++-- .../minicluster/LocalFlinkMiniClusterITCase.java | 7 ++++-- .../flink/yarn/YarnApplicationMasterRunner.java | 25 ++++++++++++++++------ 28 files changed, 160 insertions(+), 92 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index d85033a..51541be 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -90,6 +90,7 @@ import java.util.concurrent.TimeUnit; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -249,8 +250,8 @@ public abstract class ClusterClient<T> { @Override public void close() throws Exception { if (isLoaded()) { - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); actorSystem = null; } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index ca8b6fb..ec9cfc5 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -67,6 +67,9 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + /** * Simple and maybe stupid test to check the {@link ClusterClient} class. */ @@ -114,8 +117,8 @@ public class ClientTest extends TestLogger { public void shutDownActorSystem() { if (jobManagerSystem != null) { try { - jobManagerSystem.shutdown(); - jobManagerSystem.awaitTermination(); + jobManagerSystem.terminate(); + Await.ready(jobManagerSystem.whenTerminated(), Duration.Inf()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 630fa83..bbecf6a 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -57,6 +57,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; +import akka.actor.Terminated; +import akka.dispatch.OnComplete; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; @@ -73,10 +75,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext; import static org.apache.flink.util.Preconditions.checkState; /** @@ -374,11 +380,14 @@ public class MesosApplicationMasterRunner { } if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable tt) { - LOG.error("Error shutting down actor system", tt); - } + actorSystem.terminate().onComplete( + new OnComplete<Terminated>() { + public void onComplete(Throwable failure, Terminated success) { + if (failure != null) { + LOG.error("Error shutting down actor system", failure); + } + } + }, directExecutionContext()); } if (futureExecutor != null) { @@ -412,7 +421,11 @@ public class MesosApplicationMasterRunner { LOG.info("Mesos JobManager started"); // wait until everything is done - actorSystem.awaitTermination(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Error shutting down actor system", e); + } // if we get here, everything work out jolly all right, and we even exited smoothly if (webMonitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java index 708437f..38c29bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java @@ -25,6 +25,9 @@ import akka.actor.ActorSystem; import akka.actor.Address; import org.slf4j.Logger; +import java.util.concurrent.TimeoutException; + +import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; /** @@ -65,11 +68,13 @@ public class DefaultQuarantineHandler implements QuarantineHandler { private void shutdownActorSystem(ActorSystem actorSystem) { // shut the actor system down - actorSystem.shutdown(); + actorSystem.terminate(); try { // give it some time to complete the shutdown - actorSystem.awaitTermination(timeout); + Await.ready(actorSystem.whenTerminated(), timeout); + } catch (InterruptedException | TimeoutException e) { + log.error("Exception thrown when terminating the actor system", e); } finally { // now let's crash the JVM System.exit(exitCode); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java index 0b0cbf5..808de22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java @@ -43,11 +43,13 @@ import akka.pattern.Patterns; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -154,8 +156,12 @@ public class StandaloneMiniCluster implements AutoCloseableAsync { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + exception = e; + } try { highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java index 366e5fa..91849d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java @@ -125,7 +125,7 @@ public class MemoryLogger extends Thread { @Override public void run() { try { - while (running && (monitored == null || !monitored.isTerminated())) { + while (running && (monitored == null || !monitored.whenTerminated().isCompleted())) { logger.info(getMemoryUsageStatsAsString(memoryBean)); logger.info(getDirectMemoryStatsAsString(directBufferBean)); logger.info(getMemoryPoolStatsAsString(poolBeans)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java index e19cfd7..db0a1dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.util; import akka.actor.ActorSystem; import org.slf4j.Logger; +import scala.concurrent.Await; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeoutException; @@ -67,7 +68,7 @@ public class ProcessShutDownThread extends Thread { @Override public void run() { try { - actorSystem.awaitTermination(terminationTimeout); + Await.ready(actorSystem.whenTerminated(), terminationTimeout); } catch (Exception e) { if (e instanceof TimeoutException) { log.error("Actor system shut down timed out.", e); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0855991..afecae2 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -44,6 +44,7 @@ import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager} +import org.apache.flink.runtime.concurrent.Executors.directExecutionContext import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder @@ -1867,7 +1868,10 @@ class JobManager( FiniteDuration(10, SECONDS)).start() // Shutdown and discard all queued messages - context.system.shutdown() + context.system.terminate().onComplete { + case scala.util.Success(_) => + case scala.util.Failure(t) => log.warn("Could not cleanly shut down actor system", t) + }(directExecutionContext()) } private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = { @@ -2046,7 +2050,7 @@ object JobManager { } // block until everything is shut down - jobManagerSystem.awaitTermination() + Await.ready(jobManagerSystem.whenTerminated, Duration.Inf) webMonitorOption.foreach{ webMonitor => @@ -2288,11 +2292,10 @@ object JobManager { catch { case t: Throwable => LOG.error("Error while starting up JobManager", t) - try { - jobManagerSystem.shutdown() - } catch { - case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt) - } + jobManagerSystem.terminate().onComplete { + case scala.util.Success(_) => + case scala.util.Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt) + }(directExecutionContext()) throw t } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 8d9e1ee..4f70a22 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent._ +import scala.util.{Failure, Success} /** * Abstract base class for Flink's mini cluster. The mini cluster starts a @@ -479,30 +480,30 @@ abstract class FlinkMiniCluster( if (!useSingleActorSystem) { taskManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } resourceManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } } jobManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } } def awaitTermination(): Unit = { jobManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } resourceManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } taskManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } } @@ -625,7 +626,10 @@ abstract class FlinkMiniCluster( def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = { if(!useSingleActorSystem) { - actorSystem.shutdown() + actorSystem.terminate().onComplete { + case Success(_) => + case Failure(t) => LOG.warn("Could not cleanly shut down the job client actor system.", t) + } } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 1de4848..c04084c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -77,6 +77,7 @@ import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -423,7 +424,7 @@ class TaskManager( futureResponse.mapTo[Boolean].onComplete { // IMPORTANT: In the future callback, we cannot directly modify state // but only send messages to the TaskManager to do those changes - case scala.util.Success(result) => + case Success(result) => if (!result) { self ! decorateMessage( FailTask( @@ -432,7 +433,7 @@ class TaskManager( ) } - case scala.util.Failure(t) => + case Failure(t) => self ! decorateMessage( FailTask( executionID, @@ -839,10 +840,10 @@ class TaskManager( blobCache.get.getTransientBlobService.putTransient(fis) }(context.dispatcher) .onComplete { - case scala.util.Success(value) => + case Success(value) => sender ! value fis.close() - case scala.util.Failure(e) => + case Failure(e) => sender ! akka.actor.Status.Failure(e) fis.close() }(context.dispatcher) @@ -1534,7 +1535,7 @@ class TaskManager( } protected def shutdown(): Unit = { - context.system.shutdown() + context.system.terminate() // Await actor system termination and shut down JVM new ProcessShutDownThread( @@ -1885,15 +1886,14 @@ object TaskManager { MemoryLogger.startIfConfigured(LOG.logger, configuration, taskManagerSystem) // block until everything is done - taskManagerSystem.awaitTermination() + Await.ready(taskManagerSystem.whenTerminated, Duration.Inf) } catch { case t: Throwable => LOG.error("Error while starting up taskManager", t) - try { - taskManagerSystem.shutdown() - } catch { - case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt) - } + taskManagerSystem.terminate().onComplete { + case Success(_) => + case Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt) + }(Executors.directExecutionContext()) throw t } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java index 37a4547..a998fb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java @@ -43,7 +43,10 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -72,10 +75,10 @@ public class QuarantineMonitorTest extends TestLogger { } @AfterClass - public static void tearDown() { + public static void tearDown() throws InterruptedException, TimeoutException { if (actorSystem1 != null) { - actorSystem1.shutdown(); - actorSystem1.awaitTermination(); + actorSystem1.terminate(); + Await.ready(actorSystem1.whenTerminated(), Duration.Inf()); } } @@ -85,10 +88,10 @@ public class QuarantineMonitorTest extends TestLogger { } @After - public void tearDownTest() { + public void tearDownTest() throws InterruptedException, TimeoutException { if (actorSystem2 != null) { - actorSystem2.shutdown(); - actorSystem2.awaitTermination(); + actorSystem2.terminate(); + Await.ready(actorSystem2.whenTerminated(), Duration.Inf()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java index 0b7547d..ae8542b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java @@ -321,11 +321,11 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { } if (taskManagerSystem != null) { - taskManagerSystem.shutdown(); + taskManagerSystem.terminate(); } if (testSystem != null) { - testSystem.shutdown(); + testSystem.terminate(); } highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java index 38b8431..fc16483 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java @@ -182,7 +182,7 @@ public class JobManagerProcessReapingTest extends TestLogger { jmProcess.destroy(); } if (localSystem != null) { - localSystem.shutdown(); + localSystem.terminate(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 873a4f1..052349c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -132,6 +132,7 @@ import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; @@ -940,7 +941,7 @@ public class JobManagerTest extends TestLogger { assertTrue(savepointFile.exists()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -956,7 +957,7 @@ public class JobManagerTest extends TestLogger { } if (actorSystem != null) { - actorSystem.awaitTermination(TESTING_TIMEOUT()); + Await.result(actorSystem.whenTerminated(), TESTING_TIMEOUT()); } } } @@ -1130,7 +1131,7 @@ public class JobManagerTest extends TestLogger { } } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1243,7 +1244,7 @@ public class JobManagerTest extends TestLogger { assertEquals(1, targetDirectory.listFiles().length); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1259,7 +1260,7 @@ public class JobManagerTest extends TestLogger { } if (actorSystem != null) { - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.result(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } } } @@ -1416,7 +1417,7 @@ public class JobManagerTest extends TestLogger { assertTrue("Unexpected response: " + response, response instanceof JobSubmitSuccess); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1432,7 +1433,7 @@ public class JobManagerTest extends TestLogger { } if (actorSystem != null) { - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } } } @@ -1516,8 +1517,8 @@ public class JobManagerTest extends TestLogger { } finally { // cleanup the actor system and with it all of the started actors if not already terminated - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index c7d837b..ef493b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -117,7 +117,7 @@ public class JobSubmitTest { @AfterClass public static void teardownJobmanager() throws Exception { if (jobManagerSystem != null) { - jobManagerSystem.shutdown(); + jobManagerSystem.terminate(); } if (highAvailabilityServices != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 4d18060..aa86100 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -49,6 +49,8 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -155,11 +157,11 @@ public class TaskManagerMetricsTest extends TestLogger { Assert.assertFalse(metricRegistry.isShutdown()); // shut down the actors and the actor system - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.result(actorSystem.whenTerminated(), Duration.Inf()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (highAvailabilityServices != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 1acaf61..3767421 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -116,7 +116,7 @@ public class MetricQueryServiceTest extends TestLogger { testActor.message = null; assertEquals(0, emptyDump.serializedMetrics.length); - s.shutdown(); + s.terminate(); } private static class TestActor extends UntypedActor { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java index 786b0ae..ed98be5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java @@ -66,7 +66,7 @@ public class StackTraceSampleCoordinatorTest extends TestLogger { @AfterClass public static void tearDown() throws Exception { if (system != null) { - system.shutdown(); + system.terminate(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 07d0244..a32c1f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -49,6 +49,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import scala.concurrent.Await; + public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ @@ -259,8 +261,8 @@ public class AkkaRpcActorTest extends TestLogger { terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } finally { - rpcActorSystem.shutdown(); - rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + rpcActorSystem.terminate(); + Await.ready(rpcActorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 8289930..9669513 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -62,6 +62,8 @@ import java.net.InetAddress; import java.util.concurrent.TimeUnit; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertTrue; @@ -205,8 +207,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { jobManager.tell(Kill.getInstance(), ActorRef.noSender()); // shut down the actors and the actor system - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); actorSystem = null; // now that the TaskManager is shut down, the components should be shut down as well @@ -215,9 +217,9 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { assertTrue(memManager.isShutdown()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index ae66a08..0b9b951 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -218,7 +218,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger { taskManagerProcess.destroy(); } if (jmActorSystem != null) { - jmActorSystem.shutdown(); + jmActorSystem.terminate(); } if (highAvailabilityServices != null) { highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index ad32a4f..7ee0921 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -102,7 +102,7 @@ public class TaskManagerRegistrationTest extends TestLogger { @AfterClass public static void shutdownActorSystem() { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index 94473b9..5314550 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java @@ -37,10 +37,13 @@ import org.junit.Test; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import scala.concurrent.Await; + /** * Test for the {@link AkkaJobManagerRetriever}. */ @@ -55,10 +58,10 @@ public class AkkaJobManagerRetrieverTest extends TestLogger { } @AfterClass - public static void teardown() { + public static void teardown() throws InterruptedException, TimeoutException { if (actorSystem != null) { - actorSystem.shutdown(); - actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout)); actorSystem = null; } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala index 6d7d87c..947d029 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala @@ -77,7 +77,7 @@ class JobManagerConnectionTest { fail(e.getMessage) } finally { - actorSystem.shutdown() + actorSystem.terminate() } } @@ -116,7 +116,7 @@ class JobManagerConnectionTest { fail(e.getMessage) } finally { - actorSystem.shutdown() + actorSystem.terminate() } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index c2d47f9..f722935 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -50,6 +50,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages.Alive import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.testutils.TestingResourceManager +import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} @@ -229,8 +230,8 @@ class TestingCluster( Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) if(!singleActorSystem) { - jmActorSystems(index).shutdown() - jmActorSystems(index).awaitTermination() + jmActorSystems(index).terminate() + Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf) } val newJobManagerActorSystem = if(!singleActorSystem) { @@ -274,8 +275,8 @@ class TestingCluster( Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) if(!singleActorSystem) { - tmActorSystems(index).shutdown() - tmActorSystems(index).awaitTermination() + tmActorSystems(index).terminate() + Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf) } val taskManagerActorSystem = if(!singleActorSystem) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index ce750d3..a22a8a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -94,7 +94,9 @@ import java.util.concurrent.atomic.AtomicReference; import scala.Option; import scala.Some; import scala.Tuple2; +import scala.concurrent.Await; import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; @@ -425,8 +427,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { miniCluster.awaitTermination(); } - system.shutdown(); - system.awaitTermination(); + system.terminate(); + Await.ready(system.whenTerminated(), Duration.Inf()); testingServer.stop(); testingServer.close(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index aa2f38d..00c6865 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -38,8 +38,11 @@ import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeoutException; +import scala.concurrent.Await; import scala.concurrent.ExecutionContext$; +import scala.concurrent.duration.Duration; import scala.concurrent.forkjoin.ForkJoinPool; import scala.concurrent.impl.ExecutionContextImpl; @@ -62,7 +65,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger { }; @Test - public void testLocalFlinkMiniClusterWithMultipleTaskManagers() { + public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws InterruptedException, TimeoutException { final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig()); LocalFlinkMiniCluster miniCluster = null; @@ -117,7 +120,7 @@ public class LocalFlinkMiniClusterITCase extends TestLogger { } JavaTestKit.shutdownActorSystem(system); - system.awaitTermination(); + Await.ready(system.whenTerminated(), Duration.Inf()); } // shut down the global execution context, to make sure it does not affect this testing diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 497ac87..e98e174 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -57,6 +57,8 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Terminated; +import akka.dispatch.OnComplete; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -71,11 +73,15 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import scala.Option; import scala.Some; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext; import static org.apache.flink.yarn.Utils.require; /** @@ -401,11 +407,14 @@ public class YarnApplicationMasterRunner { } if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable tt) { - LOG.error("Error shutting down actor system", tt); - } + actorSystem.terminate().onComplete( + new OnComplete<Terminated>() { + public void onComplete(Throwable failure, Terminated result) { + if (failure != null) { + LOG.error("Error shutting down actor system", failure); + } + } + }, directExecutionContext()); } futureExecutor.shutdownNow(); @@ -418,7 +427,11 @@ public class YarnApplicationMasterRunner { LOG.info("YARN Application Master started"); // wait until everything is done - actorSystem.awaitTermination(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Error shutting down actor system", e); + } // if we get here, everything work out jolly all right, and we even exited smoothly if (webMonitor != null) {