This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 38b9c280128 [FLINK-32876][runtime] Prevent ExecutionTimeBasedSlowTaskDetector from identifying tasks in CREATED state as slow tasks. 38b9c280128 is described below commit 38b9c280128981b3e809df1f963bdaf8c0491804 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Fri Aug 18 10:42:25 2023 +0800 [FLINK-32876][runtime] Prevent ExecutionTimeBasedSlowTaskDetector from identifying tasks in CREATED state as slow tasks. This closes #23222. --- .../ExecutionTimeBasedSlowTaskDetector.java | 12 +++++++++++- .../ExecutionTimeBasedSlowTaskDetectorTest.java | 21 +++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java index 34cb5b47b36..f6d08548a0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java @@ -214,7 +214,17 @@ public class ExecutionTimeBasedSlowTaskDetector implements SlowTaskDetector { ExecutionTimeWithInputBytes baseline, long currentTimeMillis) { return executions.stream() - .filter(e -> !e.getState().isTerminal() && e.getState() != ExecutionState.CANCELING) + .filter( + // We will filter out tasks that are in the CREATED state, as we do not + // allow speculative execution for them because they have not been + // scheduled. + // However, for tasks that are already in the SCHEDULED state, we allow + // speculative execution to provide the capability of parallel execution + // running. + e -> + !e.getState().isTerminal() + && e.getState() != ExecutionState.CANCELING + && e.getState() != ExecutionState.CREATED) .filter( e -> { ExecutionTimeWithInputBytes timeWithBytes = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java index b11f86c80d4..1714d79edbf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -76,6 +76,27 @@ class ExecutionTimeBasedSlowTaskDetectorTest { assertThat(slowTasks).hasSize(parallelism); } + @Test + void testAllTasksInCreatedAndNoSlowTasks() throws Exception { + final int parallelism = 3; + final JobVertex jobVertex = createNoOpVertex(parallelism); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex); + + // all tasks are in the CREATED state, which is not classified as slow tasks. + final ExecutionGraph executionGraph = + SchedulerTestingUtils.createScheduler( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + EXECUTOR_RESOURCE.getExecutor()) + .getExecutionGraph(); + + final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = createSlowTaskDetector(0, 1, 0); + final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks = + slowTaskDetector.findSlowTasks(executionGraph); + + assertThat(slowTasks.size()).isZero(); + } + @Test void testFinishedTaskNotExceedRatio() throws Exception { final int parallelism = 3;