asfgit closed pull request #6446: [FLINK-9240] Avoid deprecated Akka methods URL: https://github.com/apache/flink/pull/6446
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 373267ffc34..309010e4bf0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -90,6 +90,7 @@ import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -249,8 +250,8 @@ public boolean isLoaded() { @Override public void close() throws Exception { if (isLoaded()) { - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); actorSystem = null; } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index ca8b6fbabf7..ec9cfc548c1 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -67,6 +67,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + /** * Simple and maybe stupid test to check the {@link ClusterClient} class. */ @@ -114,8 +117,8 @@ public void setUp() throws Exception { public void shutDownActorSystem() { if (jobManagerSystem != null) { try { - jobManagerSystem.shutdown(); - jobManagerSystem.awaitTermination(); + jobManagerSystem.terminate(); + Await.ready(jobManagerSystem.whenTerminated(), Duration.Inf()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 630fa839000..1e0670efc5f 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -57,6 +57,8 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; +import akka.actor.Terminated; +import akka.dispatch.OnComplete; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; @@ -73,10 +75,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext; import static org.apache.flink.util.Preconditions.checkState; /** @@ -374,11 +380,14 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie } if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable tt) { - LOG.error("Error shutting down actor system", tt); - } + actorSystem.terminate().onComplete( + new OnComplete<Terminated>() { + public void onComplete(Throwable failure, Terminated result) { + if (failure != null) { + LOG.error("Error shutting down actor system", failure); + } + } + }, directExecutionContext()); } if (futureExecutor != null) { @@ -412,7 +421,11 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie LOG.info("Mesos JobManager started"); // wait until everything is done - actorSystem.awaitTermination(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Error shutting down actor system", e); + } // if we get here, everything work out jolly all right, and we even exited smoothly if (webMonitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java index 708437ff8cb..38c29bd7156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java @@ -25,6 +25,9 @@ import akka.actor.Address; import org.slf4j.Logger; +import java.util.concurrent.TimeoutException; + +import scala.concurrent.Await; import scala.concurrent.duration.FiniteDuration; /** @@ -65,11 +68,13 @@ public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) { private void shutdownActorSystem(ActorSystem actorSystem) { // shut the actor system down - actorSystem.shutdown(); + actorSystem.terminate(); try { // give it some time to complete the shutdown - actorSystem.awaitTermination(timeout); + Await.ready(actorSystem.whenTerminated(), timeout); + } catch (InterruptedException | TimeoutException e) { + log.error("Exception thrown when terminating the actor system", e); } finally { // now let's crash the JVM System.exit(exitCode); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java index 0b0cbf5cb01..808de222d0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java @@ -43,11 +43,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -154,8 +156,12 @@ public Configuration getConfiguration() { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + exception = e; + } try { highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java index 8bb357648bf..b166e1066ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/MemoryLogger.java @@ -106,7 +106,7 @@ public void shutdown() { @Override public void run() { try { - while (running && (monitored == null || !monitored.isTerminated())) { + while (running && (monitored == null || !monitored.whenTerminated().isCompleted())) { logger.info(getMemoryUsageStatsAsString(memoryBean)); logger.info(getDirectMemoryStatsAsString(directBufferBean)); logger.info(getMemoryPoolStatsAsString(poolBeans)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java index e19cfd70a92..db0a1dcaf2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ProcessShutDownThread.java @@ -20,6 +20,7 @@ import akka.actor.ActorSystem; import org.slf4j.Logger; +import scala.concurrent.Await; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeoutException; @@ -67,7 +68,7 @@ public ProcessShutDownThread( @Override public void run() { try { - actorSystem.awaitTermination(terminationTimeout); + Await.ready(actorSystem.whenTerminated(), terminationTimeout); } catch (Exception e) { if (e instanceof TimeoutException) { log.error("Actor system shut down timed out.", e); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2a8f49267d9..cf831e32340 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -44,6 +44,7 @@ import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager} +import org.apache.flink.runtime.concurrent.Executors.directExecutionContext import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder @@ -1867,7 +1868,10 @@ class JobManager( FiniteDuration(10, SECONDS)).start() // Shutdown and discard all queued messages - context.system.shutdown() + context.system.terminate().onComplete { + case scala.util.Success(_) => + case scala.util.Failure(t) => log.warn("Could not cleanly shut down actor system", t) + }(directExecutionContext()) } private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = { @@ -2046,7 +2050,7 @@ object JobManager { } // block until everything is shut down - jobManagerSystem.awaitTermination() + Await.ready(jobManagerSystem.whenTerminated, Duration.Inf) webMonitorOption.foreach{ webMonitor => @@ -2288,11 +2292,10 @@ object JobManager { catch { case t: Throwable => LOG.error("Error while starting up JobManager", t) - try { - jobManagerSystem.shutdown() - } catch { - case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt) - } + jobManagerSystem.terminate().onComplete { + case scala.util.Success(_) => + case scala.util.Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt) + }(directExecutionContext()) throw t } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 8d9e1ee1e07..4f70a22f9c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent._ +import scala.util.{Failure, Success} /** * Abstract base class for Flink's mini cluster. The mini cluster starts a @@ -479,30 +480,30 @@ abstract class FlinkMiniCluster( if (!useSingleActorSystem) { taskManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } resourceManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } } jobManagerActorSystems foreach { - _ foreach(_.shutdown()) + _ foreach(_.terminate()) } } def awaitTermination(): Unit = { jobManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } resourceManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } taskManagerActorSystems foreach { - _ foreach(_.awaitTermination()) + _ foreach(s => Await.ready(s.whenTerminated, Duration.Inf)) } } @@ -625,7 +626,10 @@ abstract class FlinkMiniCluster( def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = { if(!useSingleActorSystem) { - actorSystem.shutdown() + actorSystem.terminate().onComplete { + case Success(_) => + case Failure(t) => LOG.warn("Could not cleanly shut down the job client actor system.", t) + } } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 9a057ab62b2..8db4c160d04 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -77,6 +77,7 @@ import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.{Failure, Success} /** * The TaskManager is responsible for executing the individual tasks of a Flink job. It is @@ -423,7 +424,7 @@ class TaskManager( futureResponse.mapTo[Boolean].onComplete { // IMPORTANT: In the future callback, we cannot directly modify state // but only send messages to the TaskManager to do those changes - case scala.util.Success(result) => + case Success(result) => if (!result) { self ! decorateMessage( FailTask( @@ -432,7 +433,7 @@ class TaskManager( ) } - case scala.util.Failure(t) => + case Failure(t) => self ! decorateMessage( FailTask( executionID, @@ -839,10 +840,10 @@ class TaskManager( blobCache.get.getTransientBlobService.putTransient(fis) }(context.dispatcher) .onComplete { - case scala.util.Success(value) => + case Success(value) => sender ! value fis.close() - case scala.util.Failure(e) => + case Failure(e) => sender ! akka.actor.Status.Failure(e) fis.close() }(context.dispatcher) @@ -1534,7 +1535,7 @@ class TaskManager( } protected def shutdown(): Unit = { - context.system.shutdown() + context.system.terminate() // Await actor system termination and shut down JVM new ProcessShutDownThread( @@ -1897,15 +1898,14 @@ object TaskManager { } // block until everything is done - taskManagerSystem.awaitTermination() + Await.ready(taskManagerSystem.whenTerminated, Duration.Inf) } catch { case t: Throwable => LOG.error("Error while starting up taskManager", t) - try { - taskManagerSystem.shutdown() - } catch { - case tt: Throwable => LOG.warn("Could not cleanly shut down actor system", tt) - } + taskManagerSystem.terminate().onComplete { + case Success(_) => + case Failure(tt) => LOG.warn("Could not cleanly shut down actor system", tt) + }(Executors.directExecutionContext()) throw t } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java index 37a45477cc3..a998fb0ac8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java @@ -43,7 +43,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -72,10 +75,10 @@ public static void setup() { } @AfterClass - public static void tearDown() { + public static void tearDown() throws InterruptedException, TimeoutException { if (actorSystem1 != null) { - actorSystem1.shutdown(); - actorSystem1.awaitTermination(); + actorSystem1.terminate(); + Await.ready(actorSystem1.whenTerminated(), Duration.Inf()); } } @@ -85,10 +88,10 @@ public void setupTest() { } @After - public void tearDownTest() { + public void tearDownTest() throws InterruptedException, TimeoutException { if (actorSystem2 != null) { - actorSystem2.shutdown(); - actorSystem2.awaitTermination(); + actorSystem2.terminate(); + Await.ready(actorSystem2.whenTerminated(), Duration.Inf()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java index 0b7547df33a..ae8542bf9e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java @@ -321,11 +321,11 @@ public void testClientNonDetachedListeningBehaviour() throws Exception { } if (taskManagerSystem != null) { - taskManagerSystem.shutdown(); + taskManagerSystem.terminate(); } if (testSystem != null) { - testSystem.shutdown(); + testSystem.terminate(); } highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java index 38b843146ed..fc164830904 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java @@ -182,7 +182,7 @@ public void testReapProcessOnFailure() { jmProcess.destroy(); } if (localSystem != null) { - localSystem.shutdown(); + localSystem.terminate(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 873a4f1f4f4..052349cd043 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -132,6 +132,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; @@ -940,7 +941,7 @@ public void testCancelWithSavepoint() throws Exception { assertTrue(savepointFile.exists()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -956,7 +957,7 @@ public void testCancelWithSavepoint() throws Exception { } if (actorSystem != null) { - actorSystem.awaitTermination(TESTING_TIMEOUT()); + Await.result(actorSystem.whenTerminated(), TESTING_TIMEOUT()); } } } @@ -1130,7 +1131,7 @@ public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception { } } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1243,7 +1244,7 @@ public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception assertEquals(1, targetDirectory.listFiles().length); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1259,7 +1260,7 @@ public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception } if (actorSystem != null) { - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.result(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } } } @@ -1416,7 +1417,7 @@ public void testSavepointRestoreSettings() throws Exception { assertTrue("Unexpected response: " + response, response instanceof JobSubmitSuccess); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (archiver != null) { @@ -1432,7 +1433,7 @@ public void testSavepointRestoreSettings() throws Exception { } if (actorSystem != null) { - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } } } @@ -1516,8 +1517,8 @@ public void testResourceManagerConnection() throws TimeoutException, Interrupted } finally { // cleanup the actor system and with it all of the started actors if not already terminated - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index c7d837b00ed..ef493b9330a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -117,7 +117,7 @@ public static void setupJobManager() { @AfterClass public static void teardownJobmanager() throws Exception { if (jobManagerSystem != null) { - jobManagerSystem.shutdown(); + jobManagerSystem.terminate(); } if (highAvailabilityServices != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index db040232f69..3abde88e574 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -154,11 +156,11 @@ protected void run() { Assert.assertFalse(metricRegistry.isShutdown()); // shut down the actors and the actor system - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.result(actorSystem.whenTerminated(), Duration.Inf()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } if (highAvailabilityServices != null) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 1acaf6181bb..3767421b7d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -116,7 +116,7 @@ public long getCount() { testActor.message = null; assertEquals(0, emptyDump.serializedMetrics.length); - s.shutdown(); + s.terminate(); } private static class TestActor extends UntypedActor { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java index 786b0aeae2c..ed98be5d480 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java @@ -66,7 +66,7 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { if (system != null) { - system.shutdown(); + system.terminate(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 07d02443f1a..a32c1f64906 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -49,6 +49,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import scala.concurrent.Await; + public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ @@ -259,8 +261,8 @@ public void testActorTerminationWhenServiceShutdown() throws Exception { terminationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } finally { - rpcActorSystem.shutdown(); - rpcActorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + rpcActorSystem.terminate(); + Await.ready(rpcActorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 828993012b8..9669513a111 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -62,6 +62,8 @@ import java.util.concurrent.TimeUnit; import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertTrue; @@ -205,8 +207,8 @@ protected void run() { jobManager.tell(Kill.getInstance(), ActorRef.noSender()); // shut down the actors and the actor system - actorSystem.shutdown(); - actorSystem.awaitTermination(); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); actorSystem = null; // now that the TaskManager is shut down, the components should be shut down as well @@ -215,9 +217,9 @@ protected void run() { assertTrue(memManager.isShutdown()); } finally { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); - actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT()); + Await.ready(actorSystem.whenTerminated(), TestingUtils.TESTING_TIMEOUT()); } highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java index ae66a0836ad..0b9b951f54f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java @@ -218,7 +218,7 @@ public void testReapProcessOnFailure() throws Exception { taskManagerProcess.destroy(); } if (jmActorSystem != null) { - jmActorSystem.shutdown(); + jmActorSystem.terminate(); } if (highAvailabilityServices != null) { highAvailabilityServices.closeAndCleanupAllData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index ad32a4fdd8d..7ee0921646a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -102,7 +102,7 @@ public static void startActorSystem() { @AfterClass public static void shutdownActorSystem() { if (actorSystem != null) { - actorSystem.shutdown(); + actorSystem.terminate(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java index 94473b9581c..53145504ec8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/AkkaJobManagerRetrieverTest.java @@ -37,10 +37,13 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import scala.concurrent.Await; + /** * Test for the {@link AkkaJobManagerRetriever}. */ @@ -55,10 +58,10 @@ public static void setup() { } @AfterClass - public static void teardown() { + public static void teardown() throws InterruptedException, TimeoutException { if (actorSystem != null) { - actorSystem.shutdown(); - actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + actorSystem.terminate(); + Await.ready(actorSystem.whenTerminated(), FutureUtils.toFiniteDuration(timeout)); actorSystem = null; } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala index 6d7d87cbf2b..947d0291db3 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala @@ -77,7 +77,7 @@ class JobManagerConnectionTest { fail(e.getMessage) } finally { - actorSystem.shutdown() + actorSystem.terminate() } } @@ -116,7 +116,7 @@ class JobManagerConnectionTest { fail(e.getMessage) } finally { - actorSystem.shutdown() + actorSystem.terminate() } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index c2d47f92d80..f722935f90d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -50,6 +50,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages.Alive import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager import org.apache.flink.runtime.testutils.TestingResourceManager +import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, ExecutionContext, Future} @@ -229,8 +230,8 @@ class TestingCluster( Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) if(!singleActorSystem) { - jmActorSystems(index).shutdown() - jmActorSystems(index).awaitTermination() + jmActorSystems(index).terminate() + Await.ready(jmActorSystems(index).whenTerminated, Duration.Inf) } val newJobManagerActorSystem = if(!singleActorSystem) { @@ -274,8 +275,8 @@ class TestingCluster( Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) if(!singleActorSystem) { - tmActorSystems(index).shutdown() - tmActorSystems(index).awaitTermination() + tmActorSystems(index).terminate() + Await.ready(tmActorSystems(index).whenTerminated, Duration.Inf) } val taskManagerActorSystem = if(!singleActorSystem) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index ce750d3b067..a22a8a8c930 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -94,7 +94,9 @@ import scala.Option; import scala.Some; import scala.Tuple2; +import scala.concurrent.Await; import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; @@ -425,8 +427,8 @@ public Object getKey(Long value) throws Exception { miniCluster.awaitTermination(); } - system.shutdown(); - system.awaitTermination(); + system.terminate(); + Await.ready(system.whenTerminated(), Duration.Inf()); testingServer.stop(); testingServer.close(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index aa2f38d2619..00c6865300c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -38,8 +38,11 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeoutException; +import scala.concurrent.Await; import scala.concurrent.ExecutionContext$; +import scala.concurrent.duration.Duration; import scala.concurrent.forkjoin.ForkJoinPool; import scala.concurrent.impl.ExecutionContextImpl; @@ -62,7 +65,7 @@ }; @Test - public void testLocalFlinkMiniClusterWithMultipleTaskManagers() { + public void testLocalFlinkMiniClusterWithMultipleTaskManagers() throws InterruptedException, TimeoutException { final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig()); LocalFlinkMiniCluster miniCluster = null; @@ -117,7 +120,7 @@ protected void run() { } JavaTestKit.shutdownActorSystem(system); - system.awaitTermination(); + Await.ready(system.whenTerminated(), Duration.Inf()); } // shut down the global execution context, to make sure it does not affect this testing diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 497ac87c4a3..e98e174cbdb 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -57,6 +57,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.actor.Terminated; +import akka.dispatch.OnComplete; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -71,11 +73,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import scala.Option; import scala.Some; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import static org.apache.flink.runtime.concurrent.Executors.directExecutionContext; import static org.apache.flink.yarn.Utils.require; /** @@ -401,11 +407,14 @@ protected int runApplicationMaster(Configuration config) { } if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable tt) { - LOG.error("Error shutting down actor system", tt); - } + actorSystem.terminate().onComplete( + new OnComplete<Terminated>() { + public void onComplete(Throwable failure, Terminated result) { + if (failure != null) { + LOG.error("Error shutting down actor system", failure); + } + } + }, directExecutionContext()); } futureExecutor.shutdownNow(); @@ -418,7 +427,11 @@ protected int runApplicationMaster(Configuration config) { LOG.info("YARN Application Master started"); // wait until everything is done - actorSystem.awaitTermination(); + try { + Await.ready(actorSystem.whenTerminated(), Duration.Inf()); + } catch (InterruptedException | TimeoutException e) { + LOG.error("Error shutting down actor system", e); + } // if we get here, everything work out jolly all right, and we even exited smoothly if (webMonitor != null) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services