Repository: flink
Updated Branches:
  refs/heads/master 5df0b2759 -> 975339395


[FLINK-3943] [table] Add support for EXCEPT operator

This closes #2169.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97533939
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97533939
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97533939

Branch: refs/heads/master
Commit: 97533939596c90fab69e2b4000846fc588f2a07b
Parents: 5df0b27
Author: Ivan Mushketyk <ivan.mushke...@gmail.com>
Authored: Sun Jun 26 15:20:25 2016 +0100
Committer: twalthr <twal...@apache.org>
Committed: Mon Jul 11 13:57:49 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |  51 ++++++-
 .../api/table/plan/logical/operators.scala      |  31 +++++
 .../plan/nodes/dataset/DataSetIntersect.scala   |   6 +-
 .../table/plan/nodes/dataset/DataSetMinus.scala | 134 +++++++++++++++++++
 .../api/table/plan/rules/FlinkRuleSets.scala    |   1 +
 .../rules/dataSet/DataSetIntersectRule.scala    |   3 +-
 .../plan/rules/dataSet/DataSetMinusRule.scala   |  56 ++++++++
 .../runtime/IntersectCoGroupFunction.scala      |   1 -
 .../table/runtime/MinusCoGroupFunction.scala    |  47 +++++++
 .../org/apache/flink/api/table/table.scala      |  50 +++++++
 .../scala/batch/sql/SetOperatorsITCase.scala    |  67 ++++++++++
 .../scala/batch/table/SetOperatorsITCase.scala  |  77 +++++++++++
 .../scala/stream/table/UnsupportedOpsTest.scala |  11 +-
 .../api/scala/util/CollectionDataSets.scala     |   4 +-
 14 files changed, 526 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index afddee9..817a84a 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -561,6 +561,30 @@ Table result = left.intersectAll(right);
     </tr>
 
     <tr>
+      <td><strong>Minus</strong></td>
+      <td>
+        <p>Similar to a SQL EXCEPT clause. Minus returns records from the left 
table that do not exist in the right table. Duplicate records in the left table 
are returned exactly once, i.e., duplicates are removed. Both tables must have 
identical field types.</p>
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "a, b, c");
+Table result = left.minus(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MinusAll</strong></td>
+      <td>
+        <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records 
that do not exist in the right table. A record that is present n times in the 
left table and m times in the right table is returned (n - m) times, i.e., as 
many duplicates as are present in the right table are removed. Both tables must 
have identical field types.</p>
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "a, b, c");
+Table result = left.minusAll(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
       <td><strong>Distinct</strong></td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
@@ -731,7 +755,7 @@ val result = left.intersect(right);
       </td>
     </tr>
 
-       <tr>
+    <tr>
       <td><strong>IntersectAll</strong></td>
       <td>
         <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records 
that exist in both tables. If a record is present in both tables more than 
once, it is returned as many times as it is present in both tables, i.e., the 
resulting table might have duplicate records. Both tables must have identical 
field types.</p>
@@ -744,6 +768,30 @@ val result = left.intersectAll(right);
     </tr>
 
     <tr>
+      <td><strong>Minus</strong></td>
+      <td>
+        <p>Similar to a SQL EXCEPT clause. Minus returns records from the left 
table that do not exist in the right table. Duplicate records in the left table 
are returned exactly once, i.e., duplicates are removed. Both tables must have 
identical field types.</p>
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
+val result = left.minus(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MinusAll</strong></td>
+      <td>
+        <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records 
that do not exist in the right table. A record that is present n times in the 
left table and m times in the right table is returned (n - m) times, i.e., as 
many duplicates as are present in the right table are removed. Both tables must 
have identical field types.</p>
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
+val result = left.minusAll(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
       <td><strong>Distinct</strong></td>
       <td>
         <p>Similar to a SQL DISTINCT clause. Returns rows with distinct value 
combinations.</p>
@@ -884,7 +932,6 @@ Among others, the following SQL features are not supported, 
yet:
 - Non-equi joins and Cartesian products
 - Result selection by order position (`ORDER BY OFFSET FETCH`)
 - Grouping sets
-- `EXCEPT` set operation
 
 *Note: Tables are joined in the order in which they are specified in the 
`FROM` clause. In some cases the table order must be manually tweaked to 
resolve Cartesian products.*
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 028983b..70d7724 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -236,6 +236,37 @@ case class Aggregate(
   }
 }
 
+case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends 
BinaryNode {
+  override def output: Seq[Attribute] = left.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+    left.construct(relBuilder)
+    right.construct(relBuilder)
+    relBuilder.minus(all)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      failValidation(s"Minus on stream tables is currently not supported.")
+    }
+
+    val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
+    if (left.output.length != right.output.length) {
+      failValidation(s"Minus two table of different column sizes:" +
+        s" ${left.output.size} and ${right.output.size}")
+    }
+    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
+      l.resultType == r.resultType
+    }
+    if (!sameSchema) {
+      failValidation(s"Minus two table of different schema:" +
+        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] 
and" +
+        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
+    }
+    resolvedMinus
+  }
+}
+
 case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends 
