Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


TanYuxin-tyx commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1851182058

   @lsyldliu Thanks for helping review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


lsyldliu merged PR #23771:
URL: https://github.com/apache/flink/pull/23771


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


TanYuxin-tyx commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1850001991

   In addition, I've rebased the PR onto the latest master to verify it still 
works properly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


TanYuxin-tyx commented on code in PR #23771:
URL: https://github.com/apache/flink/pull/23771#discussion_r1422403200


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java:
##
@@ -30,19 +33,43 @@
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link InputPriorityConflictResolver}. */
+@ExtendWith(ParameterizedTestExtension.class)
 class InputPriorityConflictResolverTest {
 
-@Test
+@Parameter
+public Tuple2 
batchShuffleModeAndStreamExchangeMode;
+
+@Parameters(name = "batchShuffleModeAndStreamExchangeMode={0}")
+public static Collection> 
parameters() {
+return Arrays.asList(
+Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
StreamExchangeMode.BATCH),
+Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, 
StreamExchangeMode.BATCH),

Review Comment:
   Currently, The `StreamExchangeMode` and `BatchShuffleMode` are not strictly 
binded, so I added the tests to verify these two modes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


TanYuxin-tyx commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1849994039

   @lsyldliu Thanks for reviewing. I have addressed the comments. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-12-11 Thread via GitHub


lsyldliu commented on code in PR #23771:
URL: https://github.com/apache/flink/pull/23771#discussion_r1422369948


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java:
##
@@ -30,19 +33,43 @@
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link InputPriorityConflictResolver}. */
+@ExtendWith(ParameterizedTestExtension.class)
 class InputPriorityConflictResolverTest {
 
-@Test
+@Parameter
+public Tuple2 
batchShuffleModeAndStreamExchangeMode;
+
+@Parameters(name = "batchShuffleModeAndStreamExchangeMode={0}")
+public static Collection> 
parameters() {
+return Arrays.asList(
+Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
StreamExchangeMode.BATCH),
+Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, 
StreamExchangeMode.BATCH),

Review Comment:
   I have one question: what is the purpose of this case test?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/ShuffleModePlanOptimizeTest.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Test optimized plans for different shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
+class ShuffleModePlanOptimizeTest extends TableTestBase {
+
+@Parameters(name = "mode = {0}")
+public static Collection parameters() {
+return Arrays.asList(
+BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
+BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE,
+BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+}
+
+@Parameter public BatchShuffleMode mode;
+
+private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+private final TestValuesCatalog catalog =
+new TestValuesCatalog("testCatalog", "test_database", true);
+
+@BeforeEach
+void setup() {
+catalog.open();
+util.tableEnv().registerCatalog("testCatalog", catalog);
+util.tableEnv().useCatalog("testCatalog");
+TableConfig tableConfig = util.tableEnv().getConfig();
+
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED,
 true);
+if (mode != null) {
+tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, mode);
+}
+
+// partition fact table.
+util.tableEnv()
+.executeSql(
+

Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-11-29 Thread via GitHub


TanYuxin-tyx commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1833175583

   @lsyldliu Could you please take a look at this PR?  Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-11-21 Thread via GitHub


flinkbot commented on PR #23771:
URL: https://github.com/apache/flink/pull/23771#issuecomment-1822252927

   
   ## CI report:
   
   * b28265317fdb66e766d7f80e47ea4c7061591c2f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan [flink]

2023-11-21 Thread via GitHub


TanYuxin-tyx opened a new pull request, #23771:
URL: https://github.com/apache/flink/pull/23771

   
   
   
   ## What is the purpose of the change
   
   *To enhance the performance of hybrid shuffle, it is imperative to address 
the inconsistency between hybrid shuffle mode and blocking shuffle mode in 
certain query plans of TPC-DS (such as q88.sql, q14a.sql, q14b.sql, etc).
   In hybrid shuffle mode, these plans introduce additional blocking shuffle 
edges and result in increased shuffle times, potentially impacting overall 
efficiency.*
   
   
   ## Brief change log
   
 - *Hybrid shuffle mode avoids unnecessary blocking edges in the plan*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *StreamExchangeModeUtilsTest and InputPriorityConflictResolverTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org