[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future 
and I/O thread pools

This closes #3290


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254

Branch: refs/heads/master
Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400
Parents: 6630513
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 9 14:04:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 16:28:30 2017 +0100

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |  10 +-
 .../flink/runtime/filecache/FileCache.java      |   3 +-
 .../runtime/jobmaster/JobManagerServices.java   |   6 +-
 .../runtime/util/ExecutorThreadFactory.java     | 123 ++++++++++++++-----
 .../flink/runtime/util/NamedThreadFactory.java  |  58 ---------
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  10 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   8 +-
 8 files changed, 119 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5033692..a23c9f6 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,10 +22,12 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -52,15 +54,17 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.mesos.Protos;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner {
 
                        futureExecutor = Executors.newScheduledThreadPool(
                                numberProcessors,
-                               new 
NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+                               new 
ExecutorThreadFactory("mesos-jobmanager-future"));
 
                        ioExecutor = Executors.newFixedThreadPool(
                                numberProcessors,
-                               new NamedThreadFactory("mesos-jobmanager-io-", 
"-thread-"));
+                               new 
ExecutorThreadFactory("mesos-jobmanager-io"));
 
                        mesosServices = 
MesosServicesUtils.createMesosServices(config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 21456de..4f2166f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -99,7 +99,8 @@ public class FileCache {
                this.shutdownHook = createShutdownHook(this, LOG);
 
                this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, 
File, Path, Future<Path>>>>();
-               this.executorService = Executors.newScheduledThreadPool(10, 
ExecutorThreadFactory.INSTANCE);
+               this.executorService = Executors.newScheduledThreadPool(10, 
+                               new ExecutorThreadFactory("flink-file-cache"));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 95500e5..8cda0f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -120,8 +120,12 @@ public class JobManagerServices {
                        throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
                }
 
+               final ScheduledExecutorService futureExecutor = 
Executors.newScheduledThreadPool(
+                               Hardware.getNumberCPUCores(),
+                               new ExecutorThreadFactory("jobmanager-future"));
+
                return new JobManagerServices(
-                       
Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), 
ExecutorThreadFactory.INSTANCE),
+                       futureExecutor,
                        libraryCacheManager,
                        
RestartStrategyFactory.createRestartStrategyFactory(config),
                        Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
index 2fb5972..4a79db3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java
@@ -18,49 +18,114 @@
 
 package org.apache.flink.runtime.util;
 
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A thread factory intended for use by critical thread pools. Critical thread 
pools here
+ * mean thread pools that support Flink's core coordination and processing 
work, and which
+ * must not simply cause unnoticed errors.
+ * 
+ * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for 
the threads.
+ * If no handler is explicitly given, the default handler for uncaught 
exceptions will log
+ * the exceptions and kill the process afterwards. That guarantees that 
critical exceptions are
+ * not accidentally lost and leave the system running in an inconsistent state.
+ * 
+ * <p>Threads created by this factory are all called '(pool-name)-thread-n', 
where
+ * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number.
+ * 
+ * <p>All threads created by this factory are daemon threads and have the 
default (normal)
+ * priority.
+ */
 public class ExecutorThreadFactory implements ThreadFactory {
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorThreadFactory.class);
-       
-       
-       private static final String THREAD_NAME_PREFIX = "Flink Executor Thread 
- ";
-       
-       private static final AtomicInteger COUNTER = new AtomicInteger(1);
-       
-       private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink 
Executor Threads");
-       
-       private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER 
= new LoggingExceptionHander();
-       
-       
-       public static final ExecutorThreadFactory INSTANCE = new 
ExecutorThreadFactory();
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private ExecutorThreadFactory() {}
-       
-       
-       public Thread newThread(Runnable target) {
-               Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX 
+ COUNTER.getAndIncrement());
+
+       /** The thread pool name used when no explicit pool name has been 
specified */ 
+       private static final String DEFAULT_POOL_NAME = "flink-executor-pool";
+
+       private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+       private final ThreadGroup group;
+
+       private final String namePrefix;
+
+       private final UncaughtExceptionHandler exceptionHandler;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new thread factory using the default thread pool name 
('flink-executor-pool')
+        * and the default uncaught exception handler (log exception and kill 
process).
+        */
+       public ExecutorThreadFactory() {
+               this(DEFAULT_POOL_NAME);
+       }
+
+       /**
+        * Creates a new thread factory using the given thread pool name and 
the default
+        * uncaught exception handler (log exception and kill process).
+        * 
+        * @param poolName The pool name, used as the threads' name prefix
+        */
+       public ExecutorThreadFactory(String poolName) {
+               this(poolName, FatalExitExceptionHandler.INSTANCE);
+       }
+
+       /**
+        * Creates a new thread factory using the given thread pool name and 
the given
+        * uncaught exception handler.
+        * 
+        * @param poolName The pool name, used as the threads' name prefix
+        * @param exceptionHandler The uncaught exception handler for the 
threads
+        */
+       public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler 
exceptionHandler) {
+               checkNotNull(poolName, "poolName");
+
+               SecurityManager securityManager = System.getSecurityManager();
+               this.group = (securityManager != null) ? 
securityManager.getThreadGroup() :
+                               Thread.currentThread().getThreadGroup();
+
+               this.namePrefix = poolName + "-thread-";
+               this.exceptionHandler = exceptionHandler;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Thread newThread(Runnable runnable) {
+               Thread t = new Thread(group, runnable, namePrefix + 
threadNumber.getAndIncrement());
                t.setDaemon(true);
-               t.setUncaughtExceptionHandler(EXCEPTION_HANDLER);
+
+               // normalize the priority
+               if (t.getPriority() != Thread.NORM_PRIORITY) {
+                       t.setPriority(Thread.NORM_PRIORITY);
+               }
+
+               // optional handler for uncaught exceptions
+               if (exceptionHandler != null) {
+                       t.setUncaughtExceptionHandler(exceptionHandler);
+               }
+
                return t;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       private static final class LoggingExceptionHander implements 
Thread.UncaughtExceptionHandler {
+
+       private static final class FatalExitExceptionHandler implements 
UncaughtExceptionHandler {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(FatalExitExceptionHandler.class);
+
+               static final FatalExitExceptionHandler INSTANCE = new 
FatalExitExceptionHandler(); 
 
                @Override
                public void uncaughtException(Thread t, Throwable e) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Thread '" + t.getName() + "' 
produced an uncaught exception.", e);
-                       }
+                       LOG.error("FATAL: Thread '" + t.getName() + "' produced 
an uncaught exception. Stopping the process...", e);
+                       System.exit(-17);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
deleted file mode 100644
index bd97963..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Thread factory which allows to specify a thread pool name and a thread name.
- *
- * The code is based on {@link 
java.util.concurrent.Executors.DefaultThreadFactory}.
- */
-public class NamedThreadFactory implements ThreadFactory {
-       private static final AtomicInteger poolNumber = new AtomicInteger(1);
-       private final ThreadGroup group;
-       private final AtomicInteger threadNumber = new AtomicInteger(1);
-       private final String namePrefix;
-
-       public NamedThreadFactory(final String poolName, final String 
threadName) {
-               SecurityManager securityManager = System.getSecurityManager();
-               group = (securityManager != null) ? 
securityManager.getThreadGroup() :
-                       Thread.currentThread().getThreadGroup();
-
-               namePrefix = poolName +
-                       poolNumber.getAndIncrement() +
-                       threadName;
-       }
-
-       @Override
-       public Thread newThread(Runnable runnable) {
-               Thread t = new Thread(group, runnable,
-                       namePrefix + threadNumber.getAndIncrement(),
-                       0);
-               if (t.isDaemon()) {
-                       t.setDaemon(false);
-               }
-               if (t.getPriority() != Thread.NORM_PRIORITY) {
-                       t.setPriority(Thread.NORM_PRIORITY);
-               }
-               return t;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d575f68..a335916 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2023,11 +2023,11 @@ object JobManager {
 
     val futureExecutor = Executors.newScheduledThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-future-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-future"))
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+      new ExecutorThreadFactory("jobmanager-io"))
 
     val timeout = AkkaUtils.getTimeout(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 64cc97d..07fb996 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.minicluster
 
-import java.net.InetAddress
 import java.util.UUID
 import java.util.concurrent.{Executors, TimeUnit}
 
@@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
-import org.apache.flink.api.common.time.Time
+
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, 
ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, 
ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -109,11 +109,11 @@ abstract class FlinkMiniCluster(
 
   val futureExecutor = Executors.newScheduledThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-future-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-future"))
 
   val ioExecutor = Executors.newFixedThreadPool(
     Hardware.getNumberCPUCores(),
-    new NamedThreadFactory("mini-cluster-io-", "-thread"))
+    new ExecutorThreadFactory("mini-cluster-io"))
 
   def configuration: Configuration = {
     if (originalConfiguration.getInteger(

http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 29f1827..5cc51e4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.NamedThreadFactory;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner {
 
                final ScheduledExecutorService futureExecutor = 
Executors.newScheduledThreadPool(
                        numberProcessors,
-                       new NamedThreadFactory("yarn-jobmanager-future-", 
"-thread-"));
+                       new ExecutorThreadFactory("yarn-jobmanager-future"));
 
                final ExecutorService ioExecutor = Executors.newFixedThreadPool(
                        numberProcessors,
-                       new NamedThreadFactory("yarn-jobmanager-io-", 
"-thread-"));
+                       new ExecutorThreadFactory("yarn-jobmanager-io"));
 
                try {
                        // ------- (1) load and parse / validate all 
configurations -------

Reply via email to