BinaryNode {
   override def output: Seq[Attribute] = left.output
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
index 3d88f6b..042c28b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -41,8 +41,7 @@ class DataSetIntersect(
     left: RelNode,
     right: RelNode,
     rowType: RelDataType,
-    all: Boolean,
-    ruleDescription: String)
+    all: Boolean)
   extends BiRel(cluster, traitSet, left, right)
     with DataSetRel {
 
@@ -55,8 +54,7 @@ class DataSetIntersect(
       inputs.get(0),
       inputs.get(1),
       rowType,
-      all,
-      ruleDescription
+      all
     )
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
new file mode 100644
index 0000000..d3a2fe7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.BatchTableEnvironment
+import org.apache.flink.api.table.runtime.MinusCoGroupFunction
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which implements set minus operation.
+  *
+  */
+class DataSetMinus(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    left: RelNode,
+    right: RelNode,
+    rowType: RelDataType,
+    all: Boolean)
+  extends BiRel(cluster, traitSet, left, right)
+    with DataSetRel {
+
+  override def deriveRowType() = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+    new DataSetMinus(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      inputs.get(1),
+      rowType,
+      all
+    )
+  }
+
+  override def toString: String = {
+    s"Minus(minus: ($minusSelectionToString}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw).item("minus", minusSelectionToString)
+  }
+
+  override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+    val children = this.getInputs
+    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, 
child) =>
+      val rowCnt = metadata.getRowCount(child)
+      val rowSize = this.estimateRowSize(child.getRowType)
+      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
rowSize))
+    }
+  }
+
+  override def translateToPlan(
+      tableEnv: BatchTableEnvironment,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+    val leftDataSet: DataSet[Any] = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val rightDataSet: DataSet[Any] = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+    val coGroupOpName = s"minus: ($minusSelectionToString)"
+    val coGroupFunction = new MinusCoGroupFunction[Any](all)
+
+    val minusDs = coGroupedDs.where("*").equalTo("*")
+      .`with`(coGroupFunction).name(coGroupOpName)
+
+    val config = tableEnv.getConfig
+    val leftType = leftDataSet.getType
+
+    // here we only care about left type information, because we emit records 
from left DataSet
+    expectedType match {
+      case None if config.getEfficientTypeUsage =>
+        minusDs
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != leftType) {
+          val mapFunc = getConversionMapper(
+            config,
+            false,
+            leftType,
+            determinedType,
+            "DataSetMinusConversion",
+            getRowType.getFieldNames)
+
+          val opName = s"convert: 
(${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          minusDs.map(mapFunc).name(opName)
+        }
+        // no conversion necessary, forward
+        else {
+          minusDs
+        }
+    }
+  }
+
+  private def minusSelectionToString: String = {
+    rowType.getFieldNames.asScala.toList.mkString(", ")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 68ce354..ddfa578 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -101,6 +101,7 @@ object FlinkRuleSets {
     DataSetScanRule.INSTANCE,
     DataSetUnionRule.INSTANCE,
     DataSetIntersectRule.INSTANCE,
+    DataSetMinusRule.INSTANCE,
     DataSetSortRule.INSTANCE,
     DataSetValuesRule.INSTANCE,
     BatchTableSourceScanRule.INSTANCE

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
index f86ec9b..c0e3269 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetIntersectRule.scala
@@ -45,8 +45,7 @@ class DataSetIntersectRule
       convLeft,
       convRight,
       rel.getRowType,
-      intersect.all,
-      description)
+      intersect.all)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
new file mode 100644
index 0000000..44bead0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, 
RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalMinus
+import org.apache.calcite.rel.rules.UnionToDistinctRule
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetMinus}
+
+class DataSetMinusRule
+  extends ConverterRule(
+    classOf[LogicalMinus],
+    Convention.NONE,
+    DataSetConvention.INSTANCE,
+    "DataSetMinusRule")
+{
+
+  def convert(rel: RelNode): RelNode = {
+
+    val minus: LogicalMinus = rel.asInstanceOf[LogicalMinus]
+    val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
+    val convLeft: RelNode = RelOptRule.convert(minus.getInput(0), 
DataSetConvention.INSTANCE)
+    val convRight: RelNode = RelOptRule.convert(minus.getInput(1), 
DataSetConvention.INSTANCE)
+
+    new DataSetMinus(
+      rel.getCluster,
+      traitSet,
+      convLeft,
+      convRight,
+      rel.getRowType,
+      minus.all)
+  }
+}
+
+object DataSetMinusRule {
+  val INSTANCE: RelOptRule = new DataSetMinusRule
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
index c39c497..9930811 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/IntersectCoGroupFunction.scala
@@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable}
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.util.Collector
 
-
 class IntersectCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, 
