[FLINK-3363] [jobmanager] Properly shut down executor thread pool when 
JobManager shuts down


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a277543c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a277543c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a277543c

Branch: refs/heads/master
Commit: a277543c57f7c633e0f8b610b241eac5a95aa7cc
Parents: af3e689
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 8 13:18:50 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 8 16:57:57 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  42 +++++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |   5 +-
 .../JobManagerLeaderElectionTest.java           |  16 ++-
 .../testingUtils/TestingJobManager.scala        |  17 +---
 .../LocalFlinkMiniClusterITCase.java            | 101 ++++++++++++++++---
 .../flink/yarn/TestingYarnJobManager.scala      |   9 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  14 ++-
 7 files changed, 149 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index bc7c134..d96575f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager
 import java.io.{File, IOException}
 import java.net.{BindException, ServerSocket, UnknownHostException, 
InetAddress, InetSocketAddress}
 import java.util.UUID
+import java.util.concurrent.ExecutorService
 
 import akka.actor.Status.Failure
 import akka.actor._
@@ -90,7 +91,7 @@ import scala.language.postfixOps
  *  is indicated by [[CancellationSuccess]] and a failure by 
[[CancellationFailure]]
  *
  * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state 
of an
-     ExecutionVertex contained in the [[ExecutionGraph]].
+  * ExecutionVertex contained in the [[ExecutionGraph]].
  * A successful update is acknowledged by true and otherwise false.
  *
  * - [[RequestNextInputSplit]] requests the next input split for a running 
task on a
@@ -102,7 +103,7 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executionContext: ExecutionContext,
+    protected val executorService: ExecutorService,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -121,6 +122,15 @@ class JobManager(
 
   override val log = Logger(getClass)
 
+  /** The extra execution context, for futures, with a custom logging reporter 
*/
+  protected val executionContext: ExecutionContext = 
ExecutionContext.fromExecutor(
+    executorService,
+    (t: Throwable) => {
+      if (!context.system.isTerminated) {
+        log.error("Executor could not execute task", t)
+      }
+    })
+  
   /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, 
(ExecutionGraph, JobInfo)]()
 
@@ -246,6 +256,9 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the 
library cache manager.", e)
     }
 
