This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ddd456712d9e964d3c6759482d061076c8d6075
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Apr 10 17:38:36 2021 +0200

    [FLINK-21996][refactor] Unify exception handling for Operator Coordinator 
Events sent to not-running tasks
    
    Sending an event to a not-running task sometimes throws an exception 
directly from the method (if the event is immediately sent)
    and sometimes completes the resulting future with an exception (for example 
if the event had to be enqueued until after checkpoint
    barrier injection to preserve exactly-once sematics).
    
    This changes the code to always report those exceptions through the result 
future and never through direct exception throwing,
    to simplify and unify the way this can be handled by the calling code.
---
 .../coordination/OperatorCoordinator.java          |  3 +--
 .../coordination/OperatorCoordinatorHolder.java    |  8 ++++++-
 .../RecreateOnResetOperatorCoordinator.java        |  2 +-
 .../coordinator/SourceCoordinatorContext.java      | 28 ++++------------------
 .../CoordinatorEventsExactlyOnceITCase.java        | 14 +++++------
 .../MockOperatorCoordinatorContext.java            |  3 +--
 6 files changed, 20 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index b71fad4..4f50c8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -202,8 +202,7 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
          * target TaskManager. The future is completed exceptionally if the 
event cannot be sent.
          * That includes situations where the target task is not running.
          */
-        CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int 
targetSubtask)
-                throws TaskNotRunningException;
+        CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int 
targetSubtask);
 
         /**
          * Fails the job and trigger a global failover operation.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 83159d2..d09a4a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -464,7 +465,12 @@ public class OperatorCoordinatorHolder
                 throw new FlinkRuntimeException("Cannot serialize operator 
event", e);
             }
 
-            return eventValve.sendEvent(serializedEvent, targetSubtask);
+            try {
+                return eventValve.sendEvent(serializedEvent, targetSubtask);
+            } catch (Throwable t) {
+                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                return FutureUtils.completedExceptionally(t);
+            }
         }
 
         @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 9685083..5b12388 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -212,7 +212,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
 
         @Override
         public synchronized CompletableFuture<Acknowledge> sendEvent(
-                OperatorEvent evt, int targetSubtask) throws 
TaskNotRunningException {
+                OperatorEvent evt, int targetSubtask) {
             // Do not enter the sending procedure if the context has been 
quiesced.
             if (quiesced) {
                 return CompletableFuture.completedFuture(Acknowledge.get());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 6789832..1c05321 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
@@ -147,16 +146,8 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
     public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
         callInCoordinatorThread(
                 () -> {
-                    try {
-                        operatorCoordinatorContext.sendEvent(
-                                new SourceEventWrapper(event), subtaskId);
-                        return null;
-                    } catch (TaskNotRunningException e) {
-                        throw new FlinkRuntimeException(
-                                String.format(
-                                        "Failed to send event %s to subtask 
%d", event, subtaskId),
-                                e);
-                    }
+                    operatorCoordinatorContext.sendEvent(new 
SourceEventWrapper(event), subtaskId);
+                    return null;
                 },
                 String.format("Failed to send event %s to subtask %d", event, 
subtaskId));
     }
@@ -195,12 +186,6 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
                                             
operatorCoordinatorContext.sendEvent(
                                                     new 
AddSplitEvent<>(splits, splitSerializer),
                                                     id);
-                                        } catch (TaskNotRunningException e) {
-                                            throw new FlinkRuntimeException(
-                                                    String.format(
-                                                            "Failed to assign 
splits %s to reader %d.",
-                                                            splits, id),
-                                                    e);
                                         } catch (IOException e) {
                                             throw new FlinkRuntimeException(
                                                     "Failed to serialize 
splits.", e);
@@ -216,13 +201,8 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
         // Ensure the split assignment is done by the the coordinator executor.
         callInCoordinatorThread(
                 () -> {
-                    try {
-                        operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);
-                        return null; // void return value
-                    } catch (TaskNotRunningException e) {
-                        throw new FlinkRuntimeException(
-                                "Failed to send 'NoMoreSplits' to reader " + 
subtask, e);
-                    }
+                    operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);
+                    return null; // void return value
                 },
                 "Failed to send 'NoMoreSplits' to reader " + subtask);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 968537d..e227278 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -349,15 +349,13 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
             if (nextNumber > maxNumber) {
                 return;
             }
-            try {
-                if (nextNumber == maxNumber) {
-                    context.sendEvent(new EndEvent(), 0);
-                } else {
-                    context.sendEvent(new IntegerEvent(nextNumber), 0);
-                }
-                nextNumber++;
-            } catch (TaskNotRunningException ignored) {
+
+            if (nextNumber == maxNumber) {
+                context.sendEvent(new EndEvent(), 0);
+            } else {
+                context.sendEvent(new IntegerEvent(nextNumber), 0);
             }
+            nextNumber++;
         }
 
         private void checkWhetherToTriggerFailure() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index 4dc71dd..c20915c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -75,8 +75,7 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
     }
 
     @Override
-    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int 
targetSubtask)
-            throws TaskNotRunningException {
+    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int 
targetSubtask) {
         eventsToOperator.computeIfAbsent(targetSubtask, subtaskId -> new 
ArrayList<>()).add(evt);
         if (failEventSending) {
             CompletableFuture<Acknowledge> future = new CompletableFuture<>();

Reply via email to