[FLINK-3363] [jobmanager] Properly shut down executor thread pool when JobManager shuts down
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a277543c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a277543c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a277543c Branch: refs/heads/master Commit: a277543c57f7c633e0f8b610b241eac5a95aa7cc Parents: af3e689 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 8 13:18:50 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 8 16:57:57 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 42 +++++--- .../runtime/minicluster/FlinkMiniCluster.scala | 5 +- .../JobManagerLeaderElectionTest.java | 16 ++- .../testingUtils/TestingJobManager.scala | 17 +--- .../LocalFlinkMiniClusterITCase.java | 101 ++++++++++++++++--- .../flink/yarn/TestingYarnJobManager.scala | 9 +- .../org/apache/flink/yarn/YarnJobManager.scala | 14 ++- 7 files changed, 149 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index bc7c134..d96575f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} import java.net.{BindException, ServerSocket, UnknownHostException, InetAddress, InetSocketAddress} import java.util.UUID +import java.util.concurrent.ExecutorService import akka.actor.Status.Failure import akka.actor._ @@ -90,7 +91,7 @@ import scala.language.postfixOps * is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]] * * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an - ExecutionVertex contained in the [[ExecutionGraph]]. + * ExecutionVertex contained in the [[ExecutionGraph]]. * A successful update is acknowledged by true and otherwise false. * * - [[RequestNextInputSplit]] requests the next input split for a running task on a @@ -102,7 +103,7 @@ import scala.language.postfixOps */ class JobManager( protected val flinkConfiguration: Configuration, - protected val executionContext: ExecutionContext, + protected val executorService: ExecutorService, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, @@ -121,6 +122,15 @@ class JobManager( override val log = Logger(getClass) + /** The extra execution context, for futures, with a custom logging reporter */ + protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor( + executorService, + (t: Throwable) => { + if (!context.system.isTerminated) { + log.error("Executor could not execute task", t) + } + }) + /** Either running or not yet archived jobs (session hasn't been ended). */ protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() @@ -246,6 +256,9 @@ class JobManager( case e: IOException => log.error("Could not properly shutdown the library cache manager.", e) } + // shut down the extra thread pool for futures + executorService.shutdown() + log.debug(s"Job manager ${self.path} is completely stopped.") } @@ -1503,7 +1516,8 @@ class JobManager( /** * Updates the accumulators reported from a task manager via the Heartbeat message. - * @param accumulators list of accumulator snapshots + * + * @param accumulators list of accumulator snapshots */ private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = { accumulators foreach { @@ -2016,7 +2030,7 @@ object JobManager { def createJobManagerComponents( configuration: Configuration, leaderElectionServiceOption: Option[LeaderElectionService]) : - (ExecutionContext, + (ExecutorService, InstanceManager, FlinkScheduler, BlobLibraryCacheManager, @@ -2064,17 +2078,19 @@ object JobManager { } } - val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) + var blobServer: BlobServer = null var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null var libraryCacheManager: BlobLibraryCacheManager = null + val executorService: ExecutorService = new ForkJoinPool() + try { blobServer = new BlobServer(configuration) instanceManager = new InstanceManager() - scheduler = new FlinkScheduler(executionContext) + scheduler = new FlinkScheduler(ExecutionContext.fromExecutor(executorService)) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) instanceManager.addInstanceListener(scheduler) @@ -2093,6 +2109,8 @@ object JobManager { if (blobServer != null) { blobServer.shutdown() } + executorService.shutdownNow() + throw t } @@ -2122,7 +2140,7 @@ object JobManager { new ZooKeeperCheckpointRecoveryFactory(client, configuration)) } - (executionContext, + (executorService, instanceManager, scheduler, libraryCacheManager, @@ -2143,8 +2161,7 @@ object JobManager { * @param actorSystem The actor system running the JobManager * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the MemoryArchivist to be started - * - * @return A tuple of references (JobManager Ref, Archiver Ref) + * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors( configuration: Configuration, @@ -2174,8 +2191,7 @@ object JobManager { * the actor will have the name generated by the actor system. * @param jobManagerClass The class of the JobManager to be started * @param archiveClass The class of the MemoryArchivist to be started - * - * @return A tuple of references (JobManager Ref, Archiver Ref) + * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors( configuration: Configuration, @@ -2186,7 +2202,7 @@ object JobManager { archiveClass: Class[_ <: MemoryArchivist]) : (ActorRef, ActorRef) = { - val (executionContext, + val (executorService: ExecutorService, instanceManager, scheduler, libraryCacheManager, @@ -2211,7 +2227,7 @@ object JobManager { val jobManagerProps = Props( jobManagerClass, configuration, - executionContext, + executorService, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index bdb62ba..4cdda3f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory import scala.concurrent.duration.FiniteDuration import scala.concurrent._ +import scala.concurrent.forkjoin.ForkJoinPool /** * Abstract base class for Flink's mini cluster. The mini cluster starts a @@ -82,7 +83,7 @@ abstract class FlinkMiniCluster( /** Future lock */ val futureLock = new Object() - + implicit val executionContext = ExecutionContext.global implicit val timeout = AkkaUtils.getTimeout(userConfiguration) @@ -320,8 +321,6 @@ abstract class FlinkMiniCluster( _.map(gracefulStop(_, timeout)) } getOrElse(Seq()) - implicit val executionContext = ExecutionContext.global - Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout) if (!useSingleActorSystem) { http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index 47255fc..f50a0a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -52,7 +52,9 @@ import org.junit.rules.TemporaryFolder; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.forkjoin.ForkJoinPool; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class JobManagerLeaderElectionTest extends TestLogger { @@ -62,14 +64,16 @@ public class JobManagerLeaderElectionTest extends TestLogger { private static ActorSystem actorSystem; private static TestingServer testingServer; + private static ExecutorService executor; + private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION()); private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES); @BeforeClass public static void setup() throws Exception { actorSystem = ActorSystem.create("TestingActorSystem"); - testingServer = new TestingServer(); + executor = new ForkJoinPool(); } @AfterClass @@ -78,9 +82,13 @@ public class JobManagerLeaderElectionTest extends TestLogger { JavaTestKit.shutdownActorSystem(actorSystem); } - if(testingServer != null) { + if (testingServer != null) { testingServer.stop(); } + + if (executor != null) { + executor.shutdownNow(); + } } /** @@ -175,10 +183,10 @@ public class JobManagerLeaderElectionTest extends TestLogger { return Props.create( TestingJobManager.class, configuration, - TestingUtils.defaultExecutionContext(), + executor, new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration), 10l), + new BlobLibraryCacheManager(new BlobServer(configuration), 10L), ActorRef.noSender(), 1, 1L, http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 7cbff48..0c0ca40 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -19,6 +19,7 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef + import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -27,25 +28,17 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps +import java.util.concurrent.ExecutorService + /** JobManager implementation extended by testing messages * - * @param flinkConfiguration - * @param executionContext - * @param instanceManager - * @param scheduler - * @param libraryCacheManager - * @param archive - * @param defaultExecutionRetries - * @param delayBetweenRetries - * @param timeout */ class TestingJobManager( flinkConfiguration: Configuration, - executionContext: ExecutionContext, + executorService: ExecutorService, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -58,7 +51,7 @@ class TestingJobManager( checkpointRecoveryFactory : CheckpointRecoveryFactory) extends JobManager( flinkConfiguration, - executionContext, + executorService, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 8e87143..440cfff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -28,33 +29,42 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; + import org.junit.Test; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.forkjoin.ForkJoinPool; +import scala.concurrent.impl.ExecutionContextImpl; -public class LocalFlinkMiniClusterITCase { +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; - static ActorSystem system; +import static org.junit.Assert.fail; - @BeforeClass - public static void setup() { - system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig()); - } +public class LocalFlinkMiniClusterITCase { - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } + private static String[] ALLOWED_THREAD_PREFIXES = { }; @Test public void testLocalFlinkMiniClusterWithMultipleTaskManagers() { + + final ActorSystem system = ActorSystem.create("Testkit", AkkaUtils.getDefaultAkkaConfig()); LocalFlinkMiniCluster miniCluster = null; final int numTMs = 3; final int numSlots = 14; - try{ + // gather the threads that already exist + final Set<Thread> threadsBefore = new HashSet<>(); + { + final Thread[] allThreads = new Thread[Thread.activeCount()]; + Thread.enumerate(allThreads); + threadsBefore.addAll(Arrays.asList(allThreads)); + } + + + try { Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); @@ -90,7 +100,70 @@ public class LocalFlinkMiniClusterITCase { } finally { if (miniCluster != null) { miniCluster.stop(); + miniCluster.awaitTermination(); } + + JavaTestKit.shutdownActorSystem(system); + system.awaitTermination(); + } + + // shut down the global execution context, to make sure it does not affect this testing + try { + Field f = ExecutionContextImpl.class.getDeclaredField("executor"); + f.setAccessible(true); + + Object exec = ExecutionContext$.MODULE$.global(); + ForkJoinPool executor = (ForkJoinPool) f.get(exec); + executor.shutdownNow(); + } + catch (Exception e) { + System.err.println("Cannot test proper thread shutdown for local execution."); + return; + } + + // check for remaining threads + // we need to check repeatedly for a while, because some threads shut down slowly + + long deadline = System.currentTimeMillis() + 30000; + boolean foundThreads = true; + String threadName = ""; + + while (System.currentTimeMillis() < deadline) { + // check that no additional threads remain + final Thread[] threadsAfter = new Thread[Thread.activeCount()]; + Thread.enumerate(threadsAfter); + + foundThreads = false; + for (Thread t : threadsAfter) { + if (t.isAlive() && !threadsBefore.contains(t)) { + // this thread was not there before. check if it is allowed + boolean allowed = false; + for (String prefix : ALLOWED_THREAD_PREFIXES) { + if (t.getName().startsWith(prefix)) { + allowed = true; + break; + } + } + + if (!allowed) { + foundThreads = true; + threadName = t.toString(); + break; + } + } + } + + if (foundThreads) { + try { + Thread.sleep(500); + } catch (InterruptedException ignored) {} + } else { + break; + } + } + + if (foundThreads) { + fail("Thread " + threadName + " was started by the mini cluster, but not shut down"); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index a7f21e5..2d50407 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -18,6 +18,8 @@ package org.apache.flink.yarn +import java.util.concurrent.ExecutorService + import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory @@ -28,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.testingUtils.TestingJobManagerLike -import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration /** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin. @@ -37,7 +38,7 @@ import scala.concurrent.duration.FiniteDuration * instead of an anonymous class with the respective mixin to obtain a more readable logger name. * * @param flinkConfiguration Configuration object for the actor - * @param executionContext Execution context which is used to execute concurrent tasks in the + * @param executorService Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -51,7 +52,7 @@ import scala.concurrent.duration.FiniteDuration */ class TestingYarnJobManager( flinkConfiguration: Configuration, - executionContext: ExecutionContext, + executorService: ExecutorService, instanceManager: InstanceManager, scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -64,7 +65,7 @@ class TestingYarnJobManager( checkpointRecoveryFactory : CheckpointRecoveryFactory) extends YarnJobManager( flinkConfiguration, - executionContext, + executorService, instanceManager, scheduler, libraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 708c1d8..8dfa22d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -22,10 +22,13 @@ import java.io.File import java.lang.reflect.Method import java.nio.ByteBuffer import java.util.Collections +import java.util.concurrent.ExecutorService import java.util.{List => JavaList} import akka.actor.ActorRef + import grizzled.slf4j.Logger + import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils @@ -40,6 +43,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.yarn.YarnMessages._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.DataOutputBuffer @@ -55,7 +59,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.YarnException import org.apache.hadoop.yarn.util.Records -import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Try @@ -64,7 +67,7 @@ import scala.util.Try * to start/administer/stop the Yarn session. * * @param flinkConfiguration Configuration object for the actor - * @param executionContext Execution context which is used to execute concurrent tasks in the + * @param executorService Execution context which is used to execute concurrent tasks in the * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] * @param instanceManager Instance manager to manage the registered * [[org.apache.flink.runtime.taskmanager.TaskManager]] @@ -78,7 +81,7 @@ import scala.util.Try */ class YarnJobManager( flinkConfiguration: FlinkConfiguration, - executionContext: ExecutionContext, + executorService: ExecutorService, instanceManager: InstanceManager, scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, @@ -91,7 +94,7 @@ class YarnJobManager( checkpointRecoveryFactory : CheckpointRecoveryFactory) extends JobManager( flinkConfiguration, - executionContext, + executorService, instanceManager, scheduler, libraryCacheManager, @@ -587,7 +590,8 @@ class YarnJobManager( /** * Calculate the correct JVM heap memory limit. - * @param memoryLimit The maximum memory in megabytes. + * + * @param memoryLimit The maximum memory in megabytes. * @return A Tuple2 containing the heap and the offHeap limit in megabytes. */ private def calculateMemoryLimits(memoryLimit: Long): Long = {