This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 17a294d [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. 17a294d is described below commit 17a294daacdc67b1939607a41e67e56af2fa6888 Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Jul 13 14:36:32 2021 +0200 [FLINK-22545][coordination] Fix check during creation of Source Coordinator thread. The check was meant as a safeguard to prevent re-instantiation after fatal errors killed a previous thread. But the check was susceptible to thread termination due to idleness in the executor. This updates the check to only fail if there is in fact an instantiation next to a running thread, or after a previously crashed thread. --- .../coordinator/SourceCoordinatorProvider.java | 33 +++++++++++++++++----- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java index 563ed28..1660027 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; import org.apache.flink.runtime.util.FatalExitExceptionHandler; +import javax.annotation.Nullable; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,13 +87,16 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> } /** A thread factory class that provides some helper methods. */ - public static class CoordinatorExecutorThreadFactory implements ThreadFactory { + public static class CoordinatorExecutorThreadFactory + implements ThreadFactory, Thread.UncaughtExceptionHandler { private final String coordinatorThreadName; private final ClassLoader cl; private final Thread.UncaughtExceptionHandler errorHandler; - private Thread t; + @Nullable private Thread t; + + @Nullable private volatile Throwable previousFailureReason; CoordinatorExecutorThreadFactory( final String coordinatorThreadName, final ClassLoader contextClassLoader) { @@ -110,18 +115,32 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> @Override public synchronized Thread newThread(Runnable r) { - if (t != null) { + if (t != null && t.isAlive()) { + throw new Error( + "Source Coordinator Thread already exists. There should never be more than one " + + "thread driving the actions of a Source Coordinator. Existing Thread: " + + t); + } + if (t != null && previousFailureReason != null) { throw new Error( - "This indicates that a fatal error has happened and caused the " - + "coordinator executor thread to exit. Check the earlier logs" - + "to see the root cause of the problem."); + "The following fatal error has happened in a previously spawned " + + "Source Coordinator thread. No new thread can be spawned.", + previousFailureReason); } t = new Thread(r, coordinatorThreadName); t.setContextClassLoader(cl); - t.setUncaughtExceptionHandler(errorHandler); + t.setUncaughtExceptionHandler(this); return t; } + @Override + public synchronized void uncaughtException(Thread t, Throwable e) { + if (previousFailureReason == null) { + previousFailureReason = e; + } + errorHandler.uncaughtException(t, e); + } + String getCoordinatorThreadName() { return coordinatorThreadName; }