This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bce9c09ca4 [FLINK-32876][runtime] Prevent 
ExecutionTimeBasedSlowTaskDetector from identifying tasks in CREATED state as 
slow tasks.
4bce9c09ca4 is described below

commit 4bce9c09ca4fdf3ad8bf95ba5cf4ca361acea156
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Fri Aug 18 10:42:25 2023 +0800

    [FLINK-32876][runtime] Prevent ExecutionTimeBasedSlowTaskDetector from 
identifying tasks in CREATED state as slow tasks.
    
    This closes #23222.
---
 .../ExecutionTimeBasedSlowTaskDetector.java         | 12 +++++++++++-
 .../ExecutionTimeBasedSlowTaskDetectorTest.java     | 21 +++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
index 34cb5b47b36..f6d08548a0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetector.java
@@ -214,7 +214,17 @@ public class ExecutionTimeBasedSlowTaskDetector implements 
SlowTaskDetector {
             ExecutionTimeWithInputBytes baseline,
             long currentTimeMillis) {
         return executions.stream()
-                .filter(e -> !e.getState().isTerminal() && e.getState() != 
ExecutionState.CANCELING)
+                .filter(
+                        // We will filter out tasks that are in the CREATED 
state, as we do not
+                        // allow speculative execution for them because they 
have not been
+                        // scheduled.
+                        // However, for tasks that are already in the 
SCHEDULED state, we allow
+                        // speculative execution to provide the capability of 
parallel execution
+                        // running.
+                        e ->
+                                !e.getState().isTerminal()
+                                        && e.getState() != 
ExecutionState.CANCELING
+                                        && e.getState() != 
ExecutionState.CREATED)
                 .filter(
                         e -> {
                             ExecutionTimeWithInputBytes timeWithBytes =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
index b11f86c80d4..1714d79edbf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java
@@ -76,6 +76,27 @@ class ExecutionTimeBasedSlowTaskDetectorTest {
         assertThat(slowTasks).hasSize(parallelism);
     }
 
+    @Test
+    void testAllTasksInCreatedAndNoSlowTasks() throws Exception {
+        final int parallelism = 3;
+        final JobVertex jobVertex = createNoOpVertex(parallelism);
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(jobVertex);
+
+        // all tasks are in the CREATED state, which is not classified as slow 
tasks.
+        final ExecutionGraph executionGraph =
+                SchedulerTestingUtils.createScheduler(
+                                jobGraph,
+                                
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .getExecutionGraph();
+
+        final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = 
createSlowTaskDetector(0, 1, 0);
+        final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks 
=
+                slowTaskDetector.findSlowTasks(executionGraph);
+
+        assertThat(slowTasks.size()).isZero();
+    }
+
     @Test
     void testFinishedTaskNotExceedRatio() throws Exception {
         final int parallelism = 3;

Reply via email to