This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b737b718596 [FLINK-34206][runtime] Fix potential job failure due to 
concurrent global failure and source paralelism inference
b737b718596 is described below

commit b737b71859672e8020881ce2abf998735ee98abb
Author: sunxia <xingbe...@gmail.com>
AuthorDate: Tue Jan 30 14:26:26 2024 +0800

    [FLINK-34206][runtime] Fix potential job failure due to concurrent global 
failure and source paralelism inference
    
    This closes #24223.
---
 .../runtime/scheduler/ExecutionVertexVersioner.java      |  2 +-
 .../scheduler/adaptivebatch/AdaptiveBatchScheduler.java  | 16 ++++++++++++++--
 .../apache/flink/test/streaming/runtime/CacheITCase.java |  2 --
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
index b0a0b17db0d..a86d22fc40c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -92,7 +92,7 @@ public class ExecutionVertexVersioner {
                                 ExecutionVertexVersion::getExecutionVertexId, 
Function.identity()));
     }
 
-    ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID 
executionVertexId) {
+    public ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID 
executionVertexId) {
         final long currentVersion = getCurrentVersion(executionVertexId);
         return new ExecutionVertexVersion(executionVertexId, currentVersion);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 97b4b24f8ac..83fb50f1514 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionOperations;
 import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
 import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -187,8 +189,10 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
     protected void startSchedulingInternal() {
         tryComputeSourceParallelismThenRunAsync(
                 (Void value, Throwable throwable) -> {
-                    initializeVerticesIfPossible();
-                    super.startSchedulingInternal();
+                    if (getExecutionGraph().getState() == JobStatus.CREATED) {
+                        initializeVerticesIfPossible();
+                        super.startSchedulingInternal();
+                    }
                 });
     }
 
@@ -196,8 +200,16 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
     protected void onTaskFinished(final Execution execution, final IOMetrics 
ioMetrics) {
         checkNotNull(ioMetrics);
         updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
+        ExecutionVertexVersion currentVersion =
+                
executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID());
         tryComputeSourceParallelismThenRunAsync(
                 (Void value, Throwable throwable) -> {
+                    if (executionVertexVersioner.isModified(currentVersion)) {
+                        log.debug(
+                                "Initialization of vertices will be skipped, 
because the execution"
+                                        + " vertex version has been 
modified.");
+                        return;
+                    }
                     initializeVerticesIfPossible();
                     super.onTaskFinished(execution, ioMetrics);
                 });
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
index 8866b205ce1..c60595de6b3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
@@ -48,7 +48,6 @@ import org.apache.flink.util.OutputTag;
 import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -218,7 +217,6 @@ public class CacheITCase extends AbstractTestBase {
     }
 
     @Test
-    @Disabled
     void testRetryOnCorruptedClusterDataset(@TempDir java.nio.file.Path 
tmpDir) throws Exception {
         File file = prepareTestData(tmpDir);
 

Reply via email to