[GitHub] [flink] becketqin commented on a change in pull request #12234: [FLINK-16986][coordination] Provide exactly-once guaranteed around checkpoints and operator event sending

2020-05-29 Thread GitBox


becketqin commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r432805261



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##
@@ -99,29 +187,110 @@ public void close() throws Exception {
 
@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
+   mainThreadExecutor.assertRunningInMainThread();
coordinator.handleEventFromOperator(subtask, event);
}
 
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+   mainThreadExecutor.assertRunningInMainThread();
coordinator.subtaskFailed(subtask, reason);
+   eventValve.resetForTask(subtask);
}
 
@Override
-   public CompletableFuture checkpointCoordinator(long 
checkpointId) throws Exception {
-   return coordinator.checkpointCoordinator(checkpointId);
+   public void checkpointCoordinator(long checkpointId, 
CompletableFuture result) {
+   // unfortunately, this method does not run in the scheduler 
executor, but in the
+   // checkpoint coordinator time thread.
+   // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+   // main thread executor
+   mainThreadExecutor.execute(() -> 
checkpointCoordinatorInternal(checkpointId, result));
}
 
@Override
public void checkpointComplete(long checkpointId) {
-   coordinator.checkpointComplete(checkpointId);
+   // unfortunately, this method does not run in the scheduler 
executor, but in the
+   // checkpoint coordinator time thread.
+   // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+   // main thread executor
+   mainThreadExecutor.execute(() -> 
checkpointCompleteInternal(checkpointId));
}
 
@Override
public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+   // ideally we would like to check this here, however this 
method is called early during
+   // execution graph construction, before the main thread 
executor is set
+
+   eventValve.reset();

Review comment:
   It seems we have a minor race conditions here. It is possible that the 
coordinator sends some events between line 224 and line 225, i.e. after the 
eventValve is reset and before the coordinator has been reset to the checkpoint.
   
   I think those events won't be actually sent to the tasks because the 
operator coordinators are only restored in a global recovery, and the tasks 
shouldn't have started yet. If so, there is no real impact here except that 
those events will receive a different exception from the events failed by 
`eventValve.reset()`.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;

[GitHub] [flink] becketqin commented on a change in pull request #12234: [FLINK-16986][coordination] Provide exactly-once guaranteed around checkpoints and operator event sending

2020-05-27 Thread GitBox


becketqin commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r431007926



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##
@@ -58,7 +58,17 @@
final Collection> 
individualSnapshots = new ArrayList<>(coordinators.size());
 
for (final OperatorCoordinatorCheckpointContext coordinator : 
coordinators) {
-   
individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, 
checkpointId));
+   final CompletableFuture 
checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+   coordinator.onCallTriggerCheckpoint(checkpointId);
+
+   individualSnapshots.add(checkpointFuture);
+   checkpointFuture.whenComplete((ignored, failure) -> {
+   if (failure != null) {
+   coordinator.abortCurrentTriggering();
+   } else {
+   
coordinator.onCheckpointStateFutureComplete(checkpointId);

Review comment:
   I don't have a code snippet in hand right now. But I can create one and 
run in a tight loop to show the race condition.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on a change in pull request #12234: [FLINK-16986][coordination] Provide exactly-once guaranteed around checkpoints and operator event sending

2020-05-26 Thread GitBox


becketqin commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r429987997



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##
@@ -58,7 +58,17 @@
final Collection> 
individualSnapshots = new ArrayList<>(coordinators.size());
 
for (final OperatorCoordinatorCheckpointContext coordinator : 
coordinators) {
-   
individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator, 
checkpointId));
+   final CompletableFuture 
checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+   coordinator.onCallTriggerCheckpoint(checkpointId);
+
+   individualSnapshots.add(checkpointFuture);
+   checkpointFuture.whenComplete((ignored, failure) -> {
+   if (failure != null) {
+   coordinator.abortCurrentTriggering();
+   } else {
+   
coordinator.onCheckpointStateFutureComplete(checkpointId);

Review comment:
   If the `checkpointFuture` is completed by the coordinator executor 
before `whenComplete` is invoked here which shuts the valve, it seems possible 
that some events get into the valve after the snapshot is taken but before the 
valve is shut. That may result in inconsistent state between the TM and 
Coordinator.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org