Repository: flink
Updated Branches:
  refs/heads/master dd53831aa -> e85f787b2


[FLINK-4183] [table] Move checking for StreamTableEnvironment into validation 
layer

This closes #2221.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e85f787b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e85f787b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e85f787b

Branch: refs/heads/master
Commit: e85f787b280b63960e7f3add5aa8613b4ee23795
Parents: dd53831
Author: twalthr <twal...@apache.org>
Authored: Fri Jul 8 16:50:17 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Jul 19 16:27:35 2016 +0200

----------------------------------------------------------------------
 .../api/table/plan/logical/operators.scala      | 16 ++++++--
 .../org/apache/flink/api/table/table.scala      | 16 ++------
 .../scala/stream/table/UnsupportedOpsTest.scala | 41 ++++++++++++++++----
 3 files changed, 49 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 4df8a5e..381244e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -128,7 +128,7 @@ case class Distinct(child: LogicalNode) extends UnaryNode {
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Distinct on stream tables is currently not 
supported.")
+      failValidation(s"Distinct on stream tables is currently not supported.")
     }
     this
   }
@@ -144,7 +144,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) 
extends UnaryNode {
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Distinct on stream tables is currently not 
supported.")
+      failValidation(s"Distinct on stream tables is currently not supported.")
     }
     super.validate(tableEnv)
   }
@@ -196,7 +196,7 @@ case class Aggregate(
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Aggregate on stream tables is currently not 
supported.")
+      failValidation(s"Aggregate on stream tables is currently not supported.")
     }
 
     val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
@@ -277,6 +277,10 @@ case class Union(left: LogicalNode, right: LogicalNode, 
all: Boolean) extends Bi
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
+      failValidation(s"Union on stream tables is currently not supported.")
+    }
+
     val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
     if (left.output.length != right.output.length) {
       failValidation(s"Union two tables of different column sizes:" +
@@ -304,6 +308,10 @@ case class Intersect(left: LogicalNode, right: 
LogicalNode, all: Boolean) extend
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
+    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      failValidation(s"Intersect on stream tables is currently not supported.")
+    }
+
     val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
     if (left.output.length != right.output.length) {
       failValidation(s"Intersect two tables of different column sizes:" +
@@ -392,7 +400,7 @@ case class Join(
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Join on stream tables is currently not 
supported.")
+      failValidation(s"Join on stream tables is currently not supported.")
     }
 
     val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]

http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index e719782..cbb9a07 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -202,7 +202,7 @@ class Table(
     */
   def groupBy(fields: Expression*): GroupedTable = {
     if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Group by on stream tables is currently not 
supported.")
+      throw new ValidationException(s"Group by on stream tables is currently 
not supported.")
     }
     new GroupedTable(this, fields)
   }
@@ -392,7 +392,6 @@ class Table(
   }
 
   private def join(right: Table, joinPredicate: Option[Expression], joinType: 
JoinType): Table = {
-
     // check that right table belongs to the same TableEnvironment
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException("Only tables from the same 
TableEnvironment can be joined.")
@@ -464,14 +463,11 @@ class Table(
     * }}}
     */
   def union(right: Table): Table = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Union on stream tables is currently not 
supported.")
-    }
     // check that right table belongs to the same TableEnvironment
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
     }
-    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, 
false).validate(tableEnv))
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = 
false).validate(tableEnv))
   }
 
   /**
@@ -491,7 +487,7 @@ class Table(
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException("Only tables from the same 
TableEnvironment can be unioned.")
     }
-    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, 
true).validate(tableEnv))
+    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = 
true).validate(tableEnv))
   }
 
   /**
@@ -509,9 +505,6 @@ class Table(
     * }}}
     */
   def intersect(right: Table): Table = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Intersect on stream tables is currently not 
supported.")
-    }
     // check that right table belongs to the same TableEnvironment
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException(
@@ -535,9 +528,6 @@ class Table(
     * }}}
     */
   def intersectAll(right: Table): Table = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      throw new TableException(s"Intersect on stream tables is currently not 
supported.")
-    }
     // check that right table belongs to the same TableEnvironment
     if (right.tableEnv != this.tableEnv) {
       throw new ValidationException(

http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index df22f2f..89b0fdc 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -20,21 +20,21 @@ package org.apache.flink.api.scala.stream.table
 
 import org.apache.flink.api.scala.stream.utils.StreamTestData
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{ValidationException, TableEnvironment, 
TableException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.junit.Test
 
 class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testSelectWithAggregation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testGroupBy(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -42,21 +42,21 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
       .groupBy('_1)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testDistinct(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testSort(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
     
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -65,7 +65,7 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
     t1.join(t2)
   }
 
-  @Test(expected = classOf[TableException])
+  @Test(expected = classOf[ValidationException])
   def testUnion(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -75,6 +75,24 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
+  def testIntersect(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.intersect(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectAll(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.intersectAll(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
   def testMinus(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -82,4 +100,13 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
     t1.minus(t2)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAll(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.minusAll(t2)
+  }
 }

Reply via email to