[FLINK-4068] [table] Reduce expression also for filter/project

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f00e1e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f00e1e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f00e1e7c

Branch: refs/heads/master
Commit: f00e1e7c5578caf52eaffc5cbdd102589c13f52d
Parents: a711339
Author: twalthr <twal...@apache.org>
Authored: Fri Sep 30 17:30:44 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Oct 4 14:30:48 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/FlinkTypeFactory.scala      |  12 +-
 .../flink/api/table/FlinkTypeSystem.scala       |   9 +
 .../api/table/plan/rules/FlinkRuleSets.scala    |   7 +
 .../api/table/BatchTableEnvironmentTest.scala   | 102 -----
 .../api/table/ExpressionReductionTest.scala     | 400 +++++++++++++++++++
 .../api/table/StreamTableEnvironmentTest.scala  | 106 -----
 6 files changed, 427 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 581ecde..77eb907 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -22,7 +22,7 @@ import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
 import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
@@ -68,6 +68,16 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     }
   }
 
+  override def createSqlType(typeName: SqlTypeName, precision: Int): 
RelDataType = {
+    // it might happen that inferred VARCHAR types overflow as we set them to 
Int.MaxValue
+    // always set those to default value
+    if (typeName == VARCHAR && precision < 0) {
+      createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
+    } else {
+      super.createSqlType(typeName, precision)
+    }
+  }
+
   private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = 
typeInfo match {
     // TODO add specific RelDataTypes
     // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
index df6022a..2df043f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeSystem.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.table
 
 import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
+import org.apache.calcite.sql.`type`.SqlTypeName
 
 /**
   * Custom type system for Flink.
@@ -33,4 +34,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
   // half should be enough for all use cases
   override def getMaxNumericPrecision: Int = Int.MaxValue / 2
 
+  override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName 
match {
+    // by default all VARCHARs can have the Java default length
+    case SqlTypeName.VARCHAR =>
+      Int.MaxValue
+    case _ =>
+      super.getDefaultPrecision(typeName)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index ddfa578..7d915e6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -75,6 +75,8 @@ object FlinkRuleSets {
     SortRemoveRule.INSTANCE,
 
     // simplify expressions rules
+    ReduceExpressionsRule.FILTER_INSTANCE,
+    ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
     ReduceExpressionsRule.JOIN_INSTANCE,
 
@@ -113,6 +115,9 @@ object FlinkRuleSets {
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
       RemoveDeltaRule.INSTANCE,
+
+      // convert a logical table scan to a relational expression
+      TableScanRule.INSTANCE,
       EnumerableToLogicalTableScan.INSTANCE,
 
       // calc rules
@@ -133,6 +138,8 @@ object FlinkRuleSets {
       ProjectRemoveRule.INSTANCE,
 
       // simplify expressions rules
+      ReduceExpressionsRule.FILTER_INSTANCE,
+      ReduceExpressionsRule.PROJECT_INSTANCE,
       ReduceExpressionsRule.CALC_INSTANCE,
 
       // merge and push unions rules

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
deleted file mode 100644
index 0344dee..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.junit.Assert._
-import org.junit.Test
-
-
-class BatchTableEnvironmentTest {
-
-  @Test
-  def testReduceExpressionForSQL(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest') " +
-      "FROM MyTable WHERE a>(1+7)"
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 
'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-  }
-
-  @Test
-  def testReduceExpressionForTableAPI(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 
'c)
-
-    val table = ds
-      .where('a > (1+7))
-      .select((3+4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor())
-
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..4830b75
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, 
createTypeInformation}
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito.{mock, when}
+
+class ExpressionReductionTest {
+
+  private def mockBatchTableEnvironment(): BatchTableEnvironment = {
+    val env = mock(classOf[ExecutionEnvironment])
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = mock(classOf[DataSet[(Int, Long, String)]])
+    val jDs = mock(classOf[JDataSet[(Int, Long, String)]])
+    when(ds.javaSet).thenReturn(jDs)
+    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+    tEnv
+  }
+
+  private def mockStreamTableEnvironment(): StreamTableEnvironment = {
+    val env = mock(classOf[StreamExecutionEnvironment])
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = mock(classOf[DataStream[(Int, Long, String)]])
+    val jDs = mock(classOf[JDataStream[(Int, Long, String)]])
+    when(ds.javaStream).thenReturn(jDs)
+    when(jDs.getType).thenReturn(createTypeInformation[(Int, Long, String)])
+
+    val t = ds.toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+    tEnv
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchSQL(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+    val tEnv = mockBatchTableEnvironment()
+
+    val table = tEnv
+      .scan("MyTable")
+      .where('a > (1 + 7))
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains("+(7, a) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
+    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
+    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
+    assertTrue(optimizedString.contains("null AS EXPR$5"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
+    assertTrue(optimizedString.contains("19 AS EXPR$7"))
+    assertTrue(optimizedString.contains("false AS EXPR$8"))
+    assertTrue(optimizedString.contains("true AS EXPR$9"))
+    assertTrue(optimizedString.contains("2 AS EXPR$10"))
+    assertTrue(optimizedString.contains("true AS EXPR$11"))
+    assertTrue(optimizedString.contains("'TRUEX' AS EXPR$12"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamSQL(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val sqlQuery = "SELECT STREAM " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val table = tEnv.sql(sqlQuery)
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table = tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table =  tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+    assertTrue(optimizedString.contains("13 AS _c0"))
+    assertTrue(optimizedString.contains("'b' AS _c1"))
+    assertTrue(optimizedString.contains("'STRING' AS _c2"))
+    assertTrue(optimizedString.contains("'teststring' AS _c3"))
+    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
+    assertTrue(optimizedString.contains("false AS _c5"))
+    assertTrue(optimizedString.contains("true AS _c6"))
+    assertTrue(optimizedString.contains("2E0 AS _c7"))
+    assertTrue(optimizedString.contains("'TRUEX' AS _c8"))
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+    val tEnv = mockStreamTableEnvironment()
+
+    val table = tEnv
+      .ingest("MyTable")
+      .where('a > (1 + 7))
+
+
+    val optimized = tEnv.optimize(table.getRelNode)
+    val optimizedString = optimized.toString
+    assertTrue(optimizedString.contains(">(_1, 8)"))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f00e1e7c/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
deleted file mode 100644
index 52bf9ac..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-import org.junit.Assert._
-
-
-
-class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{
-
-  @Test
-  def testReduceExpression(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val sqlQuery = "SELECT STREAM " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO 
SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest') " +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val table = tEnv.sql(sqlQuery)
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
-    assertTrue(optimizedString.contains("'b' AS EXPR$2"))
-    assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
-    assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
-    assertTrue(optimizedString.contains("null AS EXPR$5"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6"))
-    assertTrue(optimizedString.contains("19 AS EXPR$7"))
-    assertTrue(optimizedString.contains("false AS EXPR$8"))
-    assertTrue(optimizedString.contains("true AS EXPR$9"))
-    assertTrue(optimizedString.contains("2 AS EXPR$10"))
-    assertTrue(optimizedString.contains("true AS EXPR$11"))
-  }
-
-  @Test
-  def testReduceExpressionForTableAPI(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
-
-    val table = t
-      .where('a > (1+7))
-      .select((3+4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor())
-
-
-    val optimized = tEnv.optimize(table.getRelNode)
-    val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains(">(_1, 8)"))
-    assertTrue(optimizedString.contains("13 AS _c0"))
-    assertTrue(optimizedString.contains("'b' AS _c1"))
-    assertTrue(optimizedString.contains("'STRING' AS _c2"))
-    assertTrue(optimizedString.contains("'teststring' AS _c3"))
-    assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4"))
-    assertTrue(optimizedString.contains("false AS _c5"))
-    assertTrue(optimizedString.contains("true AS _c6"))
-    assertTrue(optimizedString.contains("2E0 AS _c7"))
-  }
-
-}

Reply via email to