This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 314b418efea8f35d39b05abef5361289b054b6a7 Author: Jim Hughes <jhug...@confluent.io> AuthorDate: Thu Dec 7 08:12:42 2023 -0500 [FLINK-33757] Implement restore tests for Rank node --- .../plan/nodes/exec/stream/RankRestoreTest.java | 42 +++ .../plan/nodes/exec/stream/RankTestPrograms.java | 187 +++++++++++++ .../rank-n-test/plan/rank-n-test.json | 301 +++++++++++++++++++++ .../rank-n-test/savepoint/_metadata | Bin 0 -> 12161 bytes .../plan/rank-test-append-fast-strategy.json} | 111 +++----- .../savepoint/_metadata | Bin 0 -> 13768 bytes .../plan/rank-test-retract-strategy.json} | 133 ++++----- .../rank-test-retract-strategy/savepoint/_metadata | Bin 0 -> 18147 bytes .../plan/rank-test-update-fast-strategy.json} | 186 +++++++------ .../savepoint/_metadata | Bin 0 -> 22387 bytes 10 files changed, 728 insertions(+), 232 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java new file mode 100644 index 00000000000..cb76c1ca723 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankRestoreTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecRank}. */ +public class RankRestoreTest extends RestoreTestBase { + + public RankRestoreTest() { + super(StreamExecRank.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + RankTestPrograms.RANK_TEST_APPEND_FAST_STRATEGY, + RankTestPrograms.RANK_TEST_RETRACT_STRATEGY, + RankTestPrograms.RANK_TEST_UPDATE_FAST_STRATEGY, + RankTestPrograms.RANK_N_TEST); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java new file mode 100644 index 00000000000..979e2adbb52 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/RankTestPrograms.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import static org.apache.flink.table.factories.TestFormatFactory.CHANGELOG_MODE; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecRank}. */ +public class RankTestPrograms { + + static final TableTestProgram RANK_TEST_APPEND_FAST_STRATEGY = + getTableTestProgram( + "rank-test-append-fast-strategy", + "I", + new String[] { + "+I[2, a, 1]", + "+I[4, b, 1]", + "+I[6, c, 1]", + "-U[2, a, 1]", + "+U[1, a, 1]", + "-U[4, b, 1]", + "+U[3, b, 1]", + "-U[6, c, 1]", + "+U[5, c, 1]" + }, + new String[] {"+I[4, d, 1]", "+I[3, e, 1]"}); + + static final TableTestProgram RANK_TEST_RETRACT_STRATEGY = + getTableTestProgram( + "rank-test-retract-strategy", + "I,UA,UB", + new String[] { + "+I[2, a, 1]", + "+I[4, b, 1]", + "+I[6, c, 1]", + "-D[2, a, 1]", + "+I[1, a, 1]", + "-D[4, b, 1]", + "+I[3, b, 1]", + "-D[6, c, 1]", + "+I[5, c, 1]" + }, + new String[] {"+I[4, d, 1]", "+I[3, e, 1]"}); + + static final TableTestProgram RANK_TEST_UPDATE_FAST_STRATEGY = + TableTestProgram.of("rank-test-update-fast-strategy", "validates rank exec node") + .setupTableSource( + SourceTestStep.newBuilder("MyTable") + .addSchema( + "a INT primary key not enforced", "b VARCHAR", "c INT") + .addOption(CHANGELOG_MODE, "I") + .producedBeforeRestore( + Row.of(2, "a", 6), + Row.of(4, "b", 8), + Row.of(6, "c", 10), + Row.of(1, "a", 5), + Row.of(3, "b", 7), + Row.of(5, "c", 9)) + .producedAfterRestore(Row.of(4, "d", 7), Row.of(0, "a", 8)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "a INT NOT NULL", + "b STRING", + "count_c BIGINT NOT NULL", + "row_num BIGINT NOT NULL") + .consumedBeforeRestore( + "+I[2, a, 1, 1]", + "+I[4, b, 1, 1]", + "+I[6, c, 1, 1]", + "-U[2, a, 1, 1]", + "+U[1, a, 1, 1]", + "+I[2, a, 1, 2]", + "-U[4, b, 1, 1]", + "+U[3, b, 1, 1]", + "+I[4, b, 1, 2]", + "-U[6, c, 1, 1]", + "+U[5, c, 1, 1]", + "+I[6, c, 1, 2]") + .consumedAfterRestore( + new String[] { + "+I[4, d, 1, 1]", + "-U[1, a, 1, 1]", + "+U[0, a, 1, 1]", + "-U[2, a, 1, 2]", + "+U[1, a, 1, 2]", + "+I[2, a, 1, 3]" + }) + .build()) + .runSql( + "INSERT INTO sink_t SELECT * FROM (" + + "SELECT a, b, count_c, ROW_NUMBER() " + + " OVER (PARTITION BY b ORDER BY count_c DESC, a ASC) AS row_num" + + " FROM (" + + " SELECT a, b, COUNT(*) AS count_c" + + " FROM MyTable" + + " GROUP BY a, b" + + " )" + + ") WHERE row_num <= 10") + .build(); + + private static TableTestProgram getTableTestProgram( + final String name, + final String changelogMode, + final String[] resultsBeforeRestore, + final String[] resultsAfterRestore) { + return TableTestProgram.of(name, "validates rank exec node") + .setupTableSource( + SourceTestStep.newBuilder("MyTable") + .addSchema("a INT", "b VARCHAR", "c INT primary key not enforced") + .addOption(CHANGELOG_MODE, changelogMode) + .producedBeforeRestore( + Row.of(2, "a", 6), + Row.of(4, "b", 8), + Row.of(6, "c", 10), + Row.of(1, "a", 5), + Row.of(3, "b", 7), + Row.of(5, "c", 9)) + .producedAfterRestore(Row.of(4, "d", 7), Row.of(3, "e", 8)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b VARCHAR", "c BIGINT") + .consumedBeforeRestore(resultsBeforeRestore) + .consumedAfterRestore(resultsAfterRestore) + .build()) + .runSql( + "insert into `sink_t` select * from " + + "(select a, b, row_number() over(partition by b order by c) as c from MyTable)" + + " where c = 1") + .build(); + } + + static final TableTestProgram RANK_N_TEST = + TableTestProgram.of("rank-n-test", "validates rank node can handle multiple outputs") + .setupTableSource( + SourceTestStep.newBuilder("MyTable1") + .addSchema("a STRING", "b INT", "c INT", "t as proctime()") + .producedBeforeRestore( + Row.of("book", 1, 12), + Row.of("book", 2, 19), + Row.of("book", 4, 11), + Row.of("fruit", 4, 33)) + .producedAfterRestore( + Row.of("cereal", 6, 21), + Row.of("cereal", 7, 23), + Row.of("apple", 8, 31), + Row.of("fruit", 9, 41)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("result1") + .addSchema("a varchar", "b int", "c bigint") + .consumedBeforeRestore( + "+I[book, 1, 1]", "+I[book, 2, 2]", "+I[fruit, 4, 1]") + .consumedAfterRestore( + "+I[cereal, 6, 1]", + "+I[cereal, 7, 2]", + "+I[apple, 8, 1]", + "+I[fruit, 9, 2]") + .build()) + .runSql( + "insert into `result1` select * from " + + "(select a, b, row_number() over(partition by a order by t asc) as c from MyTable1)" + + " where c <= 2") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json new file mode 100644 index 00000000000..11f4b21e9e3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/plan/rank-n-test.json @@ -0,0 +1,301 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 17, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "INT" + }, { + "name" : "t", + "kind" : "COMPUTED", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, + "serializableString" : "PROCTIME()" + } + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` VARCHAR(2147483647), `b` INT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` VARCHAR(2147483647), `b` INT> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable1, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "id" : 18, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "CALL", + "internalName" : "$PROCTIME$1", + "operands" : [ ], + "type" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "INT" + }, { + "name" : "$2", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Calc(select=[a, b, PROCTIME() AS $2])" + }, { + "id" : 19, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "INT" + }, { + "name" : "$2", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 20, + "type" : "stream-exec-rank_1", + "configuration" : { + "table.exec.rank.topn-cache-size" : "10000" + }, + "rankType" : "ROW_NUMBER", + "partition" : { + "fields" : [ 0 ] + }, + "orderBy" : { + "fields" : [ { + "index" : 2, + "isAscending" : true, + "nullIsLast" : false + } ] + }, + "rankRange" : { + "type" : "Constant", + "start" : 1, + "end" : 2 + }, + "rankStrategy" : { + "type" : "AppendFast" + }, + "outputRowNumber" : true, + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "rankState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "a", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "fieldType" : "INT" + }, { + "name" : "$2", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "PROCTIME" + } + }, { + "name" : "w0$o0", + "fieldType" : "BIGINT NOT NULL" + } ] + }, + "description" : "Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[$2 ASC], select=[a, b, $2, w0$o0])" + }, { + "id" : 21, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 3, + "type" : "BIGINT NOT NULL" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT, `w0$o0` BIGINT NOT NULL>", + "description" : "Calc(select=[a, b, w0$o0])" + }, { + "id" : 22, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`result1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0, 2 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647), `b` INT, `w0$o0` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.result1], fields=[a, b, w0$o0])" + } ], + "edges" : [ { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 21, + "target" : 22, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata new file mode 100644 index 00000000000..8c9ecaab640 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-n-test/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json similarity index 65% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json index 7209ea0be4a..790da11a5b3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/plan/rank-test-append-fast-strategy.json @@ -1,5 +1,5 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { "id" : 1, "type" : "stream-exec-table-source-scan_1", @@ -10,54 +10,30 @@ "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", "dataType" : "VARCHAR(2147483647)" }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" + "name" : "c", + "dataType" : "INT NOT NULL" } ], - "watermarkSpecs" : [ ] + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_c", + "type" : "PRIMARY_KEY", + "columns" : [ "c" ] + } }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", "inputProperties" : [ ] }, { "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "Calc(select=[a, b])" - }, { - "id" : 3, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -67,10 +43,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", "description" : "Exchange(distribution=[hash[b]])" }, { - "id" : 4, + "id" : 3, "type" : "stream-exec-rank_1", "configuration" : { "table.exec.rank.topn-cache-size" : "10000" @@ -81,19 +57,20 @@ }, "orderBy" : { "fields" : [ { - "index" : 0, + "index" : 2, "isAscending" : true, "nullIsLast" : false } ] }, "rankRange" : { - "type" : "Variable", - "endIndex" : 0 + "type" : "Constant", + "start" : 1, + "end" : 1 }, "rankStrategy" : { "type" : "AppendFast" }, - "outputRowNumber" : true, + "outputRowNumber" : false, "generateUpdateBefore" : true, "state" : [ { "index" : 0, @@ -107,18 +84,22 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT NULL>", - "description" : "Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", + "description" : "Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[c ASC], select=[a, b, c])" }, { - "id" : 5, + "id" : 4, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", "inputIndex" : 0, - "type" : "BIGINT" + "type" : "INT" }, { "kind" : "INPUT_REF", - "inputIndex" : 2, + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : 1, "type" : "BIGINT NOT NULL" } ], "condition" : null, @@ -129,10 +110,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Calc(select=[a, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT NULL>", + "description" : "Calc(select=[a, b, 1 AS $2])" }, { - "id" : 6, + "id" : 5, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -143,28 +124,27 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", "dataType" : "BIGINT" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 1 ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -172,8 +152,8 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, $2])" } ], "edges" : [ { "source" : 1, @@ -203,12 +183,5 @@ "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata new file mode 100644 index 00000000000..9d5f2be6c14 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-append-fast-strategy/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json similarity index 59% copy from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out copy to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json index 7209ea0be4a..e79e6fc6cbc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/plan/rank-test-retract-strategy.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 6, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -10,54 +10,30 @@ "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", "dataType" : "VARCHAR(2147483647)" }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" + "name" : "c", + "dataType" : "INT NOT NULL" } ], - "watermarkSpecs" : [ ] + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_c", + "type" : "PRIMARY_KEY", + "columns" : [ "c" ] + } }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } } }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "Calc(select=[a, b])" - }, { - "id" : 3, + "id" : 7, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -67,10 +43,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", "description" : "Exchange(distribution=[hash[b]])" }, { - "id" : 4, + "id" : 8, "type" : "stream-exec-rank_1", "configuration" : { "table.exec.rank.topn-cache-size" : "10000" @@ -81,19 +57,20 @@ }, "orderBy" : { "fields" : [ { - "index" : 0, + "index" : 2, "isAscending" : true, "nullIsLast" : false } ] }, "rankRange" : { - "type" : "Variable", - "endIndex" : 0 + "type" : "Constant", + "start" : 1, + "end" : 1 }, "rankStrategy" : { - "type" : "AppendFast" + "type" : "Retract" }, - "outputRowNumber" : true, + "outputRowNumber" : false, "generateUpdateBefore" : true, "state" : [ { "index" : 0, @@ -107,18 +84,22 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT NULL>", - "description" : "Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT NOT NULL>", + "description" : "Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[c ASC], select=[a, b, c])" }, { - "id" : 5, + "id" : 9, "type" : "stream-exec-calc_1", "projection" : [ { "kind" : "INPUT_REF", "inputIndex" : 0, - "type" : "BIGINT" + "type" : "INT" }, { "kind" : "INPUT_REF", - "inputIndex" : 2, + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : 1, "type" : "BIGINT NOT NULL" } ], "condition" : null, @@ -129,10 +110,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Calc(select=[a, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT NULL>", + "description" : "Calc(select=[a, b, 1 AS $2])" }, { - "id" : 6, + "id" : 10, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -143,28 +124,27 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT" }, { "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", "dataType" : "BIGINT" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 1 ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -172,43 +152,36 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, w0$o0])" + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `$2` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, $2])" } ], "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, + "source" : 6, + "target" : 7, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 7, + "target" : 8, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 8, + "target" : 9, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 6, + "source" : 9, + "target" : 10, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata new file mode 100644 index 00000000000..4e542ae3a77 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-retract-strategy/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json similarity index 50% rename from flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out rename to flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json index 7209ea0be4a..ce4974b9939 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/RankJsonPlanTest_jsonplan/testRank.out +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/plan/rank-test-update-fast-strategy.json @@ -1,7 +1,7 @@ { - "flinkVersion" : "", + "flinkVersion" : "1.19", "nodes" : [ { - "id" : 1, + "id" : 11, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -10,43 +10,77 @@ "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", "dataType" : "INT NOT NULL" }, { - "name" : "c", + "name" : "b", "dataType" : "VARCHAR(2147483647)" }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" + "name" : "c", + "dataType" : "INT" } ], - "watermarkSpecs" : [ ] + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } + "partitionKeys" : [ ] } - } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)> NOT NULL" + } ] }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])", "inputProperties" : [ ] }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" + "id" : 12, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647)>", + "description" : "Exchange(distribution=[hash[a, b]])" + }, { + "id" : 13, + "type" : "stream-exec-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "grouping" : [ 0, 1 ], + "aggCalls" : [ { + "name" : "count_c", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "generateUpdateBefore" : false, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "groupAggregateState" } ], - "condition" : null, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -54,10 +88,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", - "description" : "Calc(select=[a, b])" + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` BIGINT NOT NULL>", + "description" : "GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c])" }, { - "id" : 3, + "id" : 14, "type" : "stream-exec-exchange_1", "inputProperties" : [ { "requiredDistribution" : { @@ -67,10 +101,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` BIGINT NOT NULL>", "description" : "Exchange(distribution=[hash[b]])" }, { - "id" : 4, + "id" : 15, "type" : "stream-exec-rank_1", "configuration" : { "table.exec.rank.topn-cache-size" : "10000" @@ -81,17 +115,23 @@ }, "orderBy" : { "fields" : [ { + "index" : 2, + "isAscending" : false, + "nullIsLast" : true + }, { "index" : 0, "isAscending" : true, "nullIsLast" : false } ] }, "rankRange" : { - "type" : "Variable", - "endIndex" : 0 + "type" : "Constant", + "start" : 1, + "end" : 10 }, "rankStrategy" : { - "type" : "AppendFast" + "type" : "UpdateFast", + "primaryKeys" : [ 0, 1 ] }, "outputRowNumber" : true, "generateUpdateBefore" : true, @@ -107,32 +147,10 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `w0$o0` BIGINT NOT NULL>", - "description" : "Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankEnd=a], partitionBy=[b], orderBy=[a ASC], select=[a, b, w0$o0])" + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>", + "description" : "Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[b], orderBy=[count_c DESC, a ASC], select=[a, b, count_c, w0$o0])" }, { - "id" : 5, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Calc(select=[a, w0$o0])" - }, { - "id" : 6, + "id" : 16, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -143,28 +161,30 @@ }, "dynamicTableSink" : { "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", + "identifier" : "`default_catalog`.`default_database`.`sink_t`", "resolvedTable" : { "schema" : { "columns" : [ { "name" : "a", - "dataType" : "BIGINT" + "dataType" : "INT NOT NULL" }, { "name" : "b", - "dataType" : "BIGINT" + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "count_c", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "row_num", + "dataType" : "BIGINT NOT NULL" } ], "watermarkSpecs" : [ ] }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } + "partitionKeys" : [ ] } } }, "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0, 1 ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" @@ -172,43 +192,43 @@ "damBehavior" : "PIPELINED", "priority" : 0 } ], - "outputType" : "ROW<`a` BIGINT, `w0$o0` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, w0$o0])" + "outputType" : "ROW<`a` INT NOT NULL, `b` VARCHAR(2147483647), `count_c` BIGINT NOT NULL, `w0$o0` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, count_c, w0$o0])" } ], "edges" : [ { - "source" : 1, - "target" : 2, + "source" : 11, + "target" : 12, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 2, - "target" : 3, + "source" : 12, + "target" : 13, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 3, - "target" : 4, + "source" : 13, + "target" : 14, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 4, - "target" : 5, + "source" : 14, + "target" : 15, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" }, { - "source" : 5, - "target" : 6, + "source" : 15, + "target" : 16, "shuffle" : { "type" : "FORWARD" }, "shuffleMode" : "PIPELINED" } ] -} +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata new file mode 100644 index 00000000000..b3d7adb0e5e Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-rank_1/rank-test-update-fast-strategy/savepoint/_metadata differ