[FLINK-5759] [jobmanager] Set UncaughtExceptionHandlers for JobManager's Future and I/O thread pools
This closes #3290 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef77c254 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef77c254 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef77c254 Branch: refs/heads/master Commit: ef77c254dadbe4c04810681fe765f5ec7d2a7400 Parents: 6630513 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 9 14:04:17 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Feb 10 16:28:30 2017 +0100 ---------------------------------------------------------------------- .../MesosApplicationMasterRunner.java | 10 +- .../flink/runtime/filecache/FileCache.java | 3 +- .../runtime/jobmaster/JobManagerServices.java | 6 +- .../runtime/util/ExecutorThreadFactory.java | 123 ++++++++++++++----- .../flink/runtime/util/NamedThreadFactory.java | 58 --------- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../runtime/minicluster/FlinkMiniCluster.scala | 10 +- .../flink/yarn/YarnApplicationMasterRunner.java | 8 +- 8 files changed, 119 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 5033692..a23c9f6 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -22,10 +22,12 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -52,15 +54,17 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; import org.apache.mesos.Protos; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -216,11 +220,11 @@ public class MesosApplicationMasterRunner { futureExecutor = Executors.newScheduledThreadPool( numberProcessors, - new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); + new ExecutorThreadFactory("mesos-jobmanager-future")); ioExecutor = Executors.newFixedThreadPool( numberProcessors, - new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); + new ExecutorThreadFactory("mesos-jobmanager-io")); mesosServices = MesosServicesUtils.createMesosServices(config); http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index 21456de..4f2166f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -99,7 +99,8 @@ public class FileCache { this.shutdownHook = createShutdownHook(this, LOG); this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>(); - this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE); + this.executorService = Executors.newScheduledThreadPool(10, + new ExecutorThreadFactory("flink-file-cache")); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java index 95500e5..8cda0f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -120,8 +120,12 @@ public class JobManagerServices { throw new IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage()); } + final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool( + Hardware.getNumberCPUCores(), + new ExecutorThreadFactory("jobmanager-future")); + return new JobManagerServices( - Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE), + futureExecutor, libraryCacheManager, RestartStrategyFactory.createRestartStrategyFactory(config), Time.of(timeout.length(), timeout.unit())); http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java index 2fb5972..4a79db3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorThreadFactory.java @@ -18,49 +18,114 @@ package org.apache.flink.runtime.util; +import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A thread factory intended for use by critical thread pools. Critical thread pools here + * mean thread pools that support Flink's core coordination and processing work, and which + * must not simply cause unnoticed errors. + * + * <p>The thread factory can be given an {@link UncaughtExceptionHandler} for the threads. + * If no handler is explicitly given, the default handler for uncaught exceptions will log + * the exceptions and kill the process afterwards. That guarantees that critical exceptions are + * not accidentally lost and leave the system running in an inconsistent state. + * + * <p>Threads created by this factory are all called '(pool-name)-thread-n', where + * <i>(pool-name)</i> is configurable, and <i>n</i> is an incrementing number. + * + * <p>All threads created by this factory are daemon threads and have the default (normal) + * priority. + */ public class ExecutorThreadFactory implements ThreadFactory { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutorThreadFactory.class); - - - private static final String THREAD_NAME_PREFIX = "Flink Executor Thread - "; - - private static final AtomicInteger COUNTER = new AtomicInteger(1); - - private static final ThreadGroup THREAD_GROUP = new ThreadGroup("Flink Executor Threads"); - - private static final Thread.UncaughtExceptionHandler EXCEPTION_HANDLER = new LoggingExceptionHander(); - - - public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory(); - - // -------------------------------------------------------------------------------------------- - - private ExecutorThreadFactory() {} - - - public Thread newThread(Runnable target) { - Thread t = new Thread(THREAD_GROUP, target, THREAD_NAME_PREFIX + COUNTER.getAndIncrement()); + + /** The thread pool name used when no explicit pool name has been specified */ + private static final String DEFAULT_POOL_NAME = "flink-executor-pool"; + + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private final ThreadGroup group; + + private final String namePrefix; + + private final UncaughtExceptionHandler exceptionHandler; + + // ------------------------------------------------------------------------ + + /** + * Creates a new thread factory using the default thread pool name ('flink-executor-pool') + * and the default uncaught exception handler (log exception and kill process). + */ + public ExecutorThreadFactory() { + this(DEFAULT_POOL_NAME); + } + + /** + * Creates a new thread factory using the given thread pool name and the default + * uncaught exception handler (log exception and kill process). + * + * @param poolName The pool name, used as the threads' name prefix + */ + public ExecutorThreadFactory(String poolName) { + this(poolName, FatalExitExceptionHandler.INSTANCE); + } + + /** + * Creates a new thread factory using the given thread pool name and the given + * uncaught exception handler. + * + * @param poolName The pool name, used as the threads' name prefix + * @param exceptionHandler The uncaught exception handler for the threads + */ + public ExecutorThreadFactory(String poolName, UncaughtExceptionHandler exceptionHandler) { + checkNotNull(poolName, "poolName"); + + SecurityManager securityManager = System.getSecurityManager(); + this.group = (securityManager != null) ? securityManager.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + + this.namePrefix = poolName + "-thread-"; + this.exceptionHandler = exceptionHandler; + } + + // ------------------------------------------------------------------------ + + @Override + public Thread newThread(Runnable runnable) { + Thread t = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(true); - t.setUncaughtExceptionHandler(EXCEPTION_HANDLER); + + // normalize the priority + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + + // optional handler for uncaught exceptions + if (exceptionHandler != null) { + t.setUncaughtExceptionHandler(exceptionHandler); + } + return t; } - + // -------------------------------------------------------------------------------------------- - - private static final class LoggingExceptionHander implements Thread.UncaughtExceptionHandler { + + private static final class FatalExitExceptionHandler implements UncaughtExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FatalExitExceptionHandler.class); + + static final FatalExitExceptionHandler INSTANCE = new FatalExitExceptionHandler(); @Override public void uncaughtException(Thread t, Throwable e) { - if (LOG.isErrorEnabled()) { - LOG.error("Thread '" + t.getName() + "' produced an uncaught exception.", e); - } + LOG.error("FATAL: Thread '" + t.getName() + "' produced an uncaught exception. Stopping the process...", e); + System.exit(-17); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java deleted file mode 100644 index bd97963..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NamedThreadFactory.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Thread factory which allows to specify a thread pool name and a thread name. - * - * The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}. - */ -public class NamedThreadFactory implements ThreadFactory { - private static final AtomicInteger poolNumber = new AtomicInteger(1); - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - private final String namePrefix; - - public NamedThreadFactory(final String poolName, final String threadName) { - SecurityManager securityManager = System.getSecurityManager(); - group = (securityManager != null) ? securityManager.getThreadGroup() : - Thread.currentThread().getThreadGroup(); - - namePrefix = poolName + - poolNumber.getAndIncrement() + - threadName; - } - - @Override - public Thread newThread(Runnable runnable) { - Thread t = new Thread(group, runnable, - namePrefix + threadNumber.getAndIncrement(), - 0); - if (t.isDaemon()) { - t.setDaemon(false); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d575f68..a335916 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2023,11 +2023,11 @@ object JobManager { val futureExecutor = Executors.newScheduledThreadPool( numberProcessors, - new NamedThreadFactory("jobmanager-future-", "-thread-")) + new ExecutorThreadFactory("jobmanager-future")) val ioExecutor = Executors.newFixedThreadPool( numberProcessors, - new NamedThreadFactory("jobmanager-io-", "-thread-")) + new ExecutorThreadFactory("jobmanager-io")) val timeout = AkkaUtils.getTimeout(configuration) http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 64cc97d..07fb996 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -18,7 +18,6 @@ package org.apache.flink.runtime.minicluster -import java.net.InetAddress import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} @@ -26,7 +25,7 @@ import akka.pattern.Patterns.gracefulStop import akka.pattern.ask import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config -import org.apache.flink.api.common.time.Time + import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils @@ -37,8 +36,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.HighAvailabilityMode import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService} import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager -import org.apache.flink.runtime.util.{Hardware, NamedThreadFactory, ZooKeeperUtils} +import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware, ZooKeeperUtils} import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} + import org.slf4j.LoggerFactory import scala.concurrent.duration.{Duration, FiniteDuration} @@ -109,11 +109,11 @@ abstract class FlinkMiniCluster( val futureExecutor = Executors.newScheduledThreadPool( Hardware.getNumberCPUCores(), - new NamedThreadFactory("mini-cluster-future-", "-thread")) + new ExecutorThreadFactory("mini-cluster-future")) val ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), - new NamedThreadFactory("mini-cluster-io-", "-thread")) + new ExecutorThreadFactory("mini-cluster-io")) def configuration: Configuration = { if (originalConfiguration.getInteger( http://git-wip-us.apache.org/repos/asf/flink/blob/ef77c254/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 29f1827..5cc51e4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -38,14 +38,14 @@ import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; - import org.apache.flink.yarn.cli.FlinkYarnSessionCli; + import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -230,11 +230,11 @@ public class YarnApplicationMasterRunner { final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool( numberProcessors, - new NamedThreadFactory("yarn-jobmanager-future-", "-thread-")); + new ExecutorThreadFactory("yarn-jobmanager-future")); final ExecutorService ioExecutor = Executors.newFixedThreadPool( numberProcessors, - new NamedThreadFactory("yarn-jobmanager-io-", "-thread-")); + new ExecutorThreadFactory("yarn-jobmanager-io")); try { // ------- (1) load and parse / validate all configurations -------