This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 490e2af [FLINK-16935][table-planner-blink] Enable or delete most of the ignored test cases in blink planner. 490e2af is described below commit 490e2af0f9cc3021b6423535768e9f3604b27519 Author: Kurt Young <k...@apache.org> AuthorDate: Sat Apr 25 20:36:59 2020 +0800 [FLINK-16935][table-planner-blink] Enable or delete most of the ignored test cases in blink planner. This closes #11874 --- .../flink/table/planner/plan/utils/SortUtil.scala | 3 +- .../batch/table/stringexpr/SetOperatorsTest.xml | 14 +++ .../plan/stream/table/TemporalTableJoinTest.xml | 90 +++++++++++++++ .../planner/expressions/DecimalTypeTest.scala | 3 +- .../expressions/NonDeterministicTests.scala | 58 +++++----- .../validation/ScalarFunctionsValidationTest.scala | 13 +-- .../table/planner/plan/batch/table/JoinTest.scala | 15 +-- .../batch/table/stringexpr/SetOperatorsTest.scala | 3 +- .../plan/stream/table/TemporalTableJoinTest.scala | 27 ++--- .../TemporalTableJoinValidationTest.scala | 18 ++- .../planner/runtime/batch/sql/LimitITCase.scala | 4 +- .../planner/runtime/batch/sql/MiscITCase.scala | 12 +- .../planner/runtime/batch/sql/RankITCase.scala | 1 - .../sql/agg/DistinctAggregateITCaseBase.scala | 34 +----- .../batch/sql/agg/WindowAggregateITCase.scala | 125 +-------------------- .../sql/join/JoinConditionTypeCoerceITCase.scala | 4 +- .../batch/sql/join/JoinWithoutKeyITCase.scala | 5 +- .../runtime/batch/sql/join/SemiJoinITCase.scala | 3 +- .../runtime/batch/table/AggregationITCase.scala | 9 +- .../planner/runtime/batch/table/CalcITCase.scala | 16 +-- .../planner/runtime/stream/sql/SortITCase.scala | 4 +- .../runtime/stream/sql/SplitAggregateITCase.scala | 5 +- .../planner/runtime/stream/table/CalcITCase.scala | 1 - .../planner/runtime/stream/table/JoinITCase.scala | 2 - 24 files changed, 177 insertions(+), 292 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala index 89f7540..59dec37 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/SortUtil.scala @@ -46,8 +46,7 @@ object SortUtil { if (fetch != null) { getLimitStart(offset) + RexLiteral.intValue(fetch) } else { - // TODO return Long.MaxValue when providing FlinkRelMdRowCount on Sort ? - Integer.MAX_VALUE + Long.MaxValue } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml index 0ebd86b..c8a528a 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.xml @@ -16,6 +16,20 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testInWithFilter"> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[LeftSemiJoin], where=[=(c, a1)], select=[a, b, c], build=[right], tryDistinctBuildRow=[true]) +:- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH]) +: +- TableSourceScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], reuse_id=[1]) ++- Exchange(distribution=[hash[a1]]) + +- LocalHashAggregate(groupBy=[a1], select=[a1]) + +- Calc(select=[a AS a1], where=[=(b, _UTF-16LE'two')]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testInWithProject"> <Resource name="planBefore"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml new file mode 100644 index 0000000..8d55f3f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testSimpleJoin"> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[*(o_amount, rate) AS rate]) ++- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime]) + :- Exchange(distribution=[hash[o_currency]]) + : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime]) + +- Exchange(distribution=[hash[currency]]) + +- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime]) +]]> + </Resource> + </TestCase> + + <TestCase name="testSimpleJoin2"> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[*(o_amount, rate) AS rate]) ++- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime]) + :- Exchange(distribution=[hash[o_currency]]) + : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime]) + +- Exchange(distribution=[hash[currency]]) + +- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime]) +]]> + </Resource> + </TestCase> + + <TestCase name="testSimpleProctimeJoin"> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[*(o_amount, rate) AS rate]) ++- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_proctime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_proctime, currency, rate, proctime]) + :- Exchange(distribution=[hash[o_currency]]) + : +- DataStreamScan(table=[[default_catalog, default_database, ProctimeOrders]], fields=[o_amount, o_currency, o_proctime]) + +- Exchange(distribution=[hash[currency]]) + +- DataStreamScan(table=[[default_catalog, default_database, ProctimeRatesHistory]], fields=[currency, rate, proctime]) +]]> + </Resource> + </TestCase> + + <TestCase name="testTemporalTableFunctionOnTopOfQuery"> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[*(o_amount, rate) AS rate]) ++- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), =(currency, o_currency))], select=[o_amount, o_currency, o_rowtime, currency, rate, rowtime]) + :- Exchange(distribution=[hash[o_currency]]) + : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[o_amount, o_currency, o_rowtime]) + +- Exchange(distribution=[hash[currency]]) + +- Calc(select=[currency, *(rate, 2) AS rate, rowtime], where=[>(rate, 100)]) + +- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, rowtime]) +]]> + </Resource> + </TestCase> + + <TestCase name="testComplexJoin"> + <Resource name="planAfter"> + <![CDATA[ +Join(joinType=[InnerJoin], where=[=(t3_secondary_key, secondary_key)], select=[rate, secondary_key, t3_comment, t3_secondary_key], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +:- Exchange(distribution=[hash[secondary_key]]) +: +- Calc(select=[*(o_amount, rate) AS rate, secondary_key]) +: +- TemporalJoin(joinType=[InnerJoin], where=[AND(__TEMPORAL_JOIN_CONDITION(o_rowtime, rowtime, currency), OR(=(currency, o_currency), =(secondary_key, o_secondary_key)))], select=[o_rowtime, o_amount, o_currency, o_secondary_key, rowtime, currency, rate, secondary_key]) +: :- Exchange(distribution=[single]) +: : +- Calc(select=[rowtime AS o_rowtime, o_amount, o_currency, o_secondary_key]) +: : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[rowtime, o_comment, o_amount, o_currency, o_secondary_key]) +: +- Exchange(distribution=[single]) +: +- Calc(select=[rowtime, currency, rate, secondary_key], where=[>(rate, 110:BIGINT)]) +: +- DataStreamScan(table=[[default_catalog, default_database, RatesHistory]], fields=[rowtime, comment, currency, rate, secondary_key]) ++- Exchange(distribution=[hash[t3_secondary_key]]) + +- DataStreamScan(table=[[default_catalog, default_database, ThirdTable]], fields=[t3_comment, t3_secondary_key]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala index 635f188..2a88898 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/DecimalTypeTest.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo import org.apache.flink.table.types.logical.DecimalType import org.apache.flink.types.Row + import org.junit.{Ignore, Test} class DecimalTypeTest extends ExpressionTestBase { @@ -132,7 +133,7 @@ class DecimalTypeTest extends ExpressionTestBase { @Ignore @Test def testDefaultDecimalCasting(): Unit = { - // from String +// // from String testTableApi( "123456789123456789123456789".cast(DataTypes.DECIMAL(38, 0)), "'123456789123456789123456789'.cast(DECIMAL)", diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala index 2565383..9cd8c63 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala @@ -23,71 +23,65 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row -import org.junit.{Ignore, Test} +import org.junit.Test /** - * Tests that can only be checked manually as they are non-deterministic. + * Tests that check all non-deterministic functions can be executed. */ class NonDeterministicTests extends ExpressionTestBase { - @Ignore @Test def testCurrentDate(): Unit = { testAllApis( - currentDate(), - "currentDate()", - "CURRENT_DATE", - "PLEASE CHECK MANUALLY") + currentDate().isGreater("1970-01-01".toDate), + "currentDate() > '1970-01-01'.toDate", + "CURRENT_DATE > DATE '1970-01-01'", + "true") } - @Ignore @Test def testCurrentTime(): Unit = { testAllApis( - currentTime(), - "currentTime()", - "CURRENT_TIME", - "PLEASE CHECK MANUALLY") + currentTime().isGreaterOrEqual("00:00:00".toTime), + "currentTime() >= '00:00:00'.toTime", + "CURRENT_TIME >= TIME '00:00:00'", + "true") } - @Ignore @Test def testCurrentTimestamp(): Unit = { testAllApis( - currentTimestamp(), - "currentTimestamp()", - "CURRENT_TIMESTAMP", - "PLEASE CHECK MANUALLY") + currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp), + "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp", + "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'", + "true") } - @Ignore @Test def testLocalTimestamp(): Unit = { testAllApis( - localTimestamp(), - "localTimestamp()", - "LOCALTIMESTAMP", - "PLEASE CHECK MANUALLY") + localTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp), + "localTimestamp() > '1970-01-01 00:00:00'.toTimestamp", + "LOCALTIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'", + "true") } - @Ignore @Test def testLocalTime(): Unit = { testAllApis( - localTime(), - "localTime()", - "LOCALTIME", - "PLEASE CHECK MANUALLY") + localTime().isGreaterOrEqual("00:00:00".toTime), + "localTime() >= '00:00:00'.toTime", + "LOCALTIME >= TIME '00:00:00'", + "true") } - @Ignore @Test def testUUID(): Unit = { testAllApis( - uuid(), - "uuid()", - "UUID()", - "PLEASE CHECK MANUALLY") + uuid().charLength(), + "uuid().charLength", + "CHARACTER_LENGTH(UUID())", + "36") } // ---------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala index e23a5bf..f38ab93 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/validation/ScalarFunctionsValidationTest.scala @@ -23,8 +23,9 @@ import org.apache.flink.table.api.{SqlParserException, ValidationException} import org.apache.flink.table.expressions.TimePointUnit import org.apache.flink.table.planner.codegen.CodeGenException import org.apache.flink.table.planner.expressions.utils.ScalarTypesTestBase + import org.apache.calcite.avatica.util.TimeUnit -import org.junit.{Ignore, Test} +import org.junit.Test class ScalarFunctionsValidationTest extends ScalarTypesTestBase { @@ -32,25 +33,19 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { // Math functions // ---------------------------------------------------------------------------------------------- - @Ignore @Test def testInvalidLog1(): Unit = { - thrown.expect(classOf[ValidationException]) - // invalid arithmetic argument testSqlApi( "LOG(1, 100)", - "FAIL" + "Infinity" ) } - @Ignore @Test def testInvalidLog2(): Unit ={ - thrown.expect(classOf[ValidationException]) - // invalid arithmetic argument testSqlApi( "LOG(-1)", - "FAIL" + "NaN" ) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala index 5c129d0..46555bd 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.batch.table import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.plan.batch.table.JoinTest.Merger @@ -127,8 +128,6 @@ class JoinTest extends TableTestBase { util.verifyPlan(joined) } - // TODO [FLINK-7942] [table] Reduce aliasing in RexNodes - // @Ignore @Test def testFilterJoinRule(): Unit = { val util = batchTestUtil() @@ -143,9 +142,7 @@ class JoinTest extends TableTestBase { util.verifyPlan(results) } - // TODO - @Ignore("Non-equi-join could be supported later.") - @Test + @Test(expected = classOf[ValidationException]) def testFullJoinNoEquiJoinPredicate(): Unit = { val util = batchTestUtil() val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c) @@ -154,9 +151,7 @@ class JoinTest extends TableTestBase { util.verifyPlan(ds2.fullOuterJoin(ds1, 'b < 'd).select('c, 'g)) } - // TODO - @Ignore("Non-equi-join could be supported later.") - @Test + @Test(expected = classOf[ValidationException]) def testLeftJoinNoEquiJoinPredicate(): Unit = { val util = batchTestUtil() val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c) @@ -165,9 +160,7 @@ class JoinTest extends TableTestBase { util.verifyPlan(ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)) } - // TODO - @Ignore("Non-equi-join could be supported later.") - @Test + @Test(expected = classOf[ValidationException]) def testRightJoinNoEquiJoinPredicate(): Unit = { val util = batchTestUtil() val ds1 = util.addTableSource[(Int, Long, String)]("Table3",'a, 'b, 'c) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala index aae2ce3..35d498a 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala @@ -22,13 +22,12 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.utils.TableTestBase -import org.junit.{Ignore, Test} +import org.junit.Test import java.sql.Timestamp class SetOperatorsTest extends TableTestBase { - @Ignore("Support in subQuery in ExpressionConverter") @Test def testInWithFilter(): Unit = { val util = batchTestUtil() diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala index 697a121..1d7314e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableJoinTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.stream.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{DataTypes, TableSchema, ValidationException} +import org.apache.flink.table.api.{DataTypes, Table, TableSchema, ValidationException} import org.apache.flink.table.expressions.{Expression, FieldReferenceExpression} import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl} import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} @@ -29,7 +29,7 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATO import org.hamcrest.Matchers.{equalTo, startsWith} import org.junit.Assert.{assertEquals, assertThat} -import org.junit.{Ignore, Test} +import org.junit.Test import java.sql.Timestamp @@ -37,25 +37,24 @@ class TemporalTableJoinTest extends TableTestBase { val util: TableTestUtil = streamTestUtil() - val orders = util.addDataStream[(Long, String, Timestamp)]( + val orders: Table = util.addDataStream[(Long, String, Timestamp)]( "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime) - val ratesHistory = util.addDataStream[(String, Int, Timestamp)]( + val ratesHistory: Table = util.addDataStream[(String, Int, Timestamp)]( "RatesHistory", 'currency, 'rate, 'rowtime.rowtime) - val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency) + val rates: TemporalTableFunction = ratesHistory.createTemporalTableFunction('rowtime, 'currency) util.addFunction("Rates", rates) - val proctimeOrders = util.addDataStream[(Long, String)]( + val proctimeOrders: Table = util.addDataStream[(Long, String)]( "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime) - val proctimeRatesHistory = util.addDataStream[(String, Int)]( + val proctimeRatesHistory: Table = util.addDataStream[(String, Int)]( "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime) - val proctimeRates = proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency) + val proctimeRates: TemporalTableFunction = + proctimeRatesHistory.createTemporalTableFunction('proctime, 'currency) - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testSimpleJoin(): Unit = { val result = orders @@ -65,8 +64,6 @@ class TemporalTableJoinTest extends TableTestBase { util.verifyPlan(result) } - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testSimpleJoin2(): Unit = { val resultJava = orders @@ -76,8 +73,6 @@ class TemporalTableJoinTest extends TableTestBase { util.verifyPlan(resultJava) } - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testSimpleProctimeJoin(): Unit = { val result = proctimeOrders @@ -92,8 +87,6 @@ class TemporalTableJoinTest extends TableTestBase { * Important thing here is that we have complex OR join condition * and there are some columns that are not being used (are being pruned). */ - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testComplexJoin(): Unit = { val util = streamTestUtil() @@ -118,8 +111,6 @@ class TemporalTableJoinTest extends TableTestBase { util.verifyPlan(result) } - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testTemporalTableFunctionOnTopOfQuery(): Unit = { val filteredRatesHistory = ratesHistory diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala index 83e1683..334c613 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/TemporalTableJoinValidationTest.scala @@ -19,11 +19,11 @@ package org.apache.flink.table.planner.plan.stream.table.validation import org.apache.flink.api.scala._ -import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{Table, ValidationException} import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} -import org.junit.{Ignore, Test} +import org.junit.Test import java.sql.Timestamp @@ -31,19 +31,19 @@ class TemporalTableJoinValidationTest extends TableTestBase { val util: TableTestUtil = streamTestUtil() - val orders = util.addDataStream[(Long, String, Timestamp)]( + val orders: Table = util.addDataStream[(Long, String, Timestamp)]( "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime) - val ordersProctime = util.addDataStream[(Long, String)]( + val ordersProctime: Table = util.addDataStream[(Long, String)]( "OrdersProctime", 'o_amount, 'o_currency, 'o_rowtime.proctime) - val ordersWithoutTimeAttribute = util.addDataStream[(Long, String, Timestamp)]( + val ordersWithoutTimeAttribute: Table = util.addDataStream[(Long, String, Timestamp)]( "OrdersWithoutTimeAttribute", 'o_amount, 'o_currency, 'o_rowtime) - val ratesHistory = util.addDataStream[(String, Int, Timestamp)]( + val ratesHistory: Table = util.addDataStream[(String, Int, Timestamp)]( "RatesHistory", 'currency, 'rate, 'rowtime.rowtime) - val ratesHistoryWithoutTimeAttribute = util.addDataStream[(String, Int, Timestamp)]( + val ratesHistoryWithoutTimeAttribute: Table = util.addDataStream[(String, Int, Timestamp)]( "ratesHistoryWithoutTimeAttribute", 'currency, 'rate, 'rowtime) @Test @@ -62,8 +62,6 @@ class TemporalTableJoinValidationTest extends TableTestBase { ratesHistory.createTemporalTableFunction("rowtime", "foobar") } - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testNonTimeIndicatorOnRightSide(): Unit = { expectedException.expect(classOf[ValidationException]) @@ -94,8 +92,6 @@ class TemporalTableJoinValidationTest extends TableTestBase { util.verifyExplain(result) } - // TODO - @Ignore("Fix bug in LogicalCorrelateToTemporalTableJoinRule") @Test def testMixedTimeIndicators(): Unit = { expectedException.expect(classOf[ValidationException]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala index dd29c24..837d63b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/LimitITCase.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.utils.TestLimitableTableSource @@ -99,8 +100,7 @@ class LimitITCase extends BatchTestBase { 5) } - @Ignore // TODO support limit without sort in table api. - @Test + @Test(expected = classOf[ValidationException]) def testTableLimitWithLimitTable(): Unit = { Assert.assertEquals( executeQuery(tEnv.scan("LimitTable").fetch(5)).size, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala index 8f93ff1..9cbca51 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/MiscITCase.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData.{buildInData, buildInType} import org.apache.flink.types.Row -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} import scala.collection.Seq @@ -477,16 +477,6 @@ class MiscITCase extends BatchTestBase { ) } - @Ignore // TODO: allows 123L="123" - @Test - def testCompareLongAndString(): Unit = { - checkQuery( - Seq((123L, "123"), (19157170390056973L, "19157170390056971")), - "select f0=f1 from Table1", - Seq(Tuple1(true), Tuple1(false)) - ) - } - @Test def testOrderByOrdinal(): Unit = { env.setParallelism(1) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala index fe51bd8..c31f0cf 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/RankITCase.scala @@ -84,7 +84,6 @@ class RankITCase extends BatchTestBase { Seq(row(2, 2, 2), row(3, 6, 3), row(4, 7, 3), row(4, 10, 3), row(5, 14, 2), row(5, 15, 2))) } - @Ignore @Test def testRankValueFilterWithLowerValue(): Unit = { checkResult("SELECT * FROM (" + diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala index fd0a126..9acd358 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala @@ -18,13 +18,11 @@ package org.apache.flink.table.planner.runtime.batch.sql.agg -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.Types import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ -import org.apache.flink.types.Row -import org.junit.{Before, Ignore, Test} + +import org.junit.{Before, Test} import scala.collection.Seq @@ -338,32 +336,4 @@ abstract class DistinctAggregateITCaseBase extends BatchTestBase { "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3) FROM Table5", Seq(row(3, 3, 5, 2, 12))) } - - // TODO remove Ignore after supporting generated code cloud be splitted into - // small classes or methods due to code is too large - @Ignore - @Test - def testMaxDistinctAggOnDifferentColumn(): Unit = { - // the max groupCount must be less than 64. - // so the max number of distinct aggregate on different column without group by column is 63. - val fields = (0 until 63).map(i => s"f$i") - val types = new RowTypeInfo(Seq.fill(fields.size)(Types.INT): _*) - val nullablesOfData = Array.fill(fields.size)(false) - val data = new Row(fields.length) - fields.indices.foreach(i => data.setField(i, i)) - - registerCollection("MyTable", Seq(data), types, fields.mkString(","), nullablesOfData) - - val expected = new Row(fields.length * 2) - fields.indices.foreach(i => expected.setField(i, 1)) - fields.indices.foreach(i => expected.setField(i + fields.length, i)) - - val distinctList = fields.map(f => s"COUNT(DISTINCT $f)").mkString(", ") - val maxList = fields.map(f => s"MAX($f)").mkString(", ") - checkResult( - s"SELECT $distinctList, $maxList FROM MyTable", - Seq(expected) - ) - } - } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala index 0bd467f..2e013e8 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/WindowAggregateITCase.scala @@ -18,26 +18,17 @@ package org.apache.flink.table.planner.runtime.batch.sql.agg -import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO} import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.core.io.InputSplit - -import scala.collection.JavaConversions._ -import org.apache.flink.table.api.{TableSchema, _} import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime import org.apache.flink.table.planner.utils.{CountAggFunction, IntAvgAggFunction, IntSumAggFunction} -import org.apache.flink.table.sources.InputFormatTableSource -import org.apache.flink.types.Row -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} class WindowAggregateITCase extends BatchTestBase { @@ -405,120 +396,6 @@ class WindowAggregateITCase extends BatchTestBase { ) } - @Ignore // TODO support table stats - @Test - def testSlidingWindow2(): Unit = { - // for 1 phase agg - val tableSchema = new TableSchema( - Array("a", "b", "c", "ts"), - Array( - Types.INT, - Types.LONG, - Types.STRING, - Types.LOCAL_DATE_TIME)) - // val colStats = Map[String, ColumnStats]( - // "ts" -> new ColumnStats(9000000L, 1L, 8D, 8, null, null), - // "a" -> new ColumnStats(10000000L, 1L, 8D, 8, 5, -5), - // "b" -> new ColumnStats(8000000L, 0L, 4D, 32, 6.1D, 0D), - // "c" -> new ColumnStats(9000000L, 0L, 1024D, 32, 6.1D, 0D)) - val table = new InputFormatTableSource[Row] { - override def getReturnType: TypeInformation[Row] = type3WithTimestamp - - // override def getTableStats: TableStats = new TableStats(10000000L, colStats) - - override def getInputFormat: InputFormat[Row, _ <: InputSplit] = { - new CollectionInputFormat[Row](data3WithTimestamp, - type3WithTimestamp.createSerializer(env.getConfig)) - } - - override def getTableSchema: TableSchema = tableSchema - } - tEnv.registerTableSource("Table3WithTimestamp1", table) - - // keyed; 1-phase; pre-accumulate with paned optimization - checkResult( - "SELECT c, countFun(a), " + - "HOP_START(ts, INTERVAL '4' SECOND, INTERVAL '8' SECOND), " + - "HOP_END(ts, INTERVAL '4' SECOND, INTERVAL '8' SECOND)" + - "FROM Table3WithTimestamp1 GROUP BY c, HOP(ts, INTERVAL '4' SECOND, INTERVAL '8' SECOND)", - Seq( - row("Comment#1", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Comment#1", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Comment#10", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#10", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#11", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#11", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#12", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#12", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#13", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#13", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#14", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#14", 1, "1970-01-01 00:00:20.0", "1970-01-01 00:00:28.0"), - row("Comment#15", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#15", 1, "1970-01-01 00:00:20.0", "1970-01-01 00:00:28.0"), - row("Comment#2", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Comment#2", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#3", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Comment#3", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#4", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Comment#4", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#5", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Comment#5", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#6", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#6", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#7", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#7", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#8", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#8", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Comment#9", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#9", 1, "1970-01-01 00:00:12.0", "1970-01-01 00:00:20.0"), - row("Hello world, how are you?", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hello world, how are you?", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Hello world", 1, "1969-12-31 23:59:56.0", "1970-01-01 00:00:04.0"), - row("Hello world", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hello", 1, "1969-12-31 23:59:56.0", "1970-01-01 00:00:04.0"), - row("Hello", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hi", 1, "1969-12-31 23:59:56.0", "1970-01-01 00:00:04.0"), - row("Hi", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("I am fine.", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("I am fine.", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0"), - row("Luke Skywalker", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Luke Skywalker", 1, "1970-01-01 00:00:04.0", "1970-01-01 00:00:12.0") - ) - ) - - // keyed; 1-phase; pre-accumulate with windowSize = slideSize - checkResult( - "SELECT c, count(a), " + - "HOP_START(ts, INTERVAL '8' SECOND, INTERVAL '8' SECOND), " + - "HOP_END(ts, INTERVAL '8' SECOND, INTERVAL '8' SECOND)" + - "FROM Table3WithTimestamp1 GROUP BY c, HOP(ts, INTERVAL '8' SECOND, INTERVAL '8' SECOND)", - Seq( - row("Comment#1", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Comment#10", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#11", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#12", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#13", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#14", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#15", 1, "1970-01-01 00:00:16.0", "1970-01-01 00:00:24.0"), - row("Comment#2", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#3", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#4", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#5", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#6", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#7", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#8", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Comment#9", 1, "1970-01-01 00:00:08.0", "1970-01-01 00:00:16.0"), - row("Hello world, how are you?", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hello world", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hello", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Hi", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("I am fine.", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0"), - row("Luke Skywalker", 1, "1970-01-01 00:00:00.0", "1970-01-01 00:00:08.0") - ) - ) - } - @Test def testNullValueInputTimestamp(): Unit = { // Sliding window diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala index 7a7530c..98e7e4b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinConditionTypeCoerceITCase.scala @@ -22,10 +22,8 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} -// @RunWith(classOf[Parameterized]) TODO -@Ignore // TODO support JoinConditionTypeCoerce class JoinConditionTypeCoerceITCase extends BatchTestBase { @Before diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala index 27552c1..27ce9b0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinWithoutKeyITCase.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} import scala.collection.Seq @@ -48,7 +48,6 @@ class JoinWithoutKeyITCase extends BatchTestBase { // single row join - @Ignore // TODO not support same source until set lazy_from_source @Test def testCrossJoinWithLeftSingleRowInput(): Unit = { checkResult( @@ -60,7 +59,6 @@ class JoinWithoutKeyITCase extends BatchTestBase { )) } - @Ignore // TODO not support same source until set lazy_from_source @Test def testCrossJoinWithRightSingleRowInput(): Unit = { checkResult( @@ -72,7 +70,6 @@ class JoinWithoutKeyITCase extends BatchTestBase { )) } - @Ignore // TODO not support same source until set lazy_from_source @Test def testCrossJoinWithEmptySingleRowInput(): Unit = { checkResult( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala index 2726f6e..414a29d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/SemiJoinITCase.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.TestData._ import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} import java.util @@ -226,7 +226,6 @@ class SemiJoinITCase(expectedJoinType: JoinType) extends BatchTestBase { ) } - @Ignore // TODO not support same source until set lazy_from_source @Test def testInWithAggregate2(): Unit = { checkResult( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala index 8ef2947..e8f1063 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala @@ -317,7 +317,6 @@ class AggregationITCase extends BatchTestBase { } @Test - @Ignore // TODO support it def testAnalyticAggregation(): Unit = { val ds = BatchTableEnvUtil.fromElements(tEnv, (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, BigDecimal.ONE), @@ -333,13 +332,13 @@ class AggregationITCase extends BatchTestBase { '_6.varSamp, '_7.varSamp) val expected = "0,0,0," + - "0,0.5,0.5,0.5," + + "0,0.5,0.5,0.500000000000000000," + "1,1,1," + - "1,0.70710677,0.7071067811865476,0.7071067811865476," + + "1,0.70710677,0.7071067811865476,0.707106781186547600," + "0,0,0," + - "0,0.25,0.25,0.25," + + "0,0.25,0.25,0.250000000000000000," + "1,1,1," + - "1,0.5,0.5,0.5" + "1,0.5,0.5,0.500000000000000000" val results = executeQuery(res) TestBaseUtils.compareResultAsText(results.asJava, expected) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala index 82cad55..b7b7391 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala @@ -264,8 +264,6 @@ class CalcITCase extends BatchTestBase { TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO - @Ignore("After implement BatchExecHashJoin") @Test def testCalcJoin(): Unit = { val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv, "a, b, c") @@ -279,8 +277,6 @@ class CalcITCase extends BatchTestBase { TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO - @Ignore("Implicit type converter") @Test def testAdvancedDataTypes(): Unit = { @@ -315,8 +311,6 @@ class CalcITCase extends BatchTestBase { TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO - @Ignore("Implicit type converter") @Test def testNumericAutocastInArithmetic() { val table = BatchTableEnvUtil.fromElements(tEnv, @@ -455,8 +449,6 @@ class CalcITCase extends BatchTestBase { } @Test - // TODO - @Ignore("Type question, should be fixed later.") def testRowType(): Unit = { val data = new mutable.MutableList[(Int, Long, String)] data.+=((1, 1L, "Jack#22")) @@ -636,14 +628,12 @@ class CalcITCase extends BatchTestBase { } @Test - // TODO - @Ignore("Type question, should be fixed later.") def testSplitFieldsOnCustomType(): Unit = { tEnv.getConfig.setMaxGeneratedCodeLength(1) // splits fields - val ds = CollectionBatchExecTable.getCustomTypeDataSet(tEnv, "i, l, s") - .filter('s.like("%a%") && 's.charLength() > 12) - .select('i, 'l, 's.charLength()) + val ds = CollectionBatchExecTable.getCustomTypeDataSet(tEnv, "myInt, myLong, myString") + .filter('myString.like("%a%") && 'myString.charLength() > 12) + .select('myInt, 'myLong, 'myString.charLength()) val expected = "3,3,25\n" + "3,5,14\n" val results = executeQuery(ds) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala index 5d50ec2..6644883 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala @@ -27,9 +27,9 @@ import org.apache.flink.table.planner.runtime.utils._ import org.apache.flink.types.Row import org.junit.Assert._ +import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{Ignore, _} import scala.collection.mutable @@ -203,8 +203,6 @@ class SortITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode assertEquals(expected, sink.getRetractResults) } - // FIXME - @Ignore("Enable after StreamExecJoin implements ExecNode.") @Test def testSortWithWhere(): Unit = { val sqlQuery = diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala index 3581e9a..0065b8e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.Types import org.apache.flink.table.planner.runtime.stream.sql.SplitAggregateITCase.PartialAggMode import org.apache.flink.table.planner.runtime.utils.StreamingWithAggTestBase.{AggMode, LocalGlobalOff, LocalGlobalOn} import org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchOn @@ -34,7 +34,7 @@ import org.apache.flink.types.Row import org.junit.Assert.assertEquals import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.{Before, Ignore, Test} +import org.junit.{Before, Test} import java.lang.{Integer => JInt, Long => JLong} import java.math.{BigDecimal => JBigDecimal} @@ -296,7 +296,6 @@ class SplitAggregateITCase( assertEquals(expected.sorted, sink.getRetractResults.sorted) } - @Ignore("[FLINK-12088]: JOIN is not supported") @Test def testAggWithJoin(): Unit = { val t1 = tEnv.sqlQuery( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala index 278a2b0..c32131e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/CalcITCase.scala @@ -37,7 +37,6 @@ import scala.collection.{Seq, mutable} @RunWith(classOf[Parameterized]) class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) { - @Ignore("CodeGen split") @Test def testFunctionSplitWhenCodegenOverLengthLimit(): Unit = { // test function split diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala index f1bd827..99cc2c7 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala @@ -626,7 +626,6 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode assertEquals(expected.sorted, sink.getRetractResults.sorted) } - // TODO @Ignore("Non-equi-join could be supported later.") @Test def testNonEqualInnerJoin(): Unit = { @@ -642,7 +641,6 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode assertEquals(expected.sorted, sink.getAppendResults.sorted) } - // TODO @Ignore("Non-equi-join could be supported later.") @Test def testNonEqualInnerJoinWithRetract(): Unit = {