This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b867d8a6c187d6bef45294a6e6a524254410167
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 13 14:36:32 2021 +0200

    [FLINK-22545][coordination] Fix check during creation of Source Coordinator 
thread.
    
    The check was meant as a safeguard to prevent re-instantiation after fatal 
errors killed a previous thread.
    But the check was susceptible to thread termination due to idleness in the 
executor.
    
    This updates the check to only fail if there is in fact an instantiation 
next to a running thread, or after a
    previously crashed thread.
---
 .../coordinator/SourceCoordinatorProvider.java     | 33 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index bb6f835..56248f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.util.FatalExitExceptionHandler;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -85,13 +87,16 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
     }
 
     /** A thread factory class that provides some helper methods. */
-    public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+    public static class CoordinatorExecutorThreadFactory
+            implements ThreadFactory, Thread.UncaughtExceptionHandler {
 
         private final String coordinatorThreadName;
         private final ClassLoader cl;
         private final Thread.UncaughtExceptionHandler errorHandler;
 
-        private Thread t;
+        @Nullable private Thread t;
+
+        @Nullable private volatile Throwable previousFailureReason;
 
         CoordinatorExecutorThreadFactory(
                 final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
@@ -110,18 +115,32 @@ public class SourceCoordinatorProvider<SplitT extends 
SourceSplit>
 
         @Override
         public synchronized Thread newThread(Runnable r) {
-            if (t != null) {
+            if (t != null && t.isAlive()) {
+                throw new Error(
+                        "Source Coordinator Thread already exists. There 
should never be more than one "
+                                + "thread driving the actions of a Source 
Coordinator. Existing Thread: "
+                                + t);
+            }
+            if (t != null && previousFailureReason != null) {
                 throw new Error(
-                        "This indicates that a fatal error has happened and 
caused the "
-                                + "coordinator executor thread to exit. Check 
the earlier logs "
-                                + "to see the root cause of the problem.");
+                        "The following fatal error has happened in a 
previously spawned "
+                                + "Source Coordinator thread. No new thread 
can be spawned.",
+                        previousFailureReason);
             }
             t = new Thread(r, coordinatorThreadName);
             t.setContextClassLoader(cl);
-            t.setUncaughtExceptionHandler(errorHandler);
+            t.setUncaughtExceptionHandler(this);
             return t;
         }
 
+        @Override
+        public synchronized void uncaughtException(Thread t, Throwable e) {
+            if (previousFailureReason == null) {
+                previousFailureReason = e;
+            }
+            errorHandler.uncaughtException(t, e);
+        }
+
         String getCoordinatorThreadName() {
             return coordinatorThreadName;
         }

Reply via email to