[FLINK-4068] [table] Reduce expression also for filter/project
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f00e1e7c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f00e1e7c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f00e1e7c Branch: refs/heads/master Commit: f00e1e7c5578caf52eaffc5cbdd102589c13f52d Parents: a711339 Author: twalthr <twal...@apache.org> Authored: Fri Sep 30 17:30:44 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Oct 4 14:30:48 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/FlinkTypeFactory.scala | 12 +- .../flink/api/table/FlinkTypeSystem.scala | 9 + .../api/table/plan/rules/FlinkRuleSets.scala | 7 + .../api/table/BatchTableEnvironmentTest.scala | 102 ----- .../api/table/ExpressionReductionTest.scala | 400 +++++++++++++++++++ .../api/table/StreamTableEnvironmentTest.scala | 106 ----- 6 files changed, 427 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index 581ecde..77eb907 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -22,7 +22,7 @@ import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} import org.apache.calcite.sql.SqlIntervalQualifier -import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil} import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ @@ -68,6 +68,16 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } } + override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { + // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue + // always set those to default value + if (typeName == VARCHAR && precision < 0) { + createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName)) + } else { + super.createSqlType(typeName, precision) + } + } + private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { // TODO add specific RelDataTypes // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala index df6022a..2df043f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl +import org.apache.calcite.sql.`type`.SqlTypeName /** * Custom type system for Flink. @@ -33,4 +34,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl { // half should be enough for all use cases override def getMaxNumericPrecision: Int = Int.MaxValue / 2 + override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match { + // by default all VARCHARs can have the Java default length + case SqlTypeName.VARCHAR => + Int.MaxValue + case _ => + super.getDefaultPrecision(typeName) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index ddfa578..7d915e6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -75,6 +75,8 @@ object FlinkRuleSets { SortRemoveRule.INSTANCE, // simplify expressions rules + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.CALC_INSTANCE, ReduceExpressionsRule.JOIN_INSTANCE, @@ -113,6 +115,9 @@ object FlinkRuleSets { val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( RemoveDeltaRule.INSTANCE, + + // convert a logical table scan to a relational expression + TableScanRule.INSTANCE, EnumerableToLogicalTableScan.INSTANCE, // calc rules @@ -133,6 +138,8 @@ object FlinkRuleSets { ProjectRemoveRule.INSTANCE, // simplify expressions rules + ReduceExpressionsRule.FILTER_INSTANCE, + ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.CALC_INSTANCE, // merge and push unions rules http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala deleted file mode 100644 index 0344dee..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.scala.util.CollectionDataSets -import org.junit.Assert._ -import org.junit.Test - - -class BatchTableEnvironmentTest { - - @Test - def testReduceExpressionForSQL(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val sqlQuery = "SELECT " + - "(3+4)+a, " + - "b+(1+2), " + - "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + - "TRIM(BOTH ' STRING '), " + - "'test' || 'string', " + - "NULLIF(1, 1), " + - "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + - "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + - "1 IS NULL, " + - "'TEST' LIKE '%EST', " + - "FLOOR(2.5), " + - "'TEST' IN ('west', 'TEST', 'rest') " + - "FROM MyTable WHERE a>(1+7)" - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", ds) - - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - } - - @Test - def testReduceExpressionForTableAPI(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) - - val table = ds - .where('a > (1+7)) - .select((3+4).toExpr + 6, - (11 === 1) ? ("a", "b"), - " STRING ".trim, - "test" + "string", - "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, - 1.isNull, - "TEST".like("%EST"), - 2.5.toExpr.floor()) - - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala new file mode 100644 index 0000000..4830b75 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.java.{DataSet => JDataSet} +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation} +import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.junit.Assert._ +import org.junit.Test +import org.mockito.Mockito.{mock, when} + +class ExpressionReductionTest { + + private def mockBatchTableEnvironment(): BatchTableEnvironment = { + val env = mock(classOf[ExecutionEnvironment]) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = mock(classOf[DataSet[(Int, Long, String)]]) + val jDs = mock(classOf[JDataSet[(Int, Long, String)]]) + when(ds.javaSet).thenReturn(jDs) + when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)]) + + val t = ds.toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + tEnv + } + + private def mockStreamTableEnvironment(): StreamTableEnvironment = { + val env = mock(classOf[StreamExecutionEnvironment]) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = mock(classOf[DataStream[(Int, Long, String)]]) + val jDs = mock(classOf[JDataStream[(Int, Long, String)]]) + when(ds.javaStream).thenReturn(jDs) + when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)]) + + val t = ds.toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + tEnv + } + + @Test + def testReduceCalcExpressionForBatchSQL(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val sqlQuery = "SELECT " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest'), " + + "CAST(TRUE AS VARCHAR) || 'X'" + + "FROM MyTable WHERE a>(1+7)" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + } + + @Test + def testReduceProjectExpressionForBatchSQL(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val sqlQuery = "SELECT " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest'), " + + "CAST(TRUE AS VARCHAR) || 'X'" + + "FROM MyTable" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + } + + @Test + def testReduceFilterExpressionForBatchSQL(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val sqlQuery = "SELECT " + + "*" + + "FROM MyTable WHERE a>(1+7)" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + } + + @Test + def testReduceCalcExpressionForBatchTableAPI(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val table = tEnv + .scan("MyTable") + .where('a > (1 + 7)) + .select((3 + 4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor(), + true.cast(Types.STRING) + "X") + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + } + + @Test + def testReduceProjectExpressionForBatchTableAPI(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val table = tEnv + .scan("MyTable") + .select((3 + 4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor(), + true.cast(Types.STRING) + "X") + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + } + + @Test + def testReduceFilterExpressionForBatchTableAPI(): Unit = { + val tEnv = mockBatchTableEnvironment() + + val table = tEnv + .scan("MyTable") + .where('a > (1 + 7)) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + } + + @Test + def testReduceCalcExpressionForStreamSQL(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val sqlQuery = "SELECT STREAM " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest'), " + + "CAST(TRUE AS VARCHAR) || 'X'" + + "FROM MyTable WHERE a>(1+7)" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + } + + @Test + def testReduceProjectExpressionForStreamSQL(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val sqlQuery = "SELECT STREAM " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest'), " + + "CAST(TRUE AS VARCHAR) || 'X'" + + "FROM MyTable" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains("+(7, a) AS EXPR$0")) + assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12")) + } + + @Test + def testReduceFilterExpressionForStreamSQL(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val sqlQuery = "SELECT STREAM " + + "*" + + "FROM MyTable WHERE a>(1+7)" + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + } + + @Test + def testReduceCalcExpressionForStreamTableAPI(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val table = tEnv + .ingest("MyTable") + .where('a > (1 + 7)) + .select((3 + 4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor(), + true.cast(Types.STRING) + "X") + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + } + + @Test + def testReduceProjectExpressionForStreamTableAPI(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val table = tEnv + .ingest("MyTable") + .where('a > (1 + 7)) + .select((3 + 4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor(), + true.cast(Types.STRING) + "X") + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + assertTrue(optimizedString.contains("'TRUEX' AS _c8")) + } + + @Test + def testReduceFilterExpressionForStreamTableAPI(): Unit = { + val tEnv = mockStreamTableEnvironment() + + val table = tEnv + .ingest("MyTable") + .where('a > (1 + 7)) + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala deleted file mode 100644 index 52bf9ac..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.flink.api.scala.stream.utils.StreamTestData -import org.apache.flink.api.scala.table._ -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.junit.Test -import org.junit.Assert._ - - - -class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{ - - @Test - def testReduceExpression(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val sqlQuery = "SELECT STREAM " + - "(3+4)+a, " + - "b+(1+2), " + - "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + - "TRIM(BOTH ' STRING '), " + - "'test' || 'string', " + - "NULLIF(1, 1), " + - "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + - "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + - "1 IS NULL, " + - "'TEST' LIKE '%EST', " + - "FLOOR(2.5), " + - "'TEST' IN ('west', 'TEST', 'rest') " + - "FROM MyTable WHERE a>(1+7)" - - val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) - tEnv.registerTable("MyTable", t) - - val table = tEnv.sql(sqlQuery) - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) - assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) - assertTrue(optimizedString.contains("'b' AS EXPR$2")) - assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) - assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) - assertTrue(optimizedString.contains("null AS EXPR$5")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) - assertTrue(optimizedString.contains("19 AS EXPR$7")) - assertTrue(optimizedString.contains("false AS EXPR$8")) - assertTrue(optimizedString.contains("true AS EXPR$9")) - assertTrue(optimizedString.contains("2 AS EXPR$10")) - assertTrue(optimizedString.contains("true AS EXPR$11")) - } - - @Test - def testReduceExpressionForTableAPI(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) - - val table = t - .where('a > (1+7)) - .select((3+4).toExpr + 6, - (11 === 1) ? ("a", "b"), - " STRING ".trim, - "test" + "string", - "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, - 1.isNull, - "TEST".like("%EST"), - 2.5.toExpr.floor()) - - - val optimized = tEnv.optimize(table.getRelNode) - val optimizedString = optimized.toString - assertTrue(optimizedString.contains(">(_1, 8)")) - assertTrue(optimizedString.contains("13 AS _c0")) - assertTrue(optimizedString.contains("'b' AS _c1")) - assertTrue(optimizedString.contains("'STRING' AS _c2")) - assertTrue(optimizedString.contains("'teststring' AS _c3")) - assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) - assertTrue(optimizedString.contains("false AS _c5")) - assertTrue(optimizedString.contains("true AS _c6")) - assertTrue(optimizedString.contains("2E0 AS _c7")) - } - -}