[FLINK-3940] [table] Add support for ORDER BY OFFSET FETCH

This closes #2282.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0472cb9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0472cb9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0472cb9b

Branch: refs/heads/master
Commit: 0472cb9ba44de7223fffbc0eca0232d558730772
Parents: dc50625
Author: gallenvara <gallenv...@126.com>
Authored: Fri Jul 22 11:39:46 2016 +0800
Committer: twalthr <twal...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 17 +++++++-
 .../api/table/plan/logical/operators.scala      | 23 +++++++++-
 .../table/plan/nodes/dataset/DataSetSort.scala  | 33 ++++++++++++---
 .../plan/rules/dataSet/DataSetSortRule.scala    | 25 ++---------
 .../table/runtime/CountPartitionFunction.scala  | 38 +++++++++++++++++
 .../api/table/runtime/LimitFilterFunction.scala | 44 ++++++++++++++++++++
 .../org/apache/flink/api/table/table.scala      | 43 +++++++++++++++++++
 .../flink/api/scala/batch/sql/SortITCase.scala  | 34 +++++++++++----
 .../api/scala/batch/table/SortITCase.scala      | 36 ++++++++++++++++
 .../api/scala/batch/utils/SortTestUtils.scala   | 13 +++++-
 10 files changed, 267 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index cb56656..6793fde 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -687,6 +687,22 @@ Table result = in.orderBy("a.asc");
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Limit</strong></td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Returns specified number of rows 
from offset position. It is technically part of the ORDER BY clause.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3);
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5);
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -1009,7 +1025,6 @@ Among others, the following SQL features are not 
supported, yet:
 - Timestamps are limited to milliseconds precision
 - Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
 - Non-equi joins and Cartesian products
-- Result selection by order position (`ORDER BY OFFSET FETCH`)
 - Grouping sets
 
 *Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. In some cases the table order must be manually tweaked to 
resolve Cartesian products.*

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index ad8618c..0d4cf2c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
+import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -150,6 +150,25 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) 
extends UnaryNode {
   }
 }
 
+case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends 
UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    child.construct(relBuilder)
+    relBuilder.limit(offset, fetch)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Limit on stream tables is currently not 
supported.")
+    }
+    if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+      throw new TableException(s"Limit operator must follow behind orderBy 
clause.")
+    }
+    super.validate(tableEnv)
+  }
+}
+
 case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode 
{
   override def output: Seq[Attribute] = child.output
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 1af03d8..ef3005c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -24,10 +24,12 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.runtime.{LimitFilterFunction, 
CountPartitionFunction}
 import org.apache.flink.api.table.typeutils.TypeConverter._
 
 import scala.collection.JavaConverters._
@@ -37,7 +39,9 @@ class DataSetSort(
     traitSet: RelTraitSet,
     inp: RelNode,
     collations: RelCollation,
-    rowType2: RelDataType)
+    rowType2: RelDataType,
+    offset: RexNode,             
+    fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
   with DataSetRel{
 
@@ -47,7 +51,9 @@ class DataSetSort(
       traitSet,
       inputs.get(0),
       collations,
-      rowType2
+      rowType2,
+      offset,
+      fetch
     )
   }
 
@@ -71,11 +77,28 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
     }
 
