[FLINK-3941] [tableAPI] Add support for UNION to Table API.

- Fix FLINK-3696 (type issues of DataSetUnion by forwarding expected types to 
input operators).

This closes #2025


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef5832d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef5832d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef5832d8

Branch: refs/heads/master
Commit: ef5832d8f5867826a60f87e2fcaef912dc2950f6
Parents: 5784f39
Author: Yijie Shen <henry.yijies...@gmail.com>
Authored: Tue May 24 12:46:21 2016 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 26 12:21:24 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |  24 +++
 .../api/table/plan/logical/operators.scala      |   4 +-
 .../table/plan/nodes/dataset/DataSetUnion.scala |  17 +-
 .../plan/rules/dataSet/DataSetUnionRule.scala   |   5 +-
 .../org/apache/flink/api/table/table.scala      |  25 ++-
 .../flink/api/java/batch/table/UnionITCase.java | 186 -------------------
 .../flink/api/scala/batch/sql/UnionITCase.scala |  32 ++--
 .../api/scala/batch/table/UnionITCase.scala     |  78 +++-----
 .../scala/stream/table/UnsupportedOpsTest.scala |   9 +
 9 files changed, 121 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 4e52a98..f1e9cd1 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -435,6 +435,18 @@ Table result = left.join(right).where("a = d").select("a, 
b, e");
     <tr>
       <td><strong>Union</strong></td>
       <td>
+        <p>Similar to a SQL UNION clause. Unions two tables with duplicate 
records removed. Both tables must have identical schema, i.e., field names and 
types.</p>
+{% highlight java %}
+Table left = tableEnv.fromDataSet(ds1, "a, b, c");
+Table right = tableEnv.fromDataSet(ds2, "a, b, c");
+Table result = left.union(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>UnionAll</strong></td>
+      <td>
         <p>Similar to a SQL UNION ALL clause. Unions two tables. Both tables 
must have identical schema, i.e., field names and types.</p>
 {% highlight java %}
 Table left = tableEnv.fromDataSet(ds1, "a, b, c");
@@ -546,6 +558,18 @@ val result = left.join(right).where('a === 'd).select('a, 
'b, 'e);
     <tr>
       <td><strong>Union</strong></td>
       <td>
+        <p>Similar to a SQL UNION clause. Unions two tables with duplicate 
records removed, both tables must have identical schema(field names and 
types).</p>
+{% highlight scala %}
+val left = ds1.toTable(tableEnv, 'a, 'b, 'c);
+val right = ds2.toTable(tableEnv, 'a, 'b, 'c);
+val result = left.union(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>UnionAll</strong></td>
+      <td>
         <p>Similar to a SQL UNION ALL clause. Unions two tables, both tables 
must have identical schema(field names and types).</p>
 {% highlight scala %}
 val left = ds1.toTable(tableEnv, 'a, 'b, 'c);

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index bd299b3..6b42a7d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -236,13 +236,13 @@ case class Aggregate(
   }
 }
 
-case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode {
+case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends 
BinaryNode {
   override def output: Seq[Attribute] = left.output
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
     left.construct(relBuilder)
     right.construct(relBuilder)
-    relBuilder.union(true)
+    relBuilder.union(all)
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index b6f6a19..78f64a4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -55,7 +55,7 @@ class DataSetUnion(
   }
 
   override def toString: String = {
-    "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+    s"Union(union: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -76,8 +76,19 @@ class DataSetUnion(
       tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-    val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    var leftDataSet: DataSet[Any] = null
+    var rightDataSet: DataSet[Any] = null
+
+    expectedType match {
+      case None =>
+        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+        rightDataSet =
+          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
Some(leftDataSet.getType))
+      case _ =>
+        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, 
expectedType)
+        rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+    }
+
     leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
index 7809d6d..ea35637 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.{RelOptRuleCall, Convention, 
RelOptRule, RelTrait
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalUnion
+import org.apache.calcite.rel.rules.UnionToDistinctRule
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetUnion}
 
 class DataSetUnionRule
@@ -33,7 +34,9 @@ class DataSetUnionRule
   {
 
   /**
-   * Only translate UNION ALL
+   * Only translate UNION ALL.
+   * Note: A distinct Union are translated into
+   * an Aggregate on top of a UNION ALL by [[UnionToDistinctRule]]
    */
   override def matches(call: RelOptRuleCall): Boolean = {
     val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion]

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 1e558c5..394b833 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -260,6 +260,29 @@ class Table(
   }
 
   /**
+    * Union two [[Table]]s with duplicate records removed.
+    * Similar to an SQL UNION. The fields of the two union operations must 
fully overlap.
+    *
+    * Note: Both tables must be bound to the same [[TableEnvironment]].
+    *
+    * Example:
+    *
+    * {{{
+    *   left.union(right)
+    * }}}
+    */
+  def union(right: Table): Table = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new TableException(s"Union on stream tables is currently not 
supported.")
+    }
+    // check that right table belongs to the same TableEnvironment
+    if (right.tableEnv != this.tableEnv) {
+      throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
+    }
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, 
false).validate(tableEnv))
+  }
+
+  /**
     * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two 
union operations
     * must fully overlap.
     *
@@ -276,7 +299,7 @@ class Table(
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
     }
-    new Table(tableEnv, Union(logicalPlan, 
right.logicalPlan).validate(tableEnv))
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, 
true).validate(tableEnv))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
deleted file mode 100644
index 853cd7f..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/UnionITCase.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class UnionITCase extends MultipleProgramsTestBase {
-
-       public UnionITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testUnion() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c");
-
-               Table selected = in1.unionAll(in2).select("c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" 
+ "Hello\n" + "Hello world\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testUnionWithFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
-
-               Table selected = in1.unionAll(in2).where("b < 2").select("c");
-
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "Hi\n" + "Hallo\n";
-               compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testUnionIncompatibleNumberOfFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-               // Must fail. Number of fields of union inputs do not match
-               in1.unionAll(in2);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testUnionIncompatibleFieldsName() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d");
-
-               // Must fail. Field names of union inputs do not match
-               in1.unionAll(in2);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testUnionIncompatibleFieldTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, c, d, 
e").select("a, b, c");
-
-               // Must fail. Field types of union inputs do not match
-               in1.unionAll(in2);
-       }
-
-       @Test
-       public void testUnionWithAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
-
-               Table selected = in1.unionAll(in2).select("c.count");
-
-               DataSet<Row> ds = tableEnv.toDataSet(selected, Row.class);
-               List<Row> results = ds.collect();
-               String expected = "18";
-               compareResultAsText(results, expected);
-       }
-
-       @Test
-       public void testUnionWithJoin() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds3 = 
CollectionDataSets.getSmall5TupleDataSet(env);
-
-               Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-               Table in2 = tableEnv.fromDataSet(ds2, "a, b, d, c, 
e").select("a, b, c");
-               Table in3 = tableEnv.fromDataSet(ds3, "a2, b2, d2, c2, 
e2").select("a2, b2, c2");
-
-           Table joinDs = in1.unionAll(in2).join(in3).where("a === 
a2").select("c, c2");
-           DataSet<Row> ds = tableEnv.toDataSet(joinDs, Row.class);
-           List<Row> results = ds.collect();
-
-           String expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
-             "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
-             "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
-             "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n";
-           compareResultAsText(results, expected);
-       }
-
-       @Test(expected = ValidationException.class)
-       public void testUnionTablesFromDifferentEnvs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tEnv1 = 
TableEnvironment.getTableEnvironment(env);
-               BatchTableEnvironment tEnv2 = 
TableEnvironment.getTableEnvironment(env);
-
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-
-               Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-               Table in2 = tEnv2.fromDataSet(ds2, "a, b, c");
-
-               // Must fail. Tables are bound to different TableEnvironments.
-               in1.unionAll(in2);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
index a42d328..527eac7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UnionITCase.scala
@@ -39,7 +39,7 @@ class UnionITCase(
   extends TableProgramsTestBase(mode, configMode) {
 
   @Test
-  def testUnion(): Unit = {
+  def testUnionAll(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -58,16 +58,31 @@ class UnionITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  //TODO: activate for EFFICIENT mode
   @Test
-  def testUnionWithFilter(): Unit = {
+  def testUnion(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
+    val sqlQuery = "SELECT c FROM t1 UNION (SELECT c FROM t2)"
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env)
+    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
+    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUnionWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val sqlQuery = "SELECT c FROM (" +
       "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" +
@@ -85,17 +100,12 @@ class UnionITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  //TODO: activate for EFFICIENT mode
   @Test
   def testUnionWithAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
     val sqlQuery = "SELECT count(c) FROM (" +
       "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
index 29427a5..f472341 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UnionITCase.scala
@@ -39,7 +39,7 @@ class UnionITCase(
   extends TableProgramsTestBase(mode, configMode) {
 
   @Test
-  def testUnion(): Unit = {
+  def testUnionAll(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -54,7 +54,22 @@ class UnionITCase(
   }
 
   @Test
-  def testTernaryUnion(): Unit = {
+  def testUnion(): Unit = {
+    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+
+    val unionDs = ds1.union(ds2).select('c)
+
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testTernaryUnionAll(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
@@ -72,21 +87,18 @@ class UnionITCase(
   }
 
   @Test
-  def testUnionWithFilter(): Unit = {
+  def testTernaryUnion(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'd, 'c, 'e)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
 
-    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
+    val unionDs = ds1.union(ds2).union(ds3).select('c)
 
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hallo\n"
+    val results = unionDs.toDataSet[Row].collect()
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -115,50 +127,6 @@ class UnionITCase(
     ds1.unionAll(ds2)
   }
 
-  @Test
-  def testUnionWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 
'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).select('c.count)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "18"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnionWithJoin(): Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    if (tEnv.getConfig.getEfficientTypeUsage) {
-      return
-    }
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv,'a, 'b, 
'd, 'c, 'e)
-    val ds3 = CollectionDataSets.getSmall5TupleDataSet(env).toTable(tEnv, 'a2, 
'b2, 'd2, 'c2, 'e2)
-
-    val joinDs = ds1.unionAll(ds2.select('a, 'b, 'c))
-      .join(ds3.select('a2, 'b2, 'c2))
-      .where('a ==='a2).select('c, 'c2)
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = "Hi,Hallo\n" + "Hallo,Hallo\n" +
-      "Hello,Hallo Welt\n" + "Hello,Hallo Welt wie\n" +
-      "Hallo Welt,Hallo Welt\n" + "Hallo Welt wie,Hallo Welt\n" +
-      "Hallo Welt,Hallo Welt wie\n" + "Hallo Welt wie,Hallo Welt wie\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
   @Test(expected = classOf[ValidationException])
   def testUnionTablesFromDifferentEnvs(): Unit = {
     val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/ef5832d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index a382447..92de6f1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -64,4 +64,13 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
     t1.join(t2)
   }
+
+  @Test(expected = classOf[TableException])
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.union(t2)
+  }
 }

Reply via email to