Repository: flink Updated Branches: refs/heads/master dd53831aa -> e85f787b2
[FLINK-4183] [table] Move checking for StreamTableEnvironment into validation layer This closes #2221. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e85f787b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e85f787b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e85f787b Branch: refs/heads/master Commit: e85f787b280b63960e7f3add5aa8613b4ee23795 Parents: dd53831 Author: twalthr <twal...@apache.org> Authored: Fri Jul 8 16:50:17 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Jul 19 16:27:35 2016 +0200 ---------------------------------------------------------------------- .../api/table/plan/logical/operators.scala | 16 ++++++-- .../org/apache/flink/api/table/table.scala | 16 ++------ .../scala/stream/table/UnsupportedOpsTest.scala | 41 ++++++++++++++++---- 3 files changed, 49 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 4df8a5e..381244e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -128,7 +128,7 @@ case class Distinct(child: LogicalNode) extends UnaryNode { override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Distinct on stream tables is currently not supported.") + failValidation(s"Distinct on stream tables is currently not supported.") } this } @@ -144,7 +144,7 @@ case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode { override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Distinct on stream tables is currently not supported.") + failValidation(s"Distinct on stream tables is currently not supported.") } super.validate(tableEnv) } @@ -196,7 +196,7 @@ case class Aggregate( override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Aggregate on stream tables is currently not supported.") + failValidation(s"Aggregate on stream tables is currently not supported.") } val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate] @@ -277,6 +277,10 @@ case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends Bi } override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) { + failValidation(s"Union on stream tables is currently not supported.") + } + val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union] if (left.output.length != right.output.length) { failValidation(s"Union two tables of different column sizes:" + @@ -304,6 +308,10 @@ case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extend } override def validate(tableEnv: TableEnvironment): LogicalNode = { + if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + failValidation(s"Intersect on stream tables is currently not supported.") + } + val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect] if (left.output.length != right.output.length) { failValidation(s"Intersect two tables of different column sizes:" + @@ -392,7 +400,7 @@ case class Join( override def validate(tableEnv: TableEnvironment): LogicalNode = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Join on stream tables is currently not supported.") + failValidation(s"Join on stream tables is currently not supported.") } val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join] http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index e719782..cbb9a07 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -202,7 +202,7 @@ class Table( */ def groupBy(fields: Expression*): GroupedTable = { if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Group by on stream tables is currently not supported.") + throw new ValidationException(s"Group by on stream tables is currently not supported.") } new GroupedTable(this, fields) } @@ -392,7 +392,6 @@ class Table( } private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = { - // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be joined.") @@ -464,14 +463,11 @@ class Table( * }}} */ def union(right: Table): Table = { - if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Union on stream tables is currently not supported.") - } // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } - new Table(tableEnv, Union(logicalPlan, right.logicalPlan, false).validate(tableEnv)) + new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv)) } /** @@ -491,7 +487,7 @@ class Table( if (right.tableEnv != this.tableEnv) { throw new ValidationException("Only tables from the same TableEnvironment can be unioned.") } - new Table(tableEnv, Union(logicalPlan, right.logicalPlan, true).validate(tableEnv)) + new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv)) } /** @@ -509,9 +505,6 @@ class Table( * }}} */ def intersect(right: Table): Table = { - if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Intersect on stream tables is currently not supported.") - } // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException( @@ -535,9 +528,6 @@ class Table( * }}} */ def intersectAll(right: Table): Table = { - if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - throw new TableException(s"Intersect on stream tables is currently not supported.") - } // check that right table belongs to the same TableEnvironment if (right.tableEnv != this.tableEnv) { throw new ValidationException( http://git-wip-us.apache.org/repos/asf/flink/blob/e85f787b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index df22f2f..89b0fdc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -20,21 +20,21 @@ package org.apache.flink.api.scala.stream.table import org.apache.flink.api.scala.stream.utils.StreamTestData import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.{ValidationException, TableEnvironment, TableException} +import org.apache.flink.api.table.{TableEnvironment, ValidationException} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Test class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testSelectWithAggregation(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testGroupBy(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -42,21 +42,21 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { .groupBy('_1) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct() } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testSort(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -65,7 +65,7 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { t1.join(t2) } - @Test(expected = classOf[TableException]) + @Test(expected = classOf[ValidationException]) def testUnion(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -75,6 +75,24 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { } @Test(expected = classOf[ValidationException]) + def testIntersect(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.intersect(t2) + } + + @Test(expected = classOf[ValidationException]) + def testIntersectAll(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.intersectAll(t2) + } + + @Test(expected = classOf[ValidationException]) def testMinus(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) @@ -82,4 +100,13 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) t1.minus(t2) } + + @Test(expected = classOf[ValidationException]) + def testMinusAll(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.minusAll(t2) + } }