This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 904536271081f393a8c29341831275080b04255a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Apr 14 05:02:55 2021 +0200

    [FLINK-21996][coordination] Ensure exactly-once guarantees for 
OperatorEvent RPCs
    
    This consists of two changes that work together:
      - Delay checkpoints until we have clarity about all in-flight 
OperatorEvents
      - Fail target subtask if the result future for an OperatorEvent send fails
---
 .../operators/coordination/EventSender.java        | 37 +++++++++++
 .../coordination/ExecutionSubtaskAccess.java       | 10 +++
 .../coordination/OperatorCoordinatorHolder.java    | 72 +++++++++++++++++++++-
 .../operators/coordination/OperatorEventValve.java |  7 ++-
 .../operators/coordination/SubtaskAccess.java      | 11 ++++
 .../operators/coordination/SubtaskGatewayImpl.java | 30 +++++++--
 .../util/IncompleteFuturesTracker.java             | 24 +++++---
 .../coordination/EventReceivingTasks.java          | 34 ++++++----
 .../OperatorCoordinatorHolderTest.java             | 18 ++++++
 .../util/IncompleteFuturesTrackerTest.java         | 18 +++++-
 .../OperatorEventSendingCheckpointITCase.java      |  5 +-
 11 files changed, 230 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java
new file mode 100644
index 0000000..c70a21a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/EventSender.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/** Simple interface for a component that takes and sends events. */
+@FunctionalInterface
+interface EventSender {
+
+    /**
+     * Takes the given Callable and calls it at a certain point to send the 
event. The result of
+     * that Callable are bridged to the given result future.
+     */
+    void sendEvent(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java
index fc363de..9babeb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java
@@ -81,6 +81,11 @@ final class ExecutionSubtaskAccess implements SubtaskAccess {
     }
 
     @Override
+    public String subtaskName() {
+        return taskExecution.getVertexWithAttempt();
+    }
+
+    @Override
     public CompletableFuture<?> hasSwitchedToRunning() {
         return taskExecution.getInitializingOrRunningFuture();
     }
@@ -91,6 +96,11 @@ final class ExecutionSubtaskAccess implements SubtaskAccess {
                 || taskExecution.getState() == ExecutionState.INITIALIZING;
     }
 
+    @Override
+    public void triggerTaskFailover(Throwable cause) {
+        taskExecution.fail(cause);
+    }
+
     // ------------------------------------------------------------------------
 
     static final class ExecutionJobVertexSubtaskAccess implements 
SubtaskAccessFactory {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 8d96e15..a38ca16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
@@ -34,6 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
@@ -112,11 +116,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 public class OperatorCoordinatorHolder
         implements OperatorCoordinatorCheckpointContext, AutoCloseable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinatorHolder.class);
+
     private final OperatorCoordinator coordinator;
     private final OperatorID operatorId;
     private final LazyInitializedCoordinatorContext context;
     private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
     private final OperatorEventValve eventValve;
+    private final IncompleteFuturesTracker unconfirmedEvents;
+    private final EventSender eventSender;
 
     private final int operatorParallelism;
     private final int operatorMaxParallelism;
@@ -139,7 +147,9 @@ public class OperatorCoordinatorHolder
         this.operatorParallelism = operatorParallelism;
         this.operatorMaxParallelism = operatorMaxParallelism;
 
+        this.unconfirmedEvents = new IncompleteFuturesTracker();
         this.eventValve = new OperatorEventValve();
+        this.eventSender = new ValveAndTrackerSender(eventValve, 
unconfirmedEvents);
     }
 
     public void lazyInitialize(
@@ -278,7 +288,7 @@ public class OperatorCoordinatorHolder
                     if (failure != null) {
                         result.completeExceptionally(failure);
                     } else if (eventValve.tryShutValve(checkpointId)) {
-                        result.complete(success);
+                        completeCheckpointOnceEventsAreDone(checkpointId, 
result, success);
                     } else {
                         // if we cannot shut the valve, this means the 
checkpoint
                         // has been aborted before, so the future is already
@@ -299,6 +309,43 @@ public class OperatorCoordinatorHolder
         }
     }
 
+    private void completeCheckpointOnceEventsAreDone(
+            final long checkpointId,
+            final CompletableFuture<byte[]> checkpointFuture,
+            final byte[] checkpointResult) {
+
+        final Collection<CompletableFuture<?>> pendingEvents =
+                unconfirmedEvents.getCurrentIncompleteAndReset();
+        if (pendingEvents.isEmpty()) {
+            checkpointFuture.complete(checkpointResult);
+            return;
+        }
+
+        LOG.info(
+                "Coordinator checkpoint {} for coordinator {} is awaiting {} 
pending events",
+                checkpointId,
+                operatorId,
+                pendingEvents.size());
+
+        final CompletableFuture<?> conjunct = 
FutureUtils.waitForAll(pendingEvents);
+        conjunct.whenComplete(
+                (success, failure) -> {
+                    if (failure == null) {
+                        checkpointFuture.complete(checkpointResult);
+                    } else {
+                        // if we reach this situation, then anyways the 
checkpoint cannot
+                        // complete because
+                        // (a) the target task really is down
+                        // (b) we have a potentially lost RPC message and need 
to
+                        //     do a task failover for the receiver to restore 
consistency
+                        checkpointFuture.completeExceptionally(
+                                new FlinkException(
+                                        "Failing OperatorCoordinator 
checkpoint because some OperatorEvents "
+                                                + "before this checkpoint 
barrier were not received by the target tasks."));
+                    }
+                });
+    }
+
     // ------------------------------------------------------------------------
     //  Checkpointing Callbacks
     // ------------------------------------------------------------------------
@@ -336,7 +383,7 @@ public class OperatorCoordinatorHolder
         final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
 
         final OperatorCoordinator.SubtaskGateway gateway =
-                new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+                new SubtaskGatewayImpl(sta, eventSender, mainThreadExecutor);
 
         // We need to do this synchronously here, otherwise we violate the 
contract that
         // 'subtaskFailed()' will never overtake 'subtaskReady()'.
@@ -522,4 +569,25 @@ public class OperatorCoordinatorHolder
             return userCodeClassLoader;
         }
     }
+
+    // ------------------------------------------------------------------------
+
+    private static final class ValveAndTrackerSender implements EventSender {
+
+        private final OperatorEventValve valve;
+        private final IncompleteFuturesTracker tracker;
+
+        ValveAndTrackerSender(OperatorEventValve valve, 
IncompleteFuturesTracker tracker) {
+            this.valve = valve;
+            this.tracker = tracker;
+        }
+
+        @Override
+        public void sendEvent(
+                Callable<CompletableFuture<Acknowledge>> sendAction,
+                CompletableFuture<Acknowledge> result) {
+            valve.sendEvent(sendAction, result);
+            tracker.trackFutureWhileIncomplete(result);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index 8e71e36..7086dab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  * that, one can register a "main thread executor" (as used by the mailbox 
components like RPC
  * components) via {@link 
#setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
  */
-final class OperatorEventValve {
+final class OperatorEventValve implements EventSender {
 
     private static final long NO_CHECKPOINT = Long.MIN_VALUE;
 
@@ -55,7 +55,7 @@ final class OperatorEventValve {
     @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
 
     /** Constructs a new OperatorEventValve. */
-    public OperatorEventValve() {
+    OperatorEventValve() {
         this.currentCheckpointId = NO_CHECKPOINT;
         this.lastCheckpointId = Long.MIN_VALUE;
     }
@@ -82,6 +82,7 @@ final class OperatorEventValve {
      * <p>This method makes no assumptions and gives no guarantees from which 
thread the result
      * future gets completed.
      */
+    @Override
     public void sendEvent(
             Callable<CompletableFuture<Acknowledge>> sendAction,
             CompletableFuture<Acknowledge> result) {
@@ -172,7 +173,7 @@ final class OperatorEventValve {
         }
     }
 
-    private static void callSendAction(
+    private void callSendAction(
             Callable<CompletableFuture<Acknowledge>> sendAction,
             CompletableFuture<Acknowledge> result) {
         try {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java
index c654af8..333310c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java
@@ -56,6 +56,12 @@ interface SubtaskAccess {
     ExecutionAttemptID currentAttempt();
 
     /**
+     * Gets a descriptive name of the operator's subtask , including name, 
subtask-id, parallelism,
+     * and execution attempt.
+     */
+    String subtaskName();
+
+    /**
      * The future returned here completes once the target subtask is in a 
running state. As running
      * state classify the states {@link ExecutionState#RUNNING} and {@link
      * ExecutionState#INITIALIZING}.
@@ -68,6 +74,11 @@ interface SubtaskAccess {
      */
     boolean isStillRunning();
 
+    /**
+     * Triggers a failover for the subtaks execution attempt that this access 
instance is bound to.
+     */
+    void triggerTaskFailover(Throwable cause);
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 0e8dade..11f33f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
 
@@ -34,14 +36,17 @@ import java.util.concurrent.Executor;
  */
 class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
 
+    private static final String EVENT_LOSS_ERROR_MESSAGE =
+            "An OperatorEvent from an OperatorCoordinator to a task was lost. "
+                    + "Triggering task failover to ensure consistency. Event: 
'%s', targetTask: %s";
+
     private final SubtaskAccess subtaskAccess;
-    private final OperatorEventValve valve;
+    private final EventSender sender;
     private final Executor sendingExecutor;
 
-    SubtaskGatewayImpl(
-            SubtaskAccess subtaskAccess, OperatorEventValve valve, Executor 
sendingExecutor) {
+    SubtaskGatewayImpl(SubtaskAccess subtaskAccess, EventSender sender, 
Executor sendingExecutor) {
         this.subtaskAccess = subtaskAccess;
-        this.valve = valve;
+        this.sender = sender;
         this.sendingExecutor = sendingExecutor;
     }
 
@@ -64,7 +69,22 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
                 subtaskAccess.createEventSendAction(serializedEvent);
 
         final CompletableFuture<Acknowledge> result = new 
CompletableFuture<>();
-        sendingExecutor.execute(() -> valve.sendEvent(sendAction, result));
+        FutureUtils.assertNoException(
+                result.handleAsync(
+                        (success, failure) -> {
+                            if (failure != null && 
subtaskAccess.isStillRunning()) {
+                                String msg =
+                                        String.format(
+                                                EVENT_LOSS_ERROR_MESSAGE,
+                                                evt,
+                                                subtaskAccess.subtaskName());
+                                subtaskAccess.triggerTaskFailover(new 
FlinkException(msg));
+                            }
+                            return null;
+                        },
+                        sendingExecutor));
+
+        sendingExecutor.execute(() -> sender.sendEvent(sendAction, result));
         return result;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java
index 9904ce0..1f8a9cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTracker.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.runtime.operators.coordination.util;
 
-import org.apache.flink.annotation.VisibleForTesting;
-
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -64,6 +61,22 @@ public final class IncompleteFuturesTracker {
         future.whenComplete((success, failure) -> removeFromSet(future));
     }
 
+    public Collection<CompletableFuture<?>> getCurrentIncompleteAndReset() {
+        lock.lock();
+        try {
+            if (incompleteFutures.isEmpty()) {
+                return Collections.emptySet();
+            }
+
+            final ArrayList<CompletableFuture<?>> futures = new 
ArrayList<>(incompleteFutures);
+            incompleteFutures.clear();
+            return futures;
+
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public void failAllFutures(Throwable cause) {
         final Collection<CompletableFuture<?>> futuresToFail;
 
@@ -95,9 +108,4 @@ public final class IncompleteFuturesTracker {
             lock.unlock();
         }
     }
-
-    @VisibleForTesting
-    Set<CompletableFuture<?>> getTrackedFutures() {
-        return Collections.unmodifiableSet(incompleteFutures);
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index baee4b1..dfed1eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -24,8 +24,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.SerializedValue;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -43,22 +41,27 @@ import java.util.stream.Collectors;
 public class EventReceivingTasks implements SubtaskAccess.SubtaskAccessFactory 
{
 
     public static EventReceivingTasks createForNotYetRunningTasks() {
-        return new EventReceivingTasks(false, null);
+        return new EventReceivingTasks(false, 
CompletableFuture.completedFuture(Acknowledge.get()));
     }
 
     public static EventReceivingTasks createForRunningTasks() {
-        return new EventReceivingTasks(true, null);
+        return new EventReceivingTasks(true, 
CompletableFuture.completedFuture(Acknowledge.get()));
     }
 
     public static EventReceivingTasks 
createForRunningTasksFailingRpcs(Throwable rpcException) {
-        return new EventReceivingTasks(true, rpcException);
+        return new EventReceivingTasks(true, 
FutureUtils.completedExceptionally(rpcException));
+    }
+
+    public static EventReceivingTasks createForRunningTasksWithRpcResult(
+            CompletableFuture<Acknowledge> result) {
+        return new EventReceivingTasks(true, result);
     }
 
     // ------------------------------------------------------------------------
 
     final ArrayList<EventWithSubtask> events = new ArrayList<>();
 
-    @Nullable private final Throwable eventSendingFailureCause;
+    private final CompletableFuture<Acknowledge> eventSendingResult;
 
     private final Map<Integer, TestSubtaskAccess> subtasks = new HashMap<>();
 
@@ -66,9 +69,9 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
 
     private EventReceivingTasks(
             final boolean createdTasksAreRunning,
-            @Nullable final Throwable eventSendingFailureCause) {
+            final CompletableFuture<Acknowledge> eventSendingResult) {
         this.createdTasksAreRunning = createdTasksAreRunning;
-        this.eventSendingFailureCause = eventSendingFailureCause;
+        this.eventSendingResult = eventSendingResult;
     }
 
     // ------------------------------------------------------------------------
@@ -123,10 +126,7 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
     Callable<CompletableFuture<Acknowledge>> createSendAction(OperatorEvent 
event, int subtask) {
         return () -> {
             events.add(new EventWithSubtask(event, subtask));
-
-            return eventSendingFailureCause == null
-                    ? CompletableFuture.completedFuture(Acknowledge.get())
-                    : 
FutureUtils.completedExceptionally(eventSendingFailureCause);
+            return eventSendingResult;
         };
     }
 
@@ -207,6 +207,11 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
         }
 
         @Override
+        public String subtaskName() {
+            return "test_task-" + subtaskIndex + " #: " + executionAttemptId;
+        }
+
+        @Override
         public CompletableFuture<?> hasSwitchedToRunning() {
             return running;
         }
@@ -219,5 +224,10 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
         void switchToRunning() {
             running.complete(null);
         }
+
+        @Override
+        public void triggerTaskFailover(Throwable cause) {
+            // ignore this in the tests
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 4eacd16..3d3cddc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -288,6 +289,23 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
         globalFailure = null;
     }
 
+    @Test
+    public void checkpointCompletionWaitsForEventFutures() throws Exception {
+        final CompletableFuture<Acknowledge> ackFuture = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(ackFuture);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        final CompletableFuture<?> checkpointFuture = 
triggerAndCompleteCheckpoint(holder, 22L);
+        assertFalse(checkpointFuture.isDone());
+
+        ackFuture.complete(Acknowledge.get());
+        assertTrue(checkpointFuture.isDone());
+    }
+
     /**
      * This test verifies that the order of Checkpoint Completion and Event 
Sending observed from
      * the outside matches that from within the OperatorCoordinator.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java
index e994288..a2aff7f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/util/IncompleteFuturesTrackerTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
@@ -40,7 +41,7 @@ public class IncompleteFuturesTrackerTest {
 
         tracker.trackFutureWhileIncomplete(future);
 
-        assertThat(tracker.getTrackedFutures(), contains(future));
+        assertThat(tracker.getCurrentIncompleteAndReset(), contains(future));
     }
 
     @Test
@@ -51,7 +52,7 @@ public class IncompleteFuturesTrackerTest {
         tracker.trackFutureWhileIncomplete(future);
         future.complete(null);
 
-        assertThat(tracker.getTrackedFutures(), not(contains(future)));
+        assertThat(tracker.getCurrentIncompleteAndReset(), 
not(contains(future)));
     }
 
     @Test
@@ -62,7 +63,7 @@ public class IncompleteFuturesTrackerTest {
         future.complete(null);
         tracker.trackFutureWhileIncomplete(future);
 
-        assertThat(tracker.getTrackedFutures(), not(contains(future)));
+        assertThat(tracker.getCurrentIncompleteAndReset(), 
not(contains(future)));
     }
 
     @Test
@@ -102,4 +103,15 @@ public class IncompleteFuturesTrackerTest {
             assertSame(expectedException, e.getCause());
         }
     }
+
+    @Test
+    public void testResetClearsTrackedFutures() {
+        final IncompleteFuturesTracker tracker = new 
IncompleteFuturesTracker();
+
+        final CompletableFuture<?> future = new CompletableFuture<>();
+        tracker.trackFutureWhileIncomplete(future);
+        tracker.getCurrentIncompleteAndReset();
+
+        assertThat(tracker.getCurrentIncompleteAndReset(), empty());
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index bd59ab0..9ca560c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -53,7 +53,6 @@ import org.apache.flink.util.function.TriFunction;
 import akka.actor.ActorSystem;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -107,7 +106,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
      * event was lost and trigger recovery to prevent data loss. Data loss 
would manifest in a
      * stalled test, because we could wait forever to collect the required 
number of events back.
      */
-    @Ignore // ignore for now, because this test fails due to FLINK-21996
     @Test
     public void testOperatorEventLostNoReaderFailure() throws Exception {
         final int[] eventsToLose = new int[] {2, 4, 6};
@@ -125,7 +123,6 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
      * (which is after the second successful event delivery, the fourth 
event), there is
      * additionally a failure on the reader that triggers recovery.
      */
-    @Ignore // ignore for now, because this test fails due to FLINK-21996
     @Test
     public void testOperatorEventLostWithReaderFailure() throws Exception {
         final int[] eventsToLose = new int[] {1, 3};
@@ -224,6 +221,8 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
                                 });
 
         final List<Long> sequence = numbers.executeAndCollect(numElements);
+        // the recovery may change the order of splits, so the sequence might 
be out-of-order
+        sequence.sort(Long::compareTo);
 
         final List<Long> expectedSequence =
                 LongStream.rangeClosed(1L, 
numElements).boxed().collect(Collectors.toList());

Reply via email to