T]{
   override def coGroup(first: JIterable[T], second: JIterable[T], out: 
Collector[T]): Unit = {
     if (first == null || second == null) return

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
new file mode 100644
index 0000000..cac4fe6
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MinusCoGroupFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CoGroupFunction
+import org.apache.flink.util.Collector
+
+class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
+  override def coGroup(first: Iterable[T], second: Iterable[T], out: 
Collector[T]): Unit = {
+    if (first == null || second == null) return
+    val leftIter = first.iterator
+    val rightIter = second.iterator
+
+    if (all) {
+      while (rightIter.hasNext && leftIter.hasNext) {
+        leftIter.next()
+        rightIter.next()
+      }
+
+      while (leftIter.hasNext) {
+        out.collect(leftIter.next())
+      }
+    } else {
+      if (!rightIter.hasNext && leftIter.hasNext) {
+        out.collect(leftIter.next())
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 5c4cdf0..0acf0f9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.plan.logical
 import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.sinks.TableSink
 import org.apache.flink.api.table.typeutils.TypeConverter
@@ -403,6 +404,55 @@ class Table(
   }
 
   /**
+    * Minus of two [[Table]]s with duplicate records removed.
+    * Similar to a SQL EXCEPT clause. Minus returns records from the left 
table that do not
+    * exist in the right table. Duplicate records in the left table are 
returned
+    * exactly once, i.e., duplicates are removed. Both tables must have 
identical field types.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.minus(right)
+    * }}}
+    */
+  def minus(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be " +
+        "subtracted.")
+    }
+    new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = 
false)
+      .validate(tableEnv))
+  }
+
+  /**
+    * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL.
+    * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do 
not exist in
+    * the right table. A record that is present n times in the left table and 
m times
+    * in the right table is returned (n - m) times, i.e., as many duplicates 
as are present
+    * in the right table are removed. Both tables must have identical field 
types.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.minusAll(right)
+    * }}}
+    */
+  def minusAll(right: Table): Table = {
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be " +
+        "subtracted.")
+    }
+    new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = 
true)
+      .validate(tableEnv))
+  }
+
+  /**
     * Unions two [[Table]]s with duplicate records removed.
     * Similar to an SQL UNION. The fields of the two union operations must 
fully overlap.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
index 10ada9d..b25f84a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsITCase.scala
@@ -124,6 +124,73 @@ class SetOperatorsITCase(
   }
 
   @Test
+  def testExcept(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = env.fromElements((1, 1L, "Hi"))
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hello\n" + "Hello world\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Ignore
+  // calcite sql parser doesn't support EXCEPT ALL
+  def testExceptAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM t1 EXCEPT ALL SELECT c FROM t2"
+
+    val data1 = new mutable.MutableList[Int]
+    data1 += (1, 1, 1, 2, 2)
+    val data2 = new mutable.MutableList[Int]
+    data2 += (1, 2, 2, 3)
+    val ds1 = env.fromCollection(data1)
+    val ds2 = env.fromCollection(data2)
+
+    tEnv.registerDataSet("t1", ds1, 'c)
+    tEnv.registerDataSet("t2", ds2, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1\n1"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT c FROM (" +
+      "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
+      "WHERE b < 2"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testIntersect(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
index 0c7a09c..83579e0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
@@ -143,6 +143,83 @@ class SetOperatorsITCase(
   }
 
   @Test
+  def testMinusAll(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
+
+    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
+      .minusAll(ds2.unionAll(ds2)).select('c)
+
+    val results = minusDs.toDataSet[Row].collect()
+    val expected = "Hi\n" +
+      "Hello\n" + "Hello world\n" +
+      "Hello\n" + "Hello world\n" +
+      "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testMinus(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
+
+    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
+      .minus(ds2.unionAll(ds2)).select('c)
+
+    val results = minusDs.toDataSet[Row].collect()
+    val expected = "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Minus inputs have different field types.
+    ds1.minus(ds2)
+  }
+
+  @Test
+  def testMinusDifferentFieldNames(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f)
+
+    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
+      .minus(ds2.unionAll(ds2)).select('c)
+
+    val results = minusDs.toDataSet[Row].collect()
+    val expected = "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAllTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 
'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.minusAll(ds2).select('c)
+  }
+
+  @Test
   def testIntersect(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index 92de6f1..df22f2f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.stream.table
 
 import org.apache.flink.api.scala.stream.utils.StreamTestData
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, TableException}
+import org.apache.flink.api.table.{ValidationException, TableEnvironment, 
TableException}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Test
@@ -73,4 +73,13 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
     t1.union(t2)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinus(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.minus(t2)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/97533939/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
index d1b6f52..ec1a810 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.api.scala.util
 
-import org.apache.hadoop.io.IntWritable
-
 import org.apache.flink.api.scala._
+import org.apache.hadoop.io.IntWritable
 
 import scala.collection.mutable
-import scala.reflect.classTag
 import scala.util.Random
 
 /**

Reply via email to