+    // shut down the extra thread pool for futures
+    executorService.shutdown()
+    
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -1503,7 +1516,8 @@ class JobManager(
   
   /**
    * Updates the accumulators reported from a task manager via the Heartbeat 
message.
-   * @param accumulators list of accumulator snapshots
+    *
+    * @param accumulators list of accumulator snapshots
    */
   private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
     accumulators foreach {
@@ -2016,7 +2030,7 @@ object JobManager {
   def createJobManagerComponents(
       configuration: Configuration,
       leaderElectionServiceOption: Option[LeaderElectionService]) :
-    (ExecutionContext,
+    (ExecutorService,
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2064,17 +2078,19 @@ object JobManager {
           }
       }
 
-    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+    
 
     var blobServer: BlobServer = null
     var instanceManager: InstanceManager = null
     var scheduler: FlinkScheduler = null
     var libraryCacheManager: BlobLibraryCacheManager = null
 
+    val executorService: ExecutorService = new ForkJoinPool()
+    
     try {
       blobServer = new BlobServer(configuration)
       instanceManager = new InstanceManager()
-      scheduler = new FlinkScheduler(executionContext)
+      scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(executorService))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
 
       instanceManager.addInstanceListener(scheduler)
@@ -2093,6 +2109,8 @@ object JobManager {
         if (blobServer != null) {
           blobServer.shutdown()
         }
+        executorService.shutdownNow()
+        
         throw t
     }
 
@@ -2122,7 +2140,7 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
-    (executionContext,
+    (executorService,
       instanceManager,
       scheduler,
       libraryCacheManager,
@@ -2143,8 +2161,7 @@ object JobManager {
    * @param actorSystem The actor system running the JobManager
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   *
-   * @return A tuple of references (JobManager Ref, Archiver Ref)
+    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
@@ -2174,8 +2191,7 @@ object JobManager {
    *                          the actor will have the name generated by the 
actor system.
    * @param jobManagerClass The class of the JobManager to be started
    * @param archiveClass The class of the MemoryArchivist to be started
-   *
-   * @return A tuple of references (JobManager Ref, Archiver Ref)
+    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(
       configuration: Configuration,
@@ -2186,7 +2202,7 @@ object JobManager {
       archiveClass: Class[_ <: MemoryArchivist])
     : (ActorRef, ActorRef) = {
 
-    val (executionContext,
+    val (executorService: ExecutorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -2211,7 +2227,7 @@ object JobManager {
     val jobManagerProps = Props(
       jobManagerClass,
       configuration,
-      executionContext,
+      executorService,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bdb62ba..4cdda3f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent._
+import scala.concurrent.forkjoin.ForkJoinPool
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -82,7 +83,7 @@ abstract class FlinkMiniCluster(
 
   /** Future lock */
   val futureLock = new Object()
-
+  
   implicit val executionContext = ExecutionContext.global
 
   implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
@@ -320,8 +321,6 @@ abstract class FlinkMiniCluster(
       _.map(gracefulStop(_, timeout))
     } getOrElse(Seq())
 
-    implicit val executionContext = ExecutionContext.global
-
     Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)
 
     if (!useSingleActorSystem) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 47255fc..f50a0a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -52,7 +52,9 @@ import org.junit.rules.TemporaryFolder;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.forkjoin.ForkJoinPool;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class JobManagerLeaderElectionTest extends TestLogger {
@@ -62,14 +64,16 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
 
        private static ActorSystem actorSystem;
        private static TestingServer testingServer;
+       private static ExecutorService executor;
+       
        private static Timeout timeout = new 
Timeout(TestingUtils.TESTING_DURATION());
        private static FiniteDuration duration = new FiniteDuration(5, 
TimeUnit.MINUTES);
 
        @BeforeClass
        public static void setup() throws Exception {
                actorSystem = ActorSystem.create("TestingActorSystem");
-
                testingServer = new TestingServer();
+               executor = new ForkJoinPool();
        }
 
        @AfterClass
@@ -78,9 +82,13 @@ public class JobManagerLeaderElectionTest extends TestLogger 
{
                        JavaTestKit.shutdownActorSystem(actorSystem);
                }
 
-               if(testingServer != null) {
+               if (testingServer != null) {
                        testingServer.stop();
                }
+               
+               if (executor != null) {
+                       executor.shutdownNow();
+               }
        }
 
        /**
@@ -175,10 +183,10 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                return Props.create(
                                TestingJobManager.class,
                                configuration,
-                               TestingUtils.defaultExecutionContext(),
+                               executor,
                                new InstanceManager(),
                                new 
Scheduler(TestingUtils.defaultExecutionContext()),
-                               new BlobLibraryCacheManager(new 
BlobServer(configuration), 10l),
+                               new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
                                ActorRef.noSender(),
                                1,
                                1L,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 7cbff48..0c0ca40 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -27,25 +28,17 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import java.util.concurrent.ExecutorService
+
 /** JobManager implementation extended by testing messages
   *
-  * @param flinkConfiguration
-  * @param executionContext
-  * @param instanceManager
-  * @param scheduler
-  * @param libraryCacheManager
-  * @param archive
-  * @param defaultExecutionRetries
-  * @param delayBetweenRetries
-  * @param timeout
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -58,7 +51,7 @@ class TestingJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
-    executionContext,
+      executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
index 8e87143..440cfff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.runtime.minicluster;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,33 +29,42 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+
 import org.junit.Test;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.forkjoin.ForkJoinPool;
+import scala.concurrent.impl.ExecutionContextImpl;
 
-public class LocalFlinkMiniClusterITCase {
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
-       static ActorSystem system;
+import static org.junit.Assert.fail;
 
-       @BeforeClass
-       public static void setup() {
-               system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
-       }
+public class LocalFlinkMiniClusterITCase {
 
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-               system = null;
-       }
+       private static String[] ALLOWED_THREAD_PREFIXES = { };
 
        @Test
        public void testLocalFlinkMiniClusterWithMultipleTaskManagers() {
+               
+               final ActorSystem system = ActorSystem.create("Testkit", 
AkkaUtils.getDefaultAkkaConfig());
                LocalFlinkMiniCluster miniCluster = null;
 
                final int numTMs = 3;
                final int numSlots = 14;
 
-               try{
+               // gather the threads that already exist
+               final Set<Thread> threadsBefore = new HashSet<>();
+               {
+                       final Thread[] allThreads = new 
Thread[Thread.activeCount()];
+                       Thread.enumerate(allThreads);
+                       threadsBefore.addAll(Arrays.asList(allThreads));
+               }
+               
+               
+               try {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
@@ -90,7 +100,70 @@ public class LocalFlinkMiniClusterITCase {
                } finally {
                        if (miniCluster != null) {
                                miniCluster.stop();
+                               miniCluster.awaitTermination();
                        }
+
+                       JavaTestKit.shutdownActorSystem(system);
+                       system.awaitTermination();
+               }
+
+               // shut down the global execution context, to make sure it does 
not affect this testing
+               try {
+                       Field f = 
ExecutionContextImpl.class.getDeclaredField("executor");
+                       f.setAccessible(true);
+                       
+                       Object exec = ExecutionContext$.MODULE$.global();
+                       ForkJoinPool executor = (ForkJoinPool) f.get(exec);
+                       executor.shutdownNow();
+               }
+               catch (Exception e) {
+                       System.err.println("Cannot test proper thread shutdown 
for local execution.");
+                       return;
+               }
+               
+               // check for remaining threads
+               // we need to check repeatedly for a while, because some 
threads shut down slowly
+               
+               long deadline = System.currentTimeMillis() + 30000;
+               boolean foundThreads = true;
+               String threadName = "";
+               
+               while (System.currentTimeMillis() < deadline) {
+                       // check that no additional threads remain
+                       final Thread[] threadsAfter = new 
Thread[Thread.activeCount()];
+                       Thread.enumerate(threadsAfter);
+
+                       foundThreads = false;
+                       for (Thread t : threadsAfter) {
+                               if (t.isAlive() && !threadsBefore.contains(t)) {
+                                       // this thread was not there before. 
check if it is allowed
+                                       boolean allowed = false;
+                                       for (String prefix : 
ALLOWED_THREAD_PREFIXES) {
+                                               if 
(t.getName().startsWith(prefix)) {
+                                                       allowed = true;
+                                                       break;
+                                               }
+                                       }
+                                       
+                                       if (!allowed) {
+                                               foundThreads = true;
+                                               threadName = t.toString();
+                                               break;
+                                       }
+                               }
+                       }
+                       
+                       if (foundThreads) {
+                               try {
+                                       Thread.sleep(500);
+                               } catch (InterruptedException ignored) {}
+                       } else {
+                               break;
+                       }
+               }
+               
+               if (foundThreads) {
+                       fail("Thread " + threadName + " was started by the mini 
cluster, but not shut down");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index a7f21e5..2d50407 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn
 
+import java.util.concurrent.ExecutorService
+
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
@@ -28,7 +30,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
 
 /** [[YarnJobManager]] implementation which mixes in the 
[[TestingJobManagerLike]] mixin.
@@ -37,7 +38,7 @@ import scala.concurrent.duration.FiniteDuration
   * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executionContext Execution context which is used to execute 
concurrent tasks in the
+  * @param executorService Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -51,7 +52,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -64,7 +65,7 @@ class TestingYarnJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends YarnJobManager(
     flinkConfiguration,
-    executionContext,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/a277543c/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 708c1d8..8dfa22d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,10 +22,13 @@ import java.io.File
 import java.lang.reflect.Method
 import java.nio.ByteBuffer
 import java.util.Collections
+import java.util.concurrent.ExecutorService
 import java.util.{List => JavaList}
 
 import akka.actor.ActorRef
+
 import grizzled.slf4j.Logger
+
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -40,6 +43,7 @@ import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.yarn.YarnMessages._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
@@ -55,7 +59,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.YarnException
 import org.apache.hadoop.yarn.util.Records
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Try
@@ -64,7 +67,7 @@ import scala.util.Try
   * to start/administer/stop the Yarn session.
   *
   * @param flinkConfiguration Configuration object for the actor
-  * @param executionContext Execution context which is used to execute 
concurrent tasks in the
+  * @param executorService Execution context which is used to execute 
concurrent tasks in the
   *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
@@ -78,7 +81,7 @@ import scala.util.Try
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    executionContext: ExecutionContext,
+    executorService: ExecutorService,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -91,7 +94,7 @@ class YarnJobManager(
     checkpointRecoveryFactory : CheckpointRecoveryFactory)
   extends JobManager(
     flinkConfiguration,
-    executionContext,
+    executorService,
     instanceManager,
     scheduler,
     libraryCacheManager,
@@ -587,7 +590,8 @@ class YarnJobManager(
 
   /**
    * Calculate the correct JVM heap memory limit.
-   * @param memoryLimit The maximum memory in megabytes.
+    *
+    * @param memoryLimit The maximum memory in megabytes.
    * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
    */
   private def calculateMemoryLimits(memoryLimit: Long): Long = {

Reply via email to