This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b462d0ec4d1d421a369e45f8dca33284b5be6bc2
Author: sunxia <xingbe...@gmail.com>
AuthorDate: Fri Jun 16 10:14:21 2023 +0800

    [FLINK-32288][runtime] Improve the scheduling performance of 
AdaptiveBatchScheduler
    
    This close #22798.
---
 .../AllFinishedInputConsumableDecider.java         |  5 +++--
 .../strategy/DefaultInputConsumableDecider.java    | 13 +++++++++++++
 .../scheduler/strategy/InputConsumableDecider.java | 10 +++++++++-
 .../PartialFinishedInputConsumableDecider.java     |  5 +++--
 .../strategy/VertexwiseSchedulingStrategy.java     |  5 +++++
 .../DefaultInputConsumableDeciderTest.java         | 22 ++++++++++++++++++++++
 .../strategy/TestingInputConsumableDecider.java    |  6 ++++++
 7 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
index 6c23757a1a7..f8cbb260488 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/AllFinishedInputConsumableDecider.java
@@ -37,14 +37,15 @@ public class AllFinishedInputConsumableDecider implements 
InputConsumableDecider
                 executionVertex.getConsumedPartitionGroups()) {
 
             if (!consumableStatusCache.computeIfAbsent(
-                    consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                    consumedPartitionGroup, 
this::isConsumableBasedOnFinishedProducers)) {
                 return false;
             }
         }
         return true;
     }
 
-    private boolean isConsumedPartitionGroupConsumable(
+    @Override
+    public boolean isConsumableBasedOnFinishedProducers(
             final ConsumedPartitionGroup consumedPartitionGroup) {
         return consumedPartitionGroup.getNumberOfUnfinishedPartitions() == 0;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
index 93db09c94b6..ccd354b0d0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java
@@ -65,6 +65,19 @@ public class DefaultInputConsumableDecider implements 
InputConsumableDecider {
         return true;
     }
 
+    @Override
+    public boolean isConsumableBasedOnFinishedProducers(
+            final ConsumedPartitionGroup consumedPartitionGroup) {
+        if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
+            // For canBePipelined consumed partition group, whether it is 
consumable does not depend
+            // on task finish. To optimize performance and avoid unnecessary 
computation, we simply
+            // return false.
+            return false;
+        } else {
+            return consumedPartitionGroup.areAllPartitionsFinished();
+        }
+    }
+
     private boolean isConsumedPartitionGroupConsumable(
             final ConsumedPartitionGroup consumedPartitionGroup,
             final Set<ExecutionVertexID> verticesToSchedule) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
index e34cb06bc4e..1d19dd2cf62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputConsumableDecider.java
@@ -24,7 +24,7 @@ import java.util.function.Function;
 
 /**
  * {@link InputConsumableDecider} is responsible for determining whether the 
input of an
- * executionVertex is consumable.
+ * executionVertex or a consumed partition group is consumable.
  */
 public interface InputConsumableDecider {
     /**
@@ -41,6 +41,14 @@ public interface InputConsumableDecider {
             Set<ExecutionVertexID> verticesToSchedule,
             Map<ConsumedPartitionGroup, Boolean> consumableStatusCache);
 
+    /**
+     * Determining whether the consumed partition group is consumable based on 
finished producers.
+     *
+     * @param consumedPartitionGroup to be determined whether it is consumable.
+     */
+    boolean isConsumableBasedOnFinishedProducers(
+            final ConsumedPartitionGroup consumedPartitionGroup);
+
     /** Factory for {@link InputConsumableDecider}. */
     interface Factory {
         InputConsumableDecider createInstance(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
index 48d626e4259..df7c353e94e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PartialFinishedInputConsumableDecider.java
@@ -43,14 +43,15 @@ public class PartialFinishedInputConsumableDecider 
implements InputConsumableDec
                 executionVertex.getConsumedPartitionGroups()) {
 
             if (!consumableStatusCache.computeIfAbsent(
-                    consumedPartitionGroup, 
this::isConsumedPartitionGroupConsumable)) {
+                    consumedPartitionGroup, 
this::isConsumableBasedOnFinishedProducers)) {
                 return false;
             }
         }
         return true;
     }
 
-    private boolean isConsumedPartitionGroupConsumable(
+    @Override
+    public boolean isConsumableBasedOnFinishedProducers(
             final ConsumedPartitionGroup consumedPartitionGroup) {
         if (consumedPartitionGroup
                 .getResultPartitionType()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
index 60ee3ab7871..b2d25b1f82f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.java
@@ -103,6 +103,11 @@ public class VertexwiseSchedulingStrategy
                     
IterableUtils.toStream(executionVertex.getProducedResults())
                             
.map(SchedulingResultPartition::getConsumerVertexGroups)
                             .flatMap(Collection::stream)
+                            .filter(
+                                    group ->
+                                            inputConsumableDecider
+                                                    
.isConsumableBasedOnFinishedProducers(
+                                                            
group.getConsumedPartitionGroup()))
                             .flatMap(IterableUtils::toStream)
                             .collect(Collectors.toSet());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
index c0184fe0061..af61b1e3676 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDeciderTest.java
@@ -50,6 +50,17 @@ class DefaultInputConsumableDeciderTest {
         DefaultInputConsumableDecider inputConsumableDecider =
                 createDefaultInputConsumableDecider(Collections.emptySet(), 
topology);
 
+        consumer.forEach(
+                vertex ->
+                        vertex.getConsumedPartitionGroups()
+                                .forEach(
+                                        group ->
+                                                assertThat(
+                                                                
inputConsumableDecider
+                                                                        
.isConsumableBasedOnFinishedProducers(
+                                                                               
 group))
+                                                        .isFalse()));
+
         assertThat(
                         inputConsumableDecider.isInputConsumable(
                                 consumer.get(0), Collections.emptySet(), new 
HashMap<>()))
@@ -78,6 +89,17 @@ class DefaultInputConsumableDeciderTest {
         DefaultInputConsumableDecider inputConsumableDecider =
                 createDefaultInputConsumableDecider(Collections.emptySet(), 
topology);
 
+        consumer.forEach(
+                vertex ->
+                        vertex.getConsumedPartitionGroups()
+                                .forEach(
+                                        group ->
+                                                assertThat(
+                                                                
inputConsumableDecider
+                                                                        
.isConsumableBasedOnFinishedProducers(
+                                                                               
 group))
+                                                        .isTrue()));
+
         assertThat(
                         inputConsumableDecider.isInputConsumable(
                                 consumer.get(0), Collections.emptySet(), new 
HashMap<>()))
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
index da2aa8241ac..67f3af93180 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingInputConsumableDecider.java
@@ -42,6 +42,12 @@ public class TestingInputConsumableDecider implements 
InputConsumableDecider {
                 || inputConsumableExecutionVertices.contains(executionVertex);
     }
 
+    @Override
+    public boolean isConsumableBasedOnFinishedProducers(
+            ConsumedPartitionGroup consumedPartitionGroup) {
+        return true;
+    }
+
     public void setInputConsumable(SchedulingExecutionVertex executionVertex) {
         inputConsumableExecutionVertices.add(executionVertex);
     }

Reply via email to