+    val limitedDS = if (offset == null && fetch == null) {
+      partitionedDs
+    } else {
+      val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
+      val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + 
limitStart else Int.MaxValue
+
+      val countFunction = new CountPartitionFunction[Any]
+      val partitionCount = partitionedDs.mapPartition(countFunction)
+
+      val limitFunction = new LimitFilterFunction[Any](
+        limitStart,
+        limitEnd,
+        "countPartition")
+      partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, 
"countPartition")
+    }
+
+
     val inputType = partitionedDs.getType
     expectedType match {
 
       case None if config.getEfficientTypeUsage =>
-        partitionedDs
+        limitedDS
 
       case _ =>
         val determinedType = determineReturnType(
@@ -96,11 +119,11 @@ class DataSetSort(
             getRowType.getFieldNames.asScala
           )
 
-          partitionedDs.map(mapFunc)
+          limitedDS.map(mapFunc)
         }
         // no conversion necessary, forward
         else {
-          partitionedDs
+          limitedDS
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
index b26d1de..5c1fb53 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
@@ -18,12 +18,10 @@
 
 package org.apache.flink.api.table.plan.rules.dataSet
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.logical.{LogicalJoin, LogicalSort}
-import org.apache.flink.api.table.TableException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSort}
 
 class DataSetSortRule
@@ -33,23 +31,6 @@ class DataSetSortRule
     DataSetConvention.INSTANCE,
     "DataSetSortRule") {
 
-  /**
-    * Only translate when no OFFSET or LIMIT specified
-    */
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val sort = call.rel(0).asInstanceOf[LogicalSort]
-
-    if (sort.offset != null) {
-      throw new TableException("ORDER BY OFFSET is currently not supported.")
-    }
-
-    if (sort.fetch != null) {
-      throw new TableException("ORDER BY FETCH is currently not supported.")
-    }
-
-    sort.offset == null && sort.fetch == null
-  }
-
   override def convert(rel: RelNode): RelNode = {
 
     val sort: LogicalSort = rel.asInstanceOf[LogicalSort]
@@ -61,7 +42,9 @@ class DataSetSortRule
       traitSet,
       convInput,
       sort.getCollation,
-      rel.getRowType
+      rel.getRowType,
+      sort.offset,
+      sort.fetch
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
new file mode 100644
index 0000000..79b8623
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.util.Collector
+
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, 
Int)] {
+  var elementCount = 0
+
+  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): 
Unit = {
+    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    val iterator = value.iterator()
+    while (iterator.hasNext) {
+      elementCount += 1
+      iterator.next()
+    }
+    out.collect(partitionIndex, elementCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
new file mode 100644
index 0000000..311b616
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.configuration.Configuration
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+class LimitFilterFunction[T](limitStart: Int,
+                             limitEnd: Int,
+                             broadcast: String) extends RichFilterFunction[T] {
+  var elementCount = 0
+  var countList = mutable.Buffer[Int]()
+
+  override def open(config: Configuration) {
+    countList = getRuntimeContext.getBroadcastVariable[(Int, 
Int)](broadcast).asScala
+      .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+  }
+
+  override def filter(value: T): Boolean = {
+    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    elementCount += 1
+    limitStart - countList(partitionIndex) < elementCount &&
+      limitEnd - countList(partitionIndex) >= elementCount
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index cbb9a07..c9fd78c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -572,6 +572,49 @@ class Table(
   }
 
   /**
+   * LIMIT is called an argument since it is technically part of the ORDER BY 
clause.
+   * The statement is used to retrieve records from table and limit the number 
of records 
+   * returned based on a limit value.
+   *
+   * Example:
+   *
+   * {{{
+   *   tab.orderBy('name.desc).limit(3)
+   * }}}
+   * 
+   * @param offset  The number of rows to skip before including them in the 
result.
+   */
+  def limit(offset: Int): Table = {
+    if (offset < 0) {
+      throw new ValidationException("Offset should be greater than or equal to 
zero.")
+    }
+    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
+  }
+
+  /**
+   * LIMIT is called an argument since it is technically part of the ORDER BY 
clause.
+   * The statement is used to retrieve records from table and limit the number 
of records 
+   * returned based on a limit value.
+   *
+   * Example:
+   *
+   * {{{
+   *   tab.orderBy('name.desc).limit(3, 5)
+   * }}}
+   *
+   * @param offset The number of rows to skip before including them in the 
result.
+   * @param fetch The number of records returned.
+   */
+  def limit(offset: Int, fetch: Int): Table = {
+    if (offset < 0 || fetch < 1) {
+      throw new ValidationException(
+        "Offset should be greater than or equal to zero and" +
+          " fetch should be greater than or equal to one.")
+    }
+    new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
+  }
+
+  /**
     * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an 
external storage location.
     *
     * A batch [[Table]] can only be written to a

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 858f75a..7c18e14 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -60,28 +60,46 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[TableException])
-  def testOrderByOffset(): Unit = {
+  @Test
+  def testOrderByWithOffset(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS"
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC OFFSET 2 ROWS"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      -x.productElement(0).asInstanceOf[Int])
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
-    tEnv.sql(sqlQuery).toDataSet[Row]
+
+    val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[TableException])
-  def testOrderByFirst(): Unit = {
+  @Test
+  def testOrderByWithOffsetAndFetch(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 FETCH NEXT 2 ROWS ONLY"
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 OFFSET 2 ROWS FETCH NEXT 
5 ROWS ONLY"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
-    tEnv.sql(sqlQuery).toDataSet[Row]
+
+    val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index 235fc45..c4a5a74 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -99,4 +99,40 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByOffset(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
+    val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testOrderByOffsetAndFetch(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3, 5)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
+    val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0472cb9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
index 07765fd..8d1f653 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
@@ -42,8 +42,17 @@ object SortTestUtils {
     ,(20, 6L, "Comment#14")
     ,(21, 6L, "Comment#15"))
 
-  def sortExpectedly(dataSet: List[Product])(implicit ordering: 
Ordering[Product]): String = {
-    dataSet.sorted(ordering).mkString("\n").replaceAll("[\\(\\)]", "")
+  def sortExpectedly(dataSet: List[Product])
+                    (implicit ordering: Ordering[Product]): String = 
+    sortExpectedly(dataSet, 0, dataSet.length)
+
+  def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
+                    (implicit ordering: Ordering[Product]): String = {
+    dataSet
+      .sorted(ordering)
+      .slice(start, end)
+      .mkString("\n")
+      .replaceAll("[\\(\\)]", "")
   }
 
 }

Reply via email to