Repository: flink
Updated Branches:
  refs/heads/master dc5062557 -> bdd7a114d


[FLINK-3940] [table] Additional improvements

- Improve overflow handling (support for more records than Int.MAX)
- SQL LIMIT support
- Bug fixing and improved docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd7a114
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd7a114
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd7a114

Branch: refs/heads/master
Commit: bdd7a114d9411e2bda51ad296061c5fca742dc8b
Parents: 0472cb9
Author: twalthr <twal...@apache.org>
Authored: Wed Aug 10 23:49:27 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Thu Aug 11 10:27:11 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 33 ++++++--
 .../api/table/plan/logical/operators.scala      | 45 +++++-----
 .../table/plan/nodes/dataset/DataSetSort.scala  | 86 ++++++++++++++------
 .../table/runtime/CountPartitionFunction.scala  | 10 ++-
 .../api/table/runtime/LimitFilterFunction.scala | 42 +++++++---
 .../org/apache/flink/api/table/table.scala      | 70 ++++++++--------
 .../flink/api/scala/batch/sql/SortITCase.scala  | 36 +++++++-
 .../api/scala/batch/table/SortITCase.scala      | 33 +++++++-
 .../scala/stream/table/UnsupportedOpsTest.scala |  8 ++
 9 files changed, 254 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 6793fde..57252d9 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -668,7 +668,7 @@ Table result = left.minusAll(right);
     <tr>
       <td><strong>Distinct</strong></td>
       <td>
-        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
+        <p>Similar to a SQL DISTINCT clause. Returns records with distinct 
value combinations.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.distinct();
@@ -679,7 +679,7 @@ Table result = in.distinct();
     <tr>
       <td><strong>Order By</strong></td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted 
across all parallel partitions.</p>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted 
across all parallel partitions.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
 Table result = in.orderBy("a.asc");
@@ -690,15 +690,15 @@ Table result = in.orderBy("a.asc");
     <tr>
       <td><strong>Limit</strong></td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Returns specified number of rows 
from offset position. It is technically part of the ORDER BY clause.</p>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.</p>
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3);
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of 
records beginning with the 4th record 
 {% endhighlight %}
 or
 {% highlight java %}
 Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5);
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning 
with the 4th record 
 {% endhighlight %}
       </td>
     </tr>
@@ -890,7 +890,7 @@ val result = left.minusAll(right);
     <tr>
       <td><strong>Distinct</strong></td>
       <td>
-        <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
+        <p>Similar to a SQL DISTINCT clause. Returns records with distinct 
value combinations.</p>
 {% highlight scala %}
 val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.distinct();
@@ -901,7 +901,7 @@ val result = in.distinct();
     <tr>
       <td><strong>Order By</strong></td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns rows globally sorted 
