[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270710882
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
 ##
 @@ -91,10 +89,13 @@ abstract class Rank(
   }
 
   override def deriveRowType(): RelDataType = {
+if (!outputRankNumber) {
+  return input.getRowType
+}
 val typeFactory = cluster.getRexBuilder.getTypeFactory
 val typeBuilder = typeFactory.builder()
 input.getRowType.getFieldList.foreach(typeBuilder.add)
-// rank function column is always the last column, and its type is BIGINT 
NOT NULL
+// rank number column is always the last column, and its type is BIGINT 
NOT NULL
 val allFieldNames = new util.HashSet[String]()
 allFieldNames.addAll(input.getRowType.getFieldNames)
 val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, 
"rk")
 
 Review comment:
   actually, I find that the rank number field name may be dropped in 
`ProjectToWindowRule`, and be changed to name like `w0$oi`. so, I think we can 
keep current implementation and add a `TODO` to solve this later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270705603
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecFirstLastRowRule.scala
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, RankType}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalRank, 
FlinkLogicalSort}
+import 
org.apache.flink.table.plan.nodes.physical.stream.{StreamExecFirstLastRow, 
StreamExecRank, StreamExecSortLimit}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.core.Sort
+import org.apache.calcite.rel.{RelCollation, RelNode}
+
+/**
+  * Rule that matches [[FlinkLogicalSort]] which is sorted by proc-time 
attribute and
+  * fetches only one record started from 0, and converts it to 
[[StreamExecFirstLastRow]].
+  *
+  * NOTES: Queries that can be converted to [[StreamExecFirstLastRow]] could 
be converted to
+  * [[StreamExecSortLimit]] too. [[StreamExecFirstLastRow]] is more efficient 
than
+  * [[StreamExecSortLimit]] due to mini-batch and less state access.
+  *
+  * e.g.
+  * 1. ''SELECT a FROM MyTable ORDER BY proctime LIMIT 1'' will be converted 
to FirstRow
 
 Review comment:
   yes, this case is rare. I think this case does not need to convert to 
`StreamExecRowDeduplicate` now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270703894
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ProcessStrategy.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.table.plan.`trait`.TraitUtil
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base class of Strategy to choose different process function.
+  */
+sealed trait ProcessStrategy
 
 Review comment:
   I will rename `ProcessStrategy` to `RankProcessStrategy`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270703894
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ProcessStrategy.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.table.plan.`trait`.TraitUtil
+
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelCollation, RelNode}
+import org.apache.calcite.util.ImmutableBitSet
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base class of Strategy to choose different process function.
+  */
+sealed trait ProcessStrategy
 
 Review comment:
   I will rename `RankProcessStrategy` to `RankProcessStrategy`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270701308
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala
 ##
 @@ -248,4 +251,262 @@ object FlinkRelOptUtil {
 }
   }
 
