[GitHub] [flink] zhuzhurk commented on a diff in pull request #22798: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler
zhuzhurk commented on code in PR #22798: URL: https://github.com/apache/flink/pull/22798#discussion_r122596 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java: ## @@ -65,6 +65,18 @@ public boolean isInputConsumable( return true; } +/** + * This method is only used to filter consumable consumed partition group in the + * onExecutionStateChange method of {@link VertexwiseSchedulingStrategy}. For hybrid shuffle + * mode, the downstream vertices will be scheduled together with their upstreams. Therefore, + * only blocking consumed partition group needs to be considered here. + */ +@Override +public boolean isConsumedPartitionGroupConsumable( +final ConsumedPartitionGroup consumedPartitionGroup) { +return consumedPartitionGroup.areAllPartitionsFinished(); Review Comment: Looks to me the logic here can be ``` if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { return false; } else { return consumedPartitionGroup.areAllPartitionsFinished(); } ``` It returns `return false` because we only expect this method to be invoked to find groups that becomes consumable after a vertex finishes. Maybe we can name it as `isConsumableBasedOnFinishedProducers`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #22798: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler
zhuzhurk commented on code in PR #22798: URL: https://github.com/apache/flink/pull/22798#discussion_r122596 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java: ## @@ -65,6 +65,18 @@ public boolean isInputConsumable( return true; } +/** + * This method is only used to filter consumable consumed partition group in the + * onExecutionStateChange method of {@link VertexwiseSchedulingStrategy}. For hybrid shuffle + * mode, the downstream vertices will be scheduled together with their upstreams. Therefore, + * only blocking consumed partition group needs to be considered here. + */ +@Override +public boolean isConsumedPartitionGroupConsumable( +final ConsumedPartitionGroup consumedPartitionGroup) { +return consumedPartitionGroup.areAllPartitionsFinished(); Review Comment: Looks to me the logic here should be ``` if (consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) { return false; } else { return consumedPartitionGroup.areAllPartitionsFinished(); } ``` It returns `return false` because we only expect this method to be invoked to find groups that becomes consumable after a vertex finishes, otherwise it can be `return true` because when using `DefaultInputConsumableDecider`, a `ConsumedPartitionGroup` is always consumable for a hybrid shuffle downstream task. However, `return false` would be helpful to improve the scheduling performance for hybrid shuffle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org