This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 490e2af  [FLINK-16935][table-planner-blink] Enable or delete most of 
the ignored test cases in blink planner.
490e2af is described below

commit 490e2af0f9cc3021b6423535768e9f3604b27519
Author: Kurt Young <k...@apache.org>
AuthorDate: Sat Apr 25 20:36:59 2020 +0800

    [FLINK-16935][table-planner-blink] Enable or delete most of the ignored 
test cases in blink planner.
    
    This closes #11874
---
 .../flink/table/planner/plan/utils/SortUtil.scala  |   3 +-
 .../batch/table/stringexpr/SetOperatorsTest.xml    |  14 +++
 .../plan/stream/table/TemporalTableJoinTest.xml    |  90 +++++++++++++++
 .../planner/expressions/DecimalTypeTest.scala      |   3 +-
 .../expressions/NonDeterministicTests.scala        |  58 +++++-----
 .../validation/ScalarFunctionsValidationTest.scala |  13 +--
 .../table/planner/plan/batch/table/JoinTest.scala  |  15 +--
 .../batch/table/stringexpr/SetOperatorsTest.scala  |   3 +-
 .../plan/stream/table/TemporalTableJoinTest.scala  |  27 ++---
 .../TemporalTableJoinValidationTest.scala          |  18 ++-
 .../planner/runtime/batch/sql/LimitITCase.scala    |   4 +-
 .../planner/runtime/batch/sql/MiscITCase.scala     |  12 +-
 .../planner/runtime/batch/sql/RankITCase.scala     |   1 -
 .../sql/agg/DistinctAggregateITCaseBase.scala      |  34 +-----
 .../batch/sql/agg/WindowAggregateITCase.scala      | 125 +--------------------
 .../sql/join/JoinConditionTypeCoerceITCase.scala   |   4 +-
 .../batch/sql/join/JoinWithoutKeyITCase.scala      |   5 +-
 .../runtime/batch/sql/join/SemiJoinITCase.scala    |   3 +-
 .../runtime/batch/table/AggregationITCase.scala    |   9 +-
 .../planner/runtime/batch/table/CalcITCase.scala   |  16 +--
 .../planner/runtime/stream/sql/SortITCase.scala    |   4 +-
 .../runtime/stream/sql/SplitAggregateITCase.scala  |   5 +-
 .../planner/runtime/stream/table/CalcITCase.scala  |   1 -
 .../planner/runtime/stream/table/JoinITCase.scala  |   2 -
 24 files changed, 177 insertions(+), 292 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala
