This is an automated email from the ASF dual-hosted git repository. jingge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b1544e4e513 [FLINK-32622][table-planner] Optimize mini-batch assignment (#23470) b1544e4e513 is described below commit b1544e4e513d2b75b350c20dbb1c17a8232c22fd Author: Jeyhun Karimov <je.kari...@gmail.com> AuthorDate: Mon Apr 29 04:46:22 2024 +0200 [FLINK-32622][table-planner] Optimize mini-batch assignment (#23470) * [FLINK-32622][table-planner] Do not add mini-batch assigner operator if it is useless --- .../StreamCommonSubGraphBasedOptimizer.scala | 30 ++++- .../table/planner/plan/utils/FlinkRexUtil.scala | 30 +++++ .../plan/optimize/MiniBatchOptimizationTest.java | 137 +++++++++++++++++++++ .../flink/table/planner/utils/TableTestBase.scala | 22 ++++ 4 files changed, 214 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala index b9845aaa304..0d7dc4fff69 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysical import org.apache.flink.table.planner.plan.optimize.program.{FlinkStreamProgram, StreamOptimizeContext} import org.apache.flink.table.planner.plan.schema.IntermediateRelTable import org.apache.flink.table.planner.plan.stats.FlinkStatistic +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil.shouldSkipMiniBatch import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext import org.apache.flink.table.planner.utils.TableConfigUtils import org.apache.flink.util.Preconditions @@ -46,10 +47,9 @@ import scala.collection.JavaConversions._ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) extends CommonSubGraphBasedOptimizer { - override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = { - val tableConfig = planner.getTableConfig - // build RelNodeBlock plan - val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) + private def optimizeSinkBlocks( + tableConfig: TableConfig, + sinkBlocks: Seq[RelNodeBlock]): Seq[RelNodeBlock] = { // infer trait properties for sink block sinkBlocks.foreach { sinkBlock => @@ -84,7 +84,6 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) block.setOptimizedPlan(optimizedTree) return sinkBlocks } - // TODO FLINK-24048: Move changeLog inference out of optimizing phase // infer modifyKind property for each blocks independently sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true)) @@ -104,6 +103,27 @@ class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner) sinkBlocks } + override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = { + val tableConfig = planner.getTableConfig + // build RelNodeBlock plan + val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, tableConfig) + // get the original configuration, and disable it if it is unnecessary + val origMiniBatchEnabled = tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) + try { + if (origMiniBatchEnabled) { + tableConfig.set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, + Boolean.box(!shouldSkipMiniBatch(sinkBlocks))) + } + optimizeSinkBlocks(tableConfig, sinkBlocks) + } finally { + // revert the changed configuration back in the end + tableConfig.getConfiguration.set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, + origMiniBatchEnabled) + } + } + private def optimizeBlock(block: RelNodeBlock, isSinkBlock: Boolean): Unit = { block.children.foreach { child => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala index 82590106330..55caaf9fe40 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala @@ -21,6 +21,8 @@ import org.apache.flink.annotation.Experimental import org.apache.flink.configuration.ConfigOption import org.apache.flink.configuration.ConfigOptions.key import org.apache.flink.table.planner.functions.sql.SqlTryCastFunction +import org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink} +import org.apache.flink.table.planner.plan.optimize.RelNodeBlock import org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail import org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils} @@ -31,6 +33,8 @@ import org.apache.calcite.avatica.util.ByteString import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{Calc, Filter, Project, TableScan, Values} +import org.apache.calcite.rel.logical.LogicalUnion import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.{SqlAsOperator, SqlKind, SqlOperator} @@ -90,6 +94,32 @@ object FlinkRexUtil { new CnfHelper(rexBuilder, maxCnfNodeCnt).toCnf(rex) } + /** + * Returns true if a input blocks only consist of [[Filter]], [[Project]], [[TableScan]], + * [[Calc]], [[Values]], and [[Sink]] nodes. + */ + def shouldSkipMiniBatch(blocks: Seq[RelNodeBlock]): Boolean = { + + val noMiniBatchRequired = { + (node: RelNode) => + node match { + case _: Filter | _: Project | _: TableScan | _: Calc | _: Values | _: Sink | + _: LegacySink => + true + case unionNode: LogicalUnion => unionNode.all + case _ => false + } + } + + def nodeTraverser(node: RelNode): Boolean = { + noMiniBatchRequired(node) && node.getInputs + .map(n => nodeTraverser(n)) + .forall(r => r) + } + + blocks.map(b => nodeTraverser(b.outputNode)).forall(r => r) + } + /** Returns true if the RexNode contains any node in the given expected [[RexInputRef]] nodes. */ def containsExpectedInputRef(rex: RexNode, expectedInputRefs: ImmutableBitSet): Boolean = { val visitor = new InputRefVisitor diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java new file mode 100644 index 00000000000..9a7c97c5e1a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/MiniBatchOptimizationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Collections; + +/** + * Test for enabling/disabling mini-batch assigner operator based on query plan. The optimization is + * performed in {@link StreamCommonSubGraphBasedOptimizer}. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class MiniBatchOptimizationTest extends TableTestBase { + + private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault()); + + @Parameter public boolean isMiniBatchEnabled; + + public long miniBatchLatency; + public long miniBatchSize; + + @BeforeEach + public void setup() { + miniBatchLatency = 5L; + miniBatchSize = 10L; + util.tableEnv() + .getConfig() + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, isMiniBatchEnabled) + .set( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(miniBatchLatency)) + .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, miniBatchSize); + util.tableEnv() + .executeSql( + "CREATE TABLE MyTableA (\n" + + " a BIGINT,\n" + + " b INT NOT NULL,\n" + + " c VARCHAR,\n" + + " d BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"); + util.tableEnv() + .executeSql( + "CREATE TABLE MyTableB (\n" + + " a BIGINT,\n" + + " b INT NOT NULL,\n" + + " c VARCHAR,\n" + + " d BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false')"); + } + + @TestTemplate + public void testMiniBatchWithAggregation() { + final String aggQuery = + "SELECT\n" + + " AVG(a) AS avg_a,\n" + + " COUNT(*) AS cnt,\n" + + " count(b) AS cnt_b,\n" + + " min(b) AS min_b,\n" + + " MAX(c) FILTER (WHERE a > 1) AS max_c\n" + + "FROM MyTableA"; + + if (isMiniBatchEnabled) { + util.verifyRelPlanExpected( + aggQuery, + JavaScalaConversionUtil.toScala( + Collections.singletonList("MiniBatchAssigner"))); + } else { + util.verifyRelPlanNotExpected( + aggQuery, + JavaScalaConversionUtil.toScala( + Collections.singletonList("MiniBatchAssigner"))); + } + } + + @TestTemplate + public void testMiniBatchWithJoin() { + final String joinQuery = "SELECT * FROM MyTableA a, MyTableB b WHERE a.a = b.a"; + + if (isMiniBatchEnabled) { + util.verifyRelPlanExpected( + joinQuery, + JavaScalaConversionUtil.toScala( + Collections.singletonList("MiniBatchAssigner"))); + } else { + util.verifyRelPlanNotExpected( + joinQuery, + JavaScalaConversionUtil.toScala( + Collections.singletonList("MiniBatchAssigner"))); + } + } + + @TestTemplate + public void testMiniBatchWithProjectFilter() { + final String joinQuery = "SELECT b FROM MyTableA a WHERE a.a > 123"; + util.verifyRelPlanNotExpected( + joinQuery, + JavaScalaConversionUtil.toScala(Collections.singletonList("MiniBatchAssigner"))); + } + + @Parameters(name = "isMiniBatchEnabled={0}") + public static Object[][] data() { + return new Object[][] {new Object[] {true}, new Object[] {false}}; + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 1e006f3d94b..a4c8d0ec1f7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -616,6 +616,28 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withQueryBlockAlias = false) } + /** + * Verify whether the optimized rel plan for the given SELECT query does not contain the + * `notExpected` strings. + */ + def verifyRelPlanExpected(query: String, notExpected: String*): Unit = { + verifyRelPlanExpected(getTableEnv.sqlQuery(query), notExpected: _*) + } + + /** + * Verify whether the optimized rel plan for the given [[Table]] does not contain the + * `notExpected` strings. + */ + def verifyRelPlanExpected(table: Table, expected: String*): Unit = { + require(expected.nonEmpty) + val relNode = TableTestUtil.toRelNode(table) + val optimizedRel = getPlanner.optimize(relNode) + val optimizedPlan = getOptimizedRelPlan(Array(optimizedRel), Array.empty, withRowType = false) + val result = expected.forall(optimizedPlan.contains(_)) + val message = s"\nactual plan:\n$optimizedPlan\nexpected:\n${expected.mkString(", ")}" + assertTrue(result, message) + } + /** * Verify whether the optimized rel plan for the given SELECT query does not contain the * `notExpected` strings.