across all parallel partitions.</p>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted 
across all parallel partitions.</p>
 {% highlight scala %}
 val in = ds.toTable(tableEnv, 'a, 'b, 'c);
 val result = in.orderBy('a.asc);
@@ -909,6 +909,22 @@ val result = in.orderBy('a.asc);
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Limit</strong></td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a 
specified number of records from an offset position. Limit is technically part 
of the Order By operator and thus must be preceded by it.</p>
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3); // returns unlimited number of 
records beginning with the 4th record 
+{% endhighlight %}
+or
+{% highlight scala %}
+val in = ds.toTable(tableEnv, 'a, 'b, 'c);
+val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning 
with the 4th record 
+{% endhighlight %}
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -1087,6 +1103,9 @@ query:
       | query INTERSECT query
     }
     [ ORDER BY orderItem [, orderItem ]* ]
+    [ LIMIT { count | ALL } ]
+    [ OFFSET start { ROW | ROWS } ]
+    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
 
 orderItem:
   expression [ ASC | DESC ]

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 0d4cf2c..79c3cb1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -19,8 +19,8 @@ package org.apache.flink.api.table.plan.logical
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.{LogicalSort, LogicalProject}
-import org.apache.calcite.rex.{RexLiteral, RexInputRef, RexNode}
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.{RexInputRef, RexNode}
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -40,9 +40,9 @@ case class Project(projectList: Seq[NamedExpression], child: 
LogicalNode) extend
     val newProjectList =
       afterResolve.projectList.zipWithIndex.map { case (e, i) =>
         e match {
-          case u @ UnresolvedAlias(child) => child match {
+          case u @ UnresolvedAlias(c) => c match {
             case ne: NamedExpression => ne
-            case e if !e.valid => u
+            case expr if !expr.valid => u
             case c @ Cast(ne: NamedExpression, tp) => Alias(c, 
s"${ne.name}-$tp")
             case other => Alias(other, s"_c$i")
           }
@@ -62,14 +62,14 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
         case n: Alias =>
           // explicit name
           if (names.contains(n.name)) {
-            throw new ValidationException(s"Duplicate field name $n.name.")
+            throw ValidationException(s"Duplicate field name $n.name.")
           } else {
             names.add(n.name)
           }
         case r: ResolvedFieldReference =>
           // simple field forwarding
           if (names.contains(r.name)) {
-            throw new ValidationException(s"Duplicate field name $r.name.")
+            throw ValidationException(s"Duplicate field name $r.name.")
           } else {
             names.add(r.name)
           }
@@ -98,10 +98,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
 
 case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends 
UnaryNode {
   override def output: Seq[Attribute] =
-    throw new UnresolvedException("Invalid call to output on AliasNode")
+    throw UnresolvedException("Invalid call to output on AliasNode")
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
-    throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+    throw UnresolvedException("Invalid call to toRelNode on AliasNode")
 
   override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
     if (aliasList.length > child.output.length) {
@@ -150,7 +150,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) 
extends UnaryNode {
   }
 }
 
-case class Limit(offset: Int, fetch: Int, child: LogicalNode) extends 
UnaryNode {
+case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends 
UnaryNode {
   override def output: Seq[Attribute] = child.output
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
@@ -160,10 +160,13 @@ case class Limit(offset: Int, fetch: Int, child: 
LogicalNode) extends UnaryNode
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Limit on stream tables is currently not 
supported.")
+      failValidation(s"Limit on stream tables is currently not supported.")
     }
     if (!child.validate(tableEnv).isInstanceOf[Sort]) {
-      throw new TableException(s"Limit operator must follow behind orderBy 
clause.")
+      failValidation(s"Limit operator must be preceded by an OrderBy 
operator.")
+    }
+    if (offset < 0) {
+      failValidation(s"Offset should be greater than or equal to zero.")
     }
     super.validate(tableEnv)
   }
@@ -193,11 +196,9 @@ case class Aggregate(
     child: LogicalNode) extends UnaryNode {
 
   override def output: Seq[Attribute] = {
-    (groupingExpressions ++ aggregateExpressions) map { agg =>
-      agg match {
-        case ne: NamedExpression => ne.toAttribute
-        case e => Alias(e, e.toString).toAttribute
-      }
+    (groupingExpressions ++ aggregateExpressions) map {
+      case ne: NamedExpression => ne.toAttribute
+      case e => Alias(e, e.toString).toAttribute
     }
   }
 
@@ -205,11 +206,9 @@ case class Aggregate(
     child.construct(relBuilder)
     relBuilder.aggregate(
       
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
-      aggregateExpressions.map { e =>
-        e match {
-          case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
-          case _ => throw new RuntimeException("This should never happen.")
-        }
+      aggregateExpressions.map {
+        case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
+        case _ => throw new RuntimeException("This should never happen.")
       }.asJava)
   }
 
@@ -403,7 +402,7 @@ case class Join(
         right)
     }
     val resolvedCondition = 
node.condition.map(_.postOrderTransform(partialFunction))
-    new Join(node.left, node.right, node.joinType, resolvedCondition)
+    Join(node.left, node.right, node.joinType, resolvedCondition)
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
@@ -429,7 +428,7 @@ case class Join(
       failValidation(s"join relations with ambiguous names: 
${ambiguousName.mkString(", ")}")
     }
 
-    resolvedJoin.condition.foreach(testJoinCondition(_))
+    resolvedJoin.condition.foreach(testJoinCondition)
     resolvedJoin
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index ef3005c..22930e7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -28,8 +28,8 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.{LimitFilterFunction, 
CountPartitionFunction}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
+import org.apache.flink.api.table.runtime.{CountPartitionFunction, 
LimitFilterFunction}
 import org.apache.flink.api.table.typeutils.TypeConverter._
 
 import scala.collection.JavaConverters._
@@ -40,12 +40,24 @@ class DataSetSort(
     inp: RelNode,
     collations: RelCollation,
     rowType2: RelDataType,
-    offset: RexNode,             
+    offset: RexNode,
     fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
-  with DataSetRel{
+  with DataSetRel {
 
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+  private val limitStart: Long = if (offset != null) {
+    RexLiteral.intValue(offset)
+  } else {
+    0L
+  }
+
+  private val limitEnd: Long = if (fetch != null) {
+    RexLiteral.intValue(fetch) + limitStart
+  } else {
+    Long.MaxValue
+  }
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new DataSetSort(
       cluster,
       traitSet,
@@ -58,18 +70,24 @@ class DataSetSort(
   }
 
   override def translateToPlan(
-              tableEnv: BatchTableEnvironment,
-              expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] 
= {
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]] = None)
+    : DataSet[Any] = {
+
+    if (fieldCollations.isEmpty) {
+      throw TableException("Limiting the result without sorting is not allowed 
" +
+        "as it could lead to arbitrary results.")
+    }
 
     val config = tableEnv.getConfig
 
-    val inputDS = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
-    val currentParallelism = inputDS.getExecutionEnvironment.getParallelism
+    val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
     var partitionedDs = if (currentParallelism == 1) {
-      inputDS
+      inputDs
     } else {
-      inputDS.partitionByRange(fieldCollations.map(_._1): _*)
+      inputDs.partitionByRange(fieldCollations.map(_._1): _*)
         .withOrders(fieldCollations.map(_._2): _*)
     }
 
@@ -77,28 +95,37 @@ class DataSetSort(
       partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
     }
 
-    val limitedDS = if (offset == null && fetch == null) {
+    val limitedDs = if (offset == null && fetch == null) {
       partitionedDs
     } else {
-      val limitStart = if (offset != null) RexLiteral.intValue(offset) else 0
-      val limitEnd = if (fetch != null) RexLiteral.intValue(fetch) + 
limitStart else Int.MaxValue
-
       val countFunction = new CountPartitionFunction[Any]
-      val partitionCount = partitionedDs.mapPartition(countFunction)
+
+      val partitionCountName = s"prepare offset/fetch"
+
+      val partitionCount = partitionedDs
+        .mapPartition(countFunction)
+        .name(partitionCountName)
+
+      val broadcastName = "countPartition"
 
       val limitFunction = new LimitFilterFunction[Any](
         limitStart,
         limitEnd,
-        "countPartition")
-      partitionedDs.filter(limitFunction).withBroadcastSet(partitionCount, 
"countPartition")
-    }
+        broadcastName)
+
+      val limitName = s"offset: $offsetToString, fetch: $fetchToString"
 
+      partitionedDs
+        .filter(limitFunction)
+        .name(limitName)
+        .withBroadcastSet(partitionCount, broadcastName)
+    }
 
     val inputType = partitionedDs.getType
     expectedType match {
 
       case None if config.getEfficientTypeUsage =>
-        limitedDS
+        limitedDs
 
       case _ =>
         val determinedType = determineReturnType(
@@ -119,11 +146,13 @@ class DataSetSort(
             getRowType.getFieldNames.asScala
           )
 
-          limitedDS.map(mapFunc)
+          val opName = s"convert: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          limitedDs.map(mapFunc).name(opName)
         }
         // no conversion necessary, forward
         else {
-          limitedDS
+          limitedDs
         }
     }
   }
@@ -143,10 +172,21 @@ class DataSetSort(
   private val sortFieldsToString = fieldCollations
     .map(col => s"${rowType2.getFieldNames.get(col._1)} 
${col._2.getShortName}" ).mkString(", ")
 
-  override def toString: String = s"Sort(by: $sortFieldsToString)"
+  private val offsetToString = s"$offset"
+
+  private val fetchToString = if (limitEnd == Long.MaxValue) {
+    "unlimited"
+  } else {
+    s"$limitEnd"
+  }
+
+  override def toString: String =
+    s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: 
$fetchToString)"
 
   override def explainTerms(pw: RelWriter) : RelWriter = {
     super.explainTerms(pw)
       .item("orderBy", sortFieldsToString)
+      .item("offset", offsetToString)
+      .item("fetch", fetchToString)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
index 79b8623..5896f4c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/CountPartitionFunction.scala
@@ -23,14 +23,16 @@ import java.lang.Iterable
 import org.apache.flink.api.common.functions.RichMapPartitionFunction
 import org.apache.flink.util.Collector
 
-class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, 
Int)] {
-  var elementCount = 0
+class CountPartitionFunction[IN] extends RichMapPartitionFunction[IN, (Int, 
Long)] {
 
-  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Int)]): 
Unit = {
+  override def mapPartition(value: Iterable[IN], out: Collector[(Int, Long)]): 
Unit = {
     val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+    var elementCount = 0L
     val iterator = value.iterator()
     while (iterator.hasNext) {
-      elementCount += 1
+      if (elementCount != Long.MaxValue) { // prevent overflow
+        elementCount += 1L
+      }
       iterator.next()
     }
     out.collect(partitionIndex, elementCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
index 311b616..5ec9035 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/LimitFilterFunction.scala
@@ -21,24 +21,44 @@ package org.apache.flink.api.table.runtime
 import org.apache.flink.api.common.functions.RichFilterFunction
 import org.apache.flink.configuration.Configuration
 
-import scala.collection.mutable
 import scala.collection.JavaConverters._
 
-class LimitFilterFunction[T](limitStart: Int,
-                             limitEnd: Int,
-                             broadcast: String) extends RichFilterFunction[T] {
-  var elementCount = 0
-  var countList = mutable.Buffer[Int]()
+
+class LimitFilterFunction[T](
+    limitStart: Long,
+    limitEnd: Long,
+    broadcastName: String)
+  extends RichFilterFunction[T] {
+
+  var partitionIndex: Int = _
+  var elementCount: Long = _
+  var countList: Array[Long] = _
 
   override def open(config: Configuration) {
-    countList = getRuntimeContext.getBroadcastVariable[(Int, 
Int)](broadcast).asScala
-      .sortWith(_._1 < _._1).map(_._2).scanLeft(0) (_ + _)
+    partitionIndex = getRuntimeContext.getIndexOfThisSubtask
+
+    val countPartitionResult = getRuntimeContext
+      .getBroadcastVariable[(Int, Long)](broadcastName)
+      .asScala
+
+    // sort by partition index, extract number per partition, sum with 
intermediate results
+    countList = countPartitionResult.sortWith(_._1 < 
_._1).map(_._2).scanLeft(0L) { case (a, b) =>
+        val sum = a + b
+        if (sum < 0L) { // prevent overflow
+          Long.MaxValue
+        }
+        sum
+    }.toArray
+
+    elementCount = 0
   }
 
   override def filter(value: T): Boolean = {
-    val partitionIndex = getRuntimeContext.getIndexOfThisSubtask
-    elementCount += 1
+    if (elementCount != Long.MaxValue) { // prevent overflow
+      elementCount += 1L
+    }
+    // we filter out records that are not within the limit (Long.MaxValue is 
unlimited)
     limitStart - countList(partitionIndex) < elementCount &&
-      limitEnd - countList(partitionIndex) >= elementCount
+      (limitEnd == Long.MaxValue || limitEnd - countList(partitionIndex) >= 
elementCount)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index c9fd78c..bfabd32 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -563,7 +563,7 @@ class Table(
     * Example:
     *
     * {{{
-    *   tab.orderBy("name DESC")
+    *   tab.orderBy("name.desc")
     * }}}
     */
   def orderBy(fields: String): Table = {
@@ -572,45 +572,39 @@ class Table(
   }
 
   /**
-   * LIMIT is called an argument since it is technically part of the ORDER BY 
clause.
-   * The statement is used to retrieve records from table and limit the number 
of records 
-   * returned based on a limit value.
-   *
-   * Example:
-   *
-   * {{{
-   *   tab.orderBy('name.desc).limit(3)
-   * }}}
-   * 
-   * @param offset  The number of rows to skip before including them in the 
result.
-   */
+    * Limits a sorted result from an offset position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By 
operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns unlimited number of records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3)
+    * }}}
+    *
+    * @param offset number of records to skip
+    */
   def limit(offset: Int): Table = {
-    if (offset < 0) {
-      throw new ValidationException("Offset should be greater than or equal to 
zero.")
-    }
-    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
-  }
-
-  /**
-   * LIMIT is called an argument since it is technically part of the ORDER BY 
clause.
-   * The statement is used to retrieve records from table and limit the number 
of records 
-   * returned based on a limit value.
-   *
-   * Example:
-   *
-   * {{{
-   *   tab.orderBy('name.desc).limit(3, 5)
-   * }}}
-   *
-   * @param offset The number of rows to skip before including them in the 
result.
-   * @param fetch The number of records returned.
-   */
+    new Table(tableEnv, Limit(offset = offset, child = 
logicalPlan).validate(tableEnv))
+  }
+
+  /**
+    * Limits a sorted result to a specified number of records from an offset 
position.
+    * Similar to a SQL LIMIT clause. Limit is technically part of the Order By 
operator and
+    * thus must be preceded by it.
+    *
+    * Example:
+    *
+    * {{{
+    *   // returns 5 records beginning with the 4th record
+    *   tab.orderBy('name.desc).limit(3, 5)
+    * }}}
+    *
+    * @param offset number of records to skip
+    * @param fetch number of records to be returned
+    */
   def limit(offset: Int, fetch: Int): Table = {
-    if (offset < 0 || fetch < 1) {
-      throw new ValidationException(
-        "Offset should be greater than or equal to zero and" +
-          " fetch should be greater than or equal to one.")
-    }
     new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
index 7c18e14..f345984 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SortITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala._
-import org.apache.flink.api.table.{TableException, Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -102,4 +102,38 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByLimit(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 LIMIT 5"
+
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLimitWithoutOrder(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds)
+
+    tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
index c4a5a74..d4a1d8d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.scala.batch.table
 
+import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.batch.utils.SortTestUtils._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -135,4 +135,33 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
+  @Test
+  def testOrderByFetch(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
+    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
+      x.productElement(0).asInstanceOf[Int])
+
+    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
+    val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+    val result = results.filterNot(_.isEmpty).sortBy(p => p.head).reduceLeft(_ 
++ _)
+
+    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchWithoutOrder(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).limit(0, 5)
+
+    t.toDataSet[Row].collect()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdd7a114/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index 89b0fdc..8ce1472 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -109,4 +109,12 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
     t1.minusAll(t2)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testLimit(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.limit(0,5)
+  }
 }

Reply via email to