Repository: flink Updated Branches: refs/heads/master dc5062557 -> bdd7a114d
[FLINK-3940] [table] Additional improvements - Improve overflow handling (support for more records than Int.MAX) - SQL LIMIT support - Bug fixing and improved docs Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd7a114 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd7a114 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd7a114 Branch: refs/heads/master Commit: bdd7a114d9411e2bda51ad296061c5fca742dc8b Parents: 0472cb9 Author: twalthr <twal...@apache.org> Authored: Wed Aug 10 23:49:27 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Thu Aug 11 10:27:11 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 33 ++++++-- .../api/table/plan/logical/operators.scala | 45 +++++----- .../table/plan/nodes/dataset/DataSetSort.scala | 86 ++++++++++++++------ .../table/runtime/CountPartitionFunction.scala | 10 ++- .../api/table/runtime/LimitFilterFunction.scala | 42 +++++++--- .../org/apache/flink/api/table/table.scala | 70 ++++++++-------- .../flink/api/scala/batch/sql/SortITCase.scala | 36 +++++++- .../api/scala/batch/table/SortITCase.scala | 33 +++++++- .../scala/stream/table/UnsupportedOpsTest.scala | 8 ++ 9 files changed, 254 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index 6793fde..57252d9 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -668,7 +668,7 @@ Table result = left.minusAll(right); <tr> <td><strong>Distinct</strong></td> <td> - <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> + <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p> {% highlight java %} Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.distinct(); @@ -679,7 +679,7 @@ Table result = in.distinct(); <tr> <td><strong>Order By</strong></td> <td> - <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p> + <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p> {% highlight java %} Table in = tableEnv.fromDataSet(ds, "a, b, c"); Table result = in.orderBy("a.asc"); @@ -690,15 +690,15 @@ Table result = in.orderBy("a.asc"); <tr> <td><strong>Limit</strong></td> <td> - <p>Similar to a SQL LIMIT clause. Returns specified number of rows from offset position. It is technically part of the ORDER BY clause.</p> + <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p> {% highlight java %} Table in = tableEnv.fromDataSet(ds, "a, b, c"); -Table result = in.orderBy("a.asc").limit(3); +Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record {% endhighlight %} or {% highlight java %} Table in = tableEnv.fromDataSet(ds, "a, b, c"); -Table result = in.orderBy("a.asc").limit(3, 5); +Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record {% endhighlight %} </td> </tr> @@ -890,7 +890,7 @@ val result = left.minusAll(right); <tr> <td><strong>Distinct</strong></td> <td> - <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value combinations.</p> + <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p> {% highlight scala %} val in = ds.toTable(tableEnv, 'a, 'b, 'c); val result = in.distinct(); @@ -901,7 +901,7 @@ val result = in.distinct(); <tr> <td><strong>Order By</strong></td> <td> - <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted across all parallel partitions.</p> + <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p> {% highlight scala %} val in = ds.toTable(tableEnv, 'a, 'b, 'c); val result = in.orderBy('a.asc); @@ -909,6 +909,22 @@ val result = in.orderBy('a.asc); </td> </tr> + <tr> + <td><strong>Limit</strong></td> + <td> + <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p> +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.orderBy('a.asc).limit(3); // returns unlimited number of records beginning with the 4th record +{% endhighlight %} +or +{% highlight scala %} +val in = ds.toTable(tableEnv, 'a, 'b, 'c); +val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with the 4th record +{% endhighlight %} + </td> + </tr> + </tbody> </table> </div> @@ -1087,6 +1103,9 @@ query: | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] + [ LIMIT { count | ALL } ] + [ OFFSET start { ROW | ROWS } ] + [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] orderItem: expression [ ASC | DESC ] http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 0d4cf2c..79c3cb1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject} -import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode} +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.rex.{RexInputRef, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation @@ -40,9 +40,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend val newProjectList = afterResolve.projectList.zipWithIndex.map { case (e, i) => e match { - case u @ UnresolvedAlias(child) => child match { + case u @ UnresolvedAlias(c) => c match { case ne: NamedExpression => ne - case e if !e.valid => u + case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp") case other => Alias(other, s"_c$i") } @@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend case n: Alias => // explicit name if (names.contains(n.name)) { - throw new ValidationException(s"Duplicate field name $n.name.") + throw ValidationException(s"Duplicate field name $n.name.") } else { names.add(n.name) } case r: ResolvedFieldReference => // simple field forwarding if (names.contains(r.name)) { - throw new ValidationException(s"Duplicate field name $r.name.") + throw ValidationException(s"Duplicate field name $r.name.") } else { names.add(r.name) } @@ -98,10 +98,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = - throw new UnresolvedException("Invalid call to output on AliasNode") + throw UnresolvedException("Invalid call to output on AliasNode") override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = - throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + throw UnresolvedException("Invalid call to toRelNode on AliasNode") override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { if (aliasList.length > child.output.length) { @@ -150,7 +150,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode { } } -case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode { +case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = child.output override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { @@ -160,10 +160,13 @@ case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends UnaryNode override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Limit on stream tables is currently not supported.") + failValidation(s"Limit on stream tables is currently not supported.") } if (!child.validate(tableEnv).isInstanceOf[Sort]) { - throw new TableException(s"Limit operator must follow behind orderBy clause.") + failValidation(s"Limit operator must be preceded by an OrderBy operator.") + } + if (offset < 0) { + failValidation(s"Offset should be greater than or equal to zero.") } super.validate(tableEnv) } @@ -193,11 +196,9 @@ case class Aggregate( child: LogicalNode) extends UnaryNode { override def output: Seq[Attribute] = { - (groupingExpressions ++ aggregateExpressions) map { agg => - agg match { - case ne: NamedExpression => ne.toAttribute - case e => Alias(e, e.toString).toAttribute - } + (groupingExpressions ++ aggregateExpressions) map { + case ne: NamedExpression => ne.toAttribute + case e => Alias(e, e.toString).toAttribute } } @@ -205,11 +206,9 @@ case class Aggregate( child.construct(relBuilder) relBuilder.aggregate( relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), - aggregateExpressions.map { e => - e match { - case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder) - case _ => throw new RuntimeException("This should never happen.") - } + aggregateExpressions.map { + case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder) + case _ => throw new RuntimeException("This should never happen.") }.asJava) } @@ -403,7 +402,7 @@ case class Join( right) } val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction)) - new Join(node.left, node.right, node.joinType, resolvedCondition) + Join(node.left, node.right, node.joinType, resolvedCondition) } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { @@ -429,7 +428,7 @@ case class Join( failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}") } - resolvedJoin.condition.foreach(testJoinCondition(_)) + resolvedJoin.condition.foreach(testJoinCondition) resolvedJoin } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index ef3005c..22930e7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -28,8 +28,8 @@ import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.flink.api.common.operators.Order import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.BatchTableEnvironment -import org.apache.flink.api.table.runtime.{LimitFilterFunction, CountPartitionFunction} +import org.apache.flink.api.table.{BatchTableEnvironment, TableException} +import org.apache.flink.api.table.runtime.{CountPartitionFunction, LimitFilterFunction} import org.apache.flink.api.table.typeutils.TypeConverter._ import scala.collection.JavaConverters._ @@ -40,12 +40,24 @@ class DataSetSort( inp: RelNode, collations: RelCollation, rowType2: RelDataType, - offset: RexNode, + offset: RexNode, fetch: RexNode) extends SingleRel(cluster, traitSet, inp) - with DataSetRel{ + with DataSetRel { - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ + private val limitStart: Long = if (offset != null) { + RexLiteral.intValue(offset) + } else { + 0L + } + + private val limitEnd: Long = if (fetch != null) { + RexLiteral.intValue(fetch) + limitStart + } else { + Long.MaxValue + } + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new DataSetSort( cluster, traitSet, @@ -58,18 +70,24 @@ class DataSetSort( } override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None) + : DataSet[Any] = { + + if (fieldCollations.isEmpty) { + throw TableException("Limiting the result without sorting is not allowed " + + "as it could lead to arbitrary results.") + } val config = tableEnv.getConfig - val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv) - val currentParallelism = inputDS.getExecutionEnvironment.getParallelism + val currentParallelism = inputDs.getExecutionEnvironment.getParallelism var partitionedDs = if (currentParallelism == 1) { - inputDS + inputDs } else { - inputDS.partitionByRange(fieldCollations.map(_._1): _*) + inputDs.partitionByRange(fieldCollations.map(_._1): _*) .withOrders(fieldCollations.map(_._2): _*) } @@ -77,28 +95,37 @@ class DataSetSort( partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) } - val limitedDS = if (offset == null && fetch == null) { + val limitedDs = if (offset == null && fetch == null) { partitionedDs } else { - val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0 - val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + limitStart else Int.MaxValue - val countFunction = new CountPartitionFunction[Any] - val partitionCount = partitionedDs.mapPartition(countFunction) + + val partitionCountName = s"prepare offset/fetch" + + val partitionCount = partitionedDs + .mapPartition(countFunction) + .name(partitionCountName) + + val broadcastName = "countPartition" val limitFunction = new LimitFilterFunction[Any]( limitStart, limitEnd, - "countPartition") - partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, "countPartition") - } + broadcastName) + + val limitName = s"offset: $offsetToString, fetch: $fetchToString" + partitionedDs + .filter(limitFunction) + .name(limitName) + .withBroadcastSet(partitionCount, broadcastName) + } val inputType = partitionedDs.getType expectedType match { case None if config.getEfficientTypeUsage => - limitedDS + limitedDs case _ => val determinedType = determineReturnType( @@ -119,11 +146,13 @@ class DataSetSort( getRowType.getFieldNames.asScala ) - limitedDS.map(mapFunc) + val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + + limitedDs.map(mapFunc).name(opName) } // no conversion necessary, forward else { - limitedDS + limitedDs } } } @@ -143,10 +172,21 @@ class DataSetSort( private val sortFieldsToString = fieldCollations .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") - override def toString: String = s"Sort(by: $sortFieldsToString)" + private val offsetToString = s"$offset" + + private val fetchToString = if (limitEnd == Long.MaxValue) { + "unlimited" + } else { + s"$limitEnd" + } + + override def toString: String = + s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)" override def explainTerms(pw: RelWriter) : RelWriter = { super.explainTerms(pw) .item("orderBy", sortFieldsToString) + .item("offset", offsetToString) + .item("fetch", fetchToString) } } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala index 79b8623..5896f4c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala @@ -23,14 +23,16 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichMapPartitionFunction import org.apache.flink.util.Collector -class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Int)] { - var elementCount = 0 +class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, Long)] { - override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): Unit = { + override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): Unit = { val partitionIndex = getRuntimeContext.getIndexOfThisSubtask + var elementCount = 0L val iterator = value.iterator() while (iterator.hasNext) { - elementCount += 1 + if (elementCount != Long.MaxValue) { // prevent overflow + elementCount += 1L + } iterator.next() } out.collect(partitionIndex, elementCount) http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala index 311b616..5ec9035 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala @@ -21,24 +21,44 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.RichFilterFunction import org.apache.flink.configuration.Configuration -import scala.collection.mutable import scala.collection.JavaConverters._ -class LimitFilterFunction[T](limitStart: Int, - limitEnd: Int, - broadcast: String) extends RichFilterFunction[T] { - var elementCount = 0 - var countList = mutable.Buffer[Int]() + +class LimitFilterFunction[T]( + limitStart: Long, + limitEnd: Long, + broadcastName: String) + extends RichFilterFunction[T] { + + var partitionIndex: Int = _ + var elementCount: Long = _ + var countList: Array[Long] = _ override def open(config: Configuration) { - countList = getRuntimeContext.getBroadcastVariable[(Int, Int)](broadcast).asScala - .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _) + partitionIndex = getRuntimeContext.getIndexOfThisSubtask + + val countPartitionResult = getRuntimeContext + .getBroadcastVariable[(Int, Long)](broadcastName) + .asScala + + // sort by partition index, extract number per partition, sum with intermediate results + countList = countPartitionResult.sortWith(_._1 < _._1).map(_._2).scanLeft(0L) { case (a, b) => + val sum = a + b + if (sum < 0L) { // prevent overflow + Long.MaxValue + } + sum + }.toArray + + elementCount = 0 } override def filter(value: T): Boolean = { - val partitionIndex = getRuntimeContext.getIndexOfThisSubtask - elementCount += 1 + if (elementCount != Long.MaxValue) { // prevent overflow + elementCount += 1L + } + // we filter out records that are not within the limit (Long.MaxValue is unlimited) limitStart - countList(partitionIndex) < elementCount && - limitEnd - countList(partitionIndex) >= elementCount + (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= elementCount) } } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index c9fd78c..bfabd32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -563,7 +563,7 @@ class Table( * Example: * * {{{ - * tab.orderBy("name DESC") + * tab.orderBy("name.desc") * }}} */ def orderBy(fields: String): Table = { @@ -572,45 +572,39 @@ class Table( } /** - * LIMIT is called an argument since it is technically part of the ORDER BY clause. - * The statement is used to retrieve records from table and limit the number of records - * returned based on a limit value. - * - * Example: - * - * {{{ - * tab.orderBy('name.desc).limit(3) - * }}} - * - * @param offset The number of rows to skip before including them in the result. - */ + * Limits a sorted result from an offset position. + * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and + * thus must be preceded by it. + * + * Example: + * + * {{{ + * // returns unlimited number of records beginning with the 4th record + * tab.orderBy('name.desc).limit(3) + * }}} + * + * @param offset number of records to skip + */ def limit(offset: Int): Table = { - if (offset < 0) { - throw new ValidationException("Offset should be greater than or equal to zero.") - } - new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv)) - } - - /** - * LIMIT is called an argument since it is technically part of the ORDER BY clause. - * The statement is used to retrieve records from table and limit the number of records - * returned based on a limit value. - * - * Example: - * - * {{{ - * tab.orderBy('name.desc).limit(3, 5) - * }}} - * - * @param offset The number of rows to skip before including them in the result. - * @param fetch The number of records returned. - */ + new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv)) + } + + /** + * Limits a sorted result to a specified number of records from an offset position. + * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and + * thus must be preceded by it. + * + * Example: + * + * {{{ + * // returns 5 records beginning with the 4th record + * tab.orderBy('name.desc).limit(3, 5) + * }}} + * + * @param offset number of records to skip + * @param fetch number of records to be returned + */ def limit(offset: Int, fetch: Int): Table = { - if (offset < 0 || fetch < 1) { - throw new ValidationException( - "Offset should be greater than or equal to zero and" + - " fetch should be greater than or equal to one.") - } new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv)) } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala index 7c18e14..f345984 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala._ -import org.apache.flink.api.table.{TableException, Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, TableException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -102,4 +102,38 @@ class SortITCase( TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } + @Test + def testOrderByLimit(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5" + + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) + + val expected = sortExpectedly(tupleDataSetStrings, 0, 5) + val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test(expected = classOf[TableException]) + def testLimitWithoutOrder(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val sqlQuery = "SELECT * FROM MyTable LIMIT 5" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala index c4a5a74..d4a1d8d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala @@ -18,13 +18,13 @@ package org.apache.flink.api.scala.batch.table +import org.apache.flink.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.api.scala.batch.utils.SortTestUtils._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.{ExecutionEnvironment, _} -import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -135,4 +135,33 @@ class SortITCase( TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) } + @Test + def testOrderByFetch(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5) + implicit def rowOrdering[T <: Product] = Ordering.by((x : T) => + x.productElement(0).asInstanceOf[Int]) + + val expected = sortExpectedly(tupleDataSetStrings, 0, 5) + val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + + val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ ++ _) + + TestBaseUtils.compareOrderedResultAsText(result.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testFetchWithoutOrder(): Unit = { + val env = getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val ds = CollectionDataSets.get3TupleDataSet(env) + val t = ds.toTable(tEnv).limit(0, 5) + + t.toDataSet[Row].collect() + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index 89b0fdc..8ce1472 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -109,4 +109,12 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) t1.minusAll(t2) } + + @Test(expected = classOf[ValidationException]) + def testLimit(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.limit(0,5) + } }