+  private[this] case class LimitPredicate(rankOnLeftSide: Boolean, pred: 
RexCall)
+
+  private[this] sealed trait Boundary
+
+  private[this] case class LowerBoundary(lower: Long) extends Boundary
+
+  private[this] case class UpperBoundary(upper: Long) extends Boundary
+
+  private[this] case class BothBoundary(lower: Long, upper: Long) extends 
Boundary
+
+  private[this] case class InputRefBoundary(inputFieldIndex: Int) extends 
Boundary
+
+  private[this] sealed trait BoundDefine
+
+  private[this] object Lower extends BoundDefine // defined lower bound
+  private[this] object Upper extends BoundDefine // defined upper bound
+  private[this] object Both extends BoundDefine // defined lower and uppper 
bound
+
+  /**
+* Extracts the TopN offset and fetch bounds from a predicate.
+*
+* @param  predicate   predicate
+* @param  rankFieldIndex  the index of rank field
+* @param  rexBuilder  RexBuilder
+* @param  config  TableConfig
+* @return A Tuple2 of extracted rank range and remaining predicates.
+*/
+  def extractRankRange(
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270699783
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
 ##
 @@ -91,10 +89,13 @@ abstract class Rank(
   }
 
   override def deriveRowType(): RelDataType = {
+if (!outputRankNumber) {
+  return input.getRowType
+}
 val typeFactory = cluster.getRexBuilder.getTypeFactory
 val typeBuilder = typeFactory.builder()
 input.getRowType.getFieldList.foreach(typeBuilder.add)
-// rank function column is always the last column, and its type is BIGINT 
NOT NULL
+// rank number column is always the last column, and its type is BIGINT 
NOT NULL
 val allFieldNames = new util.HashSet[String]()
 allFieldNames.addAll(input.getRowType.getFieldNames)
 val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, 
"rk")
 
 Review comment:
   yes, the rank number field name should be the user specified name


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-31 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270699044
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala
 ##
 @@ -34,44 +34,42 @@ import java.util
 import scala.collection.JavaConversions._
 
 /**
-  * Relational expression that returns the rows in which the rank function 
value of each row
+  * Relational expression that returns the rows in which the rank number of 
each row
   * is in the given range.
   *
-  * NOTES: Different from 
[[org.apache.calcite.sql.fun.SqlStdOperatorTable.RANK]],
-  * [[Rank]] is a Relational expression, not a window function.
+  * The node is an optimization of `OVER` for some special cases,
+  * e.g.
+  * {{{
+  * SELECT * FROM (
+  *  SELECT a, b, RANK() OVER (PARTITION BY b ORDER BY c) rk FROM MyTable) t
+  * WHERE rk < 10
+  * }}}
+  * can be converted to this node.
   *
-  * [[Rank]] will output rank function value as its last column.
-  *
-  * This RelNode only handles single rank function, is an optimization for 
some cases. e.g.
-  * 
-  * 
-  *   single rank function (on `OVER`) with filter in a SQL query statement
-  * 
-  * 
-  *   `ORDER BY` with `LIMIT` in a SQL query statement
-  *   (equivalent to `ROW_NUMBER` with filter and project)
-  * 
-  * 
-  *
-  * @param clustercluster that this relational expression belongs to
-  * @param traitSet   the traits of this rel
-  * @param input  input relational expression
-  * @param rankFunction   rank function, including: CUME_DIST, DENSE_RANK, 
PERCENT_RANK, RANK,
-  *   ROW_NUMBER
-  * @param partitionKey   partition keys (may be empty)
-  * @param sortCollation  order keys for rank function
-  * @param rankRange  the expected range of rank function value
+  * @param cluster  cluster that this relational expression belongs to
+  * @param traitSet the traits of this rel
+  * @param inputinput relational expression
+  * @param partitionKey partition keys (may be empty)
 
 Review comment:
   in order to convert sql like: 
   `select * from (
 select a, b, rank() over (order by b) as rk from MyTable
   ) where rk < a` to `Rank` on Stream, partition keys may be empty .
   
   or we do not convert this kind of sql to `Rank`, keep it as `Over`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-29 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270301404
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala
 ##
 @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode
   * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to 
[[StreamExecTemporalSort]]
   * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to 
[[StreamExecRank]]
 
 Review comment:
   So far as we know, Sort without limit is meaningless for unbounded source. 
   In a simple word, Sort with limit will be converted to `StreamExecRank`, 
Sort without limit will be converted to `StreamExecSort`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-28 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270267482
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/FlinkLogicalRankRule.scala
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkContext
+import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, 
ConstantRankRangeWithoutEnd}
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalOverWindow, FlinkLogicalRank}
+import org.apache.flink.table.plan.util.FlinkRelOptUtil
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rex.{RexProgramBuilder, RexUtil}
+import org.apache.calcite.sql.{SqlKind, SqlRankFunction}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Planner rule that matches a [[FlinkLogicalCalc]] on a 
[[FlinkLogicalOverWindow]],
+  * and converts them into a [[FlinkLogicalRank]].
+  */
+abstract class FlinkLogicalRankRuleBase
+  extends RelOptRule(
+operand(classOf[FlinkLogicalCalc],
+  operand(classOf[FlinkLogicalOverWindow], any( {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val calc: FlinkLogicalCalc = call.rel(0)
+val window: FlinkLogicalOverWindow = call.rel(1)
+val group = window.groups.get(0)
+val rankFun = 
group.aggCalls.get(0).getOperator.asInstanceOf[SqlRankFunction]
+
+// the rank function is the last field of LogicalWindow
+val rankFieldIndex = window.getRowType.getFieldCount - 1
+val condition = calc.getProgram.getCondition
+val predicate = calc.getProgram.expandLocalRef(condition)
+
+val config = 
calc.getCluster.getPlanner.getContext.asInstanceOf[FlinkContext].getTableConfig
+val (rankRange, remainingPreds) = FlinkRelOptUtil.extractRankRange(
+  predicate,
+  rankFieldIndex,
+  calc.getCluster.getRexBuilder,
+  config)
+require(rankRange.isDefined)
+
+val cluster = window.getCluster
+val rexBuilder = cluster.getRexBuilder
+
+val calcProgram = calc.getProgram
+val exprList = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
+
+val inputFields = RelOptUtil.InputFinder.bits(exprList, null).toList
+val outputRankFunColumn = inputFields.contains(rankFieldIndex)
+
+val rankRowType = if (outputRankFunColumn) {
+  window.getRowType
+} else {
+  val typeBuilder = rexBuilder.getTypeFactory.builder()
+  window.getRowType.getFieldList.dropRight(1).foreach(typeBuilder.add)
+  typeBuilder.build()
+}
+
+rankRange match {
+  case Some(ConstantRankRange(_, rankEnd)) if rankEnd <= 0 =>
+throw new TableException(s"Rank end should not less than zero, but now 
is $rankEnd")
+  case _ => // do nothing
+}
+
+val rank = new FlinkLogicalRank(
+  cluster,
+  window.getTraitSet,
+  window.getInput,
+  rankFun,
+  group.keys,
+  group.orderKeys,
+  rankRange.get,
+  outputRankFunColumn)
+
+val newRel = if (RexUtil.isIdentity(exprList, rankRowType) && 
remainingPreds.isEmpty) {
+  // project is trivial and filter is empty, remove the Calc
+  rank
+} else {
+  val programBuilder = RexProgramBuilder.create(
+rexBuilder,
+rankRowType,
+calcProgram.getExprList,
+calcProgram.getProjectList,
+remainingPreds.orNull,
+calc.getRowType,
+true, // normalize
+null) // simplify
+
+  calc.copy(calc.getTraitSet, rank, programBuilder.getProgram)
+}
+call.transformTo(newRel)
+  }
+}
+
+/**
+  * This rule handles RANK function and rank range with end.
+  */
+class FlinkLogicalRankRuleForRangeEnd extends FlinkLogicalRankRuleBase {
 
 Review comment:
   ok


This is an automated message 

[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-28 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270266970
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala
 ##
 @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode
   * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to 
[[StreamExecTemporalSort]]
   * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to 
[[StreamExecRank]]
 
 Review comment:
   The basic strategy for selecting a sort/rank operator is a query will be 
converted to `StreamExecRank`  as much as possible, otherwise it will be 
converted to `StreamExecSort`. Currently, `StreamExecSort` is only used for 
testing with bounded source. So, in product environment, `StreamExecSort` is 
disabled, and an exception will be thrown if a query is converted to 
`StreamExecSort`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-28 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270266970
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala
 ##
 @@ -38,16 +38,22 @@ import org.apache.calcite.rex.RexNode
   * ''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to 
[[StreamExecTemporalSort]]
   * ''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to 
[[StreamExecRank]]
 
 Review comment:
   The basic strategy for selecting a sort/rank operator is a query will be 
converted to `StreamExecTemporalSort`, `StreamExecFirstLastRow` or 
`StreamExecRank`  as much as possible, otherwise it will be converted to 
`StreamExecSort`. Currently, `StreamExecSort` is only used for testing with 
bounded source. So, in product environment, `StreamExecSort` is disabled, and 
an exception will be thrown if a query is converted to `StreamExecSort`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8051: [FLINK-12018] [table-planner-blink] Add support for generating optimized logical plan for Sort and Rank

2019-03-28 Thread GitBox
godfreyhe commented on a change in pull request #8051:  [FLINK-12018] 
[table-planner-blink] Add support for generating optimized logical plan for 
Sort and Rank
URL: https://github.com/apache/flink/pull/8051#discussion_r270265084
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by Flink's table planner 
module.
+ */
+public class PlannerConfigOptions {
 
 Review comment:
   `TableConfigOptions` is in `flink-table-runtime-blink` module, and 
`sql.optimizer.XXX` should be in `flink-table-planner-blink` I think.  
   `PlannerConfigOptions` should only contain optimizer configs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services