index 89f7540..59dec37 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala
@@ -46,8 +46,7 @@ object SortUtil {
     if (fetch != null) {
       getLimitStart(offset) + RexLiteral.intValue(fetch)
     } else {
-      // TODO return Long.MaxValue when providing FlinkRelMdRowCount on Sort ?
-      Integer.MAX_VALUE
+      Long.MaxValue
     }
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml
index 0ebd86b..c8a528a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml
@@ -16,6 +16,20 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testInWithFilter">
+    <Resource name="planAfter">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(c, a1)], select=[a, b, c], 
build=[right], tryDistinctBuildRow=[true])
+:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
+:  +- TableSourceScan(table=[[default_catalog, default_database, A, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1])
++- Exchange(distribution=[hash[a1]])
+   +- LocalHashAggregate(groupBy=[a1], select=[a1])
+      +- Calc(select=[a AS a1], where=[=(b, _UTF-16LE'two')])
+         +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+
   <TestCase name="testInWithProject">
     <Resource name="planBefore">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml
new file mode 100644
index 0000000..8d55f3f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testSimpleJoin">
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[*(o_amount, rate) AS rate])
++- TemporalJoin(joinType=[InnerJoin], 
where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, 
o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, 
rowtime])
+   :- Exchange(distribution=[hash[o_currency]])
+   :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], 
fields=[o_amount, o_currency, o_rowtime])
+   +- Exchange(distribution=[hash[currency]])
+      +- DataStreamScan(table=[[default_catalog, default_database, 
RatesHistory]], fields=[currency, rate, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+
+  <TestCase name="testSimpleJoin2">
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[*(o_amount, rate) AS rate])
++- TemporalJoin(joinType=[InnerJoin], 
where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, 
o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, 
rowtime])
+   :- Exchange(distribution=[hash[o_currency]])
+   :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], 
fields=[o_amount, o_currency, o_rowtime])
+   +- Exchange(distribution=[hash[currency]])
+      +- DataStreamScan(table=[[default_catalog, default_database, 
RatesHistory]], fields=[currency, rate, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+
+  <TestCase name="testSimpleProctimeJoin">
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[*(o_amount, rate) AS rate])
++- TemporalJoin(joinType=[InnerJoin], 
where=[AND(__TEMPORAL_JOIN_CONDITION(o_proctime, currency), =(currency, 
o_currency))], select=[o_amount, o_currency, o_proctime, currency, rate, 
proctime])
+   :- Exchange(distribution=[hash[o_currency]])
+   :  +- DataStreamScan(table=[[default_catalog, default_database, 
ProctimeOrders]], fields=[o_amount, o_currency, o_proctime])
+   +- Exchange(distribution=[hash[currency]])
+      +- DataStreamScan(table=[[default_catalog, default_database, 
ProctimeRatesHistory]], fields=[currency, rate, proctime])
+]]>
+    </Resource>
+  </TestCase>
+
+  <TestCase name="testTemporalTableFunctionOnTopOfQuery">
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[*(o_amount, rate) AS rate])
++- TemporalJoin(joinType=[InnerJoin], 
where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, 
o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, 
rowtime])
+   :- Exchange(distribution=[hash[o_currency]])
+   :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], 
fields=[o_amount, o_currency, o_rowtime])
+   +- Exchange(distribution=[hash[currency]])
+      +- Calc(select=[currency, *(rate, 2) AS rate, rowtime], where=[>(rate, 
100)])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
RatesHistory]], fields=[currency, rate, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+
+  <TestCase name="testComplexJoin">
+    <Resource name="planAfter">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[=(t3_secondary_key, secondary_key)], 
select=[rate, secondary_key, t3_comment, t3_secondary_key], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+:- Exchange(distribution=[hash[secondary_key]])
+:  +- Calc(select=[*(o_amount, rate) AS rate, secondary_key])
+:     +- TemporalJoin(joinType=[InnerJoin], 
where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), 
OR(=(currency, o_currency), =(secondary_key, o_secondary_key)))], 
select=[o_rowtime, o_amount, o_currency, o_secondary_key, rowtime, currency, 
rate, secondary_key])
+:        :- Exchange(distribution=[single])
+:        :  +- Calc(select=[rowtime AS o_rowtime, o_amount, o_currency, 
o_secondary_key])
+:        :     +- DataStreamScan(table=[[default_catalog, default_database, 
Orders]], fields=[rowtime, o_comment, o_amount, o_currency, o_secondary_key])
+:        +- Exchange(distribution=[single])
+:           +- Calc(select=[rowtime, currency, rate, secondary_key], 
where=[>(rate, 110:BIGINT)])
+:              +- DataStreamScan(table=[[default_catalog, default_database, 
RatesHistory]], fields=[rowtime, comment, currency, rate, secondary_key])
++- Exchange(distribution=[hash[t3_secondary_key]])
+   +- DataStreamScan(table=[[default_catalog, default_database, ThirdTable]], 
fields=[t3_comment, t3_secondary_key])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
index 635f188..2a88898 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
 import org.apache.flink.table.types.logical.DecimalType
 import org.apache.flink.types.Row
+
 import org.junit.{Ignore, Test}
 
 class DecimalTypeTest extends ExpressionTestBase {
@@ -132,7 +133,7 @@ class DecimalTypeTest extends ExpressionTestBase {
   @Ignore
   @Test
   def testDefaultDecimalCasting(): Unit = {
-    // from String
+//    // from String
     testTableApi(
       "123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)),
       "'123456789123456789123456789'.cast(DECIMAL)",
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
index 2565383..9cd8c63 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala
@@ -23,71 +23,65 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
 import org.apache.flink.types.Row
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 /**
-  * Tests that can only be checked manually as they are non-deterministic.
+  * Tests that check all non-deterministic functions can be executed.
   */
 class NonDeterministicTests extends ExpressionTestBase {
 
-  @Ignore
   @Test
   def testCurrentDate(): Unit = {
     testAllApis(
-      currentDate(),
-      "currentDate()",
-      "CURRENT_DATE",
-      "PLEASE CHECK MANUALLY")
+      currentDate().isGreater("1970-01-01".toDate),
+      "currentDate() > '1970-01-01'.toDate",
+      "CURRENT_DATE > DATE '1970-01-01'",
+      "true")
   }
 
-  @Ignore
   @Test
   def testCurrentTime(): Unit = {
     testAllApis(
-      currentTime(),
-      "currentTime()",
-      "CURRENT_TIME",
-      "PLEASE CHECK MANUALLY")
+      currentTime().isGreaterOrEqual("00:00:00".toTime),
+      "currentTime() >= '00:00:00'.toTime",
+      "CURRENT_TIME >= TIME '00:00:00'",
+      "true")
   }
 
-  @Ignore
   @Test
   def testCurrentTimestamp(): Unit = {
     testAllApis(
-      currentTimestamp(),
-      "currentTimestamp()",
-      "CURRENT_TIMESTAMP",
-      "PLEASE CHECK MANUALLY")
+      currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
+      "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
+      "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+      "true")
   }
 
-  @Ignore
   @Test
   def testLocalTimestamp(): Unit = {
     testAllApis(
-      localTimestamp(),
-      "localTimestamp()",
-      "LOCALTIMESTAMP",
-      "PLEASE CHECK MANUALLY")
+      localTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp),
+      "localTimestamp() > '1970-01-01 00:00:00'.toTimestamp",
+      "LOCALTIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'",
+      "true")
   }
 
-  @Ignore
   @Test
   def testLocalTime(): Unit = {
     testAllApis(
-      localTime(),
-      "localTime()",
-      "LOCALTIME",
-      "PLEASE CHECK MANUALLY")
+      localTime().isGreaterOrEqual("00:00:00".toTime),
+      "localTime() >= '00:00:00'.toTime",
+      "LOCALTIME >= TIME '00:00:00'",
+      "true")
   }
 
-  @Ignore
   @Test
   def testUUID(): Unit = {
     testAllApis(
-      uuid(),
-      "uuid()",
-      "UUID()",
-      "PLEASE CHECK MANUALLY")
+      uuid().charLength(),
+      "uuid().charLength",
+      "CHARACTER_LENGTH(UUID())",
+      "36")
   }
 
   // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
index e23a5bf..f38ab93 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -23,8 +23,9 @@ import org.apache.flink.table.api.{SqlParserException, 
ValidationException}
 import org.apache.flink.table.expressions.TimePointUnit
 import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase
+
 import org.apache.calcite.avatica.util.TimeUnit
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
 
@@ -32,25 +33,19 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
   // Math functions
   // 
----------------------------------------------------------------------------------------------
 
-  @Ignore
   @Test
   def testInvalidLog1(): Unit = {
-    thrown.expect(classOf[ValidationException])
-    // invalid arithmetic argument
     testSqlApi(
       "LOG(1, 100)",
-      "FAIL"
+      "Infinity"
     )
   }
 
-  @Ignore
   @Test
   def testInvalidLog2(): Unit ={
-    thrown.expect(classOf[ValidationException])
-    // invalid arithmetic argument
     testSqlApi(
       "LOG(-1)",
-      "FAIL"
+      "NaN"
     )
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
index 5c129d0..46555bd 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.batch.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.plan.batch.table.JoinTest.Merger
@@ -127,8 +128,6 @@ class JoinTest extends TableTestBase {
     util.verifyPlan(joined)
   }
 
-  // TODO [FLINK-7942] [table] Reduce aliasing in RexNodes
- // @Ignore
   @Test
   def testFilterJoinRule(): Unit = {
     val util = batchTestUtil()
@@ -143,9 +142,7 @@ class JoinTest extends TableTestBase {
     util.verifyPlan(results)
   }
 
-  // TODO
-  @Ignore("Non-equi-join could be supported later.")
-  @Test
+  @Test(expected = classOf[ValidationException])
   def testFullJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
@@ -154,9 +151,7 @@ class JoinTest extends TableTestBase {
     util.verifyPlan(ds2.fullOuterJoin(ds1, 'b < 'd).select('c, 'g))
   }
 
-  // TODO
-  @Ignore("Non-equi-join could be supported later.")
-  @Test
+  @Test(expected = classOf[ValidationException])
   def testLeftJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
@@ -165,9 +160,7 @@ class JoinTest extends TableTestBase {
     util.verifyPlan(ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g))
   }
 
-  // TODO
-  @Ignore("Non-equi-join could be supported later.")
-  @Test
+  @Test(expected = classOf[ValidationException])
   def testRightJoinNoEquiJoinPredicate(): Unit = {
     val util = batchTestUtil()
     val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
index aae2ce3..35d498a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
@@ -22,13 +22,12 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.planner.utils.TableTestBase
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import java.sql.Timestamp
 
 class SetOperatorsTest extends TableTestBase {
 
-  @Ignore("Support in subQuery in ExpressionConverter")
   @Test
   def testInWithFilter(): Unit = {
     val util = batchTestUtil()
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala
index 697a121..1d7314e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.stream.table
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException}
+import org.apache.flink.table.api.{DataTypes, Table, TableSchema, 
ValidationException}
 import org.apache.flink.table.expressions.{Expression, 
FieldReferenceExpression}
 import org.apache.flink.table.functions.{TemporalTableFunction, 
TemporalTableFunctionImpl}
 import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATO
 
 import org.hamcrest.Matchers.{equalTo, startsWith}
 import org.junit.Assert.{assertEquals, assertThat}
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import java.sql.Timestamp
 
@@ -37,25 +37,24 @@ class TemporalTableJoinTest extends TableTestBase {
 
   val util: TableTestUtil = streamTestUtil()
 
-  val orders = util.addDataStream[(Long, String, Timestamp)](
+  val orders: Table = util.addDataStream[(Long, String, Timestamp)](
     "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
 
-  val ratesHistory = util.addDataStream[(String, Int, Timestamp)](
+  val ratesHistory: Table = util.addDataStream[(String, Int, Timestamp)](
     "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
 
-  val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+  val rates: TemporalTableFunction = 
ratesHistory.createTemporalTableFunction('rowtime, 'currency)
   util.addFunction("Rates", rates)
 
-  val proctimeOrders = util.addDataStream[(Long, String)](
+  val proctimeOrders: Table = util.addDataStream[(Long, String)](
     "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime)
 
-  val proctimeRatesHistory = util.addDataStream[(String, Int)](
+  val proctimeRatesHistory: Table = util.addDataStream[(String, Int)](
     "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)
 
-  val proctimeRates = 
proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency)
+  val proctimeRates: TemporalTableFunction =
+    proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency)
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testSimpleJoin(): Unit = {
     val result = orders
@@ -65,8 +64,6 @@ class TemporalTableJoinTest extends TableTestBase {
     util.verifyPlan(result)
   }
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testSimpleJoin2(): Unit = {
     val resultJava = orders
@@ -76,8 +73,6 @@ class TemporalTableJoinTest extends TableTestBase {
     util.verifyPlan(resultJava)
   }
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testSimpleProctimeJoin(): Unit = {
     val result = proctimeOrders
@@ -92,8 +87,6 @@ class TemporalTableJoinTest extends TableTestBase {
     * Important thing here is that we have complex OR join condition
     * and there are some columns that are not being used (are being pruned).
     */
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testComplexJoin(): Unit = {
     val util = streamTestUtil()
@@ -118,8 +111,6 @@ class TemporalTableJoinTest extends TableTestBase {
     util.verifyPlan(result)
   }
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testTemporalTableFunctionOnTopOfQuery(): Unit = {
     val filteredRatesHistory = ratesHistory
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala
index 83e1683..334c613 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -19,11 +19,11 @@
 package org.apache.flink.table.planner.plan.stream.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, ValidationException}
 import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
 
-import org.junit.{Ignore, Test}
+import org.junit.Test
 
 import java.sql.Timestamp
 
@@ -31,19 +31,19 @@ class TemporalTableJoinValidationTest extends TableTestBase 
{
 
   val util: TableTestUtil = streamTestUtil()
 
-  val orders = util.addDataStream[(Long, String, Timestamp)](
+  val orders: Table = util.addDataStream[(Long, String, Timestamp)](
     "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
 
-  val ordersProctime = util.addDataStream[(Long, String)](
+  val ordersProctime: Table = util.addDataStream[(Long, String)](
     "OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime)
 
-  val ordersWithoutTimeAttribute = util.addDataStream[(Long, String, 
Timestamp)](
+  val ordersWithoutTimeAttribute: Table = util.addDataStream[(Long, String, 
Timestamp)](
     "OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime)
 
-  val ratesHistory = util.addDataStream[(String, Int, Timestamp)](
+  val ratesHistory: Table = util.addDataStream[(String, Int, Timestamp)](
     "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
 
-  val ratesHistoryWithoutTimeAttribute = util.addDataStream[(String, Int, 
Timestamp)](
+  val ratesHistoryWithoutTimeAttribute: Table = util.addDataStream[(String, 
Int, Timestamp)](
     "ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime)
 
   @Test
@@ -62,8 +62,6 @@ class TemporalTableJoinValidationTest extends TableTestBase {
     ratesHistory.createTemporalTableFunction("rowtime", "foobar")
   }
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testNonTimeIndicatorOnRightSide(): Unit = {
     expectedException.expect(classOf[ValidationException])
@@ -94,8 +92,6 @@ class TemporalTableJoinValidationTest extends TableTestBase {
     util.verifyExplain(result)
   }
 
-  // TODO
-  @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule")
   @Test
   def testMixedTimeIndicators(): Unit = {
     expectedException.expect(classOf[ValidationException])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
index dd29c24..837d63b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.runtime.batch.sql
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.TestLimitableTableSource
@@ -99,8 +100,7 @@ class LimitITCase extends BatchTestBase {
       5)
   }
 
-  @Ignore // TODO support limit without sort in table api.
-  @Test
+  @Test(expected = classOf[ValidationException])
   def testTableLimitWithLimitTable(): Unit = {
     Assert.assertEquals(
       executeQuery(tEnv.scan("LimitTable").fetch(5)).size,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
index 8f93ff1..9cbca51 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala
@@ -29,7 +29,7 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData.{buildInData, 
buildInType}
 import org.apache.flink.types.Row
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -477,16 +477,6 @@ class MiscITCase extends BatchTestBase {
     )
   }
 
-  @Ignore // TODO: allows 123L="123"
-  @Test
-  def testCompareLongAndString(): Unit = {
-    checkQuery(
-      Seq((123L, "123"), (19157170390056973L, "19157170390056971")),
-      "select f0=f1 from Table1",
-      Seq(Tuple1(true), Tuple1(false))
-    )
-  }
-
   @Test
   def testOrderByOrdinal(): Unit = {
     env.setParallelism(1)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
index fe51bd8..c31f0cf 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala
@@ -84,7 +84,6 @@ class RankITCase extends BatchTestBase {
       Seq(row(2, 2, 2), row(3, 6, 3), row(4, 7, 3), row(4, 10, 3), row(5, 14, 
2), row(5, 15, 2)))
   }
 
-  @Ignore
   @Test
   def testRankValueFilterWithLowerValue(): Unit = {
     checkResult("SELECT * FROM (" +
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
index fd0a126..9acd358 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
@@ -18,13 +18,11 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql.agg
 
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.Types
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
-import org.apache.flink.types.Row
-import org.junit.{Before, Ignore, Test}
+
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -338,32 +336,4 @@ abstract class DistinctAggregateITCaseBase extends 
BatchTestBase {
       "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE 
b > 3) FROM Table5",
       Seq(row(3, 3, 5, 2, 12)))
   }
-
-  // TODO remove Ignore after supporting generated code cloud be splitted into
-  //  small classes or methods due to code is too large
-  @Ignore
-  @Test
-  def testMaxDistinctAggOnDifferentColumn(): Unit = {
-    // the max groupCount must be less than 64.
-    // so the max number of distinct aggregate on different column without 
group by column is 63.
-    val fields = (0 until 63).map(i => s"f$i")
-    val types = new RowTypeInfo(Seq.fill(fields.size)(Types.INT): _*)
-    val nullablesOfData = Array.fill(fields.size)(false)
-    val data = new Row(fields.length)
-    fields.indices.foreach(i => data.setField(i, i))
-
-    registerCollection("MyTable", Seq(data), types, fields.mkString(","), 
nullablesOfData)
-
-    val expected = new Row(fields.length * 2)
-    fields.indices.foreach(i => expected.setField(i, 1))
-    fields.indices.foreach(i => expected.setField(i + fields.length, i))
-
-    val distinctList = fields.map(f => s"COUNT(DISTINCT $f)").mkString(", ")
-    val maxList = fields.map(f => s"MAX($f)").mkString(", ")
-    checkResult(
-      s"SELECT $distinctList, $maxList FROM MyTable",
-      Seq(expected)
-    )
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
index 0bd467f..2e013e8 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala
@@ -18,26 +18,17 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql.agg
 
-import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
LONG_TYPE_INFO, STRING_TYPE_INFO}
 import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.core.io.InputSplit
-
-import scala.collection.JavaConversions._
-import org.apache.flink.table.api.{TableSchema, _}
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
 import org.apache.flink.table.planner.utils.{CountAggFunction, 
IntAvgAggFunction, IntSumAggFunction}
-import org.apache.flink.table.sources.InputFormatTableSource
-import org.apache.flink.types.Row
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 class WindowAggregateITCase extends BatchTestBase {
 
@@ -405,120 +396,6 @@ class WindowAggregateITCase extends BatchTestBase {
     )
   }
 
-  @Ignore // TODO support table stats
-  @Test
-  def testSlidingWindow2(): Unit = {
-    // for 1 phase agg
-    val tableSchema = new TableSchema(
-      Array("a", "b", "c", "ts"),
-      Array(
-        Types.INT,
-        Types.LONG,
-        Types.STRING,
-        Types.LOCAL_DATE_TIME))
-    //    val colStats = Map[String, ColumnStats](
-    //      "ts" -> new ColumnStats(9000000L, 1L, 8D, 8, null, null),
-    //      "a" -> new ColumnStats(10000000L, 1L, 8D, 8, 5, -5),
-    //      "b" -> new ColumnStats(8000000L, 0L, 4D, 32, 6.1D, 0D),
-    //      "c" -> new ColumnStats(9000000L, 0L, 1024D, 32, 6.1D, 0D))
-    val table = new InputFormatTableSource[Row] {
-      override def getReturnType: TypeInformation[Row] = type3WithTimestamp
-
-      //      override def getTableStats: TableStats = new 
TableStats(10000000L, colStats)
-
-      override def getInputFormat: InputFormat[Row, _ <: InputSplit] = {
-        new CollectionInputFormat[Row](data3WithTimestamp,
-          type3WithTimestamp.createSerializer(env.getConfig))
-      }
-
-      override def getTableSchema: TableSchema = tableSchema
-    }
-    tEnv.registerTableSource("Table3WithTimestamp1", table)
-
-    // keyed; 1-phase; pre-accumulate with paned optimization
-    checkResult(
-      "SELECT c, countFun(a), " +
-          "HOP_START(ts, INTERVAL '4' SECOND, INTERVAL '8' SECOND), " +
-          "HOP_END(ts, INTERVAL '4' SECOND, INTERVAL '8' SECOND)" +
-          "FROM Table3WithTimestamp1 GROUP BY c, HOP(ts, INTERVAL '4' SECOND, 
INTERVAL '8' SECOND)",
-      Seq(
-        row("Comment#1", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("Comment#1", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Comment#10", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#10", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#11", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#11", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#12", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#12", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#13", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#13", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#14", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#14", 1, "1970-01-01 00:00:20.0", "1970-01-01 00:00:28.0"),
-        row("Comment#15", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#15", 1, "1970-01-01 00:00:20.0", "1970-01-01 00:00:28.0"),
-        row("Comment#2", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Comment#2", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#3", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Comment#3", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#4", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Comment#4", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#5", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Comment#5", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#6", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#6", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#7", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#7", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#8", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#8", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Comment#9", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#9", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"),
-        row("Hello world, how are you?", 1, "1970-01-01 00:00:00.0", 
"1970-01-01 00:00:08.0"),
-        row("Hello world, how are you?", 1, "1970-01-01 00:00:04.0", 
"1970-01-01 00:00:12.0"),
-        row("Hello world", 1, "1969-12-31 23:59:56.0", "1970-01-01 
00:00:04.0"),
-        row("Hello world", 1, "1970-01-01 00:00:00.0", "1970-01-01 
00:00:08.0"),
-        row("Hello", 1, "1969-12-31 23:59:56.0", "1970-01-01 00:00:04.0"),
-        row("Hello", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("Hi", 1, "1969-12-31 23:59:56.0", "1970-01-01 00:00:04.0"),
-        row("Hi", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("I am fine.", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("I am fine.", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"),
-        row("Luke Skywalker", 1, "1970-01-01 00:00:00.0", "1970-01-01 
00:00:08.0"),
-        row("Luke Skywalker", 1, "1970-01-01 00:00:04.0", "1970-01-01 
00:00:12.0")
-      )
-    )
-
-    // keyed; 1-phase; pre-accumulate with windowSize = slideSize
-    checkResult(
-      "SELECT c, count(a), " +
-          "HOP_START(ts, INTERVAL '8' SECOND, INTERVAL '8' SECOND), " +
-          "HOP_END(ts, INTERVAL '8' SECOND, INTERVAL '8' SECOND)" +
-          "FROM Table3WithTimestamp1 GROUP BY c, HOP(ts, INTERVAL '8' SECOND, 
INTERVAL '8' SECOND)",
-      Seq(
-        row("Comment#1", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("Comment#10", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#11", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#12", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#13", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#14", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#15", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"),
-        row("Comment#2", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#3", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#4", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#5", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#6", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#7", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#8", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Comment#9", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"),
-        row("Hello world, how are you?", 1, "1970-01-01 00:00:00.0", 
"1970-01-01 00:00:08.0"),
-        row("Hello world", 1, "1970-01-01 00:00:00.0", "1970-01-01 
00:00:08.0"),
-        row("Hello", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("Hi", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("I am fine.", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"),
-        row("Luke Skywalker", 1, "1970-01-01 00:00:00.0", "1970-01-01 
00:00:08.0")
-      )
-    )
-  }
-
   @Test
   def testNullValueInputTimestamp(): Unit = {
     // Sliding window
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala
index 7a7530c..98e7e4b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala
@@ -22,10 +22,8 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
-// @RunWith(classOf[Parameterized]) TODO
-@Ignore // TODO support JoinConditionTypeCoerce
 class JoinConditionTypeCoerceITCase extends BatchTestBase {
 
   @Before
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
index 27552c1..27ce9b0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -48,7 +48,6 @@ class JoinWithoutKeyITCase extends BatchTestBase {
 
   // single row join
 
-  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testCrossJoinWithLeftSingleRowInput(): Unit = {
     checkResult(
@@ -60,7 +59,6 @@ class JoinWithoutKeyITCase extends BatchTestBase {
       ))
   }
 
-  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testCrossJoinWithRightSingleRowInput(): Unit = {
     checkResult(
@@ -72,7 +70,6 @@ class JoinWithoutKeyITCase extends BatchTestBase {
       ))
   }
 
-  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testCrossJoinWithEmptySingleRowInput(): Unit = {
     checkResult(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala
index 2726f6e..414a29d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData._
 
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import java.util
 
@@ -226,7 +226,6 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends 
BatchTestBase {
     )
   }
 
-  @Ignore // TODO not support same source until set lazy_from_source
   @Test
   def testInWithAggregate2(): Unit = {
     checkResult(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
index 8ef2947..e8f1063 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala
@@ -317,7 +317,6 @@ class AggregationITCase extends BatchTestBase {
   }
 
   @Test
-  @Ignore // TODO support it
   def testAnalyticAggregation(): Unit = {
     val ds = BatchTableEnvUtil.fromElements(tEnv,
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, BigDecimal.ONE),
@@ -333,13 +332,13 @@ class AggregationITCase extends BatchTestBase {
       '_6.varSamp, '_7.varSamp)
     val expected =
       "0,0,0," +
-        "0,0.5,0.5,0.5," +
+        "0,0.5,0.5,0.500000000000000000," +
         "1,1,1," +
-        "1,0.70710677,0.7071067811865476,0.7071067811865476," +
+        "1,0.70710677,0.7071067811865476,0.707106781186547600," +
         "0,0,0," +
-        "0,0.25,0.25,0.25," +
+        "0,0.25,0.25,0.250000000000000000," +
         "1,1,1," +
-        "1,0.5,0.5,0.5"
+        "1,0.5,0.5,0.500000000000000000"
     val results = executeQuery(res)
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
index 82cad55..b7b7391 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala
@@ -264,8 +264,6 @@ class CalcITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("After implement BatchExecHashJoin")
   @Test
   def testCalcJoin(): Unit = {
     val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv, "a, b, c")
@@ -279,8 +277,6 @@ class CalcITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Implicit type converter")
   @Test
   def testAdvancedDataTypes(): Unit = {
 
@@ -315,8 +311,6 @@ class CalcITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Implicit type converter")
   @Test
   def testNumericAutocastInArithmetic() {
     val table = BatchTableEnvUtil.fromElements(tEnv,
@@ -455,8 +449,6 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   def testRowType(): Unit = {
     val data = new mutable.MutableList[(Int, Long, String)]
     data.+=((1, 1L, "Jack#22"))
@@ -636,14 +628,12 @@ class CalcITCase extends BatchTestBase {
   }
 
   @Test
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   def testSplitFieldsOnCustomType(): Unit = {
     tEnv.getConfig.setMaxGeneratedCodeLength(1) // splits fields
 
-    val ds = CollectionBatchExecTable.getCustomTypeDataSet(tEnv, "i, l, s")
-      .filter('s.like("%a%") && 's.charLength() > 12)
-      .select('i, 'l, 's.charLength())
+    val ds = CollectionBatchExecTable.getCustomTypeDataSet(tEnv, "myInt, 
myLong, myString")
+      .filter('myString.like("%a%") && 'myString.charLength() > 12)
+      .select('myInt, 'myLong, 'myString.charLength())
 
     val expected = "3,3,25\n" + "3,5,14\n"
     val results = executeQuery(ds)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
index 5d50ec2..6644883 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala
@@ -27,9 +27,9 @@ import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.types.Row
 
 import org.junit.Assert._
+import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Ignore, _}
 
 import scala.collection.mutable
 
@@ -203,8 +203,6 @@ class SortITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     assertEquals(expected, sink.getRetractResults)
   }
 
-  // FIXME
-  @Ignore("Enable after StreamExecJoin implements ExecNode.")
   @Test
   def testSortWithWhere(): Unit = {
     val sqlQuery =
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index 3581e9a..0065b8e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.Types
 import 
org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase.PartialAggMode
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.{AggMode, 
LocalGlobalOff, LocalGlobalOn}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchOn
@@ -34,7 +34,7 @@ import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.{Before, Ignore, Test}
+import org.junit.{Before, Test}
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
@@ -296,7 +296,6 @@ class SplitAggregateITCase(
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  @Ignore("[FLINK-12088]: JOIN is not supported")
   @Test
   def testAggWithJoin(): Unit = {
     val t1 = tEnv.sqlQuery(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
index 278a2b0..c32131e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala
@@ -37,7 +37,6 @@ import scala.collection.{Seq, mutable}
 @RunWith(classOf[Parameterized])
 class CalcITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
 
-  @Ignore("CodeGen split")
   @Test
   def testFunctionSplitWhenCodegenOverLengthLimit(): Unit = {
     // test function split
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index f1bd827..99cc2c7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -626,7 +626,6 @@ class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
-  // TODO
   @Ignore("Non-equi-join could be supported later.")
   @Test
   def testNonEqualInnerJoin(): Unit = {
@@ -642,7 +641,6 @@ class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
-  // TODO
   @Ignore("Non-equi-join could be supported later.")
   @Test
   def testNonEqualInnerJoinWithRetract(): Unit = {

Reply via email to