[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270710882 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala ## @@ -91,10 +89,13 @@ abstract class Rank( } override def deriveRowType(): RelDataType = { +if (!outputRankNumber) { + return input.getRowType +} val typeFactory = cluster.getRexBuilder.getTypeFactory val typeBuilder = typeFactory.builder() input.getRowType.getFieldList.foreach(typeBuilder.add) -// rank function column is always the last column, and its type is BIGINT NOT NULL +// rank number column is always the last column, and its type is BIGINT NOT NULL val allFieldNames = new util.HashSet[String]() allFieldNames.addAll(input.getRowType.getFieldNames) val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, "rk") Review comment: actually, I find that the rank number field name may be dropped in `ProjectToWindowRule`, and be changed to name like `w0$oi`. so, I think we can keep current implementation and add a `TODO` to solve this later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270705603 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecFirstLastRowRule.scala ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.physical.stream + +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, RankType} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalRank, FlinkLogicalSort} +import org.apache.flink.table.plan.nodes.physical.stream.{StreamExecFirstLastRow, StreamExecRank, StreamExecSortLimit} +import org.apache.flink.table.plan.util.FlinkRelOptUtil + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.{RelCollation, RelNode} + +/** + * Rule that matches [[FlinkLogicalSort]] which is sorted by proc-time attribute and + * fetches only one record started from 0, and converts it to [[StreamExecFirstLastRow]]. + * + * NOTES: Queries that can be converted to [[StreamExecFirstLastRow]] could be converted to + * [[StreamExecSortLimit]] too. [[StreamExecFirstLastRow]] is more efficient than + * [[StreamExecSortLimit]] due to mini-batch and less state access. + * + * e.g. + * 1. ''SELECT a FROM MyTable ORDER BY proctime LIMIT 1'' will be converted to FirstRow Review comment: yes, this case is rare. I think this case does not need to convert to `StreamExecRowDeduplicate` now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270703894 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ProcessStrategy.scala ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.table.plan.`trait`.TraitUtil + +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelNode} +import org.apache.calcite.util.ImmutableBitSet + +import scala.collection.JavaConversions._ + +/** + * Base class of Strategy to choose different process function. + */ +sealed trait ProcessStrategy Review comment: I will rename `ProcessStrategy` to `RankProcessStrategy` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270703894 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ProcessStrategy.scala ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.table.plan.`trait`.TraitUtil + +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelNode} +import org.apache.calcite.util.ImmutableBitSet + +import scala.collection.JavaConversions._ + +/** + * Base class of Strategy to choose different process function. + */ +sealed trait ProcessStrategy Review comment: I will rename `RankProcessStrategy` to `RankProcessStrategy` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270701308 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala ## @@ -248,4 +251,262 @@ object FlinkRelOptUtil { } } + private[this] case class LimitPredicate(rankOnLeftSide: Boolean, pred: RexCall) + + private[this] sealed trait Boundary + + private[this] case class LowerBoundary(lower: Long) extends Boundary + + private[this] case class UpperBoundary(upper: Long) extends Boundary + + private[this] case class BothBoundary(lower: Long, upper: Long) extends Boundary + + private[this] case class InputRefBoundary(inputFieldIndex: Int) extends Boundary + + private[this] sealed trait BoundDefine + + private[this] object Lower extends BoundDefine // defined lower bound + private[this] object Upper extends BoundDefine // defined upper bound + private[this] object Both extends BoundDefine // defined lower and uppper bound + + /** +* Extracts the TopN offset and fetch bounds from a predicate. +* +* @param predicate predicate +* @param rankFieldIndex the index of rank field +* @param rexBuilder RexBuilder +* @param config TableConfig +* @return A Tuple2 of extracted rank range and remaining predicates. +*/ + def extractRankRange( Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270699783 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala ## @@ -91,10 +89,13 @@ abstract class Rank( } override def deriveRowType(): RelDataType = { +if (!outputRankNumber) { + return input.getRowType +} val typeFactory = cluster.getRexBuilder.getTypeFactory val typeBuilder = typeFactory.builder() input.getRowType.getFieldList.foreach(typeBuilder.add) -// rank function column is always the last column, and its type is BIGINT NOT NULL +// rank number column is always the last column, and its type is BIGINT NOT NULL val allFieldNames = new util.HashSet[String]() allFieldNames.addAll(input.getRowType.getFieldNames) val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, "rk") Review comment: yes, the rank number field name should be the user specified name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270699044 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala ## @@ -34,44 +34,42 @@ import java.util import scala.collection.JavaConversions._ /** - * Relational expression that returns the rows in which the rank function value of each row + * Relational expression that returns the rows in which the rank number of each row * is in the given range. * - * NOTES: Different from [[org.apache.calcite.sql.fun.SqlStdOperatorTable.RANK]], - * [[Rank]] is a Relational expression, not a window function. + * The node is an optimization of `OVER` for some special cases, + * e.g. + * {{{ + * SELECT * FROM ( + * SELECT a, b, RANK() OVER (PARTITION BY b ORDER BY c) rk FROM MyTable) t + * WHERE rk < 10 + * }}} + * can be converted to this node. * - * [[Rank]] will output rank function value as its last column. - * - * This RelNode only handles single rank function, is an optimization for some cases. e.g. - * - * - * single rank function (on `OVER`) with filter in a SQL query statement - * - * - * `ORDER BY` with `LIMIT` in a SQL query statement - * (equivalent to `ROW_NUMBER` with filter and project) - * - * - * - * @param clustercluster that this relational expression belongs to - * @param traitSet the traits of this rel - * @param input input relational expression - * @param rankFunction rank function, including: CUME_DIST, DENSE_RANK, PERCENT_RANK, RANK, - * ROW_NUMBER - * @param partitionKey partition keys (may be empty) - * @param sortCollation order keys for rank function - * @param rankRange the expected range of rank function value + * @param cluster cluster that this relational expression belongs to + * @param traitSet the traits of this rel + * @param inputinput relational expression + * @param partitionKey partition keys (may be empty) Review comment: in order to convert sql like: `select * from ( select a, b, rank() over (order by b) as rk from MyTable ) where rk < a` to `Rank` on Stream, partition keys may be empty . or we do not convert this kind of sql to `Rank`, keep it as `Over` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270301404 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala ## @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to [[StreamExecTemporalSort]] * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to [[StreamExecRank]] Review comment: So far as we know, Sort without limit is meaningless for unbounded source. In a simple word, Sort with limit will be converted to `StreamExecRank`, Sort without limit will be converted to `StreamExecSort`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270267482 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRule.scala ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkContext +import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, ConstantRankRangeWithoutEnd} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalOverWindow, FlinkLogicalRank} +import org.apache.flink.table.plan.util.FlinkRelOptUtil + +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} +import org.apache.calcite.rex.{RexProgramBuilder, RexUtil} +import org.apache.calcite.sql.{SqlKind, SqlRankFunction} + +import scala.collection.JavaConversions._ + +/** + * Planner rule that matches a [[FlinkLogicalCalc]] on a [[FlinkLogicalOverWindow]], + * and converts them into a [[FlinkLogicalRank]]. + */ +abstract class FlinkLogicalRankRuleBase + extends RelOptRule( +operand(classOf[FlinkLogicalCalc], + operand(classOf[FlinkLogicalOverWindow], any( { + + override def onMatch(call: RelOptRuleCall): Unit = { +val calc: FlinkLogicalCalc = call.rel(0) +val window: FlinkLogicalOverWindow = call.rel(1) +val group = window.groups.get(0) +val rankFun = group.aggCalls.get(0).getOperator.asInstanceOf[SqlRankFunction] + +// the rank function is the last field of LogicalWindow +val rankFieldIndex = window.getRowType.getFieldCount - 1 +val condition = calc.getProgram.getCondition +val predicate = calc.getProgram.expandLocalRef(condition) + +val config = calc.getCluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig +val (rankRange, remainingPreds) = FlinkRelOptUtil.extractRankRange( + predicate, + rankFieldIndex, + calc.getCluster.getRexBuilder, + config) +require(rankRange.isDefined) + +val cluster = window.getCluster +val rexBuilder = cluster.getRexBuilder + +val calcProgram = calc.getProgram +val exprList = calcProgram.getProjectList.map(calcProgram.expandLocalRef) + +val inputFields = RelOptUtil.InputFinder.bits(exprList, null).toList +val outputRankFunColumn = inputFields.contains(rankFieldIndex) + +val rankRowType = if (outputRankFunColumn) { + window.getRowType +} else { + val typeBuilder = rexBuilder.getTypeFactory.builder() + window.getRowType.getFieldList.dropRight(1).foreach(typeBuilder.add) + typeBuilder.build() +} + +rankRange match { + case Some(ConstantRankRange(_, rankEnd)) if rankEnd <= 0 => +throw new TableException(s"Rank end should not less than zero, but now is $rankEnd") + case _ => // do nothing +} + +val rank = new FlinkLogicalRank( + cluster, + window.getTraitSet, + window.getInput, + rankFun, + group.keys, + group.orderKeys, + rankRange.get, + outputRankFunColumn) + +val newRel = if (RexUtil.isIdentity(exprList, rankRowType) && remainingPreds.isEmpty) { + // project is trivial and filter is empty, remove the Calc + rank +} else { + val programBuilder = RexProgramBuilder.create( +rexBuilder, +rankRowType, +calcProgram.getExprList, +calcProgram.getProjectList, +remainingPreds.orNull, +calc.getRowType, +true, // normalize +null) // simplify + + calc.copy(calc.getTraitSet, rank, programBuilder.getProgram) +} +call.transformTo(newRel) + } +} + +/** + * This rule handles RANK function and rank range with end. + */ +class FlinkLogicalRankRuleForRangeEnd extends FlinkLogicalRankRuleBase { Review comment: ok This is an automated message
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270266970 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala ## @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to [[StreamExecTemporalSort]] * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to [[StreamExecRank]] Review comment: The basic strategy for selecting a sort/rank operator is a query will be converted to `StreamExecRank` as much as possible, otherwise it will be converted to `StreamExecSort`. Currently, `StreamExecSort` is only used for testing with bounded source. So, in product environment, `StreamExecSort` is disabled, and an exception will be thrown if a query is converted to `StreamExecSort`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270266970 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala ## @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to [[StreamExecTemporalSort]] * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to [[StreamExecRank]] Review comment: The basic strategy for selecting a sort/rank operator is a query will be converted to `StreamExecTemporalSort`, `StreamExecFirstLastRow` or `StreamExecRank` as much as possible, otherwise it will be converted to `StreamExecSort`. Currently, `StreamExecSort` is only used for testing with bounded source. So, in product environment, `StreamExecSort` is disabled, and an exception will be thrown if a query is converted to `StreamExecSort`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank
godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank URL: https://github.com/apache/flink/pull/8051#discussion_r270265084 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * This class holds configuration constants used by Flink's table planner module. + */ +public class PlannerConfigOptions { Review comment: `TableConfigOptions` is in `flink-table-runtime-blink` module, and `sql.optimizer.XXX` should be in `flink-table-planner-blink` I think. `PlannerConfigOptions` should only contain optimizer configs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services