This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b737b718596 [FLINK-34206][runtime] Fix potential job failure due to concurrent global failure and source paralelism inference b737b718596 is described below commit b737b71859672e8020881ce2abf998735ee98abb Author: sunxia <xingbe...@gmail.com> AuthorDate: Tue Jan 30 14:26:26 2024 +0800 [FLINK-34206][runtime] Fix potential job failure due to concurrent global failure and source paralelism inference This closes #24223. --- .../runtime/scheduler/ExecutionVertexVersioner.java | 2 +- .../scheduler/adaptivebatch/AdaptiveBatchScheduler.java | 16 ++++++++++++++-- .../apache/flink/test/streaming/runtime/CacheITCase.java | 2 -- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java index b0a0b17db0d..a86d22fc40c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java @@ -92,7 +92,7 @@ public class ExecutionVertexVersioner { ExecutionVertexVersion::getExecutionVertexId, Function.identity())); } - ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID executionVertexId) { + public ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID executionVertexId) { final long currentVersion = getCurrentVersion(executionVertexId); return new ExecutionVertexVersion(executionVertexId, currentVersion); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 97b4b24f8ac..83fb50f1514 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint; @@ -62,6 +63,7 @@ import org.apache.flink.runtime.scheduler.DefaultScheduler; import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; import org.apache.flink.runtime.scheduler.ExecutionOperations; import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -187,8 +189,10 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { protected void startSchedulingInternal() { tryComputeSourceParallelismThenRunAsync( (Void value, Throwable throwable) -> { - initializeVerticesIfPossible(); - super.startSchedulingInternal(); + if (getExecutionGraph().getState() == JobStatus.CREATED) { + initializeVerticesIfPossible(); + super.startSchedulingInternal(); + } }); } @@ -196,8 +200,16 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { protected void onTaskFinished(final Execution execution, final IOMetrics ioMetrics) { checkNotNull(ioMetrics); updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes()); + ExecutionVertexVersion currentVersion = + executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID()); tryComputeSourceParallelismThenRunAsync( (Void value, Throwable throwable) -> { + if (executionVertexVersioner.isModified(currentVersion)) { + log.debug( + "Initialization of vertices will be skipped, because the execution" + + " vertex version has been modified."); + return; + } initializeVerticesIfPossible(); super.onTaskFinished(execution, ioMetrics); }); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java index 8866b205ce1..c60595de6b3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java @@ -48,7 +48,6 @@ import org.apache.flink.util.OutputTag; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -218,7 +217,6 @@ public class CacheITCase extends AbstractTestBase { } @Test - @Disabled void testRetryOnCorruptedClusterDataset(@TempDir java.nio.file.Path tmpDir) throws Exception { File file = prepareTestData